ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.2
Committed: Mon Mar 6 01:00:45 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +314 -59 lines
Log Message:
sync from src/main

File Contents

# User Rev Content
1 jsr166 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8    
9     import java.lang.ref.WeakReference;
10     import java.util.AbstractQueue;
11     import java.util.Arrays;
12     import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15     import java.util.Objects;
16     import java.util.Spliterator;
17     import java.util.Spliterators;
18     import java.util.concurrent.locks.Condition;
19     import java.util.concurrent.locks.ReentrantLock;
20 jsr166 1.2 import java.util.function.Consumer;
21     import java.util.function.Predicate;
22 jsr166 1.1
23     /**
24     * A bounded {@linkplain BlockingQueue blocking queue} backed by an
25     * array. This queue orders elements FIFO (first-in-first-out). The
26     * <em>head</em> of the queue is that element that has been on the
27     * queue the longest time. The <em>tail</em> of the queue is that
28     * element that has been on the queue the shortest time. New elements
29     * are inserted at the tail of the queue, and the queue retrieval
30     * operations obtain elements at the head of the queue.
31     *
32     * <p>This is a classic &quot;bounded buffer&quot;, in which a
33     * fixed-sized array holds elements inserted by producers and
34     * extracted by consumers. Once created, the capacity cannot be
35     * changed. Attempts to {@code put} an element into a full queue
36     * will result in the operation blocking; attempts to {@code take} an
37     * element from an empty queue will similarly block.
38     *
39     * <p>This class supports an optional fairness policy for ordering
40     * waiting producer and consumer threads. By default, this ordering
41     * is not guaranteed. However, a queue constructed with fairness set
42     * to {@code true} grants threads access in FIFO order. Fairness
43     * generally decreases throughput but reduces variability and avoids
44     * starvation.
45     *
46 jsr166 1.2 * <p>This class and its iterator implement all of the <em>optional</em>
47     * methods of the {@link Collection} and {@link Iterator} interfaces.
48 jsr166 1.1 *
49     * <p>This class is a member of the
50     * <a href="{@docRoot}/../technotes/guides/collections/index.html">
51     * Java Collections Framework</a>.
52     *
53     * @since 1.5
54     * @author Doug Lea
55     * @param <E> the type of elements held in this queue
56     */
57     public class ArrayBlockingQueue<E> extends AbstractQueue<E>
58     implements BlockingQueue<E>, java.io.Serializable {
59    
60 jsr166 1.2 /*
61     * Much of the implementation mechanics, especially the unusual
62     * nested loops, are shared and co-maintained with ArrayDeque.
63     */
64    
65 jsr166 1.1 /**
66     * Serialization ID. This class relies on default serialization
67     * even for the items array, which is default-serialized, even if
68     * it is empty. Otherwise it could not be declared final, which is
69     * necessary here.
70     */
71     private static final long serialVersionUID = -817911632652898426L;
72    
73     /** The queued items */
74     final Object[] items;
75    
76     /** items index for next take, poll, peek or remove */
77     int takeIndex;
78    
79     /** items index for next put, offer, or add */
80     int putIndex;
81    
82     /** Number of elements in the queue */
83     int count;
84    
85     /*
86     * Concurrency control uses the classic two-condition algorithm
87     * found in any textbook.
88     */
89    
90     /** Main lock guarding all access */
91     final ReentrantLock lock;
92    
93     /** Condition for waiting takes */
94     private final Condition notEmpty;
95    
96     /** Condition for waiting puts */
97     private final Condition notFull;
98    
99     /**
100     * Shared state for currently active iterators, or null if there
101     * are known not to be any. Allows queue operations to update
102     * iterator state.
103     */
104     transient Itrs itrs;
105    
106     // Internal helper methods
107    
108     /**
109 jsr166 1.2 * Increments i, mod modulus.
110     * Precondition and postcondition: 0 <= i < modulus.
111     */
112     static final int inc(int i, int modulus) {
113     if (++i >= modulus) i = 0;
114     return i;
115     }
116    
117     /**
118     * Decrements i, mod modulus.
119     * Precondition and postcondition: 0 <= i < modulus.
120 jsr166 1.1 */
121 jsr166 1.2 static final int dec(int i, int modulus) {
122     if (--i < 0) i = modulus - 1;
123     return i;
124 jsr166 1.1 }
125    
126     /**
127     * Returns item at index i.
128     */
129     @SuppressWarnings("unchecked")
130     final E itemAt(int i) {
131     return (E) items[i];
132     }
133    
134     /**
135 jsr166 1.2 * Returns element at array index i.
136     * This is a slight abuse of generics, accepted by javac.
137     */
138     @SuppressWarnings("unchecked")
139     static <E> E itemAt(Object[] items, int i) {
140     return (E) items[i];
141     }
142    
143     /**
144 jsr166 1.1 * Inserts element at current put position, advances, and signals.
145     * Call only when holding lock.
146     */
147 jsr166 1.2 private void enqueue(E e) {
148     // assert lock.isHeldByCurrentThread();
149 jsr166 1.1 // assert lock.getHoldCount() == 1;
150     // assert items[putIndex] == null;
151     final Object[] items = this.items;
152 jsr166 1.2 items[putIndex] = e;
153 jsr166 1.1 if (++putIndex == items.length) putIndex = 0;
154     count++;
155     notEmpty.signal();
156 jsr166 1.2 // checkInvariants();
157 jsr166 1.1 }
158    
159     /**
160     * Extracts element at current take position, advances, and signals.
161     * Call only when holding lock.
162     */
163     private E dequeue() {
164 jsr166 1.2 // assert lock.isHeldByCurrentThread();
165 jsr166 1.1 // assert lock.getHoldCount() == 1;
166     // assert items[takeIndex] != null;
167     final Object[] items = this.items;
168     @SuppressWarnings("unchecked")
169 jsr166 1.2 E e = (E) items[takeIndex];
170 jsr166 1.1 items[takeIndex] = null;
171     if (++takeIndex == items.length) takeIndex = 0;
172     count--;
173     if (itrs != null)
174     itrs.elementDequeued();
175     notFull.signal();
176 jsr166 1.2 // checkInvariants();
177     return e;
178 jsr166 1.1 }
179    
180     /**
181     * Deletes item at array index removeIndex.
182     * Utility for remove(Object) and iterator.remove.
183     * Call only when holding lock.
184     */
185     void removeAt(final int removeIndex) {
186 jsr166 1.2 // assert lock.isHeldByCurrentThread();
187 jsr166 1.1 // assert lock.getHoldCount() == 1;
188     // assert items[removeIndex] != null;
189     // assert removeIndex >= 0 && removeIndex < items.length;
190     final Object[] items = this.items;
191     if (removeIndex == takeIndex) {
192     // removing front item; just advance
193     items[takeIndex] = null;
194     if (++takeIndex == items.length) takeIndex = 0;
195     count--;
196     if (itrs != null)
197     itrs.elementDequeued();
198     } else {
199     // an "interior" remove
200    
201     // slide over all others up through putIndex.
202     for (int i = removeIndex, putIndex = this.putIndex;;) {
203     int pred = i;
204     if (++i == items.length) i = 0;
205     if (i == putIndex) {
206     items[pred] = null;
207     this.putIndex = pred;
208     break;
209     }
210     items[pred] = items[i];
211     }
212     count--;
213     if (itrs != null)
214     itrs.removedAt(removeIndex);
215     }
216     notFull.signal();
217 jsr166 1.2 // checkInvariants();
218 jsr166 1.1 }
219    
220     /**
221     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
222     * capacity and default access policy.
223     *
224     * @param capacity the capacity of this queue
225     * @throws IllegalArgumentException if {@code capacity < 1}
226     */
227     public ArrayBlockingQueue(int capacity) {
228     this(capacity, false);
229     }
230    
231     /**
232     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
233     * capacity and the specified access policy.
234     *
235     * @param capacity the capacity of this queue
236     * @param fair if {@code true} then queue accesses for threads blocked
237     * on insertion or removal, are processed in FIFO order;
238     * if {@code false} the access order is unspecified.
239     * @throws IllegalArgumentException if {@code capacity < 1}
240     */
241     public ArrayBlockingQueue(int capacity, boolean fair) {
242     if (capacity <= 0)
243     throw new IllegalArgumentException();
244     this.items = new Object[capacity];
245     lock = new ReentrantLock(fair);
246     notEmpty = lock.newCondition();
247     notFull = lock.newCondition();
248     }
249    
250     /**
251     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
252     * capacity, the specified access policy and initially containing the
253     * elements of the given collection,
254     * added in traversal order of the collection's iterator.
255     *
256     * @param capacity the capacity of this queue
257     * @param fair if {@code true} then queue accesses for threads blocked
258     * on insertion or removal, are processed in FIFO order;
259     * if {@code false} the access order is unspecified.
260     * @param c the collection of elements to initially contain
261     * @throws IllegalArgumentException if {@code capacity} is less than
262     * {@code c.size()}, or less than 1.
263     * @throws NullPointerException if the specified collection or any
264     * of its elements are null
265     */
266     public ArrayBlockingQueue(int capacity, boolean fair,
267     Collection<? extends E> c) {
268     this(capacity, fair);
269    
270     final ReentrantLock lock = this.lock;
271     lock.lock(); // Lock only for visibility, not mutual exclusion
272     try {
273 jsr166 1.2 final Object[] items = this.items;
274 jsr166 1.1 int i = 0;
275     try {
276     for (E e : c)
277     items[i++] = Objects.requireNonNull(e);
278     } catch (ArrayIndexOutOfBoundsException ex) {
279     throw new IllegalArgumentException();
280     }
281     count = i;
282     putIndex = (i == capacity) ? 0 : i;
283 jsr166 1.2 // checkInvariants();
284 jsr166 1.1 } finally {
285     lock.unlock();
286     }
287     }
288    
289     /**
290     * Inserts the specified element at the tail of this queue if it is
291     * possible to do so immediately without exceeding the queue's capacity,
292     * returning {@code true} upon success and throwing an
293     * {@code IllegalStateException} if this queue is full.
294     *
295     * @param e the element to add
296     * @return {@code true} (as specified by {@link Collection#add})
297     * @throws IllegalStateException if this queue is full
298     * @throws NullPointerException if the specified element is null
299     */
300     public boolean add(E e) {
301     return super.add(e);
302     }
303    
304     /**
305     * Inserts the specified element at the tail of this queue if it is
306     * possible to do so immediately without exceeding the queue's capacity,
307     * returning {@code true} upon success and {@code false} if this queue
308     * is full. This method is generally preferable to method {@link #add},
309     * which can fail to insert an element only by throwing an exception.
310     *
311     * @throws NullPointerException if the specified element is null
312     */
313     public boolean offer(E e) {
314     Objects.requireNonNull(e);
315     final ReentrantLock lock = this.lock;
316     lock.lock();
317     try {
318     if (count == items.length)
319     return false;
320     else {
321     enqueue(e);
322     return true;
323     }
324     } finally {
325     lock.unlock();
326     }
327     }
328    
329     /**
330     * Inserts the specified element at the tail of this queue, waiting
331     * for space to become available if the queue is full.
332     *
333     * @throws InterruptedException {@inheritDoc}
334     * @throws NullPointerException {@inheritDoc}
335     */
336     public void put(E e) throws InterruptedException {
337     Objects.requireNonNull(e);
338     final ReentrantLock lock = this.lock;
339     lock.lockInterruptibly();
340     try {
341     while (count == items.length)
342     notFull.await();
343     enqueue(e);
344     } finally {
345     lock.unlock();
346     }
347     }
348    
349     /**
350     * Inserts the specified element at the tail of this queue, waiting
351     * up to the specified wait time for space to become available if
352     * the queue is full.
353     *
354     * @throws InterruptedException {@inheritDoc}
355     * @throws NullPointerException {@inheritDoc}
356     */
357     public boolean offer(E e, long timeout, TimeUnit unit)
358     throws InterruptedException {
359    
360     Objects.requireNonNull(e);
361     long nanos = unit.toNanos(timeout);
362     final ReentrantLock lock = this.lock;
363     lock.lockInterruptibly();
364     try {
365     while (count == items.length) {
366     if (nanos <= 0L)
367     return false;
368     nanos = notFull.awaitNanos(nanos);
369     }
370     enqueue(e);
371     return true;
372     } finally {
373 jsr166 1.2 // checkInvariants();
374 jsr166 1.1 lock.unlock();
375     }
376     }
377    
378     public E poll() {
379     final ReentrantLock lock = this.lock;
380     lock.lock();
381     try {
382     return (count == 0) ? null : dequeue();
383     } finally {
384     lock.unlock();
385     }
386     }
387    
388     public E take() throws InterruptedException {
389     final ReentrantLock lock = this.lock;
390     lock.lockInterruptibly();
391     try {
392     while (count == 0)
393     notEmpty.await();
394     return dequeue();
395     } finally {
396     lock.unlock();
397     }
398     }
399    
400     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
401     long nanos = unit.toNanos(timeout);
402     final ReentrantLock lock = this.lock;
403     lock.lockInterruptibly();
404     try {
405     while (count == 0) {
406     if (nanos <= 0L)
407     return null;
408     nanos = notEmpty.awaitNanos(nanos);
409     }
410     return dequeue();
411     } finally {
412 jsr166 1.2 // checkInvariants();
413 jsr166 1.1 lock.unlock();
414     }
415     }
416    
417     public E peek() {
418     final ReentrantLock lock = this.lock;
419     lock.lock();
420     try {
421     return itemAt(takeIndex); // null when queue is empty
422     } finally {
423     lock.unlock();
424     }
425     }
426    
427     // this doc comment is overridden to remove the reference to collections
428     // greater in size than Integer.MAX_VALUE
429     /**
430     * Returns the number of elements in this queue.
431     *
432     * @return the number of elements in this queue
433     */
434     public int size() {
435     final ReentrantLock lock = this.lock;
436     lock.lock();
437     try {
438     return count;
439     } finally {
440     lock.unlock();
441     }
442     }
443    
444     // this doc comment is a modified copy of the inherited doc comment,
445     // without the reference to unlimited queues.
446     /**
447     * Returns the number of additional elements that this queue can ideally
448     * (in the absence of memory or resource constraints) accept without
449     * blocking. This is always equal to the initial capacity of this queue
450     * less the current {@code size} of this queue.
451     *
452     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
453     * an element will succeed by inspecting {@code remainingCapacity}
454     * because it may be the case that another thread is about to
455     * insert or remove an element.
456     */
457     public int remainingCapacity() {
458     final ReentrantLock lock = this.lock;
459     lock.lock();
460     try {
461     return items.length - count;
462     } finally {
463     lock.unlock();
464     }
465     }
466    
467     /**
468     * Removes a single instance of the specified element from this queue,
469     * if it is present. More formally, removes an element {@code e} such
470     * that {@code o.equals(e)}, if this queue contains one or more such
471     * elements.
472     * Returns {@code true} if this queue contained the specified element
473     * (or equivalently, if this queue changed as a result of the call).
474     *
475     * <p>Removal of interior elements in circular array based queues
476     * is an intrinsically slow and disruptive operation, so should
477     * be undertaken only in exceptional circumstances, ideally
478     * only when the queue is known not to be accessible by other
479     * threads.
480     *
481     * @param o element to be removed from this queue, if present
482     * @return {@code true} if this queue changed as a result of the call
483     */
484     public boolean remove(Object o) {
485     if (o == null) return false;
486     final ReentrantLock lock = this.lock;
487     lock.lock();
488     try {
489     if (count > 0) {
490     final Object[] items = this.items;
491 jsr166 1.2 for (int i = takeIndex, end = putIndex,
492     to = (i < end) ? end : items.length;
493     ; i = 0, to = end) {
494     for (; i < to; i++)
495     if (o.equals(items[i])) {
496     removeAt(i);
497     return true;
498     }
499     if (to == end) break;
500     }
501 jsr166 1.1 }
502     return false;
503     } finally {
504 jsr166 1.2 // checkInvariants();
505 jsr166 1.1 lock.unlock();
506     }
507     }
508    
509     /**
510     * Returns {@code true} if this queue contains the specified element.
511     * More formally, returns {@code true} if and only if this queue contains
512     * at least one element {@code e} such that {@code o.equals(e)}.
513     *
514     * @param o object to be checked for containment in this queue
515     * @return {@code true} if this queue contains the specified element
516     */
517     public boolean contains(Object o) {
518     if (o == null) return false;
519     final ReentrantLock lock = this.lock;
520     lock.lock();
521     try {
522     if (count > 0) {
523     final Object[] items = this.items;
524 jsr166 1.2 for (int i = takeIndex, end = putIndex,
525     to = (i < end) ? end : items.length;
526     ; i = 0, to = end) {
527     for (; i < to; i++)
528     if (o.equals(items[i]))
529     return true;
530     if (to == end) break;
531     }
532 jsr166 1.1 }
533     return false;
534     } finally {
535 jsr166 1.2 // checkInvariants();
536 jsr166 1.1 lock.unlock();
537     }
538     }
539    
540     /**
541     * Returns an array containing all of the elements in this queue, in
542     * proper sequence.
543     *
544     * <p>The returned array will be "safe" in that no references to it are
545     * maintained by this queue. (In other words, this method must allocate
546     * a new array). The caller is thus free to modify the returned array.
547     *
548     * <p>This method acts as bridge between array-based and collection-based
549     * APIs.
550     *
551     * @return an array containing all of the elements in this queue
552     */
553     public Object[] toArray() {
554     final ReentrantLock lock = this.lock;
555     lock.lock();
556     try {
557     final Object[] items = this.items;
558     final int end = takeIndex + count;
559     final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
560     if (end != putIndex)
561     System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
562     return a;
563     } finally {
564     lock.unlock();
565     }
566     }
567    
568     /**
569     * Returns an array containing all of the elements in this queue, in
570     * proper sequence; the runtime type of the returned array is that of
571     * the specified array. If the queue fits in the specified array, it
572     * is returned therein. Otherwise, a new array is allocated with the
573     * runtime type of the specified array and the size of this queue.
574     *
575     * <p>If this queue fits in the specified array with room to spare
576     * (i.e., the array has more elements than this queue), the element in
577     * the array immediately following the end of the queue is set to
578     * {@code null}.
579     *
580     * <p>Like the {@link #toArray()} method, this method acts as bridge between
581     * array-based and collection-based APIs. Further, this method allows
582     * precise control over the runtime type of the output array, and may,
583     * under certain circumstances, be used to save allocation costs.
584     *
585     * <p>Suppose {@code x} is a queue known to contain only strings.
586     * The following code can be used to dump the queue into a newly
587     * allocated array of {@code String}:
588     *
589     * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
590     *
591     * Note that {@code toArray(new Object[0])} is identical in function to
592     * {@code toArray()}.
593     *
594     * @param a the array into which the elements of the queue are to
595     * be stored, if it is big enough; otherwise, a new array of the
596     * same runtime type is allocated for this purpose
597     * @return an array containing all of the elements in this queue
598     * @throws ArrayStoreException if the runtime type of the specified array
599     * is not a supertype of the runtime type of every element in
600     * this queue
601     * @throws NullPointerException if the specified array is null
602     */
603     @SuppressWarnings("unchecked")
604     public <T> T[] toArray(T[] a) {
605     final ReentrantLock lock = this.lock;
606     lock.lock();
607     try {
608     final Object[] items = this.items;
609     final int count = this.count;
610     final int firstLeg = Math.min(items.length - takeIndex, count);
611     if (a.length < count) {
612     a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
613     a.getClass());
614     } else {
615     System.arraycopy(items, takeIndex, a, 0, firstLeg);
616     if (a.length > count)
617     a[count] = null;
618     }
619     if (firstLeg < count)
620     System.arraycopy(items, 0, a, firstLeg, putIndex);
621     return a;
622     } finally {
623     lock.unlock();
624     }
625     }
626    
627     public String toString() {
628     return Helpers.collectionToString(this);
629     }
630    
631     /**
632     * Atomically removes all of the elements from this queue.
633     * The queue will be empty after this call returns.
634     */
635     public void clear() {
636     final ReentrantLock lock = this.lock;
637     lock.lock();
638     try {
639 jsr166 1.2 int k;
640     if ((k = count) > 0) {
641     circularClear(items, takeIndex, putIndex);
642 jsr166 1.1 takeIndex = putIndex;
643     count = 0;
644     if (itrs != null)
645     itrs.queueIsEmpty();
646     for (; k > 0 && lock.hasWaiters(notFull); k--)
647     notFull.signal();
648     }
649     } finally {
650 jsr166 1.2 // checkInvariants();
651 jsr166 1.1 lock.unlock();
652     }
653     }
654    
655     /**
656 jsr166 1.2 * Nulls out slots starting at array index i, upto index end.
657     * Condition i == end means "full" - the entire array is cleared.
658     */
659     private static void circularClear(Object[] items, int i, int end) {
660     // assert 0 <= i && i < items.length;
661     // assert 0 <= end && end < items.length;
662     for (int to = (i < end) ? end : items.length;
663     ; i = 0, to = end) {
664     for (; i < to; i++) items[i] = null;
665     if (to == end) break;
666     }
667     }
668    
669     /**
670 jsr166 1.1 * @throws UnsupportedOperationException {@inheritDoc}
671     * @throws ClassCastException {@inheritDoc}
672     * @throws NullPointerException {@inheritDoc}
673     * @throws IllegalArgumentException {@inheritDoc}
674     */
675     public int drainTo(Collection<? super E> c) {
676     return drainTo(c, Integer.MAX_VALUE);
677     }
678    
679     /**
680     * @throws UnsupportedOperationException {@inheritDoc}
681     * @throws ClassCastException {@inheritDoc}
682     * @throws NullPointerException {@inheritDoc}
683     * @throws IllegalArgumentException {@inheritDoc}
684     */
685     public int drainTo(Collection<? super E> c, int maxElements) {
686     Objects.requireNonNull(c);
687     if (c == this)
688     throw new IllegalArgumentException();
689     if (maxElements <= 0)
690     return 0;
691     final Object[] items = this.items;
692     final ReentrantLock lock = this.lock;
693     lock.lock();
694     try {
695     int n = Math.min(maxElements, count);
696     int take = takeIndex;
697     int i = 0;
698     try {
699     while (i < n) {
700     @SuppressWarnings("unchecked")
701 jsr166 1.2 E e = (E) items[take];
702     c.add(e);
703 jsr166 1.1 items[take] = null;
704     if (++take == items.length) take = 0;
705     i++;
706     }
707     return n;
708     } finally {
709     // Restore invariants even if c.add() threw
710     if (i > 0) {
711     count -= i;
712     takeIndex = take;
713     if (itrs != null) {
714     if (count == 0)
715     itrs.queueIsEmpty();
716     else if (i > take)
717     itrs.takeIndexWrapped();
718     }
719     for (; i > 0 && lock.hasWaiters(notFull); i--)
720     notFull.signal();
721     }
722     }
723     } finally {
724 jsr166 1.2 // checkInvariants();
725 jsr166 1.1 lock.unlock();
726     }
727     }
728    
729     /**
730     * Returns an iterator over the elements in this queue in proper sequence.
731     * The elements will be returned in order from first (head) to last (tail).
732     *
733     * <p>The returned iterator is
734     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
735     *
736     * @return an iterator over the elements in this queue in proper sequence
737     */
738     public Iterator<E> iterator() {
739     return new Itr();
740     }
741    
742     /**
743     * Shared data between iterators and their queue, allowing queue
744     * modifications to update iterators when elements are removed.
745     *
746     * This adds a lot of complexity for the sake of correctly
747     * handling some uncommon operations, but the combination of
748     * circular-arrays and supporting interior removes (i.e., those
749     * not at head) would cause iterators to sometimes lose their
750     * places and/or (re)report elements they shouldn't. To avoid
751     * this, when a queue has one or more iterators, it keeps iterator
752     * state consistent by:
753     *
754     * (1) keeping track of the number of "cycles", that is, the
755     * number of times takeIndex has wrapped around to 0.
756     * (2) notifying all iterators via the callback removedAt whenever
757     * an interior element is removed (and thus other elements may
758     * be shifted).
759     *
760     * These suffice to eliminate iterator inconsistencies, but
761     * unfortunately add the secondary responsibility of maintaining
762     * the list of iterators. We track all active iterators in a
763     * simple linked list (accessed only when the queue's lock is
764     * held) of weak references to Itr. The list is cleaned up using
765     * 3 different mechanisms:
766     *
767     * (1) Whenever a new iterator is created, do some O(1) checking for
768     * stale list elements.
769     *
770     * (2) Whenever takeIndex wraps around to 0, check for iterators
771     * that have been unused for more than one wrap-around cycle.
772     *
773     * (3) Whenever the queue becomes empty, all iterators are notified
774     * and this entire data structure is discarded.
775     *
776     * So in addition to the removedAt callback that is necessary for
777     * correctness, iterators have the shutdown and takeIndexWrapped
778     * callbacks that help remove stale iterators from the list.
779     *
780     * Whenever a list element is examined, it is expunged if either
781     * the GC has determined that the iterator is discarded, or if the
782     * iterator reports that it is "detached" (does not need any
783     * further state updates). Overhead is maximal when takeIndex
784     * never advances, iterators are discarded before they are
785     * exhausted, and all removals are interior removes, in which case
786     * all stale iterators are discovered by the GC. But even in this
787     * case we don't increase the amortized complexity.
788     *
789     * Care must be taken to keep list sweeping methods from
790     * reentrantly invoking another such method, causing subtle
791     * corruption bugs.
792     */
793     class Itrs {
794    
795     /**
796     * Node in a linked list of weak iterator references.
797     */
798     private class Node extends WeakReference<Itr> {
799     Node next;
800    
801     Node(Itr iterator, Node next) {
802     super(iterator);
803     this.next = next;
804     }
805     }
806    
807     /** Incremented whenever takeIndex wraps around to 0 */
808     int cycles;
809    
810     /** Linked list of weak iterator references */
811     private Node head;
812    
813     /** Used to expunge stale iterators */
814     private Node sweeper;
815    
816     private static final int SHORT_SWEEP_PROBES = 4;
817     private static final int LONG_SWEEP_PROBES = 16;
818    
819     Itrs(Itr initial) {
820     register(initial);
821     }
822    
823     /**
824     * Sweeps itrs, looking for and expunging stale iterators.
825     * If at least one was found, tries harder to find more.
826     * Called only from iterating thread.
827     *
828     * @param tryHarder whether to start in try-harder mode, because
829     * there is known to be at least one iterator to collect
830     */
831     void doSomeSweeping(boolean tryHarder) {
832 jsr166 1.2 // assert lock.isHeldByCurrentThread();
833 jsr166 1.1 // assert head != null;
834     int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
835     Node o, p;
836     final Node sweeper = this.sweeper;
837     boolean passedGo; // to limit search to one full sweep
838    
839     if (sweeper == null) {
840     o = null;
841     p = head;
842     passedGo = true;
843     } else {
844     o = sweeper;
845     p = o.next;
846     passedGo = false;
847     }
848    
849     for (; probes > 0; probes--) {
850     if (p == null) {
851     if (passedGo)
852     break;
853     o = null;
854     p = head;
855     passedGo = true;
856     }
857     final Itr it = p.get();
858     final Node next = p.next;
859     if (it == null || it.isDetached()) {
860     // found a discarded/exhausted iterator
861     probes = LONG_SWEEP_PROBES; // "try harder"
862     // unlink p
863     p.clear();
864     p.next = null;
865     if (o == null) {
866     head = next;
867     if (next == null) {
868     // We've run out of iterators to track; retire
869     itrs = null;
870     return;
871     }
872     }
873     else
874     o.next = next;
875     } else {
876     o = p;
877     }
878     p = next;
879     }
880    
881     this.sweeper = (p == null) ? null : o;
882     }
883    
884     /**
885     * Adds a new iterator to the linked list of tracked iterators.
886     */
887     void register(Itr itr) {
888 jsr166 1.2 // assert lock.isHeldByCurrentThread();
889 jsr166 1.1 head = new Node(itr, head);
890     }
891    
892     /**
893     * Called whenever takeIndex wraps around to 0.
894     *
895     * Notifies all iterators, and expunges any that are now stale.
896     */
897     void takeIndexWrapped() {
898 jsr166 1.2 // assert lock.isHeldByCurrentThread();
899 jsr166 1.1 cycles++;
900     for (Node o = null, p = head; p != null;) {
901     final Itr it = p.get();
902     final Node next = p.next;
903     if (it == null || it.takeIndexWrapped()) {
904     // unlink p
905     // assert it == null || it.isDetached();
906     p.clear();
907     p.next = null;
908     if (o == null)
909     head = next;
910     else
911     o.next = next;
912     } else {
913     o = p;
914     }
915     p = next;
916     }
917     if (head == null) // no more iterators to track
918     itrs = null;
919     }
920    
921     /**
922     * Called whenever an interior remove (not at takeIndex) occurred.
923     *
924     * Notifies all iterators, and expunges any that are now stale.
925     */
926     void removedAt(int removedIndex) {
927     for (Node o = null, p = head; p != null;) {
928     final Itr it = p.get();
929     final Node next = p.next;
930     if (it == null || it.removedAt(removedIndex)) {
931     // unlink p
932     // assert it == null || it.isDetached();
933     p.clear();
934     p.next = null;
935     if (o == null)
936     head = next;
937     else
938     o.next = next;
939     } else {
940     o = p;
941     }
942     p = next;
943     }
944     if (head == null) // no more iterators to track
945     itrs = null;
946     }
947    
948     /**
949     * Called whenever the queue becomes empty.
950     *
951     * Notifies all active iterators that the queue is empty,
952     * clears all weak refs, and unlinks the itrs datastructure.
953     */
954     void queueIsEmpty() {
955 jsr166 1.2 // assert lock.isHeldByCurrentThread();
956 jsr166 1.1 for (Node p = head; p != null; p = p.next) {
957     Itr it = p.get();
958     if (it != null) {
959     p.clear();
960     it.shutdown();
961     }
962     }
963     head = null;
964     itrs = null;
965     }
966    
967     /**
968     * Called whenever an element has been dequeued (at takeIndex).
969     */
970     void elementDequeued() {
971 jsr166 1.2 // assert lock.isHeldByCurrentThread();
972 jsr166 1.1 if (count == 0)
973     queueIsEmpty();
974     else if (takeIndex == 0)
975     takeIndexWrapped();
976     }
977     }
978    
979     /**
980     * Iterator for ArrayBlockingQueue.
981     *
982     * To maintain weak consistency with respect to puts and takes, we
983     * read ahead one slot, so as to not report hasNext true but then
984     * not have an element to return.
985     *
986     * We switch into "detached" mode (allowing prompt unlinking from
987     * itrs without help from the GC) when all indices are negative, or
988     * when hasNext returns false for the first time. This allows the
989     * iterator to track concurrent updates completely accurately,
990     * except for the corner case of the user calling Iterator.remove()
991     * after hasNext() returned false. Even in this case, we ensure
992     * that we don't remove the wrong element by keeping track of the
993     * expected element to remove, in lastItem. Yes, we may fail to
994     * remove lastItem from the queue if it moved due to an interleaved
995     * interior remove while in detached mode.
996 jsr166 1.2 *
997     * Method forEachRemaining, added in Java 8, is treated similarly
998     * to hasNext returning false, in that we switch to detached mode,
999     * but we regard it as an even stronger request to "close" this
1000     * iteration, and don't bother supporting subsequent remove().
1001 jsr166 1.1 */
1002     private class Itr implements Iterator<E> {
1003     /** Index to look for new nextItem; NONE at end */
1004     private int cursor;
1005    
1006     /** Element to be returned by next call to next(); null if none */
1007     private E nextItem;
1008    
1009     /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
1010     private int nextIndex;
1011    
1012     /** Last element returned; null if none or not detached. */
1013     private E lastItem;
1014    
1015     /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1016     private int lastRet;
1017    
1018     /** Previous value of takeIndex, or DETACHED when detached */
1019     private int prevTakeIndex;
1020    
1021     /** Previous value of iters.cycles */
1022     private int prevCycles;
1023    
1024     /** Special index value indicating "not available" or "undefined" */
1025     private static final int NONE = -1;
1026    
1027     /**
1028     * Special index value indicating "removed elsewhere", that is,
1029     * removed by some operation other than a call to this.remove().
1030     */
1031     private static final int REMOVED = -2;
1032    
1033     /** Special value for prevTakeIndex indicating "detached mode" */
1034     private static final int DETACHED = -3;
1035    
1036     Itr() {
1037     lastRet = NONE;
1038     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1039     lock.lock();
1040     try {
1041     if (count == 0) {
1042     // assert itrs == null;
1043     cursor = NONE;
1044     nextIndex = NONE;
1045     prevTakeIndex = DETACHED;
1046     } else {
1047     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1048     prevTakeIndex = takeIndex;
1049     nextItem = itemAt(nextIndex = takeIndex);
1050     cursor = incCursor(takeIndex);
1051     if (itrs == null) {
1052     itrs = new Itrs(this);
1053     } else {
1054     itrs.register(this); // in this order
1055     itrs.doSomeSweeping(false);
1056     }
1057     prevCycles = itrs.cycles;
1058     // assert takeIndex >= 0;
1059     // assert prevTakeIndex == takeIndex;
1060     // assert nextIndex >= 0;
1061     // assert nextItem != null;
1062     }
1063     } finally {
1064     lock.unlock();
1065     }
1066     }
1067    
1068     boolean isDetached() {
1069 jsr166 1.2 // assert lock.isHeldByCurrentThread();
1070 jsr166 1.1 return prevTakeIndex < 0;
1071     }
1072    
1073     private int incCursor(int index) {
1074 jsr166 1.2 // assert lock.isHeldByCurrentThread();
1075 jsr166 1.1 if (++index == items.length) index = 0;
1076     if (index == putIndex) index = NONE;
1077     return index;
1078     }
1079    
1080     /**
1081     * Returns true if index is invalidated by the given number of
1082     * dequeues, starting from prevTakeIndex.
1083     */
1084     private boolean invalidated(int index, int prevTakeIndex,
1085     long dequeues, int length) {
1086     if (index < 0)
1087     return false;
1088     int distance = index - prevTakeIndex;
1089     if (distance < 0)
1090     distance += length;
1091     return dequeues > distance;
1092     }
1093    
1094     /**
1095     * Adjusts indices to incorporate all dequeues since the last
1096     * operation on this iterator. Call only from iterating thread.
1097     */
1098     private void incorporateDequeues() {
1099 jsr166 1.2 // assert lock.isHeldByCurrentThread();
1100 jsr166 1.1 // assert itrs != null;
1101     // assert !isDetached();
1102     // assert count > 0;
1103    
1104     final int cycles = itrs.cycles;
1105     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1106     final int prevCycles = this.prevCycles;
1107     final int prevTakeIndex = this.prevTakeIndex;
1108    
1109     if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1110     final int len = items.length;
1111     // how far takeIndex has advanced since the previous
1112     // operation of this iterator
1113     long dequeues = (cycles - prevCycles) * len
1114     + (takeIndex - prevTakeIndex);
1115    
1116     // Check indices for invalidation
1117     if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1118     lastRet = REMOVED;
1119     if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1120     nextIndex = REMOVED;
1121     if (invalidated(cursor, prevTakeIndex, dequeues, len))
1122     cursor = takeIndex;
1123    
1124     if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1125     detach();
1126     else {
1127     this.prevCycles = cycles;
1128     this.prevTakeIndex = takeIndex;
1129     }
1130     }
1131     }
1132    
1133     /**
1134     * Called when itrs should stop tracking this iterator, either
1135     * because there are no more indices to update (cursor < 0 &&
1136     * nextIndex < 0 && lastRet < 0) or as a special exception, when
1137     * lastRet >= 0, because hasNext() is about to return false for the
1138     * first time. Call only from iterating thread.
1139     */
1140     private void detach() {
1141     // Switch to detached mode
1142 jsr166 1.2 // assert lock.isHeldByCurrentThread();
1143 jsr166 1.1 // assert cursor == NONE;
1144     // assert nextIndex < 0;
1145     // assert lastRet < 0 || nextItem == null;
1146     // assert lastRet < 0 ^ lastItem != null;
1147     if (prevTakeIndex >= 0) {
1148     // assert itrs != null;
1149     prevTakeIndex = DETACHED;
1150     // try to unlink from itrs (but not too hard)
1151     itrs.doSomeSweeping(true);
1152     }
1153     }
1154    
1155     /**
1156     * For performance reasons, we would like not to acquire a lock in
1157     * hasNext in the common case. To allow for this, we only access
1158     * fields (i.e. nextItem) that are not modified by update operations
1159     * triggered by queue modifications.
1160     */
1161     public boolean hasNext() {
1162     if (nextItem != null)
1163     return true;
1164     noNext();
1165     return false;
1166     }
1167    
1168     private void noNext() {
1169     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1170     lock.lock();
1171     try {
1172     // assert cursor == NONE;
1173     // assert nextIndex == NONE;
1174     if (!isDetached()) {
1175     // assert lastRet >= 0;
1176     incorporateDequeues(); // might update lastRet
1177     if (lastRet >= 0) {
1178     lastItem = itemAt(lastRet);
1179     // assert lastItem != null;
1180     detach();
1181     }
1182     }
1183     // assert isDetached();
1184     // assert lastRet < 0 ^ lastItem != null;
1185     } finally {
1186     lock.unlock();
1187     }
1188     }
1189    
1190     public E next() {
1191 jsr166 1.2 final E e = nextItem;
1192     if (e == null)
1193 jsr166 1.1 throw new NoSuchElementException();
1194     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1195     lock.lock();
1196     try {
1197     if (!isDetached())
1198     incorporateDequeues();
1199     // assert nextIndex != NONE;
1200     // assert lastItem == null;
1201     lastRet = nextIndex;
1202     final int cursor = this.cursor;
1203     if (cursor >= 0) {
1204     nextItem = itemAt(nextIndex = cursor);
1205     // assert nextItem != null;
1206     this.cursor = incCursor(cursor);
1207     } else {
1208     nextIndex = NONE;
1209     nextItem = null;
1210 jsr166 1.2 if (lastRet == REMOVED) detach();
1211     }
1212     } finally {
1213     lock.unlock();
1214     }
1215     return e;
1216     }
1217    
1218     public void forEachRemaining(Consumer<? super E> action) {
1219     Objects.requireNonNull(action);
1220     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1221     lock.lock();
1222     try {
1223     final E e = nextItem;
1224     if (e == null) return;
1225     if (!isDetached())
1226     incorporateDequeues();
1227     action.accept(e);
1228     if (isDetached() || cursor < 0) return;
1229     final Object[] items = ArrayBlockingQueue.this.items;
1230     for (int i = cursor, end = putIndex,
1231     to = (i < end) ? end : items.length;
1232     ; i = 0, to = end) {
1233     for (; i < to; i++)
1234     action.accept(itemAt(items, i));
1235     if (to == end) break;
1236 jsr166 1.1 }
1237     } finally {
1238 jsr166 1.2 // Calling forEachRemaining is a strong hint that this
1239     // iteration is surely over; supporting remove() after
1240     // forEachRemaining() is more trouble than it's worth
1241     cursor = nextIndex = lastRet = NONE;
1242     nextItem = lastItem = null;
1243     detach();
1244 jsr166 1.1 lock.unlock();
1245     }
1246     }
1247    
1248     public void remove() {
1249     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1250     lock.lock();
1251 jsr166 1.2 // assert lock.getHoldCount() == 1;
1252 jsr166 1.1 try {
1253     if (!isDetached())
1254     incorporateDequeues(); // might update lastRet or detach
1255     final int lastRet = this.lastRet;
1256     this.lastRet = NONE;
1257     if (lastRet >= 0) {
1258     if (!isDetached())
1259     removeAt(lastRet);
1260     else {
1261     final E lastItem = this.lastItem;
1262     // assert lastItem != null;
1263     this.lastItem = null;
1264     if (itemAt(lastRet) == lastItem)
1265     removeAt(lastRet);
1266     }
1267     } else if (lastRet == NONE)
1268     throw new IllegalStateException();
1269     // else lastRet == REMOVED and the last returned element was
1270     // previously asynchronously removed via an operation other
1271     // than this.remove(), so nothing to do.
1272    
1273     if (cursor < 0 && nextIndex < 0)
1274     detach();
1275     } finally {
1276     lock.unlock();
1277     // assert lastRet == NONE;
1278     // assert lastItem == null;
1279     }
1280     }
1281    
1282     /**
1283     * Called to notify the iterator that the queue is empty, or that it
1284     * has fallen hopelessly behind, so that it should abandon any
1285     * further iteration, except possibly to return one more element
1286     * from next(), as promised by returning true from hasNext().
1287     */
1288     void shutdown() {
1289 jsr166 1.2 // assert lock.isHeldByCurrentThread();
1290 jsr166 1.1 cursor = NONE;
1291     if (nextIndex >= 0)
1292     nextIndex = REMOVED;
1293     if (lastRet >= 0) {
1294     lastRet = REMOVED;
1295     lastItem = null;
1296     }
1297     prevTakeIndex = DETACHED;
1298     // Don't set nextItem to null because we must continue to be
1299     // able to return it on next().
1300     //
1301     // Caller will unlink from itrs when convenient.
1302     }
1303    
1304     private int distance(int index, int prevTakeIndex, int length) {
1305     int distance = index - prevTakeIndex;
1306     if (distance < 0)
1307     distance += length;
1308     return distance;
1309     }
1310    
1311     /**
1312     * Called whenever an interior remove (not at takeIndex) occurred.
1313     *
1314     * @return true if this iterator should be unlinked from itrs
1315     */
1316     boolean removedAt(int removedIndex) {
1317 jsr166 1.2 // assert lock.isHeldByCurrentThread();
1318 jsr166 1.1 if (isDetached())
1319     return true;
1320    
1321     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1322     final int prevTakeIndex = this.prevTakeIndex;
1323     final int len = items.length;
1324     // distance from prevTakeIndex to removedIndex
1325     final int removedDistance =
1326     len * (itrs.cycles - this.prevCycles
1327     + ((removedIndex < takeIndex) ? 1 : 0))
1328     + (removedIndex - prevTakeIndex);
1329     // assert itrs.cycles - this.prevCycles >= 0;
1330     // assert itrs.cycles - this.prevCycles <= 1;
1331     // assert removedDistance > 0;
1332     // assert removedIndex != takeIndex;
1333     int cursor = this.cursor;
1334     if (cursor >= 0) {
1335     int x = distance(cursor, prevTakeIndex, len);
1336     if (x == removedDistance) {
1337     if (cursor == putIndex)
1338     this.cursor = cursor = NONE;
1339     }
1340     else if (x > removedDistance) {
1341     // assert cursor != prevTakeIndex;
1342 jsr166 1.2 this.cursor = cursor = dec(cursor, len);
1343 jsr166 1.1 }
1344     }
1345     int lastRet = this.lastRet;
1346     if (lastRet >= 0) {
1347     int x = distance(lastRet, prevTakeIndex, len);
1348     if (x == removedDistance)
1349     this.lastRet = lastRet = REMOVED;
1350     else if (x > removedDistance)
1351 jsr166 1.2 this.lastRet = lastRet = dec(lastRet, len);
1352 jsr166 1.1 }
1353     int nextIndex = this.nextIndex;
1354     if (nextIndex >= 0) {
1355     int x = distance(nextIndex, prevTakeIndex, len);
1356     if (x == removedDistance)
1357     this.nextIndex = nextIndex = REMOVED;
1358     else if (x > removedDistance)
1359 jsr166 1.2 this.nextIndex = nextIndex = dec(nextIndex, len);
1360 jsr166 1.1 }
1361     if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1362     this.prevTakeIndex = DETACHED;
1363     return true;
1364     }
1365     return false;
1366     }
1367    
1368     /**
1369     * Called whenever takeIndex wraps around to zero.
1370     *
1371     * @return true if this iterator should be unlinked from itrs
1372     */
1373     boolean takeIndexWrapped() {
1374 jsr166 1.2 // assert lock.isHeldByCurrentThread();
1375 jsr166 1.1 if (isDetached())
1376     return true;
1377     if (itrs.cycles - prevCycles > 1) {
1378     // All the elements that existed at the time of the last
1379     // operation are gone, so abandon further iteration.
1380     shutdown();
1381     return true;
1382     }
1383     return false;
1384     }
1385    
1386     // /** Uncomment for debugging. */
1387     // public String toString() {
1388     // return ("cursor=" + cursor + " " +
1389     // "nextIndex=" + nextIndex + " " +
1390     // "lastRet=" + lastRet + " " +
1391     // "nextItem=" + nextItem + " " +
1392     // "lastItem=" + lastItem + " " +
1393     // "prevCycles=" + prevCycles + " " +
1394     // "prevTakeIndex=" + prevTakeIndex + " " +
1395     // "size()=" + size() + " " +
1396     // "remainingCapacity()=" + remainingCapacity());
1397     // }
1398     }
1399    
1400     /**
1401     * Returns a {@link Spliterator} over the elements in this queue.
1402     *
1403     * <p>The returned spliterator is
1404     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1405     *
1406     * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1407     * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1408     *
1409     * @implNote
1410     * The {@code Spliterator} implements {@code trySplit} to permit limited
1411     * parallelism.
1412     *
1413     * @return a {@code Spliterator} over the elements in this queue
1414     * @since 1.8
1415     */
1416     public Spliterator<E> spliterator() {
1417     return Spliterators.spliterator
1418     (this, (Spliterator.ORDERED |
1419     Spliterator.NONNULL |
1420     Spliterator.CONCURRENT));
1421     }
1422    
1423 jsr166 1.2 /**
1424     * @throws NullPointerException {@inheritDoc}
1425     */
1426     public void forEach(Consumer<? super E> action) {
1427     Objects.requireNonNull(action);
1428     final ReentrantLock lock = this.lock;
1429     lock.lock();
1430     try {
1431     if (count > 0) {
1432     final Object[] items = this.items;
1433     for (int i = takeIndex, end = putIndex,
1434     to = (i < end) ? end : items.length;
1435     ; i = 0, to = end) {
1436     for (; i < to; i++)
1437     action.accept(itemAt(items, i));
1438     if (to == end) break;
1439     }
1440     }
1441     } finally {
1442     // checkInvariants();
1443     lock.unlock();
1444     }
1445     }
1446    
1447     /**
1448     * @throws NullPointerException {@inheritDoc}
1449     */
1450     public boolean removeIf(Predicate<? super E> filter) {
1451     Objects.requireNonNull(filter);
1452     return bulkRemove(filter);
1453     }
1454    
1455     /**
1456     * @throws NullPointerException {@inheritDoc}
1457     */
1458     public boolean removeAll(Collection<?> c) {
1459     Objects.requireNonNull(c);
1460     return bulkRemove(e -> c.contains(e));
1461     }
1462    
1463     /**
1464     * @throws NullPointerException {@inheritDoc}
1465     */
1466     public boolean retainAll(Collection<?> c) {
1467     Objects.requireNonNull(c);
1468     return bulkRemove(e -> !c.contains(e));
1469     }
1470    
1471     /** Implementation of bulk remove methods. */
1472     private boolean bulkRemove(Predicate<? super E> filter) {
1473     final ReentrantLock lock = this.lock;
1474     lock.lock();
1475     try {
1476     if (itrs == null) { // check for active iterators
1477     if (count > 0) {
1478     final Object[] items = this.items;
1479     // Optimize for initial run of survivors
1480     for (int i = takeIndex, end = putIndex,
1481     to = (i < end) ? end : items.length;
1482     ; i = 0, to = end) {
1483     for (; i < to; i++)
1484     if (filter.test(itemAt(items, i)))
1485     return bulkRemoveModified(filter, i);
1486     if (to == end) break;
1487     }
1488     }
1489     return false;
1490     }
1491     } finally {
1492     // checkInvariants();
1493     lock.unlock();
1494     }
1495     // Active iterators are too hairy!
1496     // Punting (for now) to the slow n^2 algorithm ...
1497     return super.removeIf(filter);
1498     }
1499    
1500     // A tiny bit set implementation
1501    
1502     private static long[] nBits(int n) {
1503     return new long[((n - 1) >> 6) + 1];
1504     }
1505     private static void setBit(long[] bits, int i) {
1506     bits[i >> 6] |= 1L << i;
1507     }
1508     private static boolean isClear(long[] bits, int i) {
1509     return (bits[i >> 6] & (1L << i)) == 0;
1510     }
1511    
1512     /**
1513     * Returns circular distance from i to j, disambiguating i == j to
1514     * items.length; never returns 0.
1515     */
1516     private int distanceNonEmpty(int i, int j) {
1517     if ((j -= i) <= 0) j += items.length;
1518     return j;
1519     }
1520    
1521     /**
1522     * Helper for bulkRemove, in case of at least one deletion.
1523     * Tolerate predicates that reentrantly access the collection for
1524     * read (but not write), so traverse once to find elements to
1525     * delete, a second pass to physically expunge.
1526     *
1527     * @param beg valid index of first element to be deleted
1528     */
1529     private boolean bulkRemoveModified(
1530     Predicate<? super E> filter, final int beg) {
1531     final Object[] es = items;
1532     final int capacity = items.length;
1533     final int end = putIndex;
1534     final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
1535     deathRow[0] = 1L; // set bit 0
1536     for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1537     ; i = 0, to = end, k -= capacity) {
1538     for (; i < to; i++)
1539     if (filter.test(itemAt(es, i)))
1540     setBit(deathRow, i - k);
1541     if (to == end) break;
1542     }
1543     // a two-finger traversal, with hare i reading, tortoise w writing
1544     int w = beg;
1545     for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1546     ; w = 0) { // w rejoins i on second leg
1547     // In this loop, i and w are on the same leg, with i > w
1548     for (; i < to; i++)
1549     if (isClear(deathRow, i - k))
1550     es[w++] = es[i];
1551     if (to == end) break;
1552     // In this loop, w is on the first leg, i on the second
1553     for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
1554     if (isClear(deathRow, i - k))
1555     es[w++] = es[i];
1556     if (i >= to) {
1557     if (w == capacity) w = 0; // "corner" case
1558     break;
1559     }
1560     }
1561     count -= distanceNonEmpty(w, end);
1562     circularClear(es, putIndex = w, end);
1563     // checkInvariants();
1564     return true;
1565     }
1566    
1567     /** debugging */
1568     void checkInvariants() {
1569     // meta-assertions
1570     // assert lock.isHeldByCurrentThread();
1571     try {
1572     // Unlike ArrayDeque, we have a count field but no spare slot.
1573     // We prefer ArrayDeque's strategy (and the names of its fields!),
1574     // but our field layout is baked into the serial form, and so is
1575     // too annoying to change.
1576     //
1577     // putIndex == takeIndex must be disambiguated by checking count.
1578     int capacity = items.length;
1579     // assert capacity > 0;
1580     // assert takeIndex >= 0 && takeIndex < capacity;
1581     // assert putIndex >= 0 && putIndex < capacity;
1582     // assert count <= capacity;
1583     // assert takeIndex == putIndex || items[takeIndex] != null;
1584     // assert count == capacity || items[putIndex] == null;
1585     // assert takeIndex == putIndex || items[dec(putIndex, capacity)] != null;
1586     } catch (Throwable t) {
1587     System.err.printf("takeIndex=%d putIndex=%d count=%d capacity=%d%n",
1588     takeIndex, putIndex, count, items.length);
1589     System.err.printf("items=%s%n",
1590     Arrays.toString(items));
1591     throw t;
1592     }
1593     }
1594    
1595 jsr166 1.1 }