ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.2
Committed: Mon Mar 6 01:00:45 2017 UTC (7 years, 2 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +314 -59 lines
Log Message:
sync from src/main

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.ref.WeakReference;
10 import java.util.AbstractQueue;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.Objects;
16 import java.util.Spliterator;
17 import java.util.Spliterators;
18 import java.util.concurrent.locks.Condition;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.Consumer;
21 import java.util.function.Predicate;
22
23 /**
24 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
25 * array. This queue orders elements FIFO (first-in-first-out). The
26 * <em>head</em> of the queue is that element that has been on the
27 * queue the longest time. The <em>tail</em> of the queue is that
28 * element that has been on the queue the shortest time. New elements
29 * are inserted at the tail of the queue, and the queue retrieval
30 * operations obtain elements at the head of the queue.
31 *
32 * <p>This is a classic &quot;bounded buffer&quot;, in which a
33 * fixed-sized array holds elements inserted by producers and
34 * extracted by consumers. Once created, the capacity cannot be
35 * changed. Attempts to {@code put} an element into a full queue
36 * will result in the operation blocking; attempts to {@code take} an
37 * element from an empty queue will similarly block.
38 *
39 * <p>This class supports an optional fairness policy for ordering
40 * waiting producer and consumer threads. By default, this ordering
41 * is not guaranteed. However, a queue constructed with fairness set
42 * to {@code true} grants threads access in FIFO order. Fairness
43 * generally decreases throughput but reduces variability and avoids
44 * starvation.
45 *
46 * <p>This class and its iterator implement all of the <em>optional</em>
47 * methods of the {@link Collection} and {@link Iterator} interfaces.
48 *
49 * <p>This class is a member of the
50 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
51 * Java Collections Framework</a>.
52 *
53 * @since 1.5
54 * @author Doug Lea
55 * @param <E> the type of elements held in this queue
56 */
57 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
58 implements BlockingQueue<E>, java.io.Serializable {
59
60 /*
61 * Much of the implementation mechanics, especially the unusual
62 * nested loops, are shared and co-maintained with ArrayDeque.
63 */
64
65 /**
66 * Serialization ID. This class relies on default serialization
67 * even for the items array, which is default-serialized, even if
68 * it is empty. Otherwise it could not be declared final, which is
69 * necessary here.
70 */
71 private static final long serialVersionUID = -817911632652898426L;
72
73 /** The queued items */
74 final Object[] items;
75
76 /** items index for next take, poll, peek or remove */
77 int takeIndex;
78
79 /** items index for next put, offer, or add */
80 int putIndex;
81
82 /** Number of elements in the queue */
83 int count;
84
85 /*
86 * Concurrency control uses the classic two-condition algorithm
87 * found in any textbook.
88 */
89
90 /** Main lock guarding all access */
91 final ReentrantLock lock;
92
93 /** Condition for waiting takes */
94 private final Condition notEmpty;
95
96 /** Condition for waiting puts */
97 private final Condition notFull;
98
99 /**
100 * Shared state for currently active iterators, or null if there
101 * are known not to be any. Allows queue operations to update
102 * iterator state.
103 */
104 transient Itrs itrs;
105
106 // Internal helper methods
107
108 /**
109 * Increments i, mod modulus.
110 * Precondition and postcondition: 0 <= i < modulus.
111 */
112 static final int inc(int i, int modulus) {
113 if (++i >= modulus) i = 0;
114 return i;
115 }
116
117 /**
118 * Decrements i, mod modulus.
119 * Precondition and postcondition: 0 <= i < modulus.
120 */
121 static final int dec(int i, int modulus) {
122 if (--i < 0) i = modulus - 1;
123 return i;
124 }
125
126 /**
127 * Returns item at index i.
128 */
129 @SuppressWarnings("unchecked")
130 final E itemAt(int i) {
131 return (E) items[i];
132 }
133
134 /**
135 * Returns element at array index i.
136 * This is a slight abuse of generics, accepted by javac.
137 */
138 @SuppressWarnings("unchecked")
139 static <E> E itemAt(Object[] items, int i) {
140 return (E) items[i];
141 }
142
143 /**
144 * Inserts element at current put position, advances, and signals.
145 * Call only when holding lock.
146 */
147 private void enqueue(E e) {
148 // assert lock.isHeldByCurrentThread();
149 // assert lock.getHoldCount() == 1;
150 // assert items[putIndex] == null;
151 final Object[] items = this.items;
152 items[putIndex] = e;
153 if (++putIndex == items.length) putIndex = 0;
154 count++;
155 notEmpty.signal();
156 // checkInvariants();
157 }
158
159 /**
160 * Extracts element at current take position, advances, and signals.
161 * Call only when holding lock.
162 */
163 private E dequeue() {
164 // assert lock.isHeldByCurrentThread();
165 // assert lock.getHoldCount() == 1;
166 // assert items[takeIndex] != null;
167 final Object[] items = this.items;
168 @SuppressWarnings("unchecked")
169 E e = (E) items[takeIndex];
170 items[takeIndex] = null;
171 if (++takeIndex == items.length) takeIndex = 0;
172 count--;
173 if (itrs != null)
174 itrs.elementDequeued();
175 notFull.signal();
176 // checkInvariants();
177 return e;
178 }
179
180 /**
181 * Deletes item at array index removeIndex.
182 * Utility for remove(Object) and iterator.remove.
183 * Call only when holding lock.
184 */
185 void removeAt(final int removeIndex) {
186 // assert lock.isHeldByCurrentThread();
187 // assert lock.getHoldCount() == 1;
188 // assert items[removeIndex] != null;
189 // assert removeIndex >= 0 && removeIndex < items.length;
190 final Object[] items = this.items;
191 if (removeIndex == takeIndex) {
192 // removing front item; just advance
193 items[takeIndex] = null;
194 if (++takeIndex == items.length) takeIndex = 0;
195 count--;
196 if (itrs != null)
197 itrs.elementDequeued();
198 } else {
199 // an "interior" remove
200
201 // slide over all others up through putIndex.
202 for (int i = removeIndex, putIndex = this.putIndex;;) {
203 int pred = i;
204 if (++i == items.length) i = 0;
205 if (i == putIndex) {
206 items[pred] = null;
207 this.putIndex = pred;
208 break;
209 }
210 items[pred] = items[i];
211 }
212 count--;
213 if (itrs != null)
214 itrs.removedAt(removeIndex);
215 }
216 notFull.signal();
217 // checkInvariants();
218 }
219
220 /**
221 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
222 * capacity and default access policy.
223 *
224 * @param capacity the capacity of this queue
225 * @throws IllegalArgumentException if {@code capacity < 1}
226 */
227 public ArrayBlockingQueue(int capacity) {
228 this(capacity, false);
229 }
230
231 /**
232 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
233 * capacity and the specified access policy.
234 *
235 * @param capacity the capacity of this queue
236 * @param fair if {@code true} then queue accesses for threads blocked
237 * on insertion or removal, are processed in FIFO order;
238 * if {@code false} the access order is unspecified.
239 * @throws IllegalArgumentException if {@code capacity < 1}
240 */
241 public ArrayBlockingQueue(int capacity, boolean fair) {
242 if (capacity <= 0)
243 throw new IllegalArgumentException();
244 this.items = new Object[capacity];
245 lock = new ReentrantLock(fair);
246 notEmpty = lock.newCondition();
247 notFull = lock.newCondition();
248 }
249
250 /**
251 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
252 * capacity, the specified access policy and initially containing the
253 * elements of the given collection,
254 * added in traversal order of the collection's iterator.
255 *
256 * @param capacity the capacity of this queue
257 * @param fair if {@code true} then queue accesses for threads blocked
258 * on insertion or removal, are processed in FIFO order;
259 * if {@code false} the access order is unspecified.
260 * @param c the collection of elements to initially contain
261 * @throws IllegalArgumentException if {@code capacity} is less than
262 * {@code c.size()}, or less than 1.
263 * @throws NullPointerException if the specified collection or any
264 * of its elements are null
265 */
266 public ArrayBlockingQueue(int capacity, boolean fair,
267 Collection<? extends E> c) {
268 this(capacity, fair);
269
270 final ReentrantLock lock = this.lock;
271 lock.lock(); // Lock only for visibility, not mutual exclusion
272 try {
273 final Object[] items = this.items;
274 int i = 0;
275 try {
276 for (E e : c)
277 items[i++] = Objects.requireNonNull(e);
278 } catch (ArrayIndexOutOfBoundsException ex) {
279 throw new IllegalArgumentException();
280 }
281 count = i;
282 putIndex = (i == capacity) ? 0 : i;
283 // checkInvariants();
284 } finally {
285 lock.unlock();
286 }
287 }
288
289 /**
290 * Inserts the specified element at the tail of this queue if it is
291 * possible to do so immediately without exceeding the queue's capacity,
292 * returning {@code true} upon success and throwing an
293 * {@code IllegalStateException} if this queue is full.
294 *
295 * @param e the element to add
296 * @return {@code true} (as specified by {@link Collection#add})
297 * @throws IllegalStateException if this queue is full
298 * @throws NullPointerException if the specified element is null
299 */
300 public boolean add(E e) {
301 return super.add(e);
302 }
303
304 /**
305 * Inserts the specified element at the tail of this queue if it is
306 * possible to do so immediately without exceeding the queue's capacity,
307 * returning {@code true} upon success and {@code false} if this queue
308 * is full. This method is generally preferable to method {@link #add},
309 * which can fail to insert an element only by throwing an exception.
310 *
311 * @throws NullPointerException if the specified element is null
312 */
313 public boolean offer(E e) {
314 Objects.requireNonNull(e);
315 final ReentrantLock lock = this.lock;
316 lock.lock();
317 try {
318 if (count == items.length)
319 return false;
320 else {
321 enqueue(e);
322 return true;
323 }
324 } finally {
325 lock.unlock();
326 }
327 }
328
329 /**
330 * Inserts the specified element at the tail of this queue, waiting
331 * for space to become available if the queue is full.
332 *
333 * @throws InterruptedException {@inheritDoc}
334 * @throws NullPointerException {@inheritDoc}
335 */
336 public void put(E e) throws InterruptedException {
337 Objects.requireNonNull(e);
338 final ReentrantLock lock = this.lock;
339 lock.lockInterruptibly();
340 try {
341 while (count == items.length)
342 notFull.await();
343 enqueue(e);
344 } finally {
345 lock.unlock();
346 }
347 }
348
349 /**
350 * Inserts the specified element at the tail of this queue, waiting
351 * up to the specified wait time for space to become available if
352 * the queue is full.
353 *
354 * @throws InterruptedException {@inheritDoc}
355 * @throws NullPointerException {@inheritDoc}
356 */
357 public boolean offer(E e, long timeout, TimeUnit unit)
358 throws InterruptedException {
359
360 Objects.requireNonNull(e);
361 long nanos = unit.toNanos(timeout);
362 final ReentrantLock lock = this.lock;
363 lock.lockInterruptibly();
364 try {
365 while (count == items.length) {
366 if (nanos <= 0L)
367 return false;
368 nanos = notFull.awaitNanos(nanos);
369 }
370 enqueue(e);
371 return true;
372 } finally {
373 // checkInvariants();
374 lock.unlock();
375 }
376 }
377
378 public E poll() {
379 final ReentrantLock lock = this.lock;
380 lock.lock();
381 try {
382 return (count == 0) ? null : dequeue();
383 } finally {
384 lock.unlock();
385 }
386 }
387
388 public E take() throws InterruptedException {
389 final ReentrantLock lock = this.lock;
390 lock.lockInterruptibly();
391 try {
392 while (count == 0)
393 notEmpty.await();
394 return dequeue();
395 } finally {
396 lock.unlock();
397 }
398 }
399
400 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
401 long nanos = unit.toNanos(timeout);
402 final ReentrantLock lock = this.lock;
403 lock.lockInterruptibly();
404 try {
405 while (count == 0) {
406 if (nanos <= 0L)
407 return null;
408 nanos = notEmpty.awaitNanos(nanos);
409 }
410 return dequeue();
411 } finally {
412 // checkInvariants();
413 lock.unlock();
414 }
415 }
416
417 public E peek() {
418 final ReentrantLock lock = this.lock;
419 lock.lock();
420 try {
421 return itemAt(takeIndex); // null when queue is empty
422 } finally {
423 lock.unlock();
424 }
425 }
426
427 // this doc comment is overridden to remove the reference to collections
428 // greater in size than Integer.MAX_VALUE
429 /**
430 * Returns the number of elements in this queue.
431 *
432 * @return the number of elements in this queue
433 */
434 public int size() {
435 final ReentrantLock lock = this.lock;
436 lock.lock();
437 try {
438 return count;
439 } finally {
440 lock.unlock();
441 }
442 }
443
444 // this doc comment is a modified copy of the inherited doc comment,
445 // without the reference to unlimited queues.
446 /**
447 * Returns the number of additional elements that this queue can ideally
448 * (in the absence of memory or resource constraints) accept without
449 * blocking. This is always equal to the initial capacity of this queue
450 * less the current {@code size} of this queue.
451 *
452 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
453 * an element will succeed by inspecting {@code remainingCapacity}
454 * because it may be the case that another thread is about to
455 * insert or remove an element.
456 */
457 public int remainingCapacity() {
458 final ReentrantLock lock = this.lock;
459 lock.lock();
460 try {
461 return items.length - count;
462 } finally {
463 lock.unlock();
464 }
465 }
466
467 /**
468 * Removes a single instance of the specified element from this queue,
469 * if it is present. More formally, removes an element {@code e} such
470 * that {@code o.equals(e)}, if this queue contains one or more such
471 * elements.
472 * Returns {@code true} if this queue contained the specified element
473 * (or equivalently, if this queue changed as a result of the call).
474 *
475 * <p>Removal of interior elements in circular array based queues
476 * is an intrinsically slow and disruptive operation, so should
477 * be undertaken only in exceptional circumstances, ideally
478 * only when the queue is known not to be accessible by other
479 * threads.
480 *
481 * @param o element to be removed from this queue, if present
482 * @return {@code true} if this queue changed as a result of the call
483 */
484 public boolean remove(Object o) {
485 if (o == null) return false;
486 final ReentrantLock lock = this.lock;
487 lock.lock();
488 try {
489 if (count > 0) {
490 final Object[] items = this.items;
491 for (int i = takeIndex, end = putIndex,
492 to = (i < end) ? end : items.length;
493 ; i = 0, to = end) {
494 for (; i < to; i++)
495 if (o.equals(items[i])) {
496 removeAt(i);
497 return true;
498 }
499 if (to == end) break;
500 }
501 }
502 return false;
503 } finally {
504 // checkInvariants();
505 lock.unlock();
506 }
507 }
508
509 /**
510 * Returns {@code true} if this queue contains the specified element.
511 * More formally, returns {@code true} if and only if this queue contains
512 * at least one element {@code e} such that {@code o.equals(e)}.
513 *
514 * @param o object to be checked for containment in this queue
515 * @return {@code true} if this queue contains the specified element
516 */
517 public boolean contains(Object o) {
518 if (o == null) return false;
519 final ReentrantLock lock = this.lock;
520 lock.lock();
521 try {
522 if (count > 0) {
523 final Object[] items = this.items;
524 for (int i = takeIndex, end = putIndex,
525 to = (i < end) ? end : items.length;
526 ; i = 0, to = end) {
527 for (; i < to; i++)
528 if (o.equals(items[i]))
529 return true;
530 if (to == end) break;
531 }
532 }
533 return false;
534 } finally {
535 // checkInvariants();
536 lock.unlock();
537 }
538 }
539
540 /**
541 * Returns an array containing all of the elements in this queue, in
542 * proper sequence.
543 *
544 * <p>The returned array will be "safe" in that no references to it are
545 * maintained by this queue. (In other words, this method must allocate
546 * a new array). The caller is thus free to modify the returned array.
547 *
548 * <p>This method acts as bridge between array-based and collection-based
549 * APIs.
550 *
551 * @return an array containing all of the elements in this queue
552 */
553 public Object[] toArray() {
554 final ReentrantLock lock = this.lock;
555 lock.lock();
556 try {
557 final Object[] items = this.items;
558 final int end = takeIndex + count;
559 final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
560 if (end != putIndex)
561 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
562 return a;
563 } finally {
564 lock.unlock();
565 }
566 }
567
568 /**
569 * Returns an array containing all of the elements in this queue, in
570 * proper sequence; the runtime type of the returned array is that of
571 * the specified array. If the queue fits in the specified array, it
572 * is returned therein. Otherwise, a new array is allocated with the
573 * runtime type of the specified array and the size of this queue.
574 *
575 * <p>If this queue fits in the specified array with room to spare
576 * (i.e., the array has more elements than this queue), the element in
577 * the array immediately following the end of the queue is set to
578 * {@code null}.
579 *
580 * <p>Like the {@link #toArray()} method, this method acts as bridge between
581 * array-based and collection-based APIs. Further, this method allows
582 * precise control over the runtime type of the output array, and may,
583 * under certain circumstances, be used to save allocation costs.
584 *
585 * <p>Suppose {@code x} is a queue known to contain only strings.
586 * The following code can be used to dump the queue into a newly
587 * allocated array of {@code String}:
588 *
589 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
590 *
591 * Note that {@code toArray(new Object[0])} is identical in function to
592 * {@code toArray()}.
593 *
594 * @param a the array into which the elements of the queue are to
595 * be stored, if it is big enough; otherwise, a new array of the
596 * same runtime type is allocated for this purpose
597 * @return an array containing all of the elements in this queue
598 * @throws ArrayStoreException if the runtime type of the specified array
599 * is not a supertype of the runtime type of every element in
600 * this queue
601 * @throws NullPointerException if the specified array is null
602 */
603 @SuppressWarnings("unchecked")
604 public <T> T[] toArray(T[] a) {
605 final ReentrantLock lock = this.lock;
606 lock.lock();
607 try {
608 final Object[] items = this.items;
609 final int count = this.count;
610 final int firstLeg = Math.min(items.length - takeIndex, count);
611 if (a.length < count) {
612 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
613 a.getClass());
614 } else {
615 System.arraycopy(items, takeIndex, a, 0, firstLeg);
616 if (a.length > count)
617 a[count] = null;
618 }
619 if (firstLeg < count)
620 System.arraycopy(items, 0, a, firstLeg, putIndex);
621 return a;
622 } finally {
623 lock.unlock();
624 }
625 }
626
627 public String toString() {
628 return Helpers.collectionToString(this);
629 }
630
631 /**
632 * Atomically removes all of the elements from this queue.
633 * The queue will be empty after this call returns.
634 */
635 public void clear() {
636 final ReentrantLock lock = this.lock;
637 lock.lock();
638 try {
639 int k;
640 if ((k = count) > 0) {
641 circularClear(items, takeIndex, putIndex);
642 takeIndex = putIndex;
643 count = 0;
644 if (itrs != null)
645 itrs.queueIsEmpty();
646 for (; k > 0 && lock.hasWaiters(notFull); k--)
647 notFull.signal();
648 }
649 } finally {
650 // checkInvariants();
651 lock.unlock();
652 }
653 }
654
655 /**
656 * Nulls out slots starting at array index i, upto index end.
657 * Condition i == end means "full" - the entire array is cleared.
658 */
659 private static void circularClear(Object[] items, int i, int end) {
660 // assert 0 <= i && i < items.length;
661 // assert 0 <= end && end < items.length;
662 for (int to = (i < end) ? end : items.length;
663 ; i = 0, to = end) {
664 for (; i < to; i++) items[i] = null;
665 if (to == end) break;
666 }
667 }
668
669 /**
670 * @throws UnsupportedOperationException {@inheritDoc}
671 * @throws ClassCastException {@inheritDoc}
672 * @throws NullPointerException {@inheritDoc}
673 * @throws IllegalArgumentException {@inheritDoc}
674 */
675 public int drainTo(Collection<? super E> c) {
676 return drainTo(c, Integer.MAX_VALUE);
677 }
678
679 /**
680 * @throws UnsupportedOperationException {@inheritDoc}
681 * @throws ClassCastException {@inheritDoc}
682 * @throws NullPointerException {@inheritDoc}
683 * @throws IllegalArgumentException {@inheritDoc}
684 */
685 public int drainTo(Collection<? super E> c, int maxElements) {
686 Objects.requireNonNull(c);
687 if (c == this)
688 throw new IllegalArgumentException();
689 if (maxElements <= 0)
690 return 0;
691 final Object[] items = this.items;
692 final ReentrantLock lock = this.lock;
693 lock.lock();
694 try {
695 int n = Math.min(maxElements, count);
696 int take = takeIndex;
697 int i = 0;
698 try {
699 while (i < n) {
700 @SuppressWarnings("unchecked")
701 E e = (E) items[take];
702 c.add(e);
703 items[take] = null;
704 if (++take == items.length) take = 0;
705 i++;
706 }
707 return n;
708 } finally {
709 // Restore invariants even if c.add() threw
710 if (i > 0) {
711 count -= i;
712 takeIndex = take;
713 if (itrs != null) {
714 if (count == 0)
715 itrs.queueIsEmpty();
716 else if (i > take)
717 itrs.takeIndexWrapped();
718 }
719 for (; i > 0 && lock.hasWaiters(notFull); i--)
720 notFull.signal();
721 }
722 }
723 } finally {
724 // checkInvariants();
725 lock.unlock();
726 }
727 }
728
729 /**
730 * Returns an iterator over the elements in this queue in proper sequence.
731 * The elements will be returned in order from first (head) to last (tail).
732 *
733 * <p>The returned iterator is
734 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
735 *
736 * @return an iterator over the elements in this queue in proper sequence
737 */
738 public Iterator<E> iterator() {
739 return new Itr();
740 }
741
742 /**
743 * Shared data between iterators and their queue, allowing queue
744 * modifications to update iterators when elements are removed.
745 *
746 * This adds a lot of complexity for the sake of correctly
747 * handling some uncommon operations, but the combination of
748 * circular-arrays and supporting interior removes (i.e., those
749 * not at head) would cause iterators to sometimes lose their
750 * places and/or (re)report elements they shouldn't. To avoid
751 * this, when a queue has one or more iterators, it keeps iterator
752 * state consistent by:
753 *
754 * (1) keeping track of the number of "cycles", that is, the
755 * number of times takeIndex has wrapped around to 0.
756 * (2) notifying all iterators via the callback removedAt whenever
757 * an interior element is removed (and thus other elements may
758 * be shifted).
759 *
760 * These suffice to eliminate iterator inconsistencies, but
761 * unfortunately add the secondary responsibility of maintaining
762 * the list of iterators. We track all active iterators in a
763 * simple linked list (accessed only when the queue's lock is
764 * held) of weak references to Itr. The list is cleaned up using
765 * 3 different mechanisms:
766 *
767 * (1) Whenever a new iterator is created, do some O(1) checking for
768 * stale list elements.
769 *
770 * (2) Whenever takeIndex wraps around to 0, check for iterators
771 * that have been unused for more than one wrap-around cycle.
772 *
773 * (3) Whenever the queue becomes empty, all iterators are notified
774 * and this entire data structure is discarded.
775 *
776 * So in addition to the removedAt callback that is necessary for
777 * correctness, iterators have the shutdown and takeIndexWrapped
778 * callbacks that help remove stale iterators from the list.
779 *
780 * Whenever a list element is examined, it is expunged if either
781 * the GC has determined that the iterator is discarded, or if the
782 * iterator reports that it is "detached" (does not need any
783 * further state updates). Overhead is maximal when takeIndex
784 * never advances, iterators are discarded before they are
785 * exhausted, and all removals are interior removes, in which case
786 * all stale iterators are discovered by the GC. But even in this
787 * case we don't increase the amortized complexity.
788 *
789 * Care must be taken to keep list sweeping methods from
790 * reentrantly invoking another such method, causing subtle
791 * corruption bugs.
792 */
793 class Itrs {
794
795 /**
796 * Node in a linked list of weak iterator references.
797 */
798 private class Node extends WeakReference<Itr> {
799 Node next;
800
801 Node(Itr iterator, Node next) {
802 super(iterator);
803 this.next = next;
804 }
805 }
806
807 /** Incremented whenever takeIndex wraps around to 0 */
808 int cycles;
809
810 /** Linked list of weak iterator references */
811 private Node head;
812
813 /** Used to expunge stale iterators */
814 private Node sweeper;
815
816 private static final int SHORT_SWEEP_PROBES = 4;
817 private static final int LONG_SWEEP_PROBES = 16;
818
819 Itrs(Itr initial) {
820 register(initial);
821 }
822
823 /**
824 * Sweeps itrs, looking for and expunging stale iterators.
825 * If at least one was found, tries harder to find more.
826 * Called only from iterating thread.
827 *
828 * @param tryHarder whether to start in try-harder mode, because
829 * there is known to be at least one iterator to collect
830 */
831 void doSomeSweeping(boolean tryHarder) {
832 // assert lock.isHeldByCurrentThread();
833 // assert head != null;
834 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
835 Node o, p;
836 final Node sweeper = this.sweeper;
837 boolean passedGo; // to limit search to one full sweep
838
839 if (sweeper == null) {
840 o = null;
841 p = head;
842 passedGo = true;
843 } else {
844 o = sweeper;
845 p = o.next;
846 passedGo = false;
847 }
848
849 for (; probes > 0; probes--) {
850 if (p == null) {
851 if (passedGo)
852 break;
853 o = null;
854 p = head;
855 passedGo = true;
856 }
857 final Itr it = p.get();
858 final Node next = p.next;
859 if (it == null || it.isDetached()) {
860 // found a discarded/exhausted iterator
861 probes = LONG_SWEEP_PROBES; // "try harder"
862 // unlink p
863 p.clear();
864 p.next = null;
865 if (o == null) {
866 head = next;
867 if (next == null) {
868 // We've run out of iterators to track; retire
869 itrs = null;
870 return;
871 }
872 }
873 else
874 o.next = next;
875 } else {
876 o = p;
877 }
878 p = next;
879 }
880
881 this.sweeper = (p == null) ? null : o;
882 }
883
884 /**
885 * Adds a new iterator to the linked list of tracked iterators.
886 */
887 void register(Itr itr) {
888 // assert lock.isHeldByCurrentThread();
889 head = new Node(itr, head);
890 }
891
892 /**
893 * Called whenever takeIndex wraps around to 0.
894 *
895 * Notifies all iterators, and expunges any that are now stale.
896 */
897 void takeIndexWrapped() {
898 // assert lock.isHeldByCurrentThread();
899 cycles++;
900 for (Node o = null, p = head; p != null;) {
901 final Itr it = p.get();
902 final Node next = p.next;
903 if (it == null || it.takeIndexWrapped()) {
904 // unlink p
905 // assert it == null || it.isDetached();
906 p.clear();
907 p.next = null;
908 if (o == null)
909 head = next;
910 else
911 o.next = next;
912 } else {
913 o = p;
914 }
915 p = next;
916 }
917 if (head == null) // no more iterators to track
918 itrs = null;
919 }
920
921 /**
922 * Called whenever an interior remove (not at takeIndex) occurred.
923 *
924 * Notifies all iterators, and expunges any that are now stale.
925 */
926 void removedAt(int removedIndex) {
927 for (Node o = null, p = head; p != null;) {
928 final Itr it = p.get();
929 final Node next = p.next;
930 if (it == null || it.removedAt(removedIndex)) {
931 // unlink p
932 // assert it == null || it.isDetached();
933 p.clear();
934 p.next = null;
935 if (o == null)
936 head = next;
937 else
938 o.next = next;
939 } else {
940 o = p;
941 }
942 p = next;
943 }
944 if (head == null) // no more iterators to track
945 itrs = null;
946 }
947
948 /**
949 * Called whenever the queue becomes empty.
950 *
951 * Notifies all active iterators that the queue is empty,
952 * clears all weak refs, and unlinks the itrs datastructure.
953 */
954 void queueIsEmpty() {
955 // assert lock.isHeldByCurrentThread();
956 for (Node p = head; p != null; p = p.next) {
957 Itr it = p.get();
958 if (it != null) {
959 p.clear();
960 it.shutdown();
961 }
962 }
963 head = null;
964 itrs = null;
965 }
966
967 /**
968 * Called whenever an element has been dequeued (at takeIndex).
969 */
970 void elementDequeued() {
971 // assert lock.isHeldByCurrentThread();
972 if (count == 0)
973 queueIsEmpty();
974 else if (takeIndex == 0)
975 takeIndexWrapped();
976 }
977 }
978
979 /**
980 * Iterator for ArrayBlockingQueue.
981 *
982 * To maintain weak consistency with respect to puts and takes, we
983 * read ahead one slot, so as to not report hasNext true but then
984 * not have an element to return.
985 *
986 * We switch into "detached" mode (allowing prompt unlinking from
987 * itrs without help from the GC) when all indices are negative, or
988 * when hasNext returns false for the first time. This allows the
989 * iterator to track concurrent updates completely accurately,
990 * except for the corner case of the user calling Iterator.remove()
991 * after hasNext() returned false. Even in this case, we ensure
992 * that we don't remove the wrong element by keeping track of the
993 * expected element to remove, in lastItem. Yes, we may fail to
994 * remove lastItem from the queue if it moved due to an interleaved
995 * interior remove while in detached mode.
996 *
997 * Method forEachRemaining, added in Java 8, is treated similarly
998 * to hasNext returning false, in that we switch to detached mode,
999 * but we regard it as an even stronger request to "close" this
1000 * iteration, and don't bother supporting subsequent remove().
1001 */
1002 private class Itr implements Iterator<E> {
1003 /** Index to look for new nextItem; NONE at end */
1004 private int cursor;
1005
1006 /** Element to be returned by next call to next(); null if none */
1007 private E nextItem;
1008
1009 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
1010 private int nextIndex;
1011
1012 /** Last element returned; null if none or not detached. */
1013 private E lastItem;
1014
1015 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1016 private int lastRet;
1017
1018 /** Previous value of takeIndex, or DETACHED when detached */
1019 private int prevTakeIndex;
1020
1021 /** Previous value of iters.cycles */
1022 private int prevCycles;
1023
1024 /** Special index value indicating "not available" or "undefined" */
1025 private static final int NONE = -1;
1026
1027 /**
1028 * Special index value indicating "removed elsewhere", that is,
1029 * removed by some operation other than a call to this.remove().
1030 */
1031 private static final int REMOVED = -2;
1032
1033 /** Special value for prevTakeIndex indicating "detached mode" */
1034 private static final int DETACHED = -3;
1035
1036 Itr() {
1037 lastRet = NONE;
1038 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1039 lock.lock();
1040 try {
1041 if (count == 0) {
1042 // assert itrs == null;
1043 cursor = NONE;
1044 nextIndex = NONE;
1045 prevTakeIndex = DETACHED;
1046 } else {
1047 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1048 prevTakeIndex = takeIndex;
1049 nextItem = itemAt(nextIndex = takeIndex);
1050 cursor = incCursor(takeIndex);
1051 if (itrs == null) {
1052 itrs = new Itrs(this);
1053 } else {
1054 itrs.register(this); // in this order
1055 itrs.doSomeSweeping(false);
1056 }
1057 prevCycles = itrs.cycles;
1058 // assert takeIndex >= 0;
1059 // assert prevTakeIndex == takeIndex;
1060 // assert nextIndex >= 0;
1061 // assert nextItem != null;
1062 }
1063 } finally {
1064 lock.unlock();
1065 }
1066 }
1067
1068 boolean isDetached() {
1069 // assert lock.isHeldByCurrentThread();
1070 return prevTakeIndex < 0;
1071 }
1072
1073 private int incCursor(int index) {
1074 // assert lock.isHeldByCurrentThread();
1075 if (++index == items.length) index = 0;
1076 if (index == putIndex) index = NONE;
1077 return index;
1078 }
1079
1080 /**
1081 * Returns true if index is invalidated by the given number of
1082 * dequeues, starting from prevTakeIndex.
1083 */
1084 private boolean invalidated(int index, int prevTakeIndex,
1085 long dequeues, int length) {
1086 if (index < 0)
1087 return false;
1088 int distance = index - prevTakeIndex;
1089 if (distance < 0)
1090 distance += length;
1091 return dequeues > distance;
1092 }
1093
1094 /**
1095 * Adjusts indices to incorporate all dequeues since the last
1096 * operation on this iterator. Call only from iterating thread.
1097 */
1098 private void incorporateDequeues() {
1099 // assert lock.isHeldByCurrentThread();
1100 // assert itrs != null;
1101 // assert !isDetached();
1102 // assert count > 0;
1103
1104 final int cycles = itrs.cycles;
1105 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1106 final int prevCycles = this.prevCycles;
1107 final int prevTakeIndex = this.prevTakeIndex;
1108
1109 if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1110 final int len = items.length;
1111 // how far takeIndex has advanced since the previous
1112 // operation of this iterator
1113 long dequeues = (cycles - prevCycles) * len
1114 + (takeIndex - prevTakeIndex);
1115
1116 // Check indices for invalidation
1117 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1118 lastRet = REMOVED;
1119 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1120 nextIndex = REMOVED;
1121 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1122 cursor = takeIndex;
1123
1124 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1125 detach();
1126 else {
1127 this.prevCycles = cycles;
1128 this.prevTakeIndex = takeIndex;
1129 }
1130 }
1131 }
1132
1133 /**
1134 * Called when itrs should stop tracking this iterator, either
1135 * because there are no more indices to update (cursor < 0 &&
1136 * nextIndex < 0 && lastRet < 0) or as a special exception, when
1137 * lastRet >= 0, because hasNext() is about to return false for the
1138 * first time. Call only from iterating thread.
1139 */
1140 private void detach() {
1141 // Switch to detached mode
1142 // assert lock.isHeldByCurrentThread();
1143 // assert cursor == NONE;
1144 // assert nextIndex < 0;
1145 // assert lastRet < 0 || nextItem == null;
1146 // assert lastRet < 0 ^ lastItem != null;
1147 if (prevTakeIndex >= 0) {
1148 // assert itrs != null;
1149 prevTakeIndex = DETACHED;
1150 // try to unlink from itrs (but not too hard)
1151 itrs.doSomeSweeping(true);
1152 }
1153 }
1154
1155 /**
1156 * For performance reasons, we would like not to acquire a lock in
1157 * hasNext in the common case. To allow for this, we only access
1158 * fields (i.e. nextItem) that are not modified by update operations
1159 * triggered by queue modifications.
1160 */
1161 public boolean hasNext() {
1162 if (nextItem != null)
1163 return true;
1164 noNext();
1165 return false;
1166 }
1167
1168 private void noNext() {
1169 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1170 lock.lock();
1171 try {
1172 // assert cursor == NONE;
1173 // assert nextIndex == NONE;
1174 if (!isDetached()) {
1175 // assert lastRet >= 0;
1176 incorporateDequeues(); // might update lastRet
1177 if (lastRet >= 0) {
1178 lastItem = itemAt(lastRet);
1179 // assert lastItem != null;
1180 detach();
1181 }
1182 }
1183 // assert isDetached();
1184 // assert lastRet < 0 ^ lastItem != null;
1185 } finally {
1186 lock.unlock();
1187 }
1188 }
1189
1190 public E next() {
1191 final E e = nextItem;
1192 if (e == null)
1193 throw new NoSuchElementException();
1194 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1195 lock.lock();
1196 try {
1197 if (!isDetached())
1198 incorporateDequeues();
1199 // assert nextIndex != NONE;
1200 // assert lastItem == null;
1201 lastRet = nextIndex;
1202 final int cursor = this.cursor;
1203 if (cursor >= 0) {
1204 nextItem = itemAt(nextIndex = cursor);
1205 // assert nextItem != null;
1206 this.cursor = incCursor(cursor);
1207 } else {
1208 nextIndex = NONE;
1209 nextItem = null;
1210 if (lastRet == REMOVED) detach();
1211 }
1212 } finally {
1213 lock.unlock();
1214 }
1215 return e;
1216 }
1217
1218 public void forEachRemaining(Consumer<? super E> action) {
1219 Objects.requireNonNull(action);
1220 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1221 lock.lock();
1222 try {
1223 final E e = nextItem;
1224 if (e == null) return;
1225 if (!isDetached())
1226 incorporateDequeues();
1227 action.accept(e);
1228 if (isDetached() || cursor < 0) return;
1229 final Object[] items = ArrayBlockingQueue.this.items;
1230 for (int i = cursor, end = putIndex,
1231 to = (i < end) ? end : items.length;
1232 ; i = 0, to = end) {
1233 for (; i < to; i++)
1234 action.accept(itemAt(items, i));
1235 if (to == end) break;
1236 }
1237 } finally {
1238 // Calling forEachRemaining is a strong hint that this
1239 // iteration is surely over; supporting remove() after
1240 // forEachRemaining() is more trouble than it's worth
1241 cursor = nextIndex = lastRet = NONE;
1242 nextItem = lastItem = null;
1243 detach();
1244 lock.unlock();
1245 }
1246 }
1247
1248 public void remove() {
1249 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1250 lock.lock();
1251 // assert lock.getHoldCount() == 1;
1252 try {
1253 if (!isDetached())
1254 incorporateDequeues(); // might update lastRet or detach
1255 final int lastRet = this.lastRet;
1256 this.lastRet = NONE;
1257 if (lastRet >= 0) {
1258 if (!isDetached())
1259 removeAt(lastRet);
1260 else {
1261 final E lastItem = this.lastItem;
1262 // assert lastItem != null;
1263 this.lastItem = null;
1264 if (itemAt(lastRet) == lastItem)
1265 removeAt(lastRet);
1266 }
1267 } else if (lastRet == NONE)
1268 throw new IllegalStateException();
1269 // else lastRet == REMOVED and the last returned element was
1270 // previously asynchronously removed via an operation other
1271 // than this.remove(), so nothing to do.
1272
1273 if (cursor < 0 && nextIndex < 0)
1274 detach();
1275 } finally {
1276 lock.unlock();
1277 // assert lastRet == NONE;
1278 // assert lastItem == null;
1279 }
1280 }
1281
1282 /**
1283 * Called to notify the iterator that the queue is empty, or that it
1284 * has fallen hopelessly behind, so that it should abandon any
1285 * further iteration, except possibly to return one more element
1286 * from next(), as promised by returning true from hasNext().
1287 */
1288 void shutdown() {
1289 // assert lock.isHeldByCurrentThread();
1290 cursor = NONE;
1291 if (nextIndex >= 0)
1292 nextIndex = REMOVED;
1293 if (lastRet >= 0) {
1294 lastRet = REMOVED;
1295 lastItem = null;
1296 }
1297 prevTakeIndex = DETACHED;
1298 // Don't set nextItem to null because we must continue to be
1299 // able to return it on next().
1300 //
1301 // Caller will unlink from itrs when convenient.
1302 }
1303
1304 private int distance(int index, int prevTakeIndex, int length) {
1305 int distance = index - prevTakeIndex;
1306 if (distance < 0)
1307 distance += length;
1308 return distance;
1309 }
1310
1311 /**
1312 * Called whenever an interior remove (not at takeIndex) occurred.
1313 *
1314 * @return true if this iterator should be unlinked from itrs
1315 */
1316 boolean removedAt(int removedIndex) {
1317 // assert lock.isHeldByCurrentThread();
1318 if (isDetached())
1319 return true;
1320
1321 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1322 final int prevTakeIndex = this.prevTakeIndex;
1323 final int len = items.length;
1324 // distance from prevTakeIndex to removedIndex
1325 final int removedDistance =
1326 len * (itrs.cycles - this.prevCycles
1327 + ((removedIndex < takeIndex) ? 1 : 0))
1328 + (removedIndex - prevTakeIndex);
1329 // assert itrs.cycles - this.prevCycles >= 0;
1330 // assert itrs.cycles - this.prevCycles <= 1;
1331 // assert removedDistance > 0;
1332 // assert removedIndex != takeIndex;
1333 int cursor = this.cursor;
1334 if (cursor >= 0) {
1335 int x = distance(cursor, prevTakeIndex, len);
1336 if (x == removedDistance) {
1337 if (cursor == putIndex)
1338 this.cursor = cursor = NONE;
1339 }
1340 else if (x > removedDistance) {
1341 // assert cursor != prevTakeIndex;
1342 this.cursor = cursor = dec(cursor, len);
1343 }
1344 }
1345 int lastRet = this.lastRet;
1346 if (lastRet >= 0) {
1347 int x = distance(lastRet, prevTakeIndex, len);
1348 if (x == removedDistance)
1349 this.lastRet = lastRet = REMOVED;
1350 else if (x > removedDistance)
1351 this.lastRet = lastRet = dec(lastRet, len);
1352 }
1353 int nextIndex = this.nextIndex;
1354 if (nextIndex >= 0) {
1355 int x = distance(nextIndex, prevTakeIndex, len);
1356 if (x == removedDistance)
1357 this.nextIndex = nextIndex = REMOVED;
1358 else if (x > removedDistance)
1359 this.nextIndex = nextIndex = dec(nextIndex, len);
1360 }
1361 if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1362 this.prevTakeIndex = DETACHED;
1363 return true;
1364 }
1365 return false;
1366 }
1367
1368 /**
1369 * Called whenever takeIndex wraps around to zero.
1370 *
1371 * @return true if this iterator should be unlinked from itrs
1372 */
1373 boolean takeIndexWrapped() {
1374 // assert lock.isHeldByCurrentThread();
1375 if (isDetached())
1376 return true;
1377 if (itrs.cycles - prevCycles > 1) {
1378 // All the elements that existed at the time of the last
1379 // operation are gone, so abandon further iteration.
1380 shutdown();
1381 return true;
1382 }
1383 return false;
1384 }
1385
1386 // /** Uncomment for debugging. */
1387 // public String toString() {
1388 // return ("cursor=" + cursor + " " +
1389 // "nextIndex=" + nextIndex + " " +
1390 // "lastRet=" + lastRet + " " +
1391 // "nextItem=" + nextItem + " " +
1392 // "lastItem=" + lastItem + " " +
1393 // "prevCycles=" + prevCycles + " " +
1394 // "prevTakeIndex=" + prevTakeIndex + " " +
1395 // "size()=" + size() + " " +
1396 // "remainingCapacity()=" + remainingCapacity());
1397 // }
1398 }
1399
1400 /**
1401 * Returns a {@link Spliterator} over the elements in this queue.
1402 *
1403 * <p>The returned spliterator is
1404 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1405 *
1406 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1407 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1408 *
1409 * @implNote
1410 * The {@code Spliterator} implements {@code trySplit} to permit limited
1411 * parallelism.
1412 *
1413 * @return a {@code Spliterator} over the elements in this queue
1414 * @since 1.8
1415 */
1416 public Spliterator<E> spliterator() {
1417 return Spliterators.spliterator
1418 (this, (Spliterator.ORDERED |
1419 Spliterator.NONNULL |
1420 Spliterator.CONCURRENT));
1421 }
1422
1423 /**
1424 * @throws NullPointerException {@inheritDoc}
1425 */
1426 public void forEach(Consumer<? super E> action) {
1427 Objects.requireNonNull(action);
1428 final ReentrantLock lock = this.lock;
1429 lock.lock();
1430 try {
1431 if (count > 0) {
1432 final Object[] items = this.items;
1433 for (int i = takeIndex, end = putIndex,
1434 to = (i < end) ? end : items.length;
1435 ; i = 0, to = end) {
1436 for (; i < to; i++)
1437 action.accept(itemAt(items, i));
1438 if (to == end) break;
1439 }
1440 }
1441 } finally {
1442 // checkInvariants();
1443 lock.unlock();
1444 }
1445 }
1446
1447 /**
1448 * @throws NullPointerException {@inheritDoc}
1449 */
1450 public boolean removeIf(Predicate<? super E> filter) {
1451 Objects.requireNonNull(filter);
1452 return bulkRemove(filter);
1453 }
1454
1455 /**
1456 * @throws NullPointerException {@inheritDoc}
1457 */
1458 public boolean removeAll(Collection<?> c) {
1459 Objects.requireNonNull(c);
1460 return bulkRemove(e -> c.contains(e));
1461 }
1462
1463 /**
1464 * @throws NullPointerException {@inheritDoc}
1465 */
1466 public boolean retainAll(Collection<?> c) {
1467 Objects.requireNonNull(c);
1468 return bulkRemove(e -> !c.contains(e));
1469 }
1470
1471 /** Implementation of bulk remove methods. */
1472 private boolean bulkRemove(Predicate<? super E> filter) {
1473 final ReentrantLock lock = this.lock;
1474 lock.lock();
1475 try {
1476 if (itrs == null) { // check for active iterators
1477 if (count > 0) {
1478 final Object[] items = this.items;
1479 // Optimize for initial run of survivors
1480 for (int i = takeIndex, end = putIndex,
1481 to = (i < end) ? end : items.length;
1482 ; i = 0, to = end) {
1483 for (; i < to; i++)
1484 if (filter.test(itemAt(items, i)))
1485 return bulkRemoveModified(filter, i);
1486 if (to == end) break;
1487 }
1488 }
1489 return false;
1490 }
1491 } finally {
1492 // checkInvariants();
1493 lock.unlock();
1494 }
1495 // Active iterators are too hairy!
1496 // Punting (for now) to the slow n^2 algorithm ...
1497 return super.removeIf(filter);
1498 }
1499
1500 // A tiny bit set implementation
1501
1502 private static long[] nBits(int n) {
1503 return new long[((n - 1) >> 6) + 1];
1504 }
1505 private static void setBit(long[] bits, int i) {
1506 bits[i >> 6] |= 1L << i;
1507 }
1508 private static boolean isClear(long[] bits, int i) {
1509 return (bits[i >> 6] & (1L << i)) == 0;
1510 }
1511
1512 /**
1513 * Returns circular distance from i to j, disambiguating i == j to
1514 * items.length; never returns 0.
1515 */
1516 private int distanceNonEmpty(int i, int j) {
1517 if ((j -= i) <= 0) j += items.length;
1518 return j;
1519 }
1520
1521 /**
1522 * Helper for bulkRemove, in case of at least one deletion.
1523 * Tolerate predicates that reentrantly access the collection for
1524 * read (but not write), so traverse once to find elements to
1525 * delete, a second pass to physically expunge.
1526 *
1527 * @param beg valid index of first element to be deleted
1528 */
1529 private boolean bulkRemoveModified(
1530 Predicate<? super E> filter, final int beg) {
1531 final Object[] es = items;
1532 final int capacity = items.length;
1533 final int end = putIndex;
1534 final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
1535 deathRow[0] = 1L; // set bit 0
1536 for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1537 ; i = 0, to = end, k -= capacity) {
1538 for (; i < to; i++)
1539 if (filter.test(itemAt(es, i)))
1540 setBit(deathRow, i - k);
1541 if (to == end) break;
1542 }
1543 // a two-finger traversal, with hare i reading, tortoise w writing
1544 int w = beg;
1545 for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1546 ; w = 0) { // w rejoins i on second leg
1547 // In this loop, i and w are on the same leg, with i > w
1548 for (; i < to; i++)
1549 if (isClear(deathRow, i - k))
1550 es[w++] = es[i];
1551 if (to == end) break;
1552 // In this loop, w is on the first leg, i on the second
1553 for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
1554 if (isClear(deathRow, i - k))
1555 es[w++] = es[i];
1556 if (i >= to) {
1557 if (w == capacity) w = 0; // "corner" case
1558 break;
1559 }
1560 }
1561 count -= distanceNonEmpty(w, end);
1562 circularClear(es, putIndex = w, end);
1563 // checkInvariants();
1564 return true;
1565 }
1566
1567 /** debugging */
1568 void checkInvariants() {
1569 // meta-assertions
1570 // assert lock.isHeldByCurrentThread();
1571 try {
1572 // Unlike ArrayDeque, we have a count field but no spare slot.
1573 // We prefer ArrayDeque's strategy (and the names of its fields!),
1574 // but our field layout is baked into the serial form, and so is
1575 // too annoying to change.
1576 //
1577 // putIndex == takeIndex must be disambiguated by checking count.
1578 int capacity = items.length;
1579 // assert capacity > 0;
1580 // assert takeIndex >= 0 && takeIndex < capacity;
1581 // assert putIndex >= 0 && putIndex < capacity;
1582 // assert count <= capacity;
1583 // assert takeIndex == putIndex || items[takeIndex] != null;
1584 // assert count == capacity || items[putIndex] == null;
1585 // assert takeIndex == putIndex || items[dec(putIndex, capacity)] != null;
1586 } catch (Throwable t) {
1587 System.err.printf("takeIndex=%d putIndex=%d count=%d capacity=%d%n",
1588 takeIndex, putIndex, count, items.length);
1589 System.err.printf("items=%s%n",
1590 Arrays.toString(items));
1591 throw t;
1592 }
1593 }
1594
1595 }