ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.88
Committed: Sun Jul 3 02:46:10 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.87: +8 -8 lines
Log Message:
rename some private methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.Condition;
9 import java.util.concurrent.locks.ReentrantLock;
10 import java.util.AbstractQueue;
11 import java.util.Collection;
12 import java.util.Iterator;
13 import java.util.NoSuchElementException;
14
15 /**
16 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
17 * array. This queue orders elements FIFO (first-in-first-out). The
18 * <em>head</em> of the queue is that element that has been on the
19 * queue the longest time. The <em>tail</em> of the queue is that
20 * element that has been on the queue the shortest time. New elements
21 * are inserted at the tail of the queue, and the queue retrieval
22 * operations obtain elements at the head of the queue.
23 *
24 * <p>This is a classic &quot;bounded buffer&quot;, in which a
25 * fixed-sized array holds elements inserted by producers and
26 * extracted by consumers. Once created, the capacity cannot be
27 * changed. Attempts to {@code put} an element into a full queue
28 * will result in the operation blocking; attempts to {@code take} an
29 * element from an empty queue will similarly block.
30 *
31 * <p>This class supports an optional fairness policy for ordering
32 * waiting producer and consumer threads. By default, this ordering
33 * is not guaranteed. However, a queue constructed with fairness set
34 * to {@code true} grants threads access in FIFO order. Fairness
35 * generally decreases throughput but reduces variability and avoids
36 * starvation.
37 *
38 * <p>This class and its iterator implement all of the
39 * <em>optional</em> methods of the {@link Collection} and {@link
40 * Iterator} interfaces.
41 *
42 * <p>This class is a member of the
43 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
44 * Java Collections Framework</a>.
45 *
46 * @since 1.5
47 * @author Doug Lea
48 * @param <E> the type of elements held in this collection
49 */
50 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
51 implements BlockingQueue<E>, java.io.Serializable {
52
53 /**
54 * Serialization ID. This class relies on default serialization
55 * even for the items array, which is default-serialized, even if
56 * it is empty. Otherwise it could not be declared final, which is
57 * necessary here.
58 */
59 private static final long serialVersionUID = -817911632652898426L;
60
61 /** The queued items */
62 final Object[] items;
63
64 /** items index for next take, poll, peek or remove */
65 int takeIndex;
66
67 /** items index for next put, offer, or add */
68 int putIndex;
69
70 /** Number of elements in the queue */
71 int count;
72
73 /*
74 * Concurrency control uses the classic two-condition algorithm
75 * found in any textbook.
76 */
77
78 /** Main lock guarding all access */
79 final ReentrantLock lock;
80
81 /** Condition for waiting takes */
82 private final Condition notEmpty;
83
84 /** Condition for waiting puts */
85 private final Condition notFull;
86
87 // Internal helper methods
88
89 /**
90 * Circularly increment i.
91 */
92 final int inc(int i) {
93 return (++i == items.length) ? 0 : i;
94 }
95
96 /**
97 * Circularly decrement i.
98 */
99 final int dec(int i) {
100 return ((i == 0) ? items.length : i) - 1;
101 }
102
103 /**
104 * Returns item at index i.
105 */
106 final E itemAt(int i) {
107 @SuppressWarnings("unchecked") E x = (E) items[i];
108 return x;
109 }
110
111 /**
112 * Throws NullPointerException if argument is null.
113 *
114 * @param v the element
115 */
116 private static void checkNotNull(Object v) {
117 if (v == null)
118 throw new NullPointerException();
119 }
120
121 /**
122 * Inserts element at current put position, advances, and signals.
123 * Call only when holding lock.
124 */
125 private void enqueue(E x) {
126 items[putIndex] = x;
127 putIndex = inc(putIndex);
128 ++count;
129 notEmpty.signal();
130 }
131
132 /**
133 * Extracts element at current take position, advances, and signals.
134 * Call only when holding lock.
135 */
136 private E dequeue() {
137 final Object[] items = this.items;
138 @SuppressWarnings("unchecked") E x = (E) items[takeIndex];
139 items[takeIndex] = null;
140 takeIndex = inc(takeIndex);
141 --count;
142 notFull.signal();
143 return x;
144 }
145
146 /**
147 * Deletes item at position i.
148 * Utility for remove and iterator.remove.
149 * Call only when holding lock.
150 */
151 void removeAt(int i) {
152 final Object[] items = this.items;
153 // if removing front item, just advance
154 if (i == takeIndex) {
155 items[takeIndex] = null;
156 takeIndex = inc(takeIndex);
157 } else {
158 // slide over all others up through putIndex.
159 for (;;) {
160 int nexti = inc(i);
161 if (nexti != putIndex) {
162 items[i] = items[nexti];
163 i = nexti;
164 } else {
165 items[i] = null;
166 putIndex = i;
167 break;
168 }
169 }
170 }
171 --count;
172 notFull.signal();
173 }
174
175 /**
176 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
177 * capacity and default access policy.
178 *
179 * @param capacity the capacity of this queue
180 * @throws IllegalArgumentException if {@code capacity < 1}
181 */
182 public ArrayBlockingQueue(int capacity) {
183 this(capacity, false);
184 }
185
186 /**
187 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
188 * capacity and the specified access policy.
189 *
190 * @param capacity the capacity of this queue
191 * @param fair if {@code true} then queue accesses for threads blocked
192 * on insertion or removal, are processed in FIFO order;
193 * if {@code false} the access order is unspecified.
194 * @throws IllegalArgumentException if {@code capacity < 1}
195 */
196 public ArrayBlockingQueue(int capacity, boolean fair) {
197 if (capacity <= 0)
198 throw new IllegalArgumentException();
199 this.items = new Object[capacity];
200 lock = new ReentrantLock(fair);
201 notEmpty = lock.newCondition();
202 notFull = lock.newCondition();
203 }
204
205 /**
206 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
207 * capacity, the specified access policy and initially containing the
208 * elements of the given collection,
209 * added in traversal order of the collection's iterator.
210 *
211 * @param capacity the capacity of this queue
212 * @param fair if {@code true} then queue accesses for threads blocked
213 * on insertion or removal, are processed in FIFO order;
214 * if {@code false} the access order is unspecified.
215 * @param c the collection of elements to initially contain
216 * @throws IllegalArgumentException if {@code capacity} is less than
217 * {@code c.size()}, or less than 1.
218 * @throws NullPointerException if the specified collection or any
219 * of its elements are null
220 */
221 public ArrayBlockingQueue(int capacity, boolean fair,
222 Collection<? extends E> c) {
223 this(capacity, fair);
224
225 final ReentrantLock lock = this.lock;
226 lock.lock(); // Lock only for visibility, not mutual exclusion
227 try {
228 int i = 0;
229 try {
230 for (E e : c) {
231 checkNotNull(e);
232 items[i++] = e;
233 }
234 } catch (ArrayIndexOutOfBoundsException ex) {
235 throw new IllegalArgumentException();
236 }
237 count = i;
238 putIndex = (i == capacity) ? 0 : i;
239 } finally {
240 lock.unlock();
241 }
242 }
243
244 /**
245 * Inserts the specified element at the tail of this queue if it is
246 * possible to do so immediately without exceeding the queue's capacity,
247 * returning {@code true} upon success and throwing an
248 * {@code IllegalStateException} if this queue is full.
249 *
250 * @param e the element to add
251 * @return {@code true} (as specified by {@link Collection#add})
252 * @throws IllegalStateException if this queue is full
253 * @throws NullPointerException if the specified element is null
254 */
255 public boolean add(E e) {
256 return super.add(e);
257 }
258
259 /**
260 * Inserts the specified element at the tail of this queue if it is
261 * possible to do so immediately without exceeding the queue's capacity,
262 * returning {@code true} upon success and {@code false} if this queue
263 * is full. This method is generally preferable to method {@link #add},
264 * which can fail to insert an element only by throwing an exception.
265 *
266 * @throws NullPointerException if the specified element is null
267 */
268 public boolean offer(E e) {
269 checkNotNull(e);
270 final ReentrantLock lock = this.lock;
271 lock.lock();
272 try {
273 if (count == items.length)
274 return false;
275 else {
276 enqueue(e);
277 return true;
278 }
279 } finally {
280 lock.unlock();
281 }
282 }
283
284 /**
285 * Inserts the specified element at the tail of this queue, waiting
286 * for space to become available if the queue is full.
287 *
288 * @throws InterruptedException {@inheritDoc}
289 * @throws NullPointerException {@inheritDoc}
290 */
291 public void put(E e) throws InterruptedException {
292 checkNotNull(e);
293 final ReentrantLock lock = this.lock;
294 lock.lockInterruptibly();
295 try {
296 while (count == items.length)
297 notFull.await();
298 enqueue(e);
299 } finally {
300 lock.unlock();
301 }
302 }
303
304 /**
305 * Inserts the specified element at the tail of this queue, waiting
306 * up to the specified wait time for space to become available if
307 * the queue is full.
308 *
309 * @throws InterruptedException {@inheritDoc}
310 * @throws NullPointerException {@inheritDoc}
311 */
312 public boolean offer(E e, long timeout, TimeUnit unit)
313 throws InterruptedException {
314
315 checkNotNull(e);
316 long nanos = unit.toNanos(timeout);
317 final ReentrantLock lock = this.lock;
318 lock.lockInterruptibly();
319 try {
320 while (count == items.length) {
321 if (nanos <= 0)
322 return false;
323 nanos = notFull.awaitNanos(nanos);
324 }
325 enqueue(e);
326 return true;
327 } finally {
328 lock.unlock();
329 }
330 }
331
332 public E poll() {
333 final ReentrantLock lock = this.lock;
334 lock.lock();
335 try {
336 return (count == 0) ? null : dequeue();
337 } finally {
338 lock.unlock();
339 }
340 }
341
342 public E take() throws InterruptedException {
343 final ReentrantLock lock = this.lock;
344 lock.lockInterruptibly();
345 try {
346 while (count == 0)
347 notEmpty.await();
348 return dequeue();
349 } finally {
350 lock.unlock();
351 }
352 }
353
354 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
355 long nanos = unit.toNanos(timeout);
356 final ReentrantLock lock = this.lock;
357 lock.lockInterruptibly();
358 try {
359 while (count == 0) {
360 if (nanos <= 0)
361 return null;
362 nanos = notEmpty.awaitNanos(nanos);
363 }
364 return dequeue();
365 } finally {
366 lock.unlock();
367 }
368 }
369
370 public E peek() {
371 final ReentrantLock lock = this.lock;
372 lock.lock();
373 try {
374 return (count == 0) ? null : itemAt(takeIndex);
375 } finally {
376 lock.unlock();
377 }
378 }
379
380 // this doc comment is overridden to remove the reference to collections
381 // greater in size than Integer.MAX_VALUE
382 /**
383 * Returns the number of elements in this queue.
384 *
385 * @return the number of elements in this queue
386 */
387 public int size() {
388 final ReentrantLock lock = this.lock;
389 lock.lock();
390 try {
391 return count;
392 } finally {
393 lock.unlock();
394 }
395 }
396
397 // this doc comment is a modified copy of the inherited doc comment,
398 // without the reference to unlimited queues.
399 /**
400 * Returns the number of additional elements that this queue can ideally
401 * (in the absence of memory or resource constraints) accept without
402 * blocking. This is always equal to the initial capacity of this queue
403 * less the current {@code size} of this queue.
404 *
405 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
406 * an element will succeed by inspecting {@code remainingCapacity}
407 * because it may be the case that another thread is about to
408 * insert or remove an element.
409 */
410 public int remainingCapacity() {
411 final ReentrantLock lock = this.lock;
412 lock.lock();
413 try {
414 return items.length - count;
415 } finally {
416 lock.unlock();
417 }
418 }
419
420 /**
421 * Removes a single instance of the specified element from this queue,
422 * if it is present. More formally, removes an element {@code e} such
423 * that {@code o.equals(e)}, if this queue contains one or more such
424 * elements.
425 * Returns {@code true} if this queue contained the specified element
426 * (or equivalently, if this queue changed as a result of the call).
427 *
428 * <p>Removal of interior elements in circular array based queues
429 * is an intrinsically slow and disruptive operation, so should
430 * be undertaken only in exceptional circumstances, ideally
431 * only when the queue is known not to be accessible by other
432 * threads.
433 *
434 * @param o element to be removed from this queue, if present
435 * @return {@code true} if this queue changed as a result of the call
436 */
437 public boolean remove(Object o) {
438 if (o == null) return false;
439 final Object[] items = this.items;
440 final ReentrantLock lock = this.lock;
441 lock.lock();
442 try {
443 if (count > 0) {
444 final int putIndex = this.putIndex;
445 int i = takeIndex;
446 do {
447 if (o.equals(items[i])) {
448 removeAt(i);
449 return true;
450 }
451 } while ((i = inc(i)) != putIndex);
452 }
453 return false;
454 } finally {
455 lock.unlock();
456 }
457 }
458
459 /**
460 * Returns {@code true} if this queue contains the specified element.
461 * More formally, returns {@code true} if and only if this queue contains
462 * at least one element {@code e} such that {@code o.equals(e)}.
463 *
464 * @param o object to be checked for containment in this queue
465 * @return {@code true} if this queue contains the specified element
466 */
467 public boolean contains(Object o) {
468 if (o == null) return false;
469 final Object[] items = this.items;
470 final ReentrantLock lock = this.lock;
471 lock.lock();
472 try {
473 if (count > 0) {
474 final int putIndex = this.putIndex;
475 int i = takeIndex;
476 do {
477 if (o.equals(items[i]))
478 return true;
479 } while ((i = inc(i)) != putIndex);
480 }
481 return false;
482 } finally {
483 lock.unlock();
484 }
485 }
486
487 /**
488 * Returns an array containing all of the elements in this queue, in
489 * proper sequence.
490 *
491 * <p>The returned array will be "safe" in that no references to it are
492 * maintained by this queue. (In other words, this method must allocate
493 * a new array). The caller is thus free to modify the returned array.
494 *
495 * <p>This method acts as bridge between array-based and collection-based
496 * APIs.
497 *
498 * @return an array containing all of the elements in this queue
499 */
500 public Object[] toArray() {
501 final Object[] items = this.items;
502 final ReentrantLock lock = this.lock;
503 lock.lock();
504 try {
505 final int count = this.count;
506 Object[] a = new Object[count];
507 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
508 a[k] = items[i];
509 return a;
510 } finally {
511 lock.unlock();
512 }
513 }
514
515 /**
516 * Returns an array containing all of the elements in this queue, in
517 * proper sequence; the runtime type of the returned array is that of
518 * the specified array. If the queue fits in the specified array, it
519 * is returned therein. Otherwise, a new array is allocated with the
520 * runtime type of the specified array and the size of this queue.
521 *
522 * <p>If this queue fits in the specified array with room to spare
523 * (i.e., the array has more elements than this queue), the element in
524 * the array immediately following the end of the queue is set to
525 * {@code null}.
526 *
527 * <p>Like the {@link #toArray()} method, this method acts as bridge between
528 * array-based and collection-based APIs. Further, this method allows
529 * precise control over the runtime type of the output array, and may,
530 * under certain circumstances, be used to save allocation costs.
531 *
532 * <p>Suppose {@code x} is a queue known to contain only strings.
533 * The following code can be used to dump the queue into a newly
534 * allocated array of {@code String}:
535 *
536 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
537 *
538 * Note that {@code toArray(new Object[0])} is identical in function to
539 * {@code toArray()}.
540 *
541 * @param a the array into which the elements of the queue are to
542 * be stored, if it is big enough; otherwise, a new array of the
543 * same runtime type is allocated for this purpose
544 * @return an array containing all of the elements in this queue
545 * @throws ArrayStoreException if the runtime type of the specified array
546 * is not a supertype of the runtime type of every element in
547 * this queue
548 * @throws NullPointerException if the specified array is null
549 */
550 @SuppressWarnings("unchecked")
551 public <T> T[] toArray(T[] a) {
552 final Object[] items = this.items;
553 final ReentrantLock lock = this.lock;
554 lock.lock();
555 try {
556 final int count = this.count;
557 final int len = a.length;
558 if (len < count)
559 a = (T[])java.lang.reflect.Array.newInstance(
560 a.getClass().getComponentType(), count);
561 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
562 a[k] = (T) items[i];
563 if (len > count)
564 a[count] = null;
565 return a;
566 } finally {
567 lock.unlock();
568 }
569 }
570
571 public String toString() {
572 final ReentrantLock lock = this.lock;
573 lock.lock();
574 try {
575 int k = count;
576 if (k == 0)
577 return "[]";
578
579 StringBuilder sb = new StringBuilder();
580 sb.append('[');
581 for (int i = takeIndex; ; i = inc(i)) {
582 Object e = items[i];
583 sb.append(e == this ? "(this Collection)" : e);
584 if (--k == 0)
585 return sb.append(']').toString();
586 sb.append(',').append(' ');
587 }
588 } finally {
589 lock.unlock();
590 }
591 }
592
593 /**
594 * Atomically removes all of the elements from this queue.
595 * The queue will be empty after this call returns.
596 */
597 public void clear() {
598 final Object[] items = this.items;
599 final ReentrantLock lock = this.lock;
600 lock.lock();
601 try {
602 int k = count;
603 if (k > 0) {
604 final int putIndex = this.putIndex;
605 int i = takeIndex;
606 do {
607 items[i] = null;
608 } while ((i = inc(i)) != putIndex);
609 takeIndex = putIndex;
610 count = 0;
611 for (; k > 0 && lock.hasWaiters(notFull); k--)
612 notFull.signal();
613 }
614 } finally {
615 lock.unlock();
616 }
617 }
618
619 /**
620 * @throws UnsupportedOperationException {@inheritDoc}
621 * @throws ClassCastException {@inheritDoc}
622 * @throws NullPointerException {@inheritDoc}
623 * @throws IllegalArgumentException {@inheritDoc}
624 */
625 public int drainTo(Collection<? super E> c) {
626 return drainTo(c, Integer.MAX_VALUE);
627 }
628
629 /**
630 * @throws UnsupportedOperationException {@inheritDoc}
631 * @throws ClassCastException {@inheritDoc}
632 * @throws NullPointerException {@inheritDoc}
633 * @throws IllegalArgumentException {@inheritDoc}
634 */
635 public int drainTo(Collection<? super E> c, int maxElements) {
636 checkNotNull(c);
637 if (c == this)
638 throw new IllegalArgumentException();
639 if (maxElements <= 0)
640 return 0;
641 final Object[] items = this.items;
642 final ReentrantLock lock = this.lock;
643 lock.lock();
644 try {
645 int n = Math.min(maxElements, count);
646 int take = takeIndex;
647 int i = 0;
648 try {
649 while (i < n) {
650 @SuppressWarnings("unchecked") E x = (E) items[take];
651 c.add(x);
652 items[take] = null;
653 take = inc(take);
654 i++;
655 }
656 return n;
657 } finally {
658 // Restore invariants even if c.add() threw
659 count -= i;
660 takeIndex = take;
661 for (; i > 0 && lock.hasWaiters(notFull); i--)
662 notFull.signal();
663 }
664 } finally {
665 lock.unlock();
666 }
667 }
668
669 /**
670 * Returns an iterator over the elements in this queue in proper sequence.
671 * The elements will be returned in order from first (head) to last (tail).
672 *
673 * <p>The returned iterator is a "weakly consistent" iterator that
674 * will never throw {@link java.util.ConcurrentModificationException
675 * ConcurrentModificationException}, and guarantees to traverse
676 * elements as they existed upon construction of the iterator, and
677 * may (but is not guaranteed to) reflect any modifications
678 * subsequent to construction.
679 *
680 * @return an iterator over the elements in this queue in proper sequence
681 */
682 public Iterator<E> iterator() {
683 return new Itr();
684 }
685
686 /**
687 * Iterator for ArrayBlockingQueue. To maintain weak consistency
688 * with respect to puts and takes, we (1) read ahead one slot, so
689 * as to not report hasNext true but then not have an element to
690 * return -- however we later recheck this slot to use the most
691 * current value; (2) ensure that each array slot is traversed at
692 * most once (by tracking "remaining" elements); (3) skip over
693 * null slots, which can occur if takes race ahead of iterators.
694 * However, for circular array-based queues, we cannot rely on any
695 * well established definition of what it means to be weakly
696 * consistent with respect to interior removes since these may
697 * require slot overwrites in the process of sliding elements to
698 * cover gaps. So we settle for resiliency, operating on
699 * established apparent nexts, which may miss some elements that
700 * have moved between calls to next.
701 */
702 private class Itr implements Iterator<E> {
703 private int remaining; // Number of elements yet to be returned
704 private int nextIndex; // Index of element to be returned by next
705 private E nextItem; // Element to be returned by next call to next
706 private E lastItem; // Element returned by last call to next
707 private int lastRet; // Index of last element returned, or -1 if none
708
709 Itr() {
710 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
711 lock.lock();
712 try {
713 lastRet = -1;
714 if ((remaining = count) > 0)
715 nextItem = itemAt(nextIndex = takeIndex);
716 } finally {
717 lock.unlock();
718 }
719 }
720
721 public boolean hasNext() {
722 return remaining > 0;
723 }
724
725 public E next() {
726 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
727 lock.lock();
728 try {
729 if (remaining <= 0)
730 throw new NoSuchElementException();
731 lastRet = nextIndex;
732 E x = itemAt(nextIndex); // check for fresher value
733 if (x == null) {
734 x = nextItem; // we are forced to report old value
735 lastItem = null; // but ensure remove fails
736 }
737 else
738 lastItem = x;
739 while (--remaining > 0 && // skip over nulls
740 (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
741 ;
742 return x;
743 } finally {
744 lock.unlock();
745 }
746 }
747
748 public void remove() {
749 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
750 lock.lock();
751 try {
752 int i = lastRet;
753 if (i == -1)
754 throw new IllegalStateException();
755 lastRet = -1;
756 E x = lastItem;
757 lastItem = null;
758 // only remove if item still at index
759 if (x != null && x == items[i]) {
760 boolean removingHead = (i == takeIndex);
761 removeAt(i);
762 if (!removingHead)
763 nextIndex = dec(nextIndex);
764 }
765 } finally {
766 lock.unlock();
767 }
768 }
769 }
770
771 }