ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.4
Committed: Tue Oct 22 15:21:30 2013 UTC (10 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.3: +2 -2 lines
Log Message:
javadoc typo fixes from Sergey Malenkov; https://bugs.openjdk.java.net/browse/JDK-8022746

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.concurrent.locks.Condition;
9     import java.util.concurrent.locks.ReentrantLock;
10     import java.util.AbstractQueue;
11     import java.util.Collection;
12     import java.util.Iterator;
13     import java.util.NoSuchElementException;
14     import java.lang.ref.WeakReference;
15    
16     /**
17     * A bounded {@linkplain BlockingQueue blocking queue} backed by an
18     * array. This queue orders elements FIFO (first-in-first-out). The
19     * <em>head</em> of the queue is that element that has been on the
20     * queue the longest time. The <em>tail</em> of the queue is that
21     * element that has been on the queue the shortest time. New elements
22     * are inserted at the tail of the queue, and the queue retrieval
23     * operations obtain elements at the head of the queue.
24     *
25     * <p>This is a classic &quot;bounded buffer&quot;, in which a
26     * fixed-sized array holds elements inserted by producers and
27     * extracted by consumers. Once created, the capacity cannot be
28     * changed. Attempts to {@code put} an element into a full queue
29     * will result in the operation blocking; attempts to {@code take} an
30     * element from an empty queue will similarly block.
31     *
32     * <p>This class supports an optional fairness policy for ordering
33     * waiting producer and consumer threads. By default, this ordering
34     * is not guaranteed. However, a queue constructed with fairness set
35     * to {@code true} grants threads access in FIFO order. Fairness
36     * generally decreases throughput but reduces variability and avoids
37     * starvation.
38     *
39     * <p>This class and its iterator implement all of the
40     * <em>optional</em> methods of the {@link Collection} and {@link
41     * Iterator} interfaces.
42     *
43     * <p>This class is a member of the
44     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
45     * Java Collections Framework</a>.
46     *
47     * @since 1.5
48     * @author Doug Lea
49     * @param <E> the type of elements held in this collection
50     */
51     public class ArrayBlockingQueue<E> extends AbstractQueue<E>
52     implements BlockingQueue<E>, java.io.Serializable {
53    
54     /**
55     * Serialization ID. This class relies on default serialization
56     * even for the items array, which is default-serialized, even if
57     * it is empty. Otherwise it could not be declared final, which is
58     * necessary here.
59     */
60     private static final long serialVersionUID = -817911632652898426L;
61    
62     /** The queued items */
63     final Object[] items;
64    
65     /** items index for next take, poll, peek or remove */
66     int takeIndex;
67    
68     /** items index for next put, offer, or add */
69     int putIndex;
70    
71     /** Number of elements in the queue */
72     int count;
73    
74     /*
75     * Concurrency control uses the classic two-condition algorithm
76     * found in any textbook.
77     */
78    
79     /** Main lock guarding all access */
80     final ReentrantLock lock;
81    
82     /** Condition for waiting takes */
83     private final Condition notEmpty;
84    
85     /** Condition for waiting puts */
86     private final Condition notFull;
87    
88     /**
89     * Shared state for currently active iterators, or null if there
90     * are known not to be any. Allows queue operations to update
91     * iterator state.
92     */
93     transient Itrs itrs = null;
94    
95     // Internal helper methods
96    
97     /**
98     * Circularly increment i.
99     */
100     final int inc(int i) {
101     return (++i == items.length) ? 0 : i;
102     }
103    
104     /**
105     * Circularly decrement i.
106     */
107     final int dec(int i) {
108     return ((i == 0) ? items.length : i) - 1;
109     }
110    
111     /**
112     * Returns item at index i.
113     */
114     @SuppressWarnings("unchecked")
115     final E itemAt(int i) {
116     return (E) items[i];
117     }
118    
119     /**
120     * Throws NullPointerException if argument is null.
121     *
122     * @param v the element
123     */
124     private static void checkNotNull(Object v) {
125     if (v == null)
126     throw new NullPointerException();
127     }
128    
129     /**
130     * Inserts element at current put position, advances, and signals.
131     * Call only when holding lock.
132     */
133     private void enqueue(E x) {
134     // assert lock.getHoldCount() == 1;
135     // assert items[putIndex] == null;
136     items[putIndex] = x;
137     putIndex = inc(putIndex);
138     count++;
139     notEmpty.signal();
140     }
141    
142     /**
143     * Extracts element at current take position, advances, and signals.
144     * Call only when holding lock.
145     */
146     private E dequeue() {
147     // assert lock.getHoldCount() == 1;
148     // assert items[takeIndex] != null;
149     final Object[] items = this.items;
150     @SuppressWarnings("unchecked")
151     E x = (E) items[takeIndex];
152     items[takeIndex] = null;
153     takeIndex = inc(takeIndex);
154     count--;
155     if (itrs != null)
156     itrs.elementDequeued();
157     notFull.signal();
158     return x;
159     }
160    
161     /**
162     * Deletes item at array index removeIndex.
163     * Utility for remove(Object) and iterator.remove.
164     * Call only when holding lock.
165     */
166     void removeAt(final int removeIndex) {
167     // assert lock.getHoldCount() == 1;
168     // assert items[removeIndex] != null;
169     // assert removeIndex >= 0 && removeIndex < items.length;
170     final Object[] items = this.items;
171     if (removeIndex == takeIndex) {
172     // removing front item; just advance
173     items[takeIndex] = null;
174     takeIndex = inc(takeIndex);
175     count--;
176     if (itrs != null)
177     itrs.elementDequeued();
178     } else {
179     // an "interior" remove
180    
181     // slide over all others up through putIndex.
182     final int putIndex = this.putIndex;
183     for (int i = removeIndex;;) {
184     int next = inc(i);
185     if (next != putIndex) {
186     items[i] = items[next];
187     i = next;
188     } else {
189     items[i] = null;
190     this.putIndex = i;
191     break;
192     }
193     }
194     count--;
195     if (itrs != null)
196     itrs.removedAt(removeIndex);
197     }
198     notFull.signal();
199     }
200    
201     /**
202     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
203     * capacity and default access policy.
204     *
205     * @param capacity the capacity of this queue
206     * @throws IllegalArgumentException if {@code capacity < 1}
207     */
208     public ArrayBlockingQueue(int capacity) {
209     this(capacity, false);
210     }
211    
212     /**
213     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
214     * capacity and the specified access policy.
215     *
216     * @param capacity the capacity of this queue
217     * @param fair if {@code true} then queue accesses for threads blocked
218     * on insertion or removal, are processed in FIFO order;
219     * if {@code false} the access order is unspecified.
220     * @throws IllegalArgumentException if {@code capacity < 1}
221     */
222     public ArrayBlockingQueue(int capacity, boolean fair) {
223     if (capacity <= 0)
224     throw new IllegalArgumentException();
225     this.items = new Object[capacity];
226     lock = new ReentrantLock(fair);
227     notEmpty = lock.newCondition();
228     notFull = lock.newCondition();
229     }
230    
231     /**
232     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
233     * capacity, the specified access policy and initially containing the
234     * elements of the given collection,
235     * added in traversal order of the collection's iterator.
236     *
237     * @param capacity the capacity of this queue
238     * @param fair if {@code true} then queue accesses for threads blocked
239     * on insertion or removal, are processed in FIFO order;
240     * if {@code false} the access order is unspecified.
241     * @param c the collection of elements to initially contain
242     * @throws IllegalArgumentException if {@code capacity} is less than
243     * {@code c.size()}, or less than 1.
244     * @throws NullPointerException if the specified collection or any
245     * of its elements are null
246     */
247     public ArrayBlockingQueue(int capacity, boolean fair,
248     Collection<? extends E> c) {
249     this(capacity, fair);
250    
251     final ReentrantLock lock = this.lock;
252     lock.lock(); // Lock only for visibility, not mutual exclusion
253     try {
254     int i = 0;
255     try {
256     for (E e : c) {
257     checkNotNull(e);
258     items[i++] = e;
259     }
260     } catch (ArrayIndexOutOfBoundsException ex) {
261     throw new IllegalArgumentException();
262     }
263     count = i;
264     putIndex = (i == capacity) ? 0 : i;
265     } finally {
266     lock.unlock();
267     }
268     }
269    
270     /**
271     * Inserts the specified element at the tail of this queue if it is
272     * possible to do so immediately without exceeding the queue's capacity,
273     * returning {@code true} upon success and throwing an
274     * {@code IllegalStateException} if this queue is full.
275     *
276     * @param e the element to add
277     * @return {@code true} (as specified by {@link Collection#add})
278     * @throws IllegalStateException if this queue is full
279     * @throws NullPointerException if the specified element is null
280     */
281     public boolean add(E e) {
282     return super.add(e);
283     }
284    
285     /**
286     * Inserts the specified element at the tail of this queue if it is
287     * possible to do so immediately without exceeding the queue's capacity,
288     * returning {@code true} upon success and {@code false} if this queue
289     * is full. This method is generally preferable to method {@link #add},
290     * which can fail to insert an element only by throwing an exception.
291     *
292     * @throws NullPointerException if the specified element is null
293     */
294     public boolean offer(E e) {
295     checkNotNull(e);
296     final ReentrantLock lock = this.lock;
297     lock.lock();
298     try {
299     if (count == items.length)
300     return false;
301     else {
302     enqueue(e);
303     return true;
304     }
305     } finally {
306     lock.unlock();
307     }
308     }
309    
310     /**
311     * Inserts the specified element at the tail of this queue, waiting
312     * for space to become available if the queue is full.
313     *
314     * @throws InterruptedException {@inheritDoc}
315     * @throws NullPointerException {@inheritDoc}
316     */
317     public void put(E e) throws InterruptedException {
318     checkNotNull(e);
319     final ReentrantLock lock = this.lock;
320     lock.lockInterruptibly();
321     try {
322     while (count == items.length)
323     notFull.await();
324     enqueue(e);
325     } finally {
326     lock.unlock();
327     }
328     }
329    
330     /**
331     * Inserts the specified element at the tail of this queue, waiting
332     * up to the specified wait time for space to become available if
333     * the queue is full.
334     *
335     * @throws InterruptedException {@inheritDoc}
336     * @throws NullPointerException {@inheritDoc}
337     */
338     public boolean offer(E e, long timeout, TimeUnit unit)
339     throws InterruptedException {
340    
341     checkNotNull(e);
342     long nanos = unit.toNanos(timeout);
343     final ReentrantLock lock = this.lock;
344     lock.lockInterruptibly();
345     try {
346     while (count == items.length) {
347     if (nanos <= 0)
348     return false;
349     nanos = notFull.awaitNanos(nanos);
350     }
351     enqueue(e);
352     return true;
353     } finally {
354     lock.unlock();
355     }
356     }
357    
358     public E poll() {
359     final ReentrantLock lock = this.lock;
360     lock.lock();
361     try {
362     return (count == 0) ? null : dequeue();
363     } finally {
364     lock.unlock();
365     }
366     }
367    
368     public E take() throws InterruptedException {
369     final ReentrantLock lock = this.lock;
370     lock.lockInterruptibly();
371     try {
372     while (count == 0)
373     notEmpty.await();
374     return dequeue();
375     } finally {
376     lock.unlock();
377     }
378     }
379    
380     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
381     long nanos = unit.toNanos(timeout);
382     final ReentrantLock lock = this.lock;
383     lock.lockInterruptibly();
384     try {
385     while (count == 0) {
386     if (nanos <= 0)
387     return null;
388     nanos = notEmpty.awaitNanos(nanos);
389     }
390     return dequeue();
391     } finally {
392     lock.unlock();
393     }
394     }
395    
396     public E peek() {
397     final ReentrantLock lock = this.lock;
398     lock.lock();
399     try {
400 jsr166 1.3 return itemAt(takeIndex); // null when queue is empty
401 dl 1.1 } finally {
402     lock.unlock();
403     }
404     }
405    
406     // this doc comment is overridden to remove the reference to collections
407     // greater in size than Integer.MAX_VALUE
408     /**
409     * Returns the number of elements in this queue.
410     *
411     * @return the number of elements in this queue
412     */
413     public int size() {
414     final ReentrantLock lock = this.lock;
415     lock.lock();
416     try {
417     return count;
418     } finally {
419     lock.unlock();
420     }
421     }
422    
423     // this doc comment is a modified copy of the inherited doc comment,
424     // without the reference to unlimited queues.
425     /**
426     * Returns the number of additional elements that this queue can ideally
427     * (in the absence of memory or resource constraints) accept without
428     * blocking. This is always equal to the initial capacity of this queue
429     * less the current {@code size} of this queue.
430     *
431     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
432     * an element will succeed by inspecting {@code remainingCapacity}
433     * because it may be the case that another thread is about to
434     * insert or remove an element.
435     */
436     public int remainingCapacity() {
437     final ReentrantLock lock = this.lock;
438     lock.lock();
439     try {
440     return items.length - count;
441     } finally {
442     lock.unlock();
443     }
444     }
445    
446     /**
447     * Removes a single instance of the specified element from this queue,
448     * if it is present. More formally, removes an element {@code e} such
449     * that {@code o.equals(e)}, if this queue contains one or more such
450     * elements.
451     * Returns {@code true} if this queue contained the specified element
452     * (or equivalently, if this queue changed as a result of the call).
453     *
454     * <p>Removal of interior elements in circular array based queues
455     * is an intrinsically slow and disruptive operation, so should
456     * be undertaken only in exceptional circumstances, ideally
457     * only when the queue is known not to be accessible by other
458     * threads.
459     *
460     * @param o element to be removed from this queue, if present
461     * @return {@code true} if this queue changed as a result of the call
462     */
463     public boolean remove(Object o) {
464     if (o == null) return false;
465     final Object[] items = this.items;
466     final ReentrantLock lock = this.lock;
467     lock.lock();
468     try {
469     if (count > 0) {
470     final int putIndex = this.putIndex;
471     int i = takeIndex;
472     do {
473     if (o.equals(items[i])) {
474     removeAt(i);
475     return true;
476     }
477     } while ((i = inc(i)) != putIndex);
478     }
479     return false;
480     } finally {
481     lock.unlock();
482     }
483     }
484    
485     /**
486     * Returns {@code true} if this queue contains the specified element.
487     * More formally, returns {@code true} if and only if this queue contains
488     * at least one element {@code e} such that {@code o.equals(e)}.
489     *
490     * @param o object to be checked for containment in this queue
491     * @return {@code true} if this queue contains the specified element
492     */
493     public boolean contains(Object o) {
494     if (o == null) return false;
495     final Object[] items = this.items;
496     final ReentrantLock lock = this.lock;
497     lock.lock();
498     try {
499     if (count > 0) {
500     final int putIndex = this.putIndex;
501     int i = takeIndex;
502     do {
503     if (o.equals(items[i]))
504     return true;
505     } while ((i = inc(i)) != putIndex);
506     }
507     return false;
508     } finally {
509     lock.unlock();
510     }
511     }
512    
513     /**
514     * Returns an array containing all of the elements in this queue, in
515     * proper sequence.
516     *
517     * <p>The returned array will be "safe" in that no references to it are
518     * maintained by this queue. (In other words, this method must allocate
519     * a new array). The caller is thus free to modify the returned array.
520     *
521     * <p>This method acts as bridge between array-based and collection-based
522     * APIs.
523     *
524     * @return an array containing all of the elements in this queue
525     */
526     public Object[] toArray() {
527     final Object[] items = this.items;
528     final ReentrantLock lock = this.lock;
529     lock.lock();
530     try {
531     final int count = this.count;
532     Object[] a = new Object[count];
533 jsr166 1.2 int n = items.length - takeIndex;
534     if (count <= n) {
535     System.arraycopy(items, takeIndex, a, 0, count);
536     } else {
537     System.arraycopy(items, takeIndex, a, 0, n);
538     System.arraycopy(items, 0, a, n, count - n);
539     }
540 dl 1.1 return a;
541     } finally {
542     lock.unlock();
543     }
544     }
545    
546     /**
547     * Returns an array containing all of the elements in this queue, in
548     * proper sequence; the runtime type of the returned array is that of
549     * the specified array. If the queue fits in the specified array, it
550     * is returned therein. Otherwise, a new array is allocated with the
551     * runtime type of the specified array and the size of this queue.
552     *
553     * <p>If this queue fits in the specified array with room to spare
554     * (i.e., the array has more elements than this queue), the element in
555     * the array immediately following the end of the queue is set to
556     * {@code null}.
557     *
558     * <p>Like the {@link #toArray()} method, this method acts as bridge between
559     * array-based and collection-based APIs. Further, this method allows
560     * precise control over the runtime type of the output array, and may,
561     * under certain circumstances, be used to save allocation costs.
562     *
563     * <p>Suppose {@code x} is a queue known to contain only strings.
564     * The following code can be used to dump the queue into a newly
565     * allocated array of {@code String}:
566     *
567     * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
568     *
569     * Note that {@code toArray(new Object[0])} is identical in function to
570     * {@code toArray()}.
571     *
572     * @param a the array into which the elements of the queue are to
573     * be stored, if it is big enough; otherwise, a new array of the
574     * same runtime type is allocated for this purpose
575     * @return an array containing all of the elements in this queue
576     * @throws ArrayStoreException if the runtime type of the specified array
577     * is not a supertype of the runtime type of every element in
578     * this queue
579     * @throws NullPointerException if the specified array is null
580     */
581     @SuppressWarnings("unchecked")
582     public <T> T[] toArray(T[] a) {
583     final Object[] items = this.items;
584     final ReentrantLock lock = this.lock;
585     lock.lock();
586     try {
587     final int count = this.count;
588     final int len = a.length;
589     if (len < count)
590     a = (T[])java.lang.reflect.Array.newInstance(
591     a.getClass().getComponentType(), count);
592 jsr166 1.2 int n = items.length - takeIndex;
593     if (count <= n)
594     System.arraycopy(items, takeIndex, a, 0, count);
595     else {
596     System.arraycopy(items, takeIndex, a, 0, n);
597     System.arraycopy(items, 0, a, n, count - n);
598     }
599 dl 1.1 if (len > count)
600     a[count] = null;
601     return a;
602     } finally {
603     lock.unlock();
604     }
605     }
606    
607     public String toString() {
608     final ReentrantLock lock = this.lock;
609     lock.lock();
610     try {
611     int k = count;
612     if (k == 0)
613     return "[]";
614    
615     StringBuilder sb = new StringBuilder();
616     sb.append('[');
617     for (int i = takeIndex; ; i = inc(i)) {
618     Object e = items[i];
619     sb.append(e == this ? "(this Collection)" : e);
620     if (--k == 0)
621     return sb.append(']').toString();
622     sb.append(',').append(' ');
623     }
624     } finally {
625     lock.unlock();
626     }
627     }
628    
629     /**
630     * Atomically removes all of the elements from this queue.
631     * The queue will be empty after this call returns.
632     */
633     public void clear() {
634     final Object[] items = this.items;
635     final ReentrantLock lock = this.lock;
636     lock.lock();
637     try {
638     int k = count;
639     if (k > 0) {
640     final int putIndex = this.putIndex;
641     int i = takeIndex;
642     do {
643     items[i] = null;
644     } while ((i = inc(i)) != putIndex);
645     takeIndex = putIndex;
646     count = 0;
647     if (itrs != null)
648     itrs.queueIsEmpty();
649     for (; k > 0 && lock.hasWaiters(notFull); k--)
650     notFull.signal();
651     }
652     } finally {
653     lock.unlock();
654     }
655     }
656    
657     /**
658     * @throws UnsupportedOperationException {@inheritDoc}
659     * @throws ClassCastException {@inheritDoc}
660     * @throws NullPointerException {@inheritDoc}
661     * @throws IllegalArgumentException {@inheritDoc}
662     */
663     public int drainTo(Collection<? super E> c) {
664     return drainTo(c, Integer.MAX_VALUE);
665     }
666    
667     /**
668     * @throws UnsupportedOperationException {@inheritDoc}
669     * @throws ClassCastException {@inheritDoc}
670     * @throws NullPointerException {@inheritDoc}
671     * @throws IllegalArgumentException {@inheritDoc}
672     */
673     public int drainTo(Collection<? super E> c, int maxElements) {
674     checkNotNull(c);
675     if (c == this)
676     throw new IllegalArgumentException();
677     if (maxElements <= 0)
678     return 0;
679     final Object[] items = this.items;
680     final ReentrantLock lock = this.lock;
681     lock.lock();
682     try {
683     int n = Math.min(maxElements, count);
684     int take = takeIndex;
685     int i = 0;
686     try {
687     while (i < n) {
688     @SuppressWarnings("unchecked")
689     E x = (E) items[take];
690     c.add(x);
691     items[take] = null;
692     take = inc(take);
693     i++;
694     }
695     return n;
696     } finally {
697     // Restore invariants even if c.add() threw
698     if (i > 0) {
699     count -= i;
700     takeIndex = take;
701     if (itrs != null) {
702     if (count == 0)
703     itrs.queueIsEmpty();
704     else if (i > take)
705     itrs.takeIndexWrapped();
706     }
707     for (; i > 0 && lock.hasWaiters(notFull); i--)
708     notFull.signal();
709     }
710     }
711     } finally {
712     lock.unlock();
713     }
714     }
715    
716     /**
717     * Returns an iterator over the elements in this queue in proper sequence.
718     * The elements will be returned in order from first (head) to last (tail).
719     *
720     * <p>The returned iterator is a "weakly consistent" iterator that
721     * will never throw {@link java.util.ConcurrentModificationException
722     * ConcurrentModificationException}, and guarantees to traverse
723     * elements as they existed upon construction of the iterator, and
724     * may (but is not guaranteed to) reflect any modifications
725     * subsequent to construction.
726     *
727     * @return an iterator over the elements in this queue in proper sequence
728     */
729     public Iterator<E> iterator() {
730     return new Itr();
731     }
732    
733     /**
734     * Shared data between iterators and their queue, allowing queue
735     * modifications to update iterators when elements are removed.
736     *
737     * This adds a lot of complexity for the sake of correctly
738     * handling some uncommon operations, but the combination of
739     * circular-arrays and supporting interior removes (i.e., those
740     * not at head) would cause iterators to sometimes lose their
741     * places and/or (re)report elements they shouldn't. To avoid
742     * this, when a queue has one or more iterators, it keeps iterator
743     * state consistent by:
744     *
745     * (1) keeping track of the number of "cycles", that is, the
746     * number of times takeIndex has wrapped around to 0.
747     * (2) notifying all iterators via the callback removedAt whenever
748     * an interior element is removed (and thus other elements may
749     * be shifted).
750     *
751     * These suffice to eliminate iterator inconsistencies, but
752     * unfortunately add the secondary responsibility of maintaining
753     * the list of iterators. We track all active iterators in a
754     * simple linked list (accessed only when the queue's lock is
755     * held) of weak references to Itr. The list is cleaned up using
756     * 3 different mechanisms:
757     *
758     * (1) Whenever a new iterator is created, do some O(1) checking for
759     * stale list elements.
760     *
761     * (2) Whenever takeIndex wraps around to 0, check for iterators
762     * that have been unused for more than one wrap-around cycle.
763     *
764     * (3) Whenever the queue becomes empty, all iterators are notified
765     * and this entire data structure is discarded.
766     *
767     * So in addition to the removedAt callback that is necessary for
768     * correctness, iterators have the shutdown and takeIndexWrapped
769     * callbacks that help remove stale iterators from the list.
770     *
771     * Whenever a list element is examined, it is expunged if either
772     * the GC has determined that the iterator is discarded, or if the
773     * iterator reports that it is "detached" (does not need any
774     * further state updates). Overhead is maximal when takeIndex
775     * never advances, iterators are discarded before they are
776     * exhausted, and all removals are interior removes, in which case
777     * all stale iterators are discovered by the GC. But even in this
778     * case we don't increase the amortized complexity.
779     *
780     * Care must be taken to keep list sweeping methods from
781     * reentrantly invoking another such method, causing subtle
782     * corruption bugs.
783     */
784     class Itrs {
785    
786     /**
787     * Node in a linked list of weak iterator references.
788     */
789     private class Node extends WeakReference<Itr> {
790     Node next;
791    
792     Node(Itr iterator, Node next) {
793     super(iterator);
794     this.next = next;
795     }
796     }
797    
798     /** Incremented whenever takeIndex wraps around to 0 */
799     int cycles = 0;
800    
801     /** Linked list of weak iterator references */
802     private Node head;
803    
804     /** Used to expunge stale iterators */
805     private Node sweeper = null;
806    
807     private static final int SHORT_SWEEP_PROBES = 4;
808     private static final int LONG_SWEEP_PROBES = 16;
809    
810     Itrs(Itr initial) {
811     register(initial);
812     }
813    
814     /**
815     * Sweeps itrs, looking for and expunging stale iterators.
816     * If at least one was found, tries harder to find more.
817     * Called only from iterating thread.
818     *
819     * @param tryHarder whether to start in try-harder mode, because
820     * there is known to be at least one iterator to collect
821     */
822     void doSomeSweeping(boolean tryHarder) {
823     // assert lock.getHoldCount() == 1;
824     // assert head != null;
825     int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
826     Node o, p;
827     final Node sweeper = this.sweeper;
828     boolean passedGo; // to limit search to one full sweep
829    
830     if (sweeper == null) {
831     o = null;
832     p = head;
833     passedGo = true;
834     } else {
835     o = sweeper;
836     p = o.next;
837     passedGo = false;
838     }
839    
840     for (; probes > 0; probes--) {
841     if (p == null) {
842     if (passedGo)
843     break;
844     o = null;
845     p = head;
846     passedGo = true;
847     }
848     final Itr it = p.get();
849     final Node next = p.next;
850     if (it == null || it.isDetached()) {
851     // found a discarded/exhausted iterator
852     probes = LONG_SWEEP_PROBES; // "try harder"
853     // unlink p
854     p.clear();
855     p.next = null;
856     if (o == null) {
857     head = next;
858     if (next == null) {
859     // We've run out of iterators to track; retire
860     itrs = null;
861     return;
862     }
863     }
864     else
865     o.next = next;
866     } else {
867     o = p;
868     }
869     p = next;
870     }
871    
872     this.sweeper = (p == null) ? null : o;
873     }
874    
875     /**
876     * Adds a new iterator to the linked list of tracked iterators.
877     */
878     void register(Itr itr) {
879     // assert lock.getHoldCount() == 1;
880     head = new Node(itr, head);
881     }
882    
883     /**
884     * Called whenever takeIndex wraps around to 0.
885     *
886     * Notifies all iterators, and expunges any that are now stale.
887     */
888     void takeIndexWrapped() {
889     // assert lock.getHoldCount() == 1;
890     cycles++;
891     for (Node o = null, p = head; p != null;) {
892     final Itr it = p.get();
893     final Node next = p.next;
894     if (it == null || it.takeIndexWrapped()) {
895     // unlink p
896     // assert it == null || it.isDetached();
897     p.clear();
898     p.next = null;
899     if (o == null)
900     head = next;
901     else
902     o.next = next;
903     } else {
904     o = p;
905     }
906     p = next;
907     }
908     if (head == null) // no more iterators to track
909     itrs = null;
910     }
911    
912     /**
913 jsr166 1.4 * Called whenever an interior remove (not at takeIndex) occurred.
914 dl 1.1 *
915     * Notifies all iterators, and expunges any that are now stale.
916     */
917     void removedAt(int removedIndex) {
918     for (Node o = null, p = head; p != null;) {
919     final Itr it = p.get();
920     final Node next = p.next;
921     if (it == null || it.removedAt(removedIndex)) {
922     // unlink p
923     // assert it == null || it.isDetached();
924     p.clear();
925     p.next = null;
926     if (o == null)
927     head = next;
928     else
929     o.next = next;
930     } else {
931     o = p;
932     }
933     p = next;
934     }
935     if (head == null) // no more iterators to track
936     itrs = null;
937     }
938    
939     /**
940     * Called whenever the queue becomes empty.
941     *
942     * Notifies all active iterators that the queue is empty,
943     * clears all weak refs, and unlinks the itrs datastructure.
944     */
945     void queueIsEmpty() {
946     // assert lock.getHoldCount() == 1;
947     for (Node p = head; p != null; p = p.next) {
948     Itr it = p.get();
949     if (it != null) {
950     p.clear();
951     it.shutdown();
952     }
953     }
954     head = null;
955     itrs = null;
956     }
957    
958     /**
959     * Called whenever an element has been dequeued (at takeIndex).
960     */
961     void elementDequeued() {
962     // assert lock.getHoldCount() == 1;
963     if (count == 0)
964     queueIsEmpty();
965     else if (takeIndex == 0)
966     takeIndexWrapped();
967     }
968     }
969    
970     /**
971     * Iterator for ArrayBlockingQueue.
972     *
973     * To maintain weak consistency with respect to puts and takes, we
974     * read ahead one slot, so as to not report hasNext true but then
975     * not have an element to return.
976     *
977     * We switch into "detached" mode (allowing prompt unlinking from
978     * itrs without help from the GC) when all indices are negative, or
979     * when hasNext returns false for the first time. This allows the
980     * iterator to track concurrent updates completely accurately,
981     * except for the corner case of the user calling Iterator.remove()
982     * after hasNext() returned false. Even in this case, we ensure
983     * that we don't remove the wrong element by keeping track of the
984     * expected element to remove, in lastItem. Yes, we may fail to
985     * remove lastItem from the queue if it moved due to an interleaved
986     * interior remove while in detached mode.
987     */
988     private class Itr implements Iterator<E> {
989     /** Index to look for new nextItem; NONE at end */
990     private int cursor;
991    
992     /** Element to be returned by next call to next(); null if none */
993     private E nextItem;
994    
995     /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
996     private int nextIndex;
997    
998     /** Last element returned; null if none or not detached. */
999     private E lastItem;
1000    
1001     /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1002     private int lastRet;
1003    
1004     /** Previous value of takeIndex, or DETACHED when detached */
1005     private int prevTakeIndex;
1006    
1007     /** Previous value of iters.cycles */
1008     private int prevCycles;
1009    
1010     /** Special index value indicating "not available" or "undefined" */
1011     private static final int NONE = -1;
1012    
1013     /**
1014     * Special index value indicating "removed elsewhere", that is,
1015     * removed by some operation other than a call to this.remove().
1016     */
1017     private static final int REMOVED = -2;
1018    
1019     /** Special value for prevTakeIndex indicating "detached mode" */
1020     private static final int DETACHED = -3;
1021    
1022     Itr() {
1023     // assert lock.getHoldCount() == 0;
1024     lastRet = NONE;
1025     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1026     lock.lock();
1027     try {
1028     if (count == 0) {
1029     // assert itrs == null;
1030     cursor = NONE;
1031     nextIndex = NONE;
1032     prevTakeIndex = DETACHED;
1033     } else {
1034     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1035     prevTakeIndex = takeIndex;
1036     nextItem = itemAt(nextIndex = takeIndex);
1037     cursor = incCursor(takeIndex);
1038     if (itrs == null) {
1039     itrs = new Itrs(this);
1040     } else {
1041     itrs.register(this); // in this order
1042     itrs.doSomeSweeping(false);
1043     }
1044     prevCycles = itrs.cycles;
1045     // assert takeIndex >= 0;
1046     // assert prevTakeIndex == takeIndex;
1047     // assert nextIndex >= 0;
1048     // assert nextItem != null;
1049     }
1050     } finally {
1051     lock.unlock();
1052     }
1053     }
1054    
1055     boolean isDetached() {
1056     // assert lock.getHoldCount() == 1;
1057     return prevTakeIndex < 0;
1058     }
1059    
1060     private int incCursor(int index) {
1061     // assert lock.getHoldCount() == 1;
1062     index = inc(index);
1063     if (index == putIndex)
1064     index = NONE;
1065     return index;
1066     }
1067    
1068     /**
1069     * Returns true if index is invalidated by the given number of
1070     * dequeues, starting from prevTakeIndex.
1071     */
1072     private boolean invalidated(int index, int prevTakeIndex,
1073     long dequeues, int length) {
1074     if (index < 0)
1075     return false;
1076     int distance = index - prevTakeIndex;
1077     if (distance < 0)
1078     distance += length;
1079     return dequeues > distance;
1080     }
1081    
1082     /**
1083     * Adjusts indices to incorporate all dequeues since the last
1084     * operation on this iterator. Call only from iterating thread.
1085     */
1086     private void incorporateDequeues() {
1087     // assert lock.getHoldCount() == 1;
1088     // assert itrs != null;
1089     // assert !isDetached();
1090     // assert count > 0;
1091    
1092     final int cycles = itrs.cycles;
1093     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1094     final int prevCycles = this.prevCycles;
1095     final int prevTakeIndex = this.prevTakeIndex;
1096    
1097     if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1098     final int len = items.length;
1099     // how far takeIndex has advanced since the previous
1100     // operation of this iterator
1101     long dequeues = (cycles - prevCycles) * len
1102     + (takeIndex - prevTakeIndex);
1103    
1104     // Check indices for invalidation
1105     if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1106     lastRet = REMOVED;
1107     if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1108     nextIndex = REMOVED;
1109     if (invalidated(cursor, prevTakeIndex, dequeues, len))
1110     cursor = takeIndex;
1111    
1112     if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1113     detach();
1114     else {
1115     this.prevCycles = cycles;
1116     this.prevTakeIndex = takeIndex;
1117     }
1118     }
1119     }
1120    
1121     /**
1122     * Called when itrs should stop tracking this iterator, either
1123     * because there are no more indices to update (cursor < 0 &&
1124     * nextIndex < 0 && lastRet < 0) or as a special exception, when
1125     * lastRet >= 0, because hasNext() is about to return false for the
1126     * first time. Call only from iterating thread.
1127     */
1128     private void detach() {
1129     // Switch to detached mode
1130     // assert lock.getHoldCount() == 1;
1131     // assert cursor == NONE;
1132     // assert nextIndex < 0;
1133     // assert lastRet < 0 || nextItem == null;
1134     // assert lastRet < 0 ^ lastItem != null;
1135     if (prevTakeIndex >= 0) {
1136     // assert itrs != null;
1137     prevTakeIndex = DETACHED;
1138     // try to unlink from itrs (but not too hard)
1139     itrs.doSomeSweeping(true);
1140     }
1141     }
1142    
1143     /**
1144     * For performance reasons, we would like not to acquire a lock in
1145     * hasNext in the common case. To allow for this, we only access
1146     * fields (i.e. nextItem) that are not modified by update operations
1147     * triggered by queue modifications.
1148     */
1149     public boolean hasNext() {
1150     // assert lock.getHoldCount() == 0;
1151     if (nextItem != null)
1152     return true;
1153     noNext();
1154     return false;
1155     }
1156    
1157     private void noNext() {
1158     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1159     lock.lock();
1160     try {
1161     // assert cursor == NONE;
1162     // assert nextIndex == NONE;
1163     if (!isDetached()) {
1164     // assert lastRet >= 0;
1165     incorporateDequeues(); // might update lastRet
1166     if (lastRet >= 0) {
1167     lastItem = itemAt(lastRet);
1168     // assert lastItem != null;
1169     detach();
1170     }
1171     }
1172     // assert isDetached();
1173     // assert lastRet < 0 ^ lastItem != null;
1174     } finally {
1175     lock.unlock();
1176     }
1177     }
1178    
1179     public E next() {
1180     // assert lock.getHoldCount() == 0;
1181     final E x = nextItem;
1182     if (x == null)
1183     throw new NoSuchElementException();
1184     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1185     lock.lock();
1186     try {
1187     if (!isDetached())
1188     incorporateDequeues();
1189     // assert nextIndex != NONE;
1190     // assert lastItem == null;
1191     lastRet = nextIndex;
1192     final int cursor = this.cursor;
1193     if (cursor >= 0) {
1194     nextItem = itemAt(nextIndex = cursor);
1195     // assert nextItem != null;
1196     this.cursor = incCursor(cursor);
1197     } else {
1198     nextIndex = NONE;
1199     nextItem = null;
1200     }
1201     } finally {
1202     lock.unlock();
1203     }
1204     return x;
1205     }
1206    
1207     public void remove() {
1208     // assert lock.getHoldCount() == 0;
1209     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1210     lock.lock();
1211     try {
1212     if (!isDetached())
1213     incorporateDequeues(); // might update lastRet or detach
1214     final int lastRet = this.lastRet;
1215     this.lastRet = NONE;
1216     if (lastRet >= 0) {
1217     if (!isDetached())
1218     removeAt(lastRet);
1219     else {
1220     final E lastItem = this.lastItem;
1221     // assert lastItem != null;
1222     this.lastItem = null;
1223     if (itemAt(lastRet) == lastItem)
1224     removeAt(lastRet);
1225     }
1226     } else if (lastRet == NONE)
1227     throw new IllegalStateException();
1228     // else lastRet == REMOVED and the last returned element was
1229     // previously asynchronously removed via an operation other
1230     // than this.remove(), so nothing to do.
1231    
1232     if (cursor < 0 && nextIndex < 0)
1233     detach();
1234     } finally {
1235     lock.unlock();
1236     // assert lastRet == NONE;
1237     // assert lastItem == null;
1238     }
1239     }
1240    
1241     /**
1242     * Called to notify the iterator that the queue is empty, or that it
1243     * has fallen hopelessly behind, so that it should abandon any
1244     * further iteration, except possibly to return one more element
1245     * from next(), as promised by returning true from hasNext().
1246     */
1247     void shutdown() {
1248     // assert lock.getHoldCount() == 1;
1249     cursor = NONE;
1250     if (nextIndex >= 0)
1251     nextIndex = REMOVED;
1252     if (lastRet >= 0) {
1253     lastRet = REMOVED;
1254     lastItem = null;
1255     }
1256     prevTakeIndex = DETACHED;
1257     // Don't set nextItem to null because we must continue to be
1258     // able to return it on next().
1259     //
1260     // Caller will unlink from itrs when convenient.
1261     }
1262    
1263     private int distance(int index, int prevTakeIndex, int length) {
1264     int distance = index - prevTakeIndex;
1265     if (distance < 0)
1266     distance += length;
1267     return distance;
1268     }
1269    
1270     /**
1271 jsr166 1.4 * Called whenever an interior remove (not at takeIndex) occurred.
1272 dl 1.1 *
1273     * @return true if this iterator should be unlinked from itrs
1274     */
1275     boolean removedAt(int removedIndex) {
1276     // assert lock.getHoldCount() == 1;
1277     if (isDetached())
1278     return true;
1279    
1280     final int cycles = itrs.cycles;
1281     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1282     final int prevCycles = this.prevCycles;
1283     final int prevTakeIndex = this.prevTakeIndex;
1284     final int len = items.length;
1285     int cycleDiff = cycles - prevCycles;
1286     if (removedIndex < takeIndex)
1287     cycleDiff++;
1288     final int removedDistance =
1289     (cycleDiff * len) + (removedIndex - prevTakeIndex);
1290     // assert removedDistance >= 0;
1291     int cursor = this.cursor;
1292     if (cursor >= 0) {
1293     int x = distance(cursor, prevTakeIndex, len);
1294     if (x == removedDistance) {
1295     if (cursor == putIndex)
1296     this.cursor = cursor = NONE;
1297     }
1298     else if (x > removedDistance) {
1299     // assert cursor != prevTakeIndex;
1300     this.cursor = cursor = dec(cursor);
1301     }
1302     }
1303     int lastRet = this.lastRet;
1304     if (lastRet >= 0) {
1305     int x = distance(lastRet, prevTakeIndex, len);
1306     if (x == removedDistance)
1307     this.lastRet = lastRet = REMOVED;
1308     else if (x > removedDistance)
1309     this.lastRet = lastRet = dec(lastRet);
1310     }
1311     int nextIndex = this.nextIndex;
1312     if (nextIndex >= 0) {
1313     int x = distance(nextIndex, prevTakeIndex, len);
1314     if (x == removedDistance)
1315     this.nextIndex = nextIndex = REMOVED;
1316     else if (x > removedDistance)
1317     this.nextIndex = nextIndex = dec(nextIndex);
1318     }
1319     else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1320     this.prevTakeIndex = DETACHED;
1321     return true;
1322     }
1323     return false;
1324     }
1325    
1326     /**
1327     * Called whenever takeIndex wraps around to zero.
1328     *
1329     * @return true if this iterator should be unlinked from itrs
1330     */
1331     boolean takeIndexWrapped() {
1332     // assert lock.getHoldCount() == 1;
1333     if (isDetached())
1334     return true;
1335     if (itrs.cycles - prevCycles > 1) {
1336     // All the elements that existed at the time of the last
1337     // operation are gone, so abandon further iteration.
1338     shutdown();
1339     return true;
1340     }
1341     return false;
1342     }
1343    
1344     // /** Uncomment for debugging. */
1345     // public String toString() {
1346     // return ("cursor=" + cursor + " " +
1347     // "nextIndex=" + nextIndex + " " +
1348     // "lastRet=" + lastRet + " " +
1349     // "nextItem=" + nextItem + " " +
1350     // "lastItem=" + lastItem + " " +
1351     // "prevCycles=" + prevCycles + " " +
1352     // "prevTakeIndex=" + prevTakeIndex + " " +
1353     // "size()=" + size() + " " +
1354     // "remainingCapacity()=" + remainingCapacity());
1355     // }
1356     }
1357     }