ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.138
Committed: Fri Nov 18 03:20:35 2016 UTC (7 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.137: +9 -0 lines
Log Message:
explain relationship with ArrayDeque

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.ref.WeakReference;
10 import java.util.AbstractQueue;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.Objects;
16 import java.util.Spliterator;
17 import java.util.Spliterators;
18 import java.util.concurrent.locks.Condition;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.Consumer;
21 import java.util.function.Predicate;
22
23 /**
24 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
25 * array. This queue orders elements FIFO (first-in-first-out). The
26 * <em>head</em> of the queue is that element that has been on the
27 * queue the longest time. The <em>tail</em> of the queue is that
28 * element that has been on the queue the shortest time. New elements
29 * are inserted at the tail of the queue, and the queue retrieval
30 * operations obtain elements at the head of the queue.
31 *
32 * <p>This is a classic &quot;bounded buffer&quot;, in which a
33 * fixed-sized array holds elements inserted by producers and
34 * extracted by consumers. Once created, the capacity cannot be
35 * changed. Attempts to {@code put} an element into a full queue
36 * will result in the operation blocking; attempts to {@code take} an
37 * element from an empty queue will similarly block.
38 *
39 * <p>This class supports an optional fairness policy for ordering
40 * waiting producer and consumer threads. By default, this ordering
41 * is not guaranteed. However, a queue constructed with fairness set
42 * to {@code true} grants threads access in FIFO order. Fairness
43 * generally decreases throughput but reduces variability and avoids
44 * starvation.
45 *
46 * <p>This class and its iterator implement all of the
47 * <em>optional</em> methods of the {@link Collection} and {@link
48 * Iterator} interfaces.
49 *
50 * <p>This class is a member of the
51 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
52 * Java Collections Framework</a>.
53 *
54 * @since 1.5
55 * @author Doug Lea
56 * @param <E> the type of elements held in this queue
57 */
58 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
59 implements BlockingQueue<E>, java.io.Serializable {
60
61 /*
62 * Much of the implementation mechanics, especially the unusual
63 * nested loops, are shared and co-maintained with ArrayDeque.
64 */
65
66 /**
67 * Serialization ID. This class relies on default serialization
68 * even for the items array, which is default-serialized, even if
69 * it is empty. Otherwise it could not be declared final, which is
70 * necessary here.
71 */
72 private static final long serialVersionUID = -817911632652898426L;
73
74 /** The queued items */
75 final Object[] items;
76
77 /** items index for next take, poll, peek or remove */
78 int takeIndex;
79
80 /** items index for next put, offer, or add */
81 int putIndex;
82
83 /** Number of elements in the queue */
84 int count;
85
86 /*
87 * Concurrency control uses the classic two-condition algorithm
88 * found in any textbook.
89 */
90
91 /** Main lock guarding all access */
92 final ReentrantLock lock;
93
94 /** Condition for waiting takes */
95 private final Condition notEmpty;
96
97 /** Condition for waiting puts */
98 private final Condition notFull;
99
100 /**
101 * Shared state for currently active iterators, or null if there
102 * are known not to be any. Allows queue operations to update
103 * iterator state.
104 */
105 transient Itrs itrs;
106
107 // Internal helper methods
108
109 /**
110 * Increments i, mod modulus.
111 * Precondition and postcondition: 0 <= i < modulus.
112 */
113 static final int inc(int i, int modulus) {
114 if (++i >= modulus) i = 0;
115 return i;
116 }
117
118 /**
119 * Decrements i, mod modulus.
120 * Precondition and postcondition: 0 <= i < modulus.
121 */
122 static final int dec(int i, int modulus) {
123 if (--i < 0) i = modulus - 1;
124 return i;
125 }
126
127 /**
128 * Returns item at index i.
129 */
130 @SuppressWarnings("unchecked")
131 final E itemAt(int i) {
132 return (E) items[i];
133 }
134
135 /**
136 * Returns element at array index i.
137 * This is a slight abuse of generics, accepted by javac.
138 */
139 @SuppressWarnings("unchecked")
140 static <E> E itemAt(Object[] items, int i) {
141 return (E) items[i];
142 }
143
144 /**
145 * Inserts element at current put position, advances, and signals.
146 * Call only when holding lock.
147 */
148 private void enqueue(E x) {
149 // assert lock.isHeldByCurrentThread();
150 // assert lock.getHoldCount() == 1;
151 // assert items[putIndex] == null;
152 final Object[] items = this.items;
153 items[putIndex] = x;
154 if (++putIndex == items.length) putIndex = 0;
155 count++;
156 notEmpty.signal();
157 // checkInvariants();
158 }
159
160 /**
161 * Extracts element at current take position, advances, and signals.
162 * Call only when holding lock.
163 */
164 private E dequeue() {
165 // assert lock.isHeldByCurrentThread();
166 // assert lock.getHoldCount() == 1;
167 // assert items[takeIndex] != null;
168 final Object[] items = this.items;
169 @SuppressWarnings("unchecked")
170 E x = (E) items[takeIndex];
171 items[takeIndex] = null;
172 if (++takeIndex == items.length) takeIndex = 0;
173 count--;
174 if (itrs != null)
175 itrs.elementDequeued();
176 notFull.signal();
177 // checkInvariants();
178 return x;
179 }
180
181 /**
182 * Deletes item at array index removeIndex.
183 * Utility for remove(Object) and iterator.remove.
184 * Call only when holding lock.
185 */
186 void removeAt(final int removeIndex) {
187 // assert lock.isHeldByCurrentThread();
188 // assert lock.getHoldCount() == 1;
189 // assert items[removeIndex] != null;
190 // assert removeIndex >= 0 && removeIndex < items.length;
191 final Object[] items = this.items;
192 if (removeIndex == takeIndex) {
193 // removing front item; just advance
194 items[takeIndex] = null;
195 if (++takeIndex == items.length) takeIndex = 0;
196 count--;
197 if (itrs != null)
198 itrs.elementDequeued();
199 } else {
200 // an "interior" remove
201
202 // slide over all others up through putIndex.
203 for (int i = removeIndex, putIndex = this.putIndex;;) {
204 int pred = i;
205 if (++i == items.length) i = 0;
206 if (i == putIndex) {
207 items[pred] = null;
208 this.putIndex = pred;
209 break;
210 }
211 items[pred] = items[i];
212 }
213 count--;
214 if (itrs != null)
215 itrs.removedAt(removeIndex);
216 }
217 notFull.signal();
218 // checkInvariants();
219 }
220
221 /**
222 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
223 * capacity and default access policy.
224 *
225 * @param capacity the capacity of this queue
226 * @throws IllegalArgumentException if {@code capacity < 1}
227 */
228 public ArrayBlockingQueue(int capacity) {
229 this(capacity, false);
230 }
231
232 /**
233 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
234 * capacity and the specified access policy.
235 *
236 * @param capacity the capacity of this queue
237 * @param fair if {@code true} then queue accesses for threads blocked
238 * on insertion or removal, are processed in FIFO order;
239 * if {@code false} the access order is unspecified.
240 * @throws IllegalArgumentException if {@code capacity < 1}
241 */
242 public ArrayBlockingQueue(int capacity, boolean fair) {
243 if (capacity <= 0)
244 throw new IllegalArgumentException();
245 this.items = new Object[capacity];
246 lock = new ReentrantLock(fair);
247 notEmpty = lock.newCondition();
248 notFull = lock.newCondition();
249 }
250
251 /**
252 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
253 * capacity, the specified access policy and initially containing the
254 * elements of the given collection,
255 * added in traversal order of the collection's iterator.
256 *
257 * @param capacity the capacity of this queue
258 * @param fair if {@code true} then queue accesses for threads blocked
259 * on insertion or removal, are processed in FIFO order;
260 * if {@code false} the access order is unspecified.
261 * @param c the collection of elements to initially contain
262 * @throws IllegalArgumentException if {@code capacity} is less than
263 * {@code c.size()}, or less than 1.
264 * @throws NullPointerException if the specified collection or any
265 * of its elements are null
266 */
267 public ArrayBlockingQueue(int capacity, boolean fair,
268 Collection<? extends E> c) {
269 this(capacity, fair);
270
271 final ReentrantLock lock = this.lock;
272 lock.lock(); // Lock only for visibility, not mutual exclusion
273 try {
274 final Object[] items = this.items;
275 int i = 0;
276 try {
277 for (E e : c)
278 items[i++] = Objects.requireNonNull(e);
279 } catch (ArrayIndexOutOfBoundsException ex) {
280 throw new IllegalArgumentException();
281 }
282 count = i;
283 putIndex = (i == capacity) ? 0 : i;
284 // checkInvariants();
285 } finally {
286 lock.unlock();
287 }
288 }
289
290 /**
291 * Inserts the specified element at the tail of this queue if it is
292 * possible to do so immediately without exceeding the queue's capacity,
293 * returning {@code true} upon success and throwing an
294 * {@code IllegalStateException} if this queue is full.
295 *
296 * @param e the element to add
297 * @return {@code true} (as specified by {@link Collection#add})
298 * @throws IllegalStateException if this queue is full
299 * @throws NullPointerException if the specified element is null
300 */
301 public boolean add(E e) {
302 return super.add(e);
303 }
304
305 /**
306 * Inserts the specified element at the tail of this queue if it is
307 * possible to do so immediately without exceeding the queue's capacity,
308 * returning {@code true} upon success and {@code false} if this queue
309 * is full. This method is generally preferable to method {@link #add},
310 * which can fail to insert an element only by throwing an exception.
311 *
312 * @throws NullPointerException if the specified element is null
313 */
314 public boolean offer(E e) {
315 Objects.requireNonNull(e);
316 final ReentrantLock lock = this.lock;
317 lock.lock();
318 try {
319 if (count == items.length)
320 return false;
321 else {
322 enqueue(e);
323 return true;
324 }
325 } finally {
326 lock.unlock();
327 }
328 }
329
330 /**
331 * Inserts the specified element at the tail of this queue, waiting
332 * for space to become available if the queue is full.
333 *
334 * @throws InterruptedException {@inheritDoc}
335 * @throws NullPointerException {@inheritDoc}
336 */
337 public void put(E e) throws InterruptedException {
338 Objects.requireNonNull(e);
339 final ReentrantLock lock = this.lock;
340 lock.lockInterruptibly();
341 try {
342 while (count == items.length)
343 notFull.await();
344 enqueue(e);
345 } finally {
346 lock.unlock();
347 }
348 }
349
350 /**
351 * Inserts the specified element at the tail of this queue, waiting
352 * up to the specified wait time for space to become available if
353 * the queue is full.
354 *
355 * @throws InterruptedException {@inheritDoc}
356 * @throws NullPointerException {@inheritDoc}
357 */
358 public boolean offer(E e, long timeout, TimeUnit unit)
359 throws InterruptedException {
360
361 Objects.requireNonNull(e);
362 long nanos = unit.toNanos(timeout);
363 final ReentrantLock lock = this.lock;
364 lock.lockInterruptibly();
365 try {
366 while (count == items.length) {
367 if (nanos <= 0L)
368 return false;
369 nanos = notFull.awaitNanos(nanos);
370 }
371 enqueue(e);
372 return true;
373 } finally {
374 // checkInvariants();
375 lock.unlock();
376 }
377 }
378
379 public E poll() {
380 final ReentrantLock lock = this.lock;
381 lock.lock();
382 try {
383 return (count == 0) ? null : dequeue();
384 } finally {
385 lock.unlock();
386 }
387 }
388
389 public E take() throws InterruptedException {
390 final ReentrantLock lock = this.lock;
391 lock.lockInterruptibly();
392 try {
393 while (count == 0)
394 notEmpty.await();
395 return dequeue();
396 } finally {
397 lock.unlock();
398 }
399 }
400
401 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
402 long nanos = unit.toNanos(timeout);
403 final ReentrantLock lock = this.lock;
404 lock.lockInterruptibly();
405 try {
406 while (count == 0) {
407 if (nanos <= 0L)
408 return null;
409 nanos = notEmpty.awaitNanos(nanos);
410 }
411 return dequeue();
412 } finally {
413 // checkInvariants();
414 lock.unlock();
415 }
416 }
417
418 public E peek() {
419 final ReentrantLock lock = this.lock;
420 lock.lock();
421 try {
422 return itemAt(takeIndex); // null when queue is empty
423 } finally {
424 lock.unlock();
425 }
426 }
427
428 // this doc comment is overridden to remove the reference to collections
429 // greater in size than Integer.MAX_VALUE
430 /**
431 * Returns the number of elements in this queue.
432 *
433 * @return the number of elements in this queue
434 */
435 public int size() {
436 final ReentrantLock lock = this.lock;
437 lock.lock();
438 try {
439 return count;
440 } finally {
441 lock.unlock();
442 }
443 }
444
445 // this doc comment is a modified copy of the inherited doc comment,
446 // without the reference to unlimited queues.
447 /**
448 * Returns the number of additional elements that this queue can ideally
449 * (in the absence of memory or resource constraints) accept without
450 * blocking. This is always equal to the initial capacity of this queue
451 * less the current {@code size} of this queue.
452 *
453 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
454 * an element will succeed by inspecting {@code remainingCapacity}
455 * because it may be the case that another thread is about to
456 * insert or remove an element.
457 */
458 public int remainingCapacity() {
459 final ReentrantLock lock = this.lock;
460 lock.lock();
461 try {
462 return items.length - count;
463 } finally {
464 lock.unlock();
465 }
466 }
467
468 /**
469 * Removes a single instance of the specified element from this queue,
470 * if it is present. More formally, removes an element {@code e} such
471 * that {@code o.equals(e)}, if this queue contains one or more such
472 * elements.
473 * Returns {@code true} if this queue contained the specified element
474 * (or equivalently, if this queue changed as a result of the call).
475 *
476 * <p>Removal of interior elements in circular array based queues
477 * is an intrinsically slow and disruptive operation, so should
478 * be undertaken only in exceptional circumstances, ideally
479 * only when the queue is known not to be accessible by other
480 * threads.
481 *
482 * @param o element to be removed from this queue, if present
483 * @return {@code true} if this queue changed as a result of the call
484 */
485 public boolean remove(Object o) {
486 if (o == null) return false;
487 final ReentrantLock lock = this.lock;
488 lock.lock();
489 try {
490 if (count > 0) {
491 final Object[] items = this.items;
492 for (int i = takeIndex, end = putIndex,
493 to = (i < end) ? end : items.length;
494 ; i = 0, to = end) {
495 for (; i < to; i++)
496 if (o.equals(items[i])) {
497 removeAt(i);
498 return true;
499 }
500 if (to == end) break;
501 }
502 }
503 return false;
504 } finally {
505 // checkInvariants();
506 lock.unlock();
507 }
508 }
509
510 /**
511 * Returns {@code true} if this queue contains the specified element.
512 * More formally, returns {@code true} if and only if this queue contains
513 * at least one element {@code e} such that {@code o.equals(e)}.
514 *
515 * @param o object to be checked for containment in this queue
516 * @return {@code true} if this queue contains the specified element
517 */
518 public boolean contains(Object o) {
519 if (o == null) return false;
520 final ReentrantLock lock = this.lock;
521 lock.lock();
522 try {
523 if (count > 0) {
524 final Object[] items = this.items;
525 for (int i = takeIndex, end = putIndex,
526 to = (i < end) ? end : items.length;
527 ; i = 0, to = end) {
528 for (; i < to; i++)
529 if (o.equals(items[i]))
530 return true;
531 if (to == end) break;
532 }
533 }
534 return false;
535 } finally {
536 // checkInvariants();
537 lock.unlock();
538 }
539 }
540
541 /**
542 * Returns an array containing all of the elements in this queue, in
543 * proper sequence.
544 *
545 * <p>The returned array will be "safe" in that no references to it are
546 * maintained by this queue. (In other words, this method must allocate
547 * a new array). The caller is thus free to modify the returned array.
548 *
549 * <p>This method acts as bridge between array-based and collection-based
550 * APIs.
551 *
552 * @return an array containing all of the elements in this queue
553 */
554 public Object[] toArray() {
555 final ReentrantLock lock = this.lock;
556 lock.lock();
557 try {
558 final Object[] items = this.items;
559 final int end = takeIndex + count;
560 final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
561 if (end != putIndex)
562 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
563 return a;
564 } finally {
565 lock.unlock();
566 }
567 }
568
569 /**
570 * Returns an array containing all of the elements in this queue, in
571 * proper sequence; the runtime type of the returned array is that of
572 * the specified array. If the queue fits in the specified array, it
573 * is returned therein. Otherwise, a new array is allocated with the
574 * runtime type of the specified array and the size of this queue.
575 *
576 * <p>If this queue fits in the specified array with room to spare
577 * (i.e., the array has more elements than this queue), the element in
578 * the array immediately following the end of the queue is set to
579 * {@code null}.
580 *
581 * <p>Like the {@link #toArray()} method, this method acts as bridge between
582 * array-based and collection-based APIs. Further, this method allows
583 * precise control over the runtime type of the output array, and may,
584 * under certain circumstances, be used to save allocation costs.
585 *
586 * <p>Suppose {@code x} is a queue known to contain only strings.
587 * The following code can be used to dump the queue into a newly
588 * allocated array of {@code String}:
589 *
590 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
591 *
592 * Note that {@code toArray(new Object[0])} is identical in function to
593 * {@code toArray()}.
594 *
595 * @param a the array into which the elements of the queue are to
596 * be stored, if it is big enough; otherwise, a new array of the
597 * same runtime type is allocated for this purpose
598 * @return an array containing all of the elements in this queue
599 * @throws ArrayStoreException if the runtime type of the specified array
600 * is not a supertype of the runtime type of every element in
601 * this queue
602 * @throws NullPointerException if the specified array is null
603 */
604 @SuppressWarnings("unchecked")
605 public <T> T[] toArray(T[] a) {
606 final ReentrantLock lock = this.lock;
607 lock.lock();
608 try {
609 final Object[] items = this.items;
610 final int count = this.count;
611 final int firstLeg = Math.min(items.length - takeIndex, count);
612 if (a.length < count) {
613 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
614 a.getClass());
615 } else {
616 System.arraycopy(items, takeIndex, a, 0, firstLeg);
617 if (a.length > count)
618 a[count] = null;
619 }
620 if (firstLeg < count)
621 System.arraycopy(items, 0, a, firstLeg, putIndex);
622 return a;
623 } finally {
624 lock.unlock();
625 }
626 }
627
628 public String toString() {
629 return Helpers.collectionToString(this);
630 }
631
632 /**
633 * Atomically removes all of the elements from this queue.
634 * The queue will be empty after this call returns.
635 */
636 public void clear() {
637 final ReentrantLock lock = this.lock;
638 lock.lock();
639 try {
640 int k;
641 if ((k = count) > 0) {
642 circularClear(items, takeIndex, putIndex);
643 takeIndex = putIndex;
644 count = 0;
645 if (itrs != null)
646 itrs.queueIsEmpty();
647 for (; k > 0 && lock.hasWaiters(notFull); k--)
648 notFull.signal();
649 }
650 } finally {
651 // checkInvariants();
652 lock.unlock();
653 }
654 }
655
656 /**
657 * Nulls out slots starting at array index i, upto index end.
658 * If i == end, the entire array is cleared!
659 */
660 private static void circularClear(Object[] items, int i, int end) {
661 for (int to = (i < end) ? end : items.length;
662 ; i = 0, to = end) {
663 Arrays.fill(items, i, to, null);
664 if (to == end) break;
665 }
666 }
667
668 /**
669 * @throws UnsupportedOperationException {@inheritDoc}
670 * @throws ClassCastException {@inheritDoc}
671 * @throws NullPointerException {@inheritDoc}
672 * @throws IllegalArgumentException {@inheritDoc}
673 */
674 public int drainTo(Collection<? super E> c) {
675 return drainTo(c, Integer.MAX_VALUE);
676 }
677
678 /**
679 * @throws UnsupportedOperationException {@inheritDoc}
680 * @throws ClassCastException {@inheritDoc}
681 * @throws NullPointerException {@inheritDoc}
682 * @throws IllegalArgumentException {@inheritDoc}
683 */
684 public int drainTo(Collection<? super E> c, int maxElements) {
685 Objects.requireNonNull(c);
686 if (c == this)
687 throw new IllegalArgumentException();
688 if (maxElements <= 0)
689 return 0;
690 final Object[] items = this.items;
691 final ReentrantLock lock = this.lock;
692 lock.lock();
693 try {
694 int n = Math.min(maxElements, count);
695 int take = takeIndex;
696 int i = 0;
697 try {
698 while (i < n) {
699 @SuppressWarnings("unchecked")
700 E x = (E) items[take];
701 c.add(x);
702 items[take] = null;
703 if (++take == items.length) take = 0;
704 i++;
705 }
706 return n;
707 } finally {
708 // Restore invariants even if c.add() threw
709 if (i > 0) {
710 count -= i;
711 takeIndex = take;
712 if (itrs != null) {
713 if (count == 0)
714 itrs.queueIsEmpty();
715 else if (i > take)
716 itrs.takeIndexWrapped();
717 }
718 for (; i > 0 && lock.hasWaiters(notFull); i--)
719 notFull.signal();
720 }
721 }
722 } finally {
723 // checkInvariants();
724 lock.unlock();
725 }
726 }
727
728 /**
729 * Returns an iterator over the elements in this queue in proper sequence.
730 * The elements will be returned in order from first (head) to last (tail).
731 *
732 * <p>The returned iterator is
733 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
734 *
735 * @return an iterator over the elements in this queue in proper sequence
736 */
737 public Iterator<E> iterator() {
738 return new Itr();
739 }
740
741 /**
742 * Shared data between iterators and their queue, allowing queue
743 * modifications to update iterators when elements are removed.
744 *
745 * This adds a lot of complexity for the sake of correctly
746 * handling some uncommon operations, but the combination of
747 * circular-arrays and supporting interior removes (i.e., those
748 * not at head) would cause iterators to sometimes lose their
749 * places and/or (re)report elements they shouldn't. To avoid
750 * this, when a queue has one or more iterators, it keeps iterator
751 * state consistent by:
752 *
753 * (1) keeping track of the number of "cycles", that is, the
754 * number of times takeIndex has wrapped around to 0.
755 * (2) notifying all iterators via the callback removedAt whenever
756 * an interior element is removed (and thus other elements may
757 * be shifted).
758 *
759 * These suffice to eliminate iterator inconsistencies, but
760 * unfortunately add the secondary responsibility of maintaining
761 * the list of iterators. We track all active iterators in a
762 * simple linked list (accessed only when the queue's lock is
763 * held) of weak references to Itr. The list is cleaned up using
764 * 3 different mechanisms:
765 *
766 * (1) Whenever a new iterator is created, do some O(1) checking for
767 * stale list elements.
768 *
769 * (2) Whenever takeIndex wraps around to 0, check for iterators
770 * that have been unused for more than one wrap-around cycle.
771 *
772 * (3) Whenever the queue becomes empty, all iterators are notified
773 * and this entire data structure is discarded.
774 *
775 * So in addition to the removedAt callback that is necessary for
776 * correctness, iterators have the shutdown and takeIndexWrapped
777 * callbacks that help remove stale iterators from the list.
778 *
779 * Whenever a list element is examined, it is expunged if either
780 * the GC has determined that the iterator is discarded, or if the
781 * iterator reports that it is "detached" (does not need any
782 * further state updates). Overhead is maximal when takeIndex
783 * never advances, iterators are discarded before they are
784 * exhausted, and all removals are interior removes, in which case
785 * all stale iterators are discovered by the GC. But even in this
786 * case we don't increase the amortized complexity.
787 *
788 * Care must be taken to keep list sweeping methods from
789 * reentrantly invoking another such method, causing subtle
790 * corruption bugs.
791 */
792 class Itrs {
793
794 /**
795 * Node in a linked list of weak iterator references.
796 */
797 private class Node extends WeakReference<Itr> {
798 Node next;
799
800 Node(Itr iterator, Node next) {
801 super(iterator);
802 this.next = next;
803 }
804 }
805
806 /** Incremented whenever takeIndex wraps around to 0 */
807 int cycles;
808
809 /** Linked list of weak iterator references */
810 private Node head;
811
812 /** Used to expunge stale iterators */
813 private Node sweeper;
814
815 private static final int SHORT_SWEEP_PROBES = 4;
816 private static final int LONG_SWEEP_PROBES = 16;
817
818 Itrs(Itr initial) {
819 register(initial);
820 }
821
822 /**
823 * Sweeps itrs, looking for and expunging stale iterators.
824 * If at least one was found, tries harder to find more.
825 * Called only from iterating thread.
826 *
827 * @param tryHarder whether to start in try-harder mode, because
828 * there is known to be at least one iterator to collect
829 */
830 void doSomeSweeping(boolean tryHarder) {
831 // assert lock.isHeldByCurrentThread();
832 // assert head != null;
833 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
834 Node o, p;
835 final Node sweeper = this.sweeper;
836 boolean passedGo; // to limit search to one full sweep
837
838 if (sweeper == null) {
839 o = null;
840 p = head;
841 passedGo = true;
842 } else {
843 o = sweeper;
844 p = o.next;
845 passedGo = false;
846 }
847
848 for (; probes > 0; probes--) {
849 if (p == null) {
850 if (passedGo)
851 break;
852 o = null;
853 p = head;
854 passedGo = true;
855 }
856 final Itr it = p.get();
857 final Node next = p.next;
858 if (it == null || it.isDetached()) {
859 // found a discarded/exhausted iterator
860 probes = LONG_SWEEP_PROBES; // "try harder"
861 // unlink p
862 p.clear();
863 p.next = null;
864 if (o == null) {
865 head = next;
866 if (next == null) {
867 // We've run out of iterators to track; retire
868 itrs = null;
869 return;
870 }
871 }
872 else
873 o.next = next;
874 } else {
875 o = p;
876 }
877 p = next;
878 }
879
880 this.sweeper = (p == null) ? null : o;
881 }
882
883 /**
884 * Adds a new iterator to the linked list of tracked iterators.
885 */
886 void register(Itr itr) {
887 // assert lock.isHeldByCurrentThread();
888 head = new Node(itr, head);
889 }
890
891 /**
892 * Called whenever takeIndex wraps around to 0.
893 *
894 * Notifies all iterators, and expunges any that are now stale.
895 */
896 void takeIndexWrapped() {
897 // assert lock.isHeldByCurrentThread();
898 cycles++;
899 for (Node o = null, p = head; p != null;) {
900 final Itr it = p.get();
901 final Node next = p.next;
902 if (it == null || it.takeIndexWrapped()) {
903 // unlink p
904 // assert it == null || it.isDetached();
905 p.clear();
906 p.next = null;
907 if (o == null)
908 head = next;
909 else
910 o.next = next;
911 } else {
912 o = p;
913 }
914 p = next;
915 }
916 if (head == null) // no more iterators to track
917 itrs = null;
918 }
919
920 /**
921 * Called whenever an interior remove (not at takeIndex) occurred.
922 *
923 * Notifies all iterators, and expunges any that are now stale.
924 */
925 void removedAt(int removedIndex) {
926 for (Node o = null, p = head; p != null;) {
927 final Itr it = p.get();
928 final Node next = p.next;
929 if (it == null || it.removedAt(removedIndex)) {
930 // unlink p
931 // assert it == null || it.isDetached();
932 p.clear();
933 p.next = null;
934 if (o == null)
935 head = next;
936 else
937 o.next = next;
938 } else {
939 o = p;
940 }
941 p = next;
942 }
943 if (head == null) // no more iterators to track
944 itrs = null;
945 }
946
947 /**
948 * Called whenever the queue becomes empty.
949 *
950 * Notifies all active iterators that the queue is empty,
951 * clears all weak refs, and unlinks the itrs datastructure.
952 */
953 void queueIsEmpty() {
954 // assert lock.isHeldByCurrentThread();
955 for (Node p = head; p != null; p = p.next) {
956 Itr it = p.get();
957 if (it != null) {
958 p.clear();
959 it.shutdown();
960 }
961 }
962 head = null;
963 itrs = null;
964 }
965
966 /**
967 * Called whenever an element has been dequeued (at takeIndex).
968 */
969 void elementDequeued() {
970 // assert lock.isHeldByCurrentThread();
971 if (count == 0)
972 queueIsEmpty();
973 else if (takeIndex == 0)
974 takeIndexWrapped();
975 }
976 }
977
978 /**
979 * Iterator for ArrayBlockingQueue.
980 *
981 * To maintain weak consistency with respect to puts and takes, we
982 * read ahead one slot, so as to not report hasNext true but then
983 * not have an element to return.
984 *
985 * We switch into "detached" mode (allowing prompt unlinking from
986 * itrs without help from the GC) when all indices are negative, or
987 * when hasNext returns false for the first time. This allows the
988 * iterator to track concurrent updates completely accurately,
989 * except for the corner case of the user calling Iterator.remove()
990 * after hasNext() returned false. Even in this case, we ensure
991 * that we don't remove the wrong element by keeping track of the
992 * expected element to remove, in lastItem. Yes, we may fail to
993 * remove lastItem from the queue if it moved due to an interleaved
994 * interior remove while in detached mode.
995 */
996 private class Itr implements Iterator<E> {
997 /** Index to look for new nextItem; NONE at end */
998 private int cursor;
999
1000 /** Element to be returned by next call to next(); null if none */
1001 private E nextItem;
1002
1003 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
1004 private int nextIndex;
1005
1006 /** Last element returned; null if none or not detached. */
1007 private E lastItem;
1008
1009 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1010 private int lastRet;
1011
1012 /** Previous value of takeIndex, or DETACHED when detached */
1013 private int prevTakeIndex;
1014
1015 /** Previous value of iters.cycles */
1016 private int prevCycles;
1017
1018 /** Special index value indicating "not available" or "undefined" */
1019 private static final int NONE = -1;
1020
1021 /**
1022 * Special index value indicating "removed elsewhere", that is,
1023 * removed by some operation other than a call to this.remove().
1024 */
1025 private static final int REMOVED = -2;
1026
1027 /** Special value for prevTakeIndex indicating "detached mode" */
1028 private static final int DETACHED = -3;
1029
1030 Itr() {
1031 lastRet = NONE;
1032 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1033 lock.lock();
1034 try {
1035 if (count == 0) {
1036 // assert itrs == null;
1037 cursor = NONE;
1038 nextIndex = NONE;
1039 prevTakeIndex = DETACHED;
1040 } else {
1041 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1042 prevTakeIndex = takeIndex;
1043 nextItem = itemAt(nextIndex = takeIndex);
1044 cursor = incCursor(takeIndex);
1045 if (itrs == null) {
1046 itrs = new Itrs(this);
1047 } else {
1048 itrs.register(this); // in this order
1049 itrs.doSomeSweeping(false);
1050 }
1051 prevCycles = itrs.cycles;
1052 // assert takeIndex >= 0;
1053 // assert prevTakeIndex == takeIndex;
1054 // assert nextIndex >= 0;
1055 // assert nextItem != null;
1056 }
1057 } finally {
1058 lock.unlock();
1059 }
1060 }
1061
1062 boolean isDetached() {
1063 // assert lock.isHeldByCurrentThread();
1064 return prevTakeIndex < 0;
1065 }
1066
1067 private int incCursor(int index) {
1068 // assert lock.isHeldByCurrentThread();
1069 if (++index == items.length) index = 0;
1070 if (index == putIndex) index = NONE;
1071 return index;
1072 }
1073
1074 /**
1075 * Returns true if index is invalidated by the given number of
1076 * dequeues, starting from prevTakeIndex.
1077 */
1078 private boolean invalidated(int index, int prevTakeIndex,
1079 long dequeues, int length) {
1080 if (index < 0)
1081 return false;
1082 int distance = index - prevTakeIndex;
1083 if (distance < 0)
1084 distance += length;
1085 return dequeues > distance;
1086 }
1087
1088 /**
1089 * Adjusts indices to incorporate all dequeues since the last
1090 * operation on this iterator. Call only from iterating thread.
1091 */
1092 private void incorporateDequeues() {
1093 // assert lock.isHeldByCurrentThread();
1094 // assert itrs != null;
1095 // assert !isDetached();
1096 // assert count > 0;
1097
1098 final int cycles = itrs.cycles;
1099 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1100 final int prevCycles = this.prevCycles;
1101 final int prevTakeIndex = this.prevTakeIndex;
1102
1103 if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1104 final int len = items.length;
1105 // how far takeIndex has advanced since the previous
1106 // operation of this iterator
1107 long dequeues = (cycles - prevCycles) * len
1108 + (takeIndex - prevTakeIndex);
1109
1110 // Check indices for invalidation
1111 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1112 lastRet = REMOVED;
1113 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1114 nextIndex = REMOVED;
1115 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1116 cursor = takeIndex;
1117
1118 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1119 detach();
1120 else {
1121 this.prevCycles = cycles;
1122 this.prevTakeIndex = takeIndex;
1123 }
1124 }
1125 }
1126
1127 /**
1128 * Called when itrs should stop tracking this iterator, either
1129 * because there are no more indices to update (cursor < 0 &&
1130 * nextIndex < 0 && lastRet < 0) or as a special exception, when
1131 * lastRet >= 0, because hasNext() is about to return false for the
1132 * first time. Call only from iterating thread.
1133 */
1134 private void detach() {
1135 // Switch to detached mode
1136 // assert lock.isHeldByCurrentThread();
1137 // assert cursor == NONE;
1138 // assert nextIndex < 0;
1139 // assert lastRet < 0 || nextItem == null;
1140 // assert lastRet < 0 ^ lastItem != null;
1141 if (prevTakeIndex >= 0) {
1142 // assert itrs != null;
1143 prevTakeIndex = DETACHED;
1144 // try to unlink from itrs (but not too hard)
1145 itrs.doSomeSweeping(true);
1146 }
1147 }
1148
1149 /**
1150 * For performance reasons, we would like not to acquire a lock in
1151 * hasNext in the common case. To allow for this, we only access
1152 * fields (i.e. nextItem) that are not modified by update operations
1153 * triggered by queue modifications.
1154 */
1155 public boolean hasNext() {
1156 if (nextItem != null)
1157 return true;
1158 noNext();
1159 return false;
1160 }
1161
1162 private void noNext() {
1163 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1164 lock.lock();
1165 try {
1166 // assert cursor == NONE;
1167 // assert nextIndex == NONE;
1168 if (!isDetached()) {
1169 // assert lastRet >= 0;
1170 incorporateDequeues(); // might update lastRet
1171 if (lastRet >= 0) {
1172 lastItem = itemAt(lastRet);
1173 // assert lastItem != null;
1174 detach();
1175 }
1176 }
1177 // assert isDetached();
1178 // assert lastRet < 0 ^ lastItem != null;
1179 } finally {
1180 lock.unlock();
1181 }
1182 }
1183
1184 public E next() {
1185 final E x = nextItem;
1186 if (x == null)
1187 throw new NoSuchElementException();
1188 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1189 lock.lock();
1190 try {
1191 if (!isDetached())
1192 incorporateDequeues();
1193 // assert nextIndex != NONE;
1194 // assert lastItem == null;
1195 lastRet = nextIndex;
1196 final int cursor = this.cursor;
1197 if (cursor >= 0) {
1198 nextItem = itemAt(nextIndex = cursor);
1199 // assert nextItem != null;
1200 this.cursor = incCursor(cursor);
1201 } else {
1202 nextIndex = NONE;
1203 nextItem = null;
1204 }
1205 } finally {
1206 lock.unlock();
1207 }
1208 return x;
1209 }
1210
1211 public void forEachRemaining(Consumer<? super E> action) {
1212 Objects.requireNonNull(action);
1213 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1214 lock.lock();
1215 try {
1216 final E x = nextItem;
1217 if (x == null) return;
1218 if (!isDetached())
1219 incorporateDequeues();
1220 action.accept(nextItem);
1221 if (isDetached() || cursor < 0) return;
1222 final Object[] items = ArrayBlockingQueue.this.items;
1223 for (int i = cursor, end = putIndex,
1224 to = (i < end) ? end : items.length;
1225 ; i = 0, to = end) {
1226 for (; i < to; i++)
1227 action.accept(itemAt(items, i));
1228 if (to == end) break;
1229 }
1230 } finally {
1231 // Calling forEachRemaining is a strong hint that this
1232 // iteration is surely over; supporting remove() after
1233 // forEachRemaining() is more trouble than it's worth
1234 cursor = NONE;
1235 nextIndex = lastRet = NONE;
1236 nextItem = lastItem = null;
1237 detach();
1238 lock.unlock();
1239 }
1240 }
1241
1242 public void remove() {
1243 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1244 lock.lock();
1245 // assert lock.getHoldCount() == 1;
1246 try {
1247 if (!isDetached())
1248 incorporateDequeues(); // might update lastRet or detach
1249 final int lastRet = this.lastRet;
1250 this.lastRet = NONE;
1251 if (lastRet >= 0) {
1252 if (!isDetached())
1253 removeAt(lastRet);
1254 else {
1255 final E lastItem = this.lastItem;
1256 // assert lastItem != null;
1257 this.lastItem = null;
1258 if (itemAt(lastRet) == lastItem)
1259 removeAt(lastRet);
1260 }
1261 } else if (lastRet == NONE)
1262 throw new IllegalStateException();
1263 // else lastRet == REMOVED and the last returned element was
1264 // previously asynchronously removed via an operation other
1265 // than this.remove(), so nothing to do.
1266
1267 if (cursor < 0 && nextIndex < 0)
1268 detach();
1269 } finally {
1270 lock.unlock();
1271 // assert lastRet == NONE;
1272 // assert lastItem == null;
1273 }
1274 }
1275
1276 /**
1277 * Called to notify the iterator that the queue is empty, or that it
1278 * has fallen hopelessly behind, so that it should abandon any
1279 * further iteration, except possibly to return one more element
1280 * from next(), as promised by returning true from hasNext().
1281 */
1282 void shutdown() {
1283 // assert lock.isHeldByCurrentThread();
1284 cursor = NONE;
1285 if (nextIndex >= 0)
1286 nextIndex = REMOVED;
1287 if (lastRet >= 0) {
1288 lastRet = REMOVED;
1289 lastItem = null;
1290 }
1291 prevTakeIndex = DETACHED;
1292 // Don't set nextItem to null because we must continue to be
1293 // able to return it on next().
1294 //
1295 // Caller will unlink from itrs when convenient.
1296 }
1297
1298 private int distance(int index, int prevTakeIndex, int length) {
1299 int distance = index - prevTakeIndex;
1300 if (distance < 0)
1301 distance += length;
1302 return distance;
1303 }
1304
1305 /**
1306 * Called whenever an interior remove (not at takeIndex) occurred.
1307 *
1308 * @return true if this iterator should be unlinked from itrs
1309 */
1310 boolean removedAt(int removedIndex) {
1311 // assert lock.isHeldByCurrentThread();
1312 if (isDetached())
1313 return true;
1314
1315 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1316 final int prevTakeIndex = this.prevTakeIndex;
1317 final int len = items.length;
1318 // distance from prevTakeIndex to removedIndex
1319 final int removedDistance =
1320 len * (itrs.cycles - this.prevCycles
1321 + ((removedIndex < takeIndex) ? 1 : 0))
1322 + (removedIndex - prevTakeIndex);
1323 // assert itrs.cycles - this.prevCycles >= 0;
1324 // assert itrs.cycles - this.prevCycles <= 1;
1325 // assert removedDistance > 0;
1326 // assert removedIndex != takeIndex;
1327 int cursor = this.cursor;
1328 if (cursor >= 0) {
1329 int x = distance(cursor, prevTakeIndex, len);
1330 if (x == removedDistance) {
1331 if (cursor == putIndex)
1332 this.cursor = cursor = NONE;
1333 }
1334 else if (x > removedDistance) {
1335 // assert cursor != prevTakeIndex;
1336 this.cursor = cursor = dec(cursor, len);
1337 }
1338 }
1339 int lastRet = this.lastRet;
1340 if (lastRet >= 0) {
1341 int x = distance(lastRet, prevTakeIndex, len);
1342 if (x == removedDistance)
1343 this.lastRet = lastRet = REMOVED;
1344 else if (x > removedDistance)
1345 this.lastRet = lastRet = dec(lastRet, len);
1346 }
1347 int nextIndex = this.nextIndex;
1348 if (nextIndex >= 0) {
1349 int x = distance(nextIndex, prevTakeIndex, len);
1350 if (x == removedDistance)
1351 this.nextIndex = nextIndex = REMOVED;
1352 else if (x > removedDistance)
1353 this.nextIndex = nextIndex = dec(nextIndex, len);
1354 }
1355 if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1356 this.prevTakeIndex = DETACHED;
1357 return true;
1358 }
1359 return false;
1360 }
1361
1362 /**
1363 * Called whenever takeIndex wraps around to zero.
1364 *
1365 * @return true if this iterator should be unlinked from itrs
1366 */
1367 boolean takeIndexWrapped() {
1368 // assert lock.isHeldByCurrentThread();
1369 if (isDetached())
1370 return true;
1371 if (itrs.cycles - prevCycles > 1) {
1372 // All the elements that existed at the time of the last
1373 // operation are gone, so abandon further iteration.
1374 shutdown();
1375 return true;
1376 }
1377 return false;
1378 }
1379
1380 // /** Uncomment for debugging. */
1381 // public String toString() {
1382 // return ("cursor=" + cursor + " " +
1383 // "nextIndex=" + nextIndex + " " +
1384 // "lastRet=" + lastRet + " " +
1385 // "nextItem=" + nextItem + " " +
1386 // "lastItem=" + lastItem + " " +
1387 // "prevCycles=" + prevCycles + " " +
1388 // "prevTakeIndex=" + prevTakeIndex + " " +
1389 // "size()=" + size() + " " +
1390 // "remainingCapacity()=" + remainingCapacity());
1391 // }
1392 }
1393
1394 /**
1395 * Returns a {@link Spliterator} over the elements in this queue.
1396 *
1397 * <p>The returned spliterator is
1398 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1399 *
1400 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1401 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1402 *
1403 * @implNote
1404 * The {@code Spliterator} implements {@code trySplit} to permit limited
1405 * parallelism.
1406 *
1407 * @return a {@code Spliterator} over the elements in this queue
1408 * @since 1.8
1409 */
1410 public Spliterator<E> spliterator() {
1411 return Spliterators.spliterator
1412 (this, (Spliterator.ORDERED |
1413 Spliterator.NONNULL |
1414 Spliterator.CONCURRENT));
1415 }
1416
1417 public void forEach(Consumer<? super E> action) {
1418 Objects.requireNonNull(action);
1419 final ReentrantLock lock = this.lock;
1420 lock.lock();
1421 try {
1422 if (count > 0) {
1423 final Object[] items = this.items;
1424 for (int i = takeIndex, end = putIndex,
1425 to = (i < end) ? end : items.length;
1426 ; i = 0, to = end) {
1427 for (; i < to; i++)
1428 action.accept(itemAt(items, i));
1429 if (to == end) break;
1430 }
1431 }
1432 } finally {
1433 // checkInvariants();
1434 lock.unlock();
1435 }
1436 }
1437
1438 /**
1439 * @throws NullPointerException {@inheritDoc}
1440 */
1441 public boolean removeIf(Predicate<? super E> filter) {
1442 Objects.requireNonNull(filter);
1443 return bulkRemove(filter);
1444 }
1445
1446 /**
1447 * @throws NullPointerException {@inheritDoc}
1448 */
1449 public boolean removeAll(Collection<?> c) {
1450 Objects.requireNonNull(c);
1451 return bulkRemove(e -> c.contains(e));
1452 }
1453
1454 /**
1455 * @throws NullPointerException {@inheritDoc}
1456 */
1457 public boolean retainAll(Collection<?> c) {
1458 Objects.requireNonNull(c);
1459 return bulkRemove(e -> !c.contains(e));
1460 }
1461
1462 /** Implementation of bulk remove methods. */
1463 private boolean bulkRemove(Predicate<? super E> filter) {
1464 final ReentrantLock lock = this.lock;
1465 lock.lock();
1466 try {
1467 if (itrs == null) { // check for active iterators
1468 if (count > 0) {
1469 final Object[] items = this.items;
1470 // Optimize for initial run of survivors
1471 for (int i = takeIndex, end = putIndex,
1472 to = (i < end) ? end : items.length;
1473 ; i = 0, to = end) {
1474 for (; i < to; i++)
1475 if (filter.test(itemAt(items, i)))
1476 return bulkRemoveModified(filter, i);
1477 if (to == end) break;
1478 }
1479 }
1480 return false;
1481 }
1482 } finally {
1483 // checkInvariants();
1484 lock.unlock();
1485 }
1486 // Active iterators are too hairy!
1487 // Punting (for now) to the slow n^2 algorithm ...
1488 return super.removeIf(filter);
1489 }
1490
1491 // A tiny bit set implementation
1492
1493 private static long[] nBits(int n) {
1494 return new long[((n - 1) >> 6) + 1];
1495 }
1496 private static void setBit(long[] bits, int i) {
1497 bits[i >> 6] |= 1L << i;
1498 }
1499 private static boolean isClear(long[] bits, int i) {
1500 return (bits[i >> 6] & (1L << i)) == 0;
1501 }
1502
1503 /**
1504 * Returns circular distance from i to j, disambiguating i == j to
1505 * items.length; never returns 0.
1506 */
1507 private int distanceNonEmpty(int i, int j) {
1508 if ((j -= i) <= 0) j += items.length;
1509 return j;
1510 }
1511
1512 /**
1513 * Helper for bulkRemove, in case of at least one deletion.
1514 * Tolerate predicates that reentrantly access the collection for
1515 * read (but not write), so traverse once to find elements to
1516 * delete, a second pass to physically expunge.
1517 *
1518 * @param beg valid index of first element to be deleted
1519 */
1520 private boolean bulkRemoveModified(
1521 Predicate<? super E> filter, final int beg) {
1522 final Object[] es = items;
1523 final int capacity = items.length;
1524 final int end = putIndex;
1525 final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
1526 deathRow[0] = 1L; // set bit 0
1527 for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1528 ; i = 0, to = end, k -= capacity) {
1529 for (; i < to; i++)
1530 if (filter.test(itemAt(es, i)))
1531 setBit(deathRow, i - k);
1532 if (to == end) break;
1533 }
1534 // a two-finger traversal, with hare i reading, tortoise w writing
1535 int w = beg;
1536 for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1537 ; w = 0) { // w rejoins i on second leg
1538 // In this loop, i and w are on the same leg, with i > w
1539 for (; i < to; i++)
1540 if (isClear(deathRow, i - k))
1541 es[w++] = es[i];
1542 if (to == end) break;
1543 // In this loop, w is on the first leg, i on the second
1544 for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
1545 if (isClear(deathRow, i - k))
1546 es[w++] = es[i];
1547 if (i >= to) {
1548 if (w == capacity) w = 0; // "corner" case
1549 break;
1550 }
1551 }
1552 count -= distanceNonEmpty(w, end);
1553 circularClear(es, putIndex = w, end);
1554 // checkInvariants();
1555 return true;
1556 }
1557
1558 /** debugging */
1559 void checkInvariants() {
1560 // meta-assertions
1561 // assert lock.isHeldByCurrentThread();
1562 try {
1563 // Unlike ArrayDeque, we have a count field but no spare slot.
1564 // We prefer ArrayDeque's strategy, but our field layout is
1565 // baked into the serial form, and so is annoying to change.
1566 // putIndex == takeIndex must be disambiguated by checking count.
1567 int capacity = items.length;
1568 // assert capacity > 0;
1569 // assert takeIndex >= 0 && takeIndex < capacity;
1570 // assert putIndex >= 0 && putIndex < capacity;
1571 // assert count <= capacity;
1572 // assert takeIndex == putIndex || items[takeIndex] != null;
1573 // assert count == capacity || items[putIndex] == null;
1574 // assert takeIndex == putIndex || items[dec(putIndex, capacity)] != null;
1575 } catch (Throwable t) {
1576 System.err.printf("takeIndex=%d putIndex=%d count=%d capacity=%d%n",
1577 takeIndex, putIndex, count, items.length);
1578 System.err.printf("items=%s%n",
1579 Arrays.toString(items));
1580 throw t;
1581 }
1582 }
1583
1584 }