ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.6
Committed: Sun Jan 18 20:17:32 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.5: +1 -0 lines
Log Message:
exactly one blank line before and after package statements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.Condition;
10 import java.util.concurrent.locks.ReentrantLock;
11 import java.util.AbstractQueue;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.lang.ref.WeakReference;
16
17 /**
18 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
19 * array. This queue orders elements FIFO (first-in-first-out). The
20 * <em>head</em> of the queue is that element that has been on the
21 * queue the longest time. The <em>tail</em> of the queue is that
22 * element that has been on the queue the shortest time. New elements
23 * are inserted at the tail of the queue, and the queue retrieval
24 * operations obtain elements at the head of the queue.
25 *
26 * <p>This is a classic &quot;bounded buffer&quot;, in which a
27 * fixed-sized array holds elements inserted by producers and
28 * extracted by consumers. Once created, the capacity cannot be
29 * changed. Attempts to {@code put} an element into a full queue
30 * will result in the operation blocking; attempts to {@code take} an
31 * element from an empty queue will similarly block.
32 *
33 * <p>This class supports an optional fairness policy for ordering
34 * waiting producer and consumer threads. By default, this ordering
35 * is not guaranteed. However, a queue constructed with fairness set
36 * to {@code true} grants threads access in FIFO order. Fairness
37 * generally decreases throughput but reduces variability and avoids
38 * starvation.
39 *
40 * <p>This class and its iterator implement all of the
41 * <em>optional</em> methods of the {@link Collection} and {@link
42 * Iterator} interfaces.
43 *
44 * <p>This class is a member of the
45 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
46 * Java Collections Framework</a>.
47 *
48 * @since 1.5
49 * @author Doug Lea
50 * @param <E> the type of elements held in this collection
51 */
52 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
53 implements BlockingQueue<E>, java.io.Serializable {
54
55 /**
56 * Serialization ID. This class relies on default serialization
57 * even for the items array, which is default-serialized, even if
58 * it is empty. Otherwise it could not be declared final, which is
59 * necessary here.
60 */
61 private static final long serialVersionUID = -817911632652898426L;
62
63 /** The queued items */
64 final Object[] items;
65
66 /** items index for next take, poll, peek or remove */
67 int takeIndex;
68
69 /** items index for next put, offer, or add */
70 int putIndex;
71
72 /** Number of elements in the queue */
73 int count;
74
75 /*
76 * Concurrency control uses the classic two-condition algorithm
77 * found in any textbook.
78 */
79
80 /** Main lock guarding all access */
81 final ReentrantLock lock;
82
83 /** Condition for waiting takes */
84 private final Condition notEmpty;
85
86 /** Condition for waiting puts */
87 private final Condition notFull;
88
89 /**
90 * Shared state for currently active iterators, or null if there
91 * are known not to be any. Allows queue operations to update
92 * iterator state.
93 */
94 transient Itrs itrs = null;
95
96 // Internal helper methods
97
98 /**
99 * Circularly increment i.
100 */
101 final int inc(int i) {
102 return (++i == items.length) ? 0 : i;
103 }
104
105 /**
106 * Circularly decrement i.
107 */
108 final int dec(int i) {
109 return ((i == 0) ? items.length : i) - 1;
110 }
111
112 /**
113 * Returns item at index i.
114 */
115 @SuppressWarnings("unchecked")
116 final E itemAt(int i) {
117 return (E) items[i];
118 }
119
120 /**
121 * Throws NullPointerException if argument is null.
122 *
123 * @param v the element
124 */
125 private static void checkNotNull(Object v) {
126 if (v == null)
127 throw new NullPointerException();
128 }
129
130 /**
131 * Inserts element at current put position, advances, and signals.
132 * Call only when holding lock.
133 */
134 private void enqueue(E x) {
135 // assert lock.getHoldCount() == 1;
136 // assert items[putIndex] == null;
137 items[putIndex] = x;
138 putIndex = inc(putIndex);
139 count++;
140 notEmpty.signal();
141 }
142
143 /**
144 * Extracts element at current take position, advances, and signals.
145 * Call only when holding lock.
146 */
147 private E dequeue() {
148 // assert lock.getHoldCount() == 1;
149 // assert items[takeIndex] != null;
150 final Object[] items = this.items;
151 @SuppressWarnings("unchecked")
152 E x = (E) items[takeIndex];
153 items[takeIndex] = null;
154 takeIndex = inc(takeIndex);
155 count--;
156 if (itrs != null)
157 itrs.elementDequeued();
158 notFull.signal();
159 return x;
160 }
161
162 /**
163 * Deletes item at array index removeIndex.
164 * Utility for remove(Object) and iterator.remove.
165 * Call only when holding lock.
166 */
167 void removeAt(final int removeIndex) {
168 // assert lock.getHoldCount() == 1;
169 // assert items[removeIndex] != null;
170 // assert removeIndex >= 0 && removeIndex < items.length;
171 final Object[] items = this.items;
172 if (removeIndex == takeIndex) {
173 // removing front item; just advance
174 items[takeIndex] = null;
175 takeIndex = inc(takeIndex);
176 count--;
177 if (itrs != null)
178 itrs.elementDequeued();
179 } else {
180 // an "interior" remove
181
182 // slide over all others up through putIndex.
183 final int putIndex = this.putIndex;
184 for (int i = removeIndex;;) {
185 int next = inc(i);
186 if (next != putIndex) {
187 items[i] = items[next];
188 i = next;
189 } else {
190 items[i] = null;
191 this.putIndex = i;
192 break;
193 }
194 }
195 count--;
196 if (itrs != null)
197 itrs.removedAt(removeIndex);
198 }
199 notFull.signal();
200 }
201
202 /**
203 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
204 * capacity and default access policy.
205 *
206 * @param capacity the capacity of this queue
207 * @throws IllegalArgumentException if {@code capacity < 1}
208 */
209 public ArrayBlockingQueue(int capacity) {
210 this(capacity, false);
211 }
212
213 /**
214 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
215 * capacity and the specified access policy.
216 *
217 * @param capacity the capacity of this queue
218 * @param fair if {@code true} then queue accesses for threads blocked
219 * on insertion or removal, are processed in FIFO order;
220 * if {@code false} the access order is unspecified.
221 * @throws IllegalArgumentException if {@code capacity < 1}
222 */
223 public ArrayBlockingQueue(int capacity, boolean fair) {
224 if (capacity <= 0)
225 throw new IllegalArgumentException();
226 this.items = new Object[capacity];
227 lock = new ReentrantLock(fair);
228 notEmpty = lock.newCondition();
229 notFull = lock.newCondition();
230 }
231
232 /**
233 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
234 * capacity, the specified access policy and initially containing the
235 * elements of the given collection,
236 * added in traversal order of the collection's iterator.
237 *
238 * @param capacity the capacity of this queue
239 * @param fair if {@code true} then queue accesses for threads blocked
240 * on insertion or removal, are processed in FIFO order;
241 * if {@code false} the access order is unspecified.
242 * @param c the collection of elements to initially contain
243 * @throws IllegalArgumentException if {@code capacity} is less than
244 * {@code c.size()}, or less than 1.
245 * @throws NullPointerException if the specified collection or any
246 * of its elements are null
247 */
248 public ArrayBlockingQueue(int capacity, boolean fair,
249 Collection<? extends E> c) {
250 this(capacity, fair);
251
252 final ReentrantLock lock = this.lock;
253 lock.lock(); // Lock only for visibility, not mutual exclusion
254 try {
255 int i = 0;
256 try {
257 for (E e : c) {
258 checkNotNull(e);
259 items[i++] = e;
260 }
261 } catch (ArrayIndexOutOfBoundsException ex) {
262 throw new IllegalArgumentException();
263 }
264 count = i;
265 putIndex = (i == capacity) ? 0 : i;
266 } finally {
267 lock.unlock();
268 }
269 }
270
271 /**
272 * Inserts the specified element at the tail of this queue if it is
273 * possible to do so immediately without exceeding the queue's capacity,
274 * returning {@code true} upon success and throwing an
275 * {@code IllegalStateException} if this queue is full.
276 *
277 * @param e the element to add
278 * @return {@code true} (as specified by {@link Collection#add})
279 * @throws IllegalStateException if this queue is full
280 * @throws NullPointerException if the specified element is null
281 */
282 public boolean add(E e) {
283 return super.add(e);
284 }
285
286 /**
287 * Inserts the specified element at the tail of this queue if it is
288 * possible to do so immediately without exceeding the queue's capacity,
289 * returning {@code true} upon success and {@code false} if this queue
290 * is full. This method is generally preferable to method {@link #add},
291 * which can fail to insert an element only by throwing an exception.
292 *
293 * @throws NullPointerException if the specified element is null
294 */
295 public boolean offer(E e) {
296 checkNotNull(e);
297 final ReentrantLock lock = this.lock;
298 lock.lock();
299 try {
300 if (count == items.length)
301 return false;
302 else {
303 enqueue(e);
304 return true;
305 }
306 } finally {
307 lock.unlock();
308 }
309 }
310
311 /**
312 * Inserts the specified element at the tail of this queue, waiting
313 * for space to become available if the queue is full.
314 *
315 * @throws InterruptedException {@inheritDoc}
316 * @throws NullPointerException {@inheritDoc}
317 */
318 public void put(E e) throws InterruptedException {
319 checkNotNull(e);
320 final ReentrantLock lock = this.lock;
321 lock.lockInterruptibly();
322 try {
323 while (count == items.length)
324 notFull.await();
325 enqueue(e);
326 } finally {
327 lock.unlock();
328 }
329 }
330
331 /**
332 * Inserts the specified element at the tail of this queue, waiting
333 * up to the specified wait time for space to become available if
334 * the queue is full.
335 *
336 * @throws InterruptedException {@inheritDoc}
337 * @throws NullPointerException {@inheritDoc}
338 */
339 public boolean offer(E e, long timeout, TimeUnit unit)
340 throws InterruptedException {
341
342 checkNotNull(e);
343 long nanos = unit.toNanos(timeout);
344 final ReentrantLock lock = this.lock;
345 lock.lockInterruptibly();
346 try {
347 while (count == items.length) {
348 if (nanos <= 0)
349 return false;
350 nanos = notFull.awaitNanos(nanos);
351 }
352 enqueue(e);
353 return true;
354 } finally {
355 lock.unlock();
356 }
357 }
358
359 public E poll() {
360 final ReentrantLock lock = this.lock;
361 lock.lock();
362 try {
363 return (count == 0) ? null : dequeue();
364 } finally {
365 lock.unlock();
366 }
367 }
368
369 public E take() throws InterruptedException {
370 final ReentrantLock lock = this.lock;
371 lock.lockInterruptibly();
372 try {
373 while (count == 0)
374 notEmpty.await();
375 return dequeue();
376 } finally {
377 lock.unlock();
378 }
379 }
380
381 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
382 long nanos = unit.toNanos(timeout);
383 final ReentrantLock lock = this.lock;
384 lock.lockInterruptibly();
385 try {
386 while (count == 0) {
387 if (nanos <= 0)
388 return null;
389 nanos = notEmpty.awaitNanos(nanos);
390 }
391 return dequeue();
392 } finally {
393 lock.unlock();
394 }
395 }
396
397 public E peek() {
398 final ReentrantLock lock = this.lock;
399 lock.lock();
400 try {
401 return itemAt(takeIndex); // null when queue is empty
402 } finally {
403 lock.unlock();
404 }
405 }
406
407 // this doc comment is overridden to remove the reference to collections
408 // greater in size than Integer.MAX_VALUE
409 /**
410 * Returns the number of elements in this queue.
411 *
412 * @return the number of elements in this queue
413 */
414 public int size() {
415 final ReentrantLock lock = this.lock;
416 lock.lock();
417 try {
418 return count;
419 } finally {
420 lock.unlock();
421 }
422 }
423
424 // this doc comment is a modified copy of the inherited doc comment,
425 // without the reference to unlimited queues.
426 /**
427 * Returns the number of additional elements that this queue can ideally
428 * (in the absence of memory or resource constraints) accept without
429 * blocking. This is always equal to the initial capacity of this queue
430 * less the current {@code size} of this queue.
431 *
432 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
433 * an element will succeed by inspecting {@code remainingCapacity}
434 * because it may be the case that another thread is about to
435 * insert or remove an element.
436 */
437 public int remainingCapacity() {
438 final ReentrantLock lock = this.lock;
439 lock.lock();
440 try {
441 return items.length - count;
442 } finally {
443 lock.unlock();
444 }
445 }
446
447 /**
448 * Removes a single instance of the specified element from this queue,
449 * if it is present. More formally, removes an element {@code e} such
450 * that {@code o.equals(e)}, if this queue contains one or more such
451 * elements.
452 * Returns {@code true} if this queue contained the specified element
453 * (or equivalently, if this queue changed as a result of the call).
454 *
455 * <p>Removal of interior elements in circular array based queues
456 * is an intrinsically slow and disruptive operation, so should
457 * be undertaken only in exceptional circumstances, ideally
458 * only when the queue is known not to be accessible by other
459 * threads.
460 *
461 * @param o element to be removed from this queue, if present
462 * @return {@code true} if this queue changed as a result of the call
463 */
464 public boolean remove(Object o) {
465 if (o == null) return false;
466 final Object[] items = this.items;
467 final ReentrantLock lock = this.lock;
468 lock.lock();
469 try {
470 if (count > 0) {
471 final int putIndex = this.putIndex;
472 int i = takeIndex;
473 do {
474 if (o.equals(items[i])) {
475 removeAt(i);
476 return true;
477 }
478 } while ((i = inc(i)) != putIndex);
479 }
480 return false;
481 } finally {
482 lock.unlock();
483 }
484 }
485
486 /**
487 * Returns {@code true} if this queue contains the specified element.
488 * More formally, returns {@code true} if and only if this queue contains
489 * at least one element {@code e} such that {@code o.equals(e)}.
490 *
491 * @param o object to be checked for containment in this queue
492 * @return {@code true} if this queue contains the specified element
493 */
494 public boolean contains(Object o) {
495 if (o == null) return false;
496 final Object[] items = this.items;
497 final ReentrantLock lock = this.lock;
498 lock.lock();
499 try {
500 if (count > 0) {
501 final int putIndex = this.putIndex;
502 int i = takeIndex;
503 do {
504 if (o.equals(items[i]))
505 return true;
506 } while ((i = inc(i)) != putIndex);
507 }
508 return false;
509 } finally {
510 lock.unlock();
511 }
512 }
513
514 /**
515 * Returns an array containing all of the elements in this queue, in
516 * proper sequence.
517 *
518 * <p>The returned array will be "safe" in that no references to it are
519 * maintained by this queue. (In other words, this method must allocate
520 * a new array). The caller is thus free to modify the returned array.
521 *
522 * <p>This method acts as bridge between array-based and collection-based
523 * APIs.
524 *
525 * @return an array containing all of the elements in this queue
526 */
527 public Object[] toArray() {
528 final Object[] items = this.items;
529 final ReentrantLock lock = this.lock;
530 lock.lock();
531 try {
532 final int count = this.count;
533 Object[] a = new Object[count];
534 int n = items.length - takeIndex;
535 if (count <= n) {
536 System.arraycopy(items, takeIndex, a, 0, count);
537 } else {
538 System.arraycopy(items, takeIndex, a, 0, n);
539 System.arraycopy(items, 0, a, n, count - n);
540 }
541 return a;
542 } finally {
543 lock.unlock();
544 }
545 }
546
547 /**
548 * Returns an array containing all of the elements in this queue, in
549 * proper sequence; the runtime type of the returned array is that of
550 * the specified array. If the queue fits in the specified array, it
551 * is returned therein. Otherwise, a new array is allocated with the
552 * runtime type of the specified array and the size of this queue.
553 *
554 * <p>If this queue fits in the specified array with room to spare
555 * (i.e., the array has more elements than this queue), the element in
556 * the array immediately following the end of the queue is set to
557 * {@code null}.
558 *
559 * <p>Like the {@link #toArray()} method, this method acts as bridge between
560 * array-based and collection-based APIs. Further, this method allows
561 * precise control over the runtime type of the output array, and may,
562 * under certain circumstances, be used to save allocation costs.
563 *
564 * <p>Suppose {@code x} is a queue known to contain only strings.
565 * The following code can be used to dump the queue into a newly
566 * allocated array of {@code String}:
567 *
568 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
569 *
570 * Note that {@code toArray(new Object[0])} is identical in function to
571 * {@code toArray()}.
572 *
573 * @param a the array into which the elements of the queue are to
574 * be stored, if it is big enough; otherwise, a new array of the
575 * same runtime type is allocated for this purpose
576 * @return an array containing all of the elements in this queue
577 * @throws ArrayStoreException if the runtime type of the specified array
578 * is not a supertype of the runtime type of every element in
579 * this queue
580 * @throws NullPointerException if the specified array is null
581 */
582 @SuppressWarnings("unchecked")
583 public <T> T[] toArray(T[] a) {
584 final Object[] items = this.items;
585 final ReentrantLock lock = this.lock;
586 lock.lock();
587 try {
588 final int count = this.count;
589 final int len = a.length;
590 if (len < count)
591 a = (T[])java.lang.reflect.Array.newInstance(
592 a.getClass().getComponentType(), count);
593 int n = items.length - takeIndex;
594 if (count <= n)
595 System.arraycopy(items, takeIndex, a, 0, count);
596 else {
597 System.arraycopy(items, takeIndex, a, 0, n);
598 System.arraycopy(items, 0, a, n, count - n);
599 }
600 if (len > count)
601 a[count] = null;
602 return a;
603 } finally {
604 lock.unlock();
605 }
606 }
607
608 public String toString() {
609 final ReentrantLock lock = this.lock;
610 lock.lock();
611 try {
612 int k = count;
613 if (k == 0)
614 return "[]";
615
616 StringBuilder sb = new StringBuilder();
617 sb.append('[');
618 for (int i = takeIndex; ; i = inc(i)) {
619 Object e = items[i];
620 sb.append(e == this ? "(this Collection)" : e);
621 if (--k == 0)
622 return sb.append(']').toString();
623 sb.append(',').append(' ');
624 }
625 } finally {
626 lock.unlock();
627 }
628 }
629
630 /**
631 * Atomically removes all of the elements from this queue.
632 * The queue will be empty after this call returns.
633 */
634 public void clear() {
635 final Object[] items = this.items;
636 final ReentrantLock lock = this.lock;
637 lock.lock();
638 try {
639 int k = count;
640 if (k > 0) {
641 final int putIndex = this.putIndex;
642 int i = takeIndex;
643 do {
644 items[i] = null;
645 } while ((i = inc(i)) != putIndex);
646 takeIndex = putIndex;
647 count = 0;
648 if (itrs != null)
649 itrs.queueIsEmpty();
650 for (; k > 0 && lock.hasWaiters(notFull); k--)
651 notFull.signal();
652 }
653 } finally {
654 lock.unlock();
655 }
656 }
657
658 /**
659 * @throws UnsupportedOperationException {@inheritDoc}
660 * @throws ClassCastException {@inheritDoc}
661 * @throws NullPointerException {@inheritDoc}
662 * @throws IllegalArgumentException {@inheritDoc}
663 */
664 public int drainTo(Collection<? super E> c) {
665 return drainTo(c, Integer.MAX_VALUE);
666 }
667
668 /**
669 * @throws UnsupportedOperationException {@inheritDoc}
670 * @throws ClassCastException {@inheritDoc}
671 * @throws NullPointerException {@inheritDoc}
672 * @throws IllegalArgumentException {@inheritDoc}
673 */
674 public int drainTo(Collection<? super E> c, int maxElements) {
675 checkNotNull(c);
676 if (c == this)
677 throw new IllegalArgumentException();
678 if (maxElements <= 0)
679 return 0;
680 final Object[] items = this.items;
681 final ReentrantLock lock = this.lock;
682 lock.lock();
683 try {
684 int n = Math.min(maxElements, count);
685 int take = takeIndex;
686 int i = 0;
687 try {
688 while (i < n) {
689 @SuppressWarnings("unchecked")
690 E x = (E) items[take];
691 c.add(x);
692 items[take] = null;
693 take = inc(take);
694 i++;
695 }
696 return n;
697 } finally {
698 // Restore invariants even if c.add() threw
699 if (i > 0) {
700 count -= i;
701 takeIndex = take;
702 if (itrs != null) {
703 if (count == 0)
704 itrs.queueIsEmpty();
705 else if (i > take)
706 itrs.takeIndexWrapped();
707 }
708 for (; i > 0 && lock.hasWaiters(notFull); i--)
709 notFull.signal();
710 }
711 }
712 } finally {
713 lock.unlock();
714 }
715 }
716
717 /**
718 * Returns an iterator over the elements in this queue in proper sequence.
719 * The elements will be returned in order from first (head) to last (tail).
720 *
721 * <p>The returned iterator is a "weakly consistent" iterator that
722 * will never throw {@link java.util.ConcurrentModificationException
723 * ConcurrentModificationException}, and guarantees to traverse
724 * elements as they existed upon construction of the iterator, and
725 * may (but is not guaranteed to) reflect any modifications
726 * subsequent to construction.
727 *
728 * @return an iterator over the elements in this queue in proper sequence
729 */
730 public Iterator<E> iterator() {
731 return new Itr();
732 }
733
734 /**
735 * Shared data between iterators and their queue, allowing queue
736 * modifications to update iterators when elements are removed.
737 *
738 * This adds a lot of complexity for the sake of correctly
739 * handling some uncommon operations, but the combination of
740 * circular-arrays and supporting interior removes (i.e., those
741 * not at head) would cause iterators to sometimes lose their
742 * places and/or (re)report elements they shouldn't. To avoid
743 * this, when a queue has one or more iterators, it keeps iterator
744 * state consistent by:
745 *
746 * (1) keeping track of the number of "cycles", that is, the
747 * number of times takeIndex has wrapped around to 0.
748 * (2) notifying all iterators via the callback removedAt whenever
749 * an interior element is removed (and thus other elements may
750 * be shifted).
751 *
752 * These suffice to eliminate iterator inconsistencies, but
753 * unfortunately add the secondary responsibility of maintaining
754 * the list of iterators. We track all active iterators in a
755 * simple linked list (accessed only when the queue's lock is
756 * held) of weak references to Itr. The list is cleaned up using
757 * 3 different mechanisms:
758 *
759 * (1) Whenever a new iterator is created, do some O(1) checking for
760 * stale list elements.
761 *
762 * (2) Whenever takeIndex wraps around to 0, check for iterators
763 * that have been unused for more than one wrap-around cycle.
764 *
765 * (3) Whenever the queue becomes empty, all iterators are notified
766 * and this entire data structure is discarded.
767 *
768 * So in addition to the removedAt callback that is necessary for
769 * correctness, iterators have the shutdown and takeIndexWrapped
770 * callbacks that help remove stale iterators from the list.
771 *
772 * Whenever a list element is examined, it is expunged if either
773 * the GC has determined that the iterator is discarded, or if the
774 * iterator reports that it is "detached" (does not need any
775 * further state updates). Overhead is maximal when takeIndex
776 * never advances, iterators are discarded before they are
777 * exhausted, and all removals are interior removes, in which case
778 * all stale iterators are discovered by the GC. But even in this
779 * case we don't increase the amortized complexity.
780 *
781 * Care must be taken to keep list sweeping methods from
782 * reentrantly invoking another such method, causing subtle
783 * corruption bugs.
784 */
785 class Itrs {
786
787 /**
788 * Node in a linked list of weak iterator references.
789 */
790 private class Node extends WeakReference<Itr> {
791 Node next;
792
793 Node(Itr iterator, Node next) {
794 super(iterator);
795 this.next = next;
796 }
797 }
798
799 /** Incremented whenever takeIndex wraps around to 0 */
800 int cycles;
801
802 /** Linked list of weak iterator references */
803 private Node head;
804
805 /** Used to expunge stale iterators */
806 private Node sweeper;
807
808 private static final int SHORT_SWEEP_PROBES = 4;
809 private static final int LONG_SWEEP_PROBES = 16;
810
811 Itrs(Itr initial) {
812 register(initial);
813 }
814
815 /**
816 * Sweeps itrs, looking for and expunging stale iterators.
817 * If at least one was found, tries harder to find more.
818 * Called only from iterating thread.
819 *
820 * @param tryHarder whether to start in try-harder mode, because
821 * there is known to be at least one iterator to collect
822 */
823 void doSomeSweeping(boolean tryHarder) {
824 // assert lock.getHoldCount() == 1;
825 // assert head != null;
826 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
827 Node o, p;
828 final Node sweeper = this.sweeper;
829 boolean passedGo; // to limit search to one full sweep
830
831 if (sweeper == null) {
832 o = null;
833 p = head;
834 passedGo = true;
835 } else {
836 o = sweeper;
837 p = o.next;
838 passedGo = false;
839 }
840
841 for (; probes > 0; probes--) {
842 if (p == null) {
843 if (passedGo)
844 break;
845 o = null;
846 p = head;
847 passedGo = true;
848 }
849 final Itr it = p.get();
850 final Node next = p.next;
851 if (it == null || it.isDetached()) {
852 // found a discarded/exhausted iterator
853 probes = LONG_SWEEP_PROBES; // "try harder"
854 // unlink p
855 p.clear();
856 p.next = null;
857 if (o == null) {
858 head = next;
859 if (next == null) {
860 // We've run out of iterators to track; retire
861 itrs = null;
862 return;
863 }
864 }
865 else
866 o.next = next;
867 } else {
868 o = p;
869 }
870 p = next;
871 }
872
873 this.sweeper = (p == null) ? null : o;
874 }
875
876 /**
877 * Adds a new iterator to the linked list of tracked iterators.
878 */
879 void register(Itr itr) {
880 // assert lock.getHoldCount() == 1;
881 head = new Node(itr, head);
882 }
883
884 /**
885 * Called whenever takeIndex wraps around to 0.
886 *
887 * Notifies all iterators, and expunges any that are now stale.
888 */
889 void takeIndexWrapped() {
890 // assert lock.getHoldCount() == 1;
891 cycles++;
892 for (Node o = null, p = head; p != null;) {
893 final Itr it = p.get();
894 final Node next = p.next;
895 if (it == null || it.takeIndexWrapped()) {
896 // unlink p
897 // assert it == null || it.isDetached();
898 p.clear();
899 p.next = null;
900 if (o == null)
901 head = next;
902 else
903 o.next = next;
904 } else {
905 o = p;
906 }
907 p = next;
908 }
909 if (head == null) // no more iterators to track
910 itrs = null;
911 }
912
913 /**
914 * Called whenever an interior remove (not at takeIndex) occurred.
915 *
916 * Notifies all iterators, and expunges any that are now stale.
917 */
918 void removedAt(int removedIndex) {
919 for (Node o = null, p = head; p != null;) {
920 final Itr it = p.get();
921 final Node next = p.next;
922 if (it == null || it.removedAt(removedIndex)) {
923 // unlink p
924 // assert it == null || it.isDetached();
925 p.clear();
926 p.next = null;
927 if (o == null)
928 head = next;
929 else
930 o.next = next;
931 } else {
932 o = p;
933 }
934 p = next;
935 }
936 if (head == null) // no more iterators to track
937 itrs = null;
938 }
939
940 /**
941 * Called whenever the queue becomes empty.
942 *
943 * Notifies all active iterators that the queue is empty,
944 * clears all weak refs, and unlinks the itrs datastructure.
945 */
946 void queueIsEmpty() {
947 // assert lock.getHoldCount() == 1;
948 for (Node p = head; p != null; p = p.next) {
949 Itr it = p.get();
950 if (it != null) {
951 p.clear();
952 it.shutdown();
953 }
954 }
955 head = null;
956 itrs = null;
957 }
958
959 /**
960 * Called whenever an element has been dequeued (at takeIndex).
961 */
962 void elementDequeued() {
963 // assert lock.getHoldCount() == 1;
964 if (count == 0)
965 queueIsEmpty();
966 else if (takeIndex == 0)
967 takeIndexWrapped();
968 }
969 }
970
971 /**
972 * Iterator for ArrayBlockingQueue.
973 *
974 * To maintain weak consistency with respect to puts and takes, we
975 * read ahead one slot, so as to not report hasNext true but then
976 * not have an element to return.
977 *
978 * We switch into "detached" mode (allowing prompt unlinking from
979 * itrs without help from the GC) when all indices are negative, or
980 * when hasNext returns false for the first time. This allows the
981 * iterator to track concurrent updates completely accurately,
982 * except for the corner case of the user calling Iterator.remove()
983 * after hasNext() returned false. Even in this case, we ensure
984 * that we don't remove the wrong element by keeping track of the
985 * expected element to remove, in lastItem. Yes, we may fail to
986 * remove lastItem from the queue if it moved due to an interleaved
987 * interior remove while in detached mode.
988 */
989 private class Itr implements Iterator<E> {
990 /** Index to look for new nextItem; NONE at end */
991 private int cursor;
992
993 /** Element to be returned by next call to next(); null if none */
994 private E nextItem;
995
996 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
997 private int nextIndex;
998
999 /** Last element returned; null if none or not detached. */
1000 private E lastItem;
1001
1002 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1003 private int lastRet;
1004
1005 /** Previous value of takeIndex, or DETACHED when detached */
1006 private int prevTakeIndex;
1007
1008 /** Previous value of iters.cycles */
1009 private int prevCycles;
1010
1011 /** Special index value indicating "not available" or "undefined" */
1012 private static final int NONE = -1;
1013
1014 /**
1015 * Special index value indicating "removed elsewhere", that is,
1016 * removed by some operation other than a call to this.remove().
1017 */
1018 private static final int REMOVED = -2;
1019
1020 /** Special value for prevTakeIndex indicating "detached mode" */
1021 private static final int DETACHED = -3;
1022
1023 Itr() {
1024 // assert lock.getHoldCount() == 0;
1025 lastRet = NONE;
1026 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1027 lock.lock();
1028 try {
1029 if (count == 0) {
1030 // assert itrs == null;
1031 cursor = NONE;
1032 nextIndex = NONE;
1033 prevTakeIndex = DETACHED;
1034 } else {
1035 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1036 prevTakeIndex = takeIndex;
1037 nextItem = itemAt(nextIndex = takeIndex);
1038 cursor = incCursor(takeIndex);
1039 if (itrs == null) {
1040 itrs = new Itrs(this);
1041 } else {
1042 itrs.register(this); // in this order
1043 itrs.doSomeSweeping(false);
1044 }
1045 prevCycles = itrs.cycles;
1046 // assert takeIndex >= 0;
1047 // assert prevTakeIndex == takeIndex;
1048 // assert nextIndex >= 0;
1049 // assert nextItem != null;
1050 }
1051 } finally {
1052 lock.unlock();
1053 }
1054 }
1055
1056 boolean isDetached() {
1057 // assert lock.getHoldCount() == 1;
1058 return prevTakeIndex < 0;
1059 }
1060
1061 private int incCursor(int index) {
1062 // assert lock.getHoldCount() == 1;
1063 index = inc(index);
1064 if (index == putIndex)
1065 index = NONE;
1066 return index;
1067 }
1068
1069 /**
1070 * Returns true if index is invalidated by the given number of
1071 * dequeues, starting from prevTakeIndex.
1072 */
1073 private boolean invalidated(int index, int prevTakeIndex,
1074 long dequeues, int length) {
1075 if (index < 0)
1076 return false;
1077 int distance = index - prevTakeIndex;
1078 if (distance < 0)
1079 distance += length;
1080 return dequeues > distance;
1081 }
1082
1083 /**
1084 * Adjusts indices to incorporate all dequeues since the last
1085 * operation on this iterator. Call only from iterating thread.
1086 */
1087 private void incorporateDequeues() {
1088 // assert lock.getHoldCount() == 1;
1089 // assert itrs != null;
1090 // assert !isDetached();
1091 // assert count > 0;
1092
1093 final int cycles = itrs.cycles;
1094 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1095 final int prevCycles = this.prevCycles;
1096 final int prevTakeIndex = this.prevTakeIndex;
1097
1098 if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1099 final int len = items.length;
1100 // how far takeIndex has advanced since the previous
1101 // operation of this iterator
1102 long dequeues = (cycles - prevCycles) * len
1103 + (takeIndex - prevTakeIndex);
1104
1105 // Check indices for invalidation
1106 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1107 lastRet = REMOVED;
1108 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1109 nextIndex = REMOVED;
1110 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1111 cursor = takeIndex;
1112
1113 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1114 detach();
1115 else {
1116 this.prevCycles = cycles;
1117 this.prevTakeIndex = takeIndex;
1118 }
1119 }
1120 }
1121
1122 /**
1123 * Called when itrs should stop tracking this iterator, either
1124 * because there are no more indices to update (cursor < 0 &&
1125 * nextIndex < 0 && lastRet < 0) or as a special exception, when
1126 * lastRet >= 0, because hasNext() is about to return false for the
1127 * first time. Call only from iterating thread.
1128 */
1129 private void detach() {
1130 // Switch to detached mode
1131 // assert lock.getHoldCount() == 1;
1132 // assert cursor == NONE;
1133 // assert nextIndex < 0;
1134 // assert lastRet < 0 || nextItem == null;
1135 // assert lastRet < 0 ^ lastItem != null;
1136 if (prevTakeIndex >= 0) {
1137 // assert itrs != null;
1138 prevTakeIndex = DETACHED;
1139 // try to unlink from itrs (but not too hard)
1140 itrs.doSomeSweeping(true);
1141 }
1142 }
1143
1144 /**
1145 * For performance reasons, we would like not to acquire a lock in
1146 * hasNext in the common case. To allow for this, we only access
1147 * fields (i.e. nextItem) that are not modified by update operations
1148 * triggered by queue modifications.
1149 */
1150 public boolean hasNext() {
1151 // assert lock.getHoldCount() == 0;
1152 if (nextItem != null)
1153 return true;
1154 noNext();
1155 return false;
1156 }
1157
1158 private void noNext() {
1159 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1160 lock.lock();
1161 try {
1162 // assert cursor == NONE;
1163 // assert nextIndex == NONE;
1164 if (!isDetached()) {
1165 // assert lastRet >= 0;
1166 incorporateDequeues(); // might update lastRet
1167 if (lastRet >= 0) {
1168 lastItem = itemAt(lastRet);
1169 // assert lastItem != null;
1170 detach();
1171 }
1172 }
1173 // assert isDetached();
1174 // assert lastRet < 0 ^ lastItem != null;
1175 } finally {
1176 lock.unlock();
1177 }
1178 }
1179
1180 public E next() {
1181 // assert lock.getHoldCount() == 0;
1182 final E x = nextItem;
1183 if (x == null)
1184 throw new NoSuchElementException();
1185 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1186 lock.lock();
1187 try {
1188 if (!isDetached())
1189 incorporateDequeues();
1190 // assert nextIndex != NONE;
1191 // assert lastItem == null;
1192 lastRet = nextIndex;
1193 final int cursor = this.cursor;
1194 if (cursor >= 0) {
1195 nextItem = itemAt(nextIndex = cursor);
1196 // assert nextItem != null;
1197 this.cursor = incCursor(cursor);
1198 } else {
1199 nextIndex = NONE;
1200 nextItem = null;
1201 }
1202 } finally {
1203 lock.unlock();
1204 }
1205 return x;
1206 }
1207
1208 public void remove() {
1209 // assert lock.getHoldCount() == 0;
1210 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1211 lock.lock();
1212 try {
1213 if (!isDetached())
1214 incorporateDequeues(); // might update lastRet or detach
1215 final int lastRet = this.lastRet;
1216 this.lastRet = NONE;
1217 if (lastRet >= 0) {
1218 if (!isDetached())
1219 removeAt(lastRet);
1220 else {
1221 final E lastItem = this.lastItem;
1222 // assert lastItem != null;
1223 this.lastItem = null;
1224 if (itemAt(lastRet) == lastItem)
1225 removeAt(lastRet);
1226 }
1227 } else if (lastRet == NONE)
1228 throw new IllegalStateException();
1229 // else lastRet == REMOVED and the last returned element was
1230 // previously asynchronously removed via an operation other
1231 // than this.remove(), so nothing to do.
1232
1233 if (cursor < 0 && nextIndex < 0)
1234 detach();
1235 } finally {
1236 lock.unlock();
1237 // assert lastRet == NONE;
1238 // assert lastItem == null;
1239 }
1240 }
1241
1242 /**
1243 * Called to notify the iterator that the queue is empty, or that it
1244 * has fallen hopelessly behind, so that it should abandon any
1245 * further iteration, except possibly to return one more element
1246 * from next(), as promised by returning true from hasNext().
1247 */
1248 void shutdown() {
1249 // assert lock.getHoldCount() == 1;
1250 cursor = NONE;
1251 if (nextIndex >= 0)
1252 nextIndex = REMOVED;
1253 if (lastRet >= 0) {
1254 lastRet = REMOVED;
1255 lastItem = null;
1256 }
1257 prevTakeIndex = DETACHED;
1258 // Don't set nextItem to null because we must continue to be
1259 // able to return it on next().
1260 //
1261 // Caller will unlink from itrs when convenient.
1262 }
1263
1264 private int distance(int index, int prevTakeIndex, int length) {
1265 int distance = index - prevTakeIndex;
1266 if (distance < 0)
1267 distance += length;
1268 return distance;
1269 }
1270
1271 /**
1272 * Called whenever an interior remove (not at takeIndex) occurred.
1273 *
1274 * @return true if this iterator should be unlinked from itrs
1275 */
1276 boolean removedAt(int removedIndex) {
1277 // assert lock.getHoldCount() == 1;
1278 if (isDetached())
1279 return true;
1280
1281 final int cycles = itrs.cycles;
1282 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1283 final int prevCycles = this.prevCycles;
1284 final int prevTakeIndex = this.prevTakeIndex;
1285 final int len = items.length;
1286 int cycleDiff = cycles - prevCycles;
1287 if (removedIndex < takeIndex)
1288 cycleDiff++;
1289 final int removedDistance =
1290 (cycleDiff * len) + (removedIndex - prevTakeIndex);
1291 // assert removedDistance >= 0;
1292 int cursor = this.cursor;
1293 if (cursor >= 0) {
1294 int x = distance(cursor, prevTakeIndex, len);
1295 if (x == removedDistance) {
1296 if (cursor == putIndex)
1297 this.cursor = cursor = NONE;
1298 }
1299 else if (x > removedDistance) {
1300 // assert cursor != prevTakeIndex;
1301 this.cursor = cursor = dec(cursor);
1302 }
1303 }
1304 int lastRet = this.lastRet;
1305 if (lastRet >= 0) {
1306 int x = distance(lastRet, prevTakeIndex, len);
1307 if (x == removedDistance)
1308 this.lastRet = lastRet = REMOVED;
1309 else if (x > removedDistance)
1310 this.lastRet = lastRet = dec(lastRet);
1311 }
1312 int nextIndex = this.nextIndex;
1313 if (nextIndex >= 0) {
1314 int x = distance(nextIndex, prevTakeIndex, len);
1315 if (x == removedDistance)
1316 this.nextIndex = nextIndex = REMOVED;
1317 else if (x > removedDistance)
1318 this.nextIndex = nextIndex = dec(nextIndex);
1319 }
1320 else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1321 this.prevTakeIndex = DETACHED;
1322 return true;
1323 }
1324 return false;
1325 }
1326
1327 /**
1328 * Called whenever takeIndex wraps around to zero.
1329 *
1330 * @return true if this iterator should be unlinked from itrs
1331 */
1332 boolean takeIndexWrapped() {
1333 // assert lock.getHoldCount() == 1;
1334 if (isDetached())
1335 return true;
1336 if (itrs.cycles - prevCycles > 1) {
1337 // All the elements that existed at the time of the last
1338 // operation are gone, so abandon further iteration.
1339 shutdown();
1340 return true;
1341 }
1342 return false;
1343 }
1344
1345 // /** Uncomment for debugging. */
1346 // public String toString() {
1347 // return ("cursor=" + cursor + " " +
1348 // "nextIndex=" + nextIndex + " " +
1349 // "lastRet=" + lastRet + " " +
1350 // "nextItem=" + nextItem + " " +
1351 // "lastItem=" + lastItem + " " +
1352 // "prevCycles=" + prevCycles + " " +
1353 // "prevTakeIndex=" + prevTakeIndex + " " +
1354 // "size()=" + size() + " " +
1355 // "remainingCapacity()=" + remainingCapacity());
1356 // }
1357 }
1358 }