ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.1
Committed: Sat Mar 26 06:22:49 2016 UTC (8 years, 1 month ago) by jsr166
Branch: MAIN
Log Message:
fork jdk8 maintenance branch for source and jtreg tests

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     import java.lang.ref.WeakReference;
10     import java.util.AbstractQueue;
11     import java.util.Arrays;
12     import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15     import java.util.Objects;
16     import java.util.Spliterator;
17     import java.util.Spliterators;
18     import java.util.concurrent.locks.Condition;
19     import java.util.concurrent.locks.ReentrantLock;
20    
21     /**
22     * A bounded {@linkplain BlockingQueue blocking queue} backed by an
23     * array. This queue orders elements FIFO (first-in-first-out). The
24     * <em>head</em> of the queue is that element that has been on the
25     * queue the longest time. The <em>tail</em> of the queue is that
26     * element that has been on the queue the shortest time. New elements
27     * are inserted at the tail of the queue, and the queue retrieval
28     * operations obtain elements at the head of the queue.
29     *
30     * <p>This is a classic &quot;bounded buffer&quot;, in which a
31     * fixed-sized array holds elements inserted by producers and
32     * extracted by consumers. Once created, the capacity cannot be
33     * changed. Attempts to {@code put} an element into a full queue
34     * will result in the operation blocking; attempts to {@code take} an
35     * element from an empty queue will similarly block.
36     *
37     * <p>This class supports an optional fairness policy for ordering
38     * waiting producer and consumer threads. By default, this ordering
39     * is not guaranteed. However, a queue constructed with fairness set
40     * to {@code true} grants threads access in FIFO order. Fairness
41     * generally decreases throughput but reduces variability and avoids
42     * starvation.
43     *
44     * <p>This class and its iterator implement all of the
45     * <em>optional</em> methods of the {@link Collection} and {@link
46     * Iterator} interfaces.
47     *
48     * <p>This class is a member of the
49     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
50     * Java Collections Framework</a>.
51     *
52     * @since 1.5
53     * @author Doug Lea
54     * @param <E> the type of elements held in this queue
55     */
56     public class ArrayBlockingQueue<E> extends AbstractQueue<E>
57     implements BlockingQueue<E>, java.io.Serializable {
58    
59     /**
60     * Serialization ID. This class relies on default serialization
61     * even for the items array, which is default-serialized, even if
62     * it is empty. Otherwise it could not be declared final, which is
63     * necessary here.
64     */
65     private static final long serialVersionUID = -817911632652898426L;
66    
67     /** The queued items */
68     final Object[] items;
69    
70     /** items index for next take, poll, peek or remove */
71     int takeIndex;
72    
73     /** items index for next put, offer, or add */
74     int putIndex;
75    
76     /** Number of elements in the queue */
77     int count;
78    
79     /*
80     * Concurrency control uses the classic two-condition algorithm
81     * found in any textbook.
82     */
83    
84     /** Main lock guarding all access */
85     final ReentrantLock lock;
86    
87     /** Condition for waiting takes */
88     private final Condition notEmpty;
89    
90     /** Condition for waiting puts */
91     private final Condition notFull;
92    
93     /**
94     * Shared state for currently active iterators, or null if there
95     * are known not to be any. Allows queue operations to update
96     * iterator state.
97     */
98     transient Itrs itrs;
99    
100     // Internal helper methods
101    
102     /**
103     * Circularly decrements array index i.
104     */
105     final int dec(int i) {
106     return ((i == 0) ? items.length : i) - 1;
107     }
108    
109     /**
110     * Returns item at index i.
111     */
112     @SuppressWarnings("unchecked")
113     final E itemAt(int i) {
114     return (E) items[i];
115     }
116    
117     /**
118     * Inserts element at current put position, advances, and signals.
119     * Call only when holding lock.
120     */
121     private void enqueue(E x) {
122     // assert lock.getHoldCount() == 1;
123     // assert items[putIndex] == null;
124     final Object[] items = this.items;
125     items[putIndex] = x;
126     if (++putIndex == items.length) putIndex = 0;
127     count++;
128     notEmpty.signal();
129     }
130    
131     /**
132     * Extracts element at current take position, advances, and signals.
133     * Call only when holding lock.
134     */
135     private E dequeue() {
136     // assert lock.getHoldCount() == 1;
137     // assert items[takeIndex] != null;
138     final Object[] items = this.items;
139     @SuppressWarnings("unchecked")
140     E x = (E) items[takeIndex];
141     items[takeIndex] = null;
142     if (++takeIndex == items.length) takeIndex = 0;
143     count--;
144     if (itrs != null)
145     itrs.elementDequeued();
146     notFull.signal();
147     return x;
148     }
149    
150     /**
151     * Deletes item at array index removeIndex.
152     * Utility for remove(Object) and iterator.remove.
153     * Call only when holding lock.
154     */
155     void removeAt(final int removeIndex) {
156     // assert lock.getHoldCount() == 1;
157     // assert items[removeIndex] != null;
158     // assert removeIndex >= 0 && removeIndex < items.length;
159     final Object[] items = this.items;
160     if (removeIndex == takeIndex) {
161     // removing front item; just advance
162     items[takeIndex] = null;
163     if (++takeIndex == items.length) takeIndex = 0;
164     count--;
165     if (itrs != null)
166     itrs.elementDequeued();
167     } else {
168     // an "interior" remove
169    
170     // slide over all others up through putIndex.
171     for (int i = removeIndex, putIndex = this.putIndex;;) {
172     int pred = i;
173     if (++i == items.length) i = 0;
174     if (i == putIndex) {
175     items[pred] = null;
176     this.putIndex = pred;
177     break;
178     }
179     items[pred] = items[i];
180     }
181     count--;
182     if (itrs != null)
183     itrs.removedAt(removeIndex);
184     }
185     notFull.signal();
186     }
187    
188     /**
189     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
190     * capacity and default access policy.
191     *
192     * @param capacity the capacity of this queue
193     * @throws IllegalArgumentException if {@code capacity < 1}
194     */
195     public ArrayBlockingQueue(int capacity) {
196     this(capacity, false);
197     }
198    
199     /**
200     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
201     * capacity and the specified access policy.
202     *
203     * @param capacity the capacity of this queue
204     * @param fair if {@code true} then queue accesses for threads blocked
205     * on insertion or removal, are processed in FIFO order;
206     * if {@code false} the access order is unspecified.
207     * @throws IllegalArgumentException if {@code capacity < 1}
208     */
209     public ArrayBlockingQueue(int capacity, boolean fair) {
210     if (capacity <= 0)
211     throw new IllegalArgumentException();
212     this.items = new Object[capacity];
213     lock = new ReentrantLock(fair);
214     notEmpty = lock.newCondition();
215     notFull = lock.newCondition();
216     }
217    
218     /**
219     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
220     * capacity, the specified access policy and initially containing the
221     * elements of the given collection,
222     * added in traversal order of the collection's iterator.
223     *
224     * @param capacity the capacity of this queue
225     * @param fair if {@code true} then queue accesses for threads blocked
226     * on insertion or removal, are processed in FIFO order;
227     * if {@code false} the access order is unspecified.
228     * @param c the collection of elements to initially contain
229     * @throws IllegalArgumentException if {@code capacity} is less than
230     * {@code c.size()}, or less than 1.
231     * @throws NullPointerException if the specified collection or any
232     * of its elements are null
233     */
234     public ArrayBlockingQueue(int capacity, boolean fair,
235     Collection<? extends E> c) {
236     this(capacity, fair);
237    
238     final ReentrantLock lock = this.lock;
239     lock.lock(); // Lock only for visibility, not mutual exclusion
240     try {
241     int i = 0;
242     try {
243     for (E e : c)
244     items[i++] = Objects.requireNonNull(e);
245     } catch (ArrayIndexOutOfBoundsException ex) {
246     throw new IllegalArgumentException();
247     }
248     count = i;
249     putIndex = (i == capacity) ? 0 : i;
250     } finally {
251     lock.unlock();
252     }
253     }
254    
255     /**
256     * Inserts the specified element at the tail of this queue if it is
257     * possible to do so immediately without exceeding the queue's capacity,
258     * returning {@code true} upon success and throwing an
259     * {@code IllegalStateException} if this queue is full.
260     *
261     * @param e the element to add
262     * @return {@code true} (as specified by {@link Collection#add})
263     * @throws IllegalStateException if this queue is full
264     * @throws NullPointerException if the specified element is null
265     */
266     public boolean add(E e) {
267     return super.add(e);
268     }
269    
270     /**
271     * Inserts the specified element at the tail of this queue if it is
272     * possible to do so immediately without exceeding the queue's capacity,
273     * returning {@code true} upon success and {@code false} if this queue
274     * is full. This method is generally preferable to method {@link #add},
275     * which can fail to insert an element only by throwing an exception.
276     *
277     * @throws NullPointerException if the specified element is null
278     */
279     public boolean offer(E e) {
280     Objects.requireNonNull(e);
281     final ReentrantLock lock = this.lock;
282     lock.lock();
283     try {
284     if (count == items.length)
285     return false;
286     else {
287     enqueue(e);
288     return true;
289     }
290     } finally {
291     lock.unlock();
292     }
293     }
294    
295     /**
296     * Inserts the specified element at the tail of this queue, waiting
297     * for space to become available if the queue is full.
298     *
299     * @throws InterruptedException {@inheritDoc}
300     * @throws NullPointerException {@inheritDoc}
301     */
302     public void put(E e) throws InterruptedException {
303     Objects.requireNonNull(e);
304     final ReentrantLock lock = this.lock;
305     lock.lockInterruptibly();
306     try {
307     while (count == items.length)
308     notFull.await();
309     enqueue(e);
310     } finally {
311     lock.unlock();
312     }
313     }
314    
315     /**
316     * Inserts the specified element at the tail of this queue, waiting
317     * up to the specified wait time for space to become available if
318     * the queue is full.
319     *
320     * @throws InterruptedException {@inheritDoc}
321     * @throws NullPointerException {@inheritDoc}
322     */
323     public boolean offer(E e, long timeout, TimeUnit unit)
324     throws InterruptedException {
325    
326     Objects.requireNonNull(e);
327     long nanos = unit.toNanos(timeout);
328     final ReentrantLock lock = this.lock;
329     lock.lockInterruptibly();
330     try {
331     while (count == items.length) {
332     if (nanos <= 0L)
333     return false;
334     nanos = notFull.awaitNanos(nanos);
335     }
336     enqueue(e);
337     return true;
338     } finally {
339     lock.unlock();
340     }
341     }
342    
343     public E poll() {
344     final ReentrantLock lock = this.lock;
345     lock.lock();
346     try {
347     return (count == 0) ? null : dequeue();
348     } finally {
349     lock.unlock();
350     }
351     }
352    
353     public E take() throws InterruptedException {
354     final ReentrantLock lock = this.lock;
355     lock.lockInterruptibly();
356     try {
357     while (count == 0)
358     notEmpty.await();
359     return dequeue();
360     } finally {
361     lock.unlock();
362     }
363     }
364    
365     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
366     long nanos = unit.toNanos(timeout);
367     final ReentrantLock lock = this.lock;
368     lock.lockInterruptibly();
369     try {
370     while (count == 0) {
371     if (nanos <= 0L)
372     return null;
373     nanos = notEmpty.awaitNanos(nanos);
374     }
375     return dequeue();
376     } finally {
377     lock.unlock();
378     }
379     }
380    
381     public E peek() {
382     final ReentrantLock lock = this.lock;
383     lock.lock();
384     try {
385     return itemAt(takeIndex); // null when queue is empty
386     } finally {
387     lock.unlock();
388     }
389     }
390    
391     // this doc comment is overridden to remove the reference to collections
392     // greater in size than Integer.MAX_VALUE
393     /**
394     * Returns the number of elements in this queue.
395     *
396     * @return the number of elements in this queue
397     */
398     public int size() {
399     final ReentrantLock lock = this.lock;
400     lock.lock();
401     try {
402     return count;
403     } finally {
404     lock.unlock();
405     }
406     }
407    
408     // this doc comment is a modified copy of the inherited doc comment,
409     // without the reference to unlimited queues.
410     /**
411     * Returns the number of additional elements that this queue can ideally
412     * (in the absence of memory or resource constraints) accept without
413     * blocking. This is always equal to the initial capacity of this queue
414     * less the current {@code size} of this queue.
415     *
416     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
417     * an element will succeed by inspecting {@code remainingCapacity}
418     * because it may be the case that another thread is about to
419     * insert or remove an element.
420     */
421     public int remainingCapacity() {
422     final ReentrantLock lock = this.lock;
423     lock.lock();
424     try {
425     return items.length - count;
426     } finally {
427     lock.unlock();
428     }
429     }
430    
431     /**
432     * Removes a single instance of the specified element from this queue,
433     * if it is present. More formally, removes an element {@code e} such
434     * that {@code o.equals(e)}, if this queue contains one or more such
435     * elements.
436     * Returns {@code true} if this queue contained the specified element
437     * (or equivalently, if this queue changed as a result of the call).
438     *
439     * <p>Removal of interior elements in circular array based queues
440     * is an intrinsically slow and disruptive operation, so should
441     * be undertaken only in exceptional circumstances, ideally
442     * only when the queue is known not to be accessible by other
443     * threads.
444     *
445     * @param o element to be removed from this queue, if present
446     * @return {@code true} if this queue changed as a result of the call
447     */
448     public boolean remove(Object o) {
449     if (o == null) return false;
450     final ReentrantLock lock = this.lock;
451     lock.lock();
452     try {
453     if (count > 0) {
454     final Object[] items = this.items;
455     final int putIndex = this.putIndex;
456     int i = takeIndex;
457     do {
458     if (o.equals(items[i])) {
459     removeAt(i);
460     return true;
461     }
462     if (++i == items.length) i = 0;
463     } while (i != putIndex);
464     }
465     return false;
466     } finally {
467     lock.unlock();
468     }
469     }
470    
471     /**
472     * Returns {@code true} if this queue contains the specified element.
473     * More formally, returns {@code true} if and only if this queue contains
474     * at least one element {@code e} such that {@code o.equals(e)}.
475     *
476     * @param o object to be checked for containment in this queue
477     * @return {@code true} if this queue contains the specified element
478     */
479     public boolean contains(Object o) {
480     if (o == null) return false;
481     final ReentrantLock lock = this.lock;
482     lock.lock();
483     try {
484     if (count > 0) {
485     final Object[] items = this.items;
486     final int putIndex = this.putIndex;
487     int i = takeIndex;
488     do {
489     if (o.equals(items[i]))
490     return true;
491     if (++i == items.length) i = 0;
492     } while (i != putIndex);
493     }
494     return false;
495     } finally {
496     lock.unlock();
497     }
498     }
499    
500     /**
501     * Returns an array containing all of the elements in this queue, in
502     * proper sequence.
503     *
504     * <p>The returned array will be "safe" in that no references to it are
505     * maintained by this queue. (In other words, this method must allocate
506     * a new array). The caller is thus free to modify the returned array.
507     *
508     * <p>This method acts as bridge between array-based and collection-based
509     * APIs.
510     *
511     * @return an array containing all of the elements in this queue
512     */
513     public Object[] toArray() {
514     final ReentrantLock lock = this.lock;
515     lock.lock();
516     try {
517     final Object[] items = this.items;
518     final int end = takeIndex + count;
519     final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
520     if (end != putIndex)
521     System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
522     return a;
523     } finally {
524     lock.unlock();
525     }
526     }
527    
528     /**
529     * Returns an array containing all of the elements in this queue, in
530     * proper sequence; the runtime type of the returned array is that of
531     * the specified array. If the queue fits in the specified array, it
532     * is returned therein. Otherwise, a new array is allocated with the
533     * runtime type of the specified array and the size of this queue.
534     *
535     * <p>If this queue fits in the specified array with room to spare
536     * (i.e., the array has more elements than this queue), the element in
537     * the array immediately following the end of the queue is set to
538     * {@code null}.
539     *
540     * <p>Like the {@link #toArray()} method, this method acts as bridge between
541     * array-based and collection-based APIs. Further, this method allows
542     * precise control over the runtime type of the output array, and may,
543     * under certain circumstances, be used to save allocation costs.
544     *
545     * <p>Suppose {@code x} is a queue known to contain only strings.
546     * The following code can be used to dump the queue into a newly
547     * allocated array of {@code String}:
548     *
549     * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
550     *
551     * Note that {@code toArray(new Object[0])} is identical in function to
552     * {@code toArray()}.
553     *
554     * @param a the array into which the elements of the queue are to
555     * be stored, if it is big enough; otherwise, a new array of the
556     * same runtime type is allocated for this purpose
557     * @return an array containing all of the elements in this queue
558     * @throws ArrayStoreException if the runtime type of the specified array
559     * is not a supertype of the runtime type of every element in
560     * this queue
561     * @throws NullPointerException if the specified array is null
562     */
563     @SuppressWarnings("unchecked")
564     public <T> T[] toArray(T[] a) {
565     final ReentrantLock lock = this.lock;
566     lock.lock();
567     try {
568     final Object[] items = this.items;
569     final int count = this.count;
570     final int firstLeg = Math.min(items.length - takeIndex, count);
571     if (a.length < count) {
572     a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
573     a.getClass());
574     } else {
575     System.arraycopy(items, takeIndex, a, 0, firstLeg);
576     if (a.length > count)
577     a[count] = null;
578     }
579     if (firstLeg < count)
580     System.arraycopy(items, 0, a, firstLeg, putIndex);
581     return a;
582     } finally {
583     lock.unlock();
584     }
585     }
586    
587     public String toString() {
588     return Helpers.collectionToString(this);
589     }
590    
591     /**
592     * Atomically removes all of the elements from this queue.
593     * The queue will be empty after this call returns.
594     */
595     public void clear() {
596     final ReentrantLock lock = this.lock;
597     lock.lock();
598     try {
599     int k = count;
600     if (k > 0) {
601     final Object[] items = this.items;
602     final int putIndex = this.putIndex;
603     int i = takeIndex;
604     do {
605     items[i] = null;
606     if (++i == items.length) i = 0;
607     } while (i != putIndex);
608     takeIndex = putIndex;
609     count = 0;
610     if (itrs != null)
611     itrs.queueIsEmpty();
612     for (; k > 0 && lock.hasWaiters(notFull); k--)
613     notFull.signal();
614     }
615     } finally {
616     lock.unlock();
617     }
618     }
619    
620     /**
621     * @throws UnsupportedOperationException {@inheritDoc}
622     * @throws ClassCastException {@inheritDoc}
623     * @throws NullPointerException {@inheritDoc}
624     * @throws IllegalArgumentException {@inheritDoc}
625     */
626     public int drainTo(Collection<? super E> c) {
627     return drainTo(c, Integer.MAX_VALUE);
628     }
629    
630     /**
631     * @throws UnsupportedOperationException {@inheritDoc}
632     * @throws ClassCastException {@inheritDoc}
633     * @throws NullPointerException {@inheritDoc}
634     * @throws IllegalArgumentException {@inheritDoc}
635     */
636     public int drainTo(Collection<? super E> c, int maxElements) {
637     Objects.requireNonNull(c);
638     if (c == this)
639     throw new IllegalArgumentException();
640     if (maxElements <= 0)
641     return 0;
642     final Object[] items = this.items;
643     final ReentrantLock lock = this.lock;
644     lock.lock();
645     try {
646     int n = Math.min(maxElements, count);
647     int take = takeIndex;
648     int i = 0;
649     try {
650     while (i < n) {
651     @SuppressWarnings("unchecked")
652     E x = (E) items[take];
653     c.add(x);
654     items[take] = null;
655     if (++take == items.length) take = 0;
656     i++;
657     }
658     return n;
659     } finally {
660     // Restore invariants even if c.add() threw
661     if (i > 0) {
662     count -= i;
663     takeIndex = take;
664     if (itrs != null) {
665     if (count == 0)
666     itrs.queueIsEmpty();
667     else if (i > take)
668     itrs.takeIndexWrapped();
669     }
670     for (; i > 0 && lock.hasWaiters(notFull); i--)
671     notFull.signal();
672     }
673     }
674     } finally {
675     lock.unlock();
676     }
677     }
678    
679     /**
680     * Returns an iterator over the elements in this queue in proper sequence.
681     * The elements will be returned in order from first (head) to last (tail).
682     *
683     * <p>The returned iterator is
684     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
685     *
686     * @return an iterator over the elements in this queue in proper sequence
687     */
688     public Iterator<E> iterator() {
689     return new Itr();
690     }
691    
692     /**
693     * Shared data between iterators and their queue, allowing queue
694     * modifications to update iterators when elements are removed.
695     *
696     * This adds a lot of complexity for the sake of correctly
697     * handling some uncommon operations, but the combination of
698     * circular-arrays and supporting interior removes (i.e., those
699     * not at head) would cause iterators to sometimes lose their
700     * places and/or (re)report elements they shouldn't. To avoid
701     * this, when a queue has one or more iterators, it keeps iterator
702     * state consistent by:
703     *
704     * (1) keeping track of the number of "cycles", that is, the
705     * number of times takeIndex has wrapped around to 0.
706     * (2) notifying all iterators via the callback removedAt whenever
707     * an interior element is removed (and thus other elements may
708     * be shifted).
709     *
710     * These suffice to eliminate iterator inconsistencies, but
711     * unfortunately add the secondary responsibility of maintaining
712     * the list of iterators. We track all active iterators in a
713     * simple linked list (accessed only when the queue's lock is
714     * held) of weak references to Itr. The list is cleaned up using
715     * 3 different mechanisms:
716     *
717     * (1) Whenever a new iterator is created, do some O(1) checking for
718     * stale list elements.
719     *
720     * (2) Whenever takeIndex wraps around to 0, check for iterators
721     * that have been unused for more than one wrap-around cycle.
722     *
723     * (3) Whenever the queue becomes empty, all iterators are notified
724     * and this entire data structure is discarded.
725     *
726     * So in addition to the removedAt callback that is necessary for
727     * correctness, iterators have the shutdown and takeIndexWrapped
728     * callbacks that help remove stale iterators from the list.
729     *
730     * Whenever a list element is examined, it is expunged if either
731     * the GC has determined that the iterator is discarded, or if the
732     * iterator reports that it is "detached" (does not need any
733     * further state updates). Overhead is maximal when takeIndex
734     * never advances, iterators are discarded before they are
735     * exhausted, and all removals are interior removes, in which case
736     * all stale iterators are discovered by the GC. But even in this
737     * case we don't increase the amortized complexity.
738     *
739     * Care must be taken to keep list sweeping methods from
740     * reentrantly invoking another such method, causing subtle
741     * corruption bugs.
742     */
743     class Itrs {
744    
745     /**
746     * Node in a linked list of weak iterator references.
747     */
748     private class Node extends WeakReference<Itr> {
749     Node next;
750    
751     Node(Itr iterator, Node next) {
752     super(iterator);
753     this.next = next;
754     }
755     }
756    
757     /** Incremented whenever takeIndex wraps around to 0 */
758     int cycles;
759    
760     /** Linked list of weak iterator references */
761     private Node head;
762    
763     /** Used to expunge stale iterators */
764     private Node sweeper;
765    
766     private static final int SHORT_SWEEP_PROBES = 4;
767     private static final int LONG_SWEEP_PROBES = 16;
768    
769     Itrs(Itr initial) {
770     register(initial);
771     }
772    
773     /**
774     * Sweeps itrs, looking for and expunging stale iterators.
775     * If at least one was found, tries harder to find more.
776     * Called only from iterating thread.
777     *
778     * @param tryHarder whether to start in try-harder mode, because
779     * there is known to be at least one iterator to collect
780     */
781     void doSomeSweeping(boolean tryHarder) {
782     // assert lock.getHoldCount() == 1;
783     // assert head != null;
784     int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
785     Node o, p;
786     final Node sweeper = this.sweeper;
787     boolean passedGo; // to limit search to one full sweep
788    
789     if (sweeper == null) {
790     o = null;
791     p = head;
792     passedGo = true;
793     } else {
794     o = sweeper;
795     p = o.next;
796     passedGo = false;
797     }
798    
799     for (; probes > 0; probes--) {
800     if (p == null) {
801     if (passedGo)
802     break;
803     o = null;
804     p = head;
805     passedGo = true;
806     }
807     final Itr it = p.get();
808     final Node next = p.next;
809     if (it == null || it.isDetached()) {
810     // found a discarded/exhausted iterator
811     probes = LONG_SWEEP_PROBES; // "try harder"
812     // unlink p
813     p.clear();
814     p.next = null;
815     if (o == null) {
816     head = next;
817     if (next == null) {
818     // We've run out of iterators to track; retire
819     itrs = null;
820     return;
821     }
822     }
823     else
824     o.next = next;
825     } else {
826     o = p;
827     }
828     p = next;
829     }
830    
831     this.sweeper = (p == null) ? null : o;
832     }
833    
834     /**
835     * Adds a new iterator to the linked list of tracked iterators.
836     */
837     void register(Itr itr) {
838     // assert lock.getHoldCount() == 1;
839     head = new Node(itr, head);
840     }
841    
842     /**
843     * Called whenever takeIndex wraps around to 0.
844     *
845     * Notifies all iterators, and expunges any that are now stale.
846     */
847     void takeIndexWrapped() {
848     // assert lock.getHoldCount() == 1;
849     cycles++;
850     for (Node o = null, p = head; p != null;) {
851     final Itr it = p.get();
852     final Node next = p.next;
853     if (it == null || it.takeIndexWrapped()) {
854     // unlink p
855     // assert it == null || it.isDetached();
856     p.clear();
857     p.next = null;
858     if (o == null)
859     head = next;
860     else
861     o.next = next;
862     } else {
863     o = p;
864     }
865     p = next;
866     }
867     if (head == null) // no more iterators to track
868     itrs = null;
869     }
870    
871     /**
872     * Called whenever an interior remove (not at takeIndex) occurred.
873     *
874     * Notifies all iterators, and expunges any that are now stale.
875     */
876     void removedAt(int removedIndex) {
877     for (Node o = null, p = head; p != null;) {
878     final Itr it = p.get();
879     final Node next = p.next;
880     if (it == null || it.removedAt(removedIndex)) {
881     // unlink p
882     // assert it == null || it.isDetached();
883     p.clear();
884     p.next = null;
885     if (o == null)
886     head = next;
887     else
888     o.next = next;
889     } else {
890     o = p;
891     }
892     p = next;
893     }
894     if (head == null) // no more iterators to track
895     itrs = null;
896     }
897    
898     /**
899     * Called whenever the queue becomes empty.
900     *
901     * Notifies all active iterators that the queue is empty,
902     * clears all weak refs, and unlinks the itrs datastructure.
903     */
904     void queueIsEmpty() {
905     // assert lock.getHoldCount() == 1;
906     for (Node p = head; p != null; p = p.next) {
907     Itr it = p.get();
908     if (it != null) {
909     p.clear();
910     it.shutdown();
911     }
912     }
913     head = null;
914     itrs = null;
915     }
916    
917     /**
918     * Called whenever an element has been dequeued (at takeIndex).
919     */
920     void elementDequeued() {
921     // assert lock.getHoldCount() == 1;
922     if (count == 0)
923     queueIsEmpty();
924     else if (takeIndex == 0)
925     takeIndexWrapped();
926     }
927     }
928    
929     /**
930     * Iterator for ArrayBlockingQueue.
931     *
932     * To maintain weak consistency with respect to puts and takes, we
933     * read ahead one slot, so as to not report hasNext true but then
934     * not have an element to return.
935     *
936     * We switch into "detached" mode (allowing prompt unlinking from
937     * itrs without help from the GC) when all indices are negative, or
938     * when hasNext returns false for the first time. This allows the
939     * iterator to track concurrent updates completely accurately,
940     * except for the corner case of the user calling Iterator.remove()
941     * after hasNext() returned false. Even in this case, we ensure
942     * that we don't remove the wrong element by keeping track of the
943     * expected element to remove, in lastItem. Yes, we may fail to
944     * remove lastItem from the queue if it moved due to an interleaved
945     * interior remove while in detached mode.
946     */
947     private class Itr implements Iterator<E> {
948     /** Index to look for new nextItem; NONE at end */
949     private int cursor;
950    
951     /** Element to be returned by next call to next(); null if none */
952     private E nextItem;
953    
954     /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
955     private int nextIndex;
956    
957     /** Last element returned; null if none or not detached. */
958     private E lastItem;
959    
960     /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
961     private int lastRet;
962    
963     /** Previous value of takeIndex, or DETACHED when detached */
964     private int prevTakeIndex;
965    
966     /** Previous value of iters.cycles */
967     private int prevCycles;
968    
969     /** Special index value indicating "not available" or "undefined" */
970     private static final int NONE = -1;
971    
972     /**
973     * Special index value indicating "removed elsewhere", that is,
974     * removed by some operation other than a call to this.remove().
975     */
976     private static final int REMOVED = -2;
977    
978     /** Special value for prevTakeIndex indicating "detached mode" */
979     private static final int DETACHED = -3;
980    
981     Itr() {
982     // assert lock.getHoldCount() == 0;
983     lastRet = NONE;
984     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
985     lock.lock();
986     try {
987     if (count == 0) {
988     // assert itrs == null;
989     cursor = NONE;
990     nextIndex = NONE;
991     prevTakeIndex = DETACHED;
992     } else {
993     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
994     prevTakeIndex = takeIndex;
995     nextItem = itemAt(nextIndex = takeIndex);
996     cursor = incCursor(takeIndex);
997     if (itrs == null) {
998     itrs = new Itrs(this);
999     } else {
1000     itrs.register(this); // in this order
1001     itrs.doSomeSweeping(false);
1002     }
1003     prevCycles = itrs.cycles;
1004     // assert takeIndex >= 0;
1005     // assert prevTakeIndex == takeIndex;
1006     // assert nextIndex >= 0;
1007     // assert nextItem != null;
1008     }
1009     } finally {
1010     lock.unlock();
1011     }
1012     }
1013    
1014     boolean isDetached() {
1015     // assert lock.getHoldCount() == 1;
1016     return prevTakeIndex < 0;
1017     }
1018    
1019     private int incCursor(int index) {
1020     // assert lock.getHoldCount() == 1;
1021     if (++index == items.length) index = 0;
1022     if (index == putIndex) index = NONE;
1023     return index;
1024     }
1025    
1026     /**
1027     * Returns true if index is invalidated by the given number of
1028     * dequeues, starting from prevTakeIndex.
1029     */
1030     private boolean invalidated(int index, int prevTakeIndex,
1031     long dequeues, int length) {
1032     if (index < 0)
1033     return false;
1034     int distance = index - prevTakeIndex;
1035     if (distance < 0)
1036     distance += length;
1037     return dequeues > distance;
1038     }
1039    
1040     /**
1041     * Adjusts indices to incorporate all dequeues since the last
1042     * operation on this iterator. Call only from iterating thread.
1043     */
1044     private void incorporateDequeues() {
1045     // assert lock.getHoldCount() == 1;
1046     // assert itrs != null;
1047     // assert !isDetached();
1048     // assert count > 0;
1049    
1050     final int cycles = itrs.cycles;
1051     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1052     final int prevCycles = this.prevCycles;
1053     final int prevTakeIndex = this.prevTakeIndex;
1054    
1055     if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1056     final int len = items.length;
1057     // how far takeIndex has advanced since the previous
1058     // operation of this iterator
1059     long dequeues = (cycles - prevCycles) * len
1060     + (takeIndex - prevTakeIndex);
1061    
1062     // Check indices for invalidation
1063     if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1064     lastRet = REMOVED;
1065     if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1066     nextIndex = REMOVED;
1067     if (invalidated(cursor, prevTakeIndex, dequeues, len))
1068     cursor = takeIndex;
1069    
1070     if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1071     detach();
1072     else {
1073     this.prevCycles = cycles;
1074     this.prevTakeIndex = takeIndex;
1075     }
1076     }
1077     }
1078    
1079     /**
1080     * Called when itrs should stop tracking this iterator, either
1081     * because there are no more indices to update (cursor < 0 &&
1082     * nextIndex < 0 && lastRet < 0) or as a special exception, when
1083     * lastRet >= 0, because hasNext() is about to return false for the
1084     * first time. Call only from iterating thread.
1085     */
1086     private void detach() {
1087     // Switch to detached mode
1088     // assert lock.getHoldCount() == 1;
1089     // assert cursor == NONE;
1090     // assert nextIndex < 0;
1091     // assert lastRet < 0 || nextItem == null;
1092     // assert lastRet < 0 ^ lastItem != null;
1093     if (prevTakeIndex >= 0) {
1094     // assert itrs != null;
1095     prevTakeIndex = DETACHED;
1096     // try to unlink from itrs (but not too hard)
1097     itrs.doSomeSweeping(true);
1098     }
1099     }
1100    
1101     /**
1102     * For performance reasons, we would like not to acquire a lock in
1103     * hasNext in the common case. To allow for this, we only access
1104     * fields (i.e. nextItem) that are not modified by update operations
1105     * triggered by queue modifications.
1106     */
1107     public boolean hasNext() {
1108     // assert lock.getHoldCount() == 0;
1109     if (nextItem != null)
1110     return true;
1111     noNext();
1112     return false;
1113     }
1114    
1115     private void noNext() {
1116     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1117     lock.lock();
1118     try {
1119     // assert cursor == NONE;
1120     // assert nextIndex == NONE;
1121     if (!isDetached()) {
1122     // assert lastRet >= 0;
1123     incorporateDequeues(); // might update lastRet
1124     if (lastRet >= 0) {
1125     lastItem = itemAt(lastRet);
1126     // assert lastItem != null;
1127     detach();
1128     }
1129     }
1130     // assert isDetached();
1131     // assert lastRet < 0 ^ lastItem != null;
1132     } finally {
1133     lock.unlock();
1134     }
1135     }
1136    
1137     public E next() {
1138     // assert lock.getHoldCount() == 0;
1139     final E x = nextItem;
1140     if (x == null)
1141     throw new NoSuchElementException();
1142     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1143     lock.lock();
1144     try {
1145     if (!isDetached())
1146     incorporateDequeues();
1147     // assert nextIndex != NONE;
1148     // assert lastItem == null;
1149     lastRet = nextIndex;
1150     final int cursor = this.cursor;
1151     if (cursor >= 0) {
1152     nextItem = itemAt(nextIndex = cursor);
1153     // assert nextItem != null;
1154     this.cursor = incCursor(cursor);
1155     } else {
1156     nextIndex = NONE;
1157     nextItem = null;
1158     }
1159     } finally {
1160     lock.unlock();
1161     }
1162     return x;
1163     }
1164    
1165     public void remove() {
1166     // assert lock.getHoldCount() == 0;
1167     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1168     lock.lock();
1169     try {
1170     if (!isDetached())
1171     incorporateDequeues(); // might update lastRet or detach
1172     final int lastRet = this.lastRet;
1173     this.lastRet = NONE;
1174     if (lastRet >= 0) {
1175     if (!isDetached())
1176     removeAt(lastRet);
1177     else {
1178     final E lastItem = this.lastItem;
1179     // assert lastItem != null;
1180     this.lastItem = null;
1181     if (itemAt(lastRet) == lastItem)
1182     removeAt(lastRet);
1183     }
1184     } else if (lastRet == NONE)
1185     throw new IllegalStateException();
1186     // else lastRet == REMOVED and the last returned element was
1187     // previously asynchronously removed via an operation other
1188     // than this.remove(), so nothing to do.
1189    
1190     if (cursor < 0 && nextIndex < 0)
1191     detach();
1192     } finally {
1193     lock.unlock();
1194     // assert lastRet == NONE;
1195     // assert lastItem == null;
1196     }
1197     }
1198    
1199     /**
1200     * Called to notify the iterator that the queue is empty, or that it
1201     * has fallen hopelessly behind, so that it should abandon any
1202     * further iteration, except possibly to return one more element
1203     * from next(), as promised by returning true from hasNext().
1204     */
1205     void shutdown() {
1206     // assert lock.getHoldCount() == 1;
1207     cursor = NONE;
1208     if (nextIndex >= 0)
1209     nextIndex = REMOVED;
1210     if (lastRet >= 0) {
1211     lastRet = REMOVED;
1212     lastItem = null;
1213     }
1214     prevTakeIndex = DETACHED;
1215     // Don't set nextItem to null because we must continue to be
1216     // able to return it on next().
1217     //
1218     // Caller will unlink from itrs when convenient.
1219     }
1220    
1221     private int distance(int index, int prevTakeIndex, int length) {
1222     int distance = index - prevTakeIndex;
1223     if (distance < 0)
1224     distance += length;
1225     return distance;
1226     }
1227    
1228     /**
1229     * Called whenever an interior remove (not at takeIndex) occurred.
1230     *
1231     * @return true if this iterator should be unlinked from itrs
1232     */
1233     boolean removedAt(int removedIndex) {
1234     // assert lock.getHoldCount() == 1;
1235     if (isDetached())
1236     return true;
1237    
1238     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1239     final int prevTakeIndex = this.prevTakeIndex;
1240     final int len = items.length;
1241     // distance from prevTakeIndex to removedIndex
1242     final int removedDistance =
1243     len * (itrs.cycles - this.prevCycles
1244     + ((removedIndex < takeIndex) ? 1 : 0))
1245     + (removedIndex - prevTakeIndex);
1246     // assert itrs.cycles - this.prevCycles >= 0;
1247     // assert itrs.cycles - this.prevCycles <= 1;
1248     // assert removedDistance > 0;
1249     // assert removedIndex != takeIndex;
1250     int cursor = this.cursor;
1251     if (cursor >= 0) {
1252     int x = distance(cursor, prevTakeIndex, len);
1253     if (x == removedDistance) {
1254     if (cursor == putIndex)
1255     this.cursor = cursor = NONE;
1256     }
1257     else if (x > removedDistance) {
1258     // assert cursor != prevTakeIndex;
1259     this.cursor = cursor = dec(cursor);
1260     }
1261     }
1262     int lastRet = this.lastRet;
1263     if (lastRet >= 0) {
1264     int x = distance(lastRet, prevTakeIndex, len);
1265     if (x == removedDistance)
1266     this.lastRet = lastRet = REMOVED;
1267     else if (x > removedDistance)
1268     this.lastRet = lastRet = dec(lastRet);
1269     }
1270     int nextIndex = this.nextIndex;
1271     if (nextIndex >= 0) {
1272     int x = distance(nextIndex, prevTakeIndex, len);
1273     if (x == removedDistance)
1274     this.nextIndex = nextIndex = REMOVED;
1275     else if (x > removedDistance)
1276     this.nextIndex = nextIndex = dec(nextIndex);
1277     }
1278     if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1279     this.prevTakeIndex = DETACHED;
1280     return true;
1281     }
1282     return false;
1283     }
1284    
1285     /**
1286     * Called whenever takeIndex wraps around to zero.
1287     *
1288     * @return true if this iterator should be unlinked from itrs
1289     */
1290     boolean takeIndexWrapped() {
1291     // assert lock.getHoldCount() == 1;
1292     if (isDetached())
1293     return true;
1294     if (itrs.cycles - prevCycles > 1) {
1295     // All the elements that existed at the time of the last
1296     // operation are gone, so abandon further iteration.
1297     shutdown();
1298     return true;
1299     }
1300     return false;
1301     }
1302    
1303     // /** Uncomment for debugging. */
1304     // public String toString() {
1305     // return ("cursor=" + cursor + " " +
1306     // "nextIndex=" + nextIndex + " " +
1307     // "lastRet=" + lastRet + " " +
1308     // "nextItem=" + nextItem + " " +
1309     // "lastItem=" + lastItem + " " +
1310     // "prevCycles=" + prevCycles + " " +
1311     // "prevTakeIndex=" + prevTakeIndex + " " +
1312     // "size()=" + size() + " " +
1313     // "remainingCapacity()=" + remainingCapacity());
1314     // }
1315     }
1316    
1317     /**
1318     * Returns a {@link Spliterator} over the elements in this queue.
1319     *
1320     * <p>The returned spliterator is
1321     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1322     *
1323     * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1324     * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1325     *
1326     * @implNote
1327     * The {@code Spliterator} implements {@code trySplit} to permit limited
1328     * parallelism.
1329     *
1330     * @return a {@code Spliterator} over the elements in this queue
1331     * @since 1.8
1332     */
1333     public Spliterator<E> spliterator() {
1334     return Spliterators.spliterator
1335     (this, (Spliterator.ORDERED |
1336     Spliterator.NONNULL |
1337     Spliterator.CONCURRENT));
1338     }
1339    
1340     }