ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.152
Committed: Thu Oct 17 01:51:37 2019 UTC (4 years, 7 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.151: +3 -0 lines
Log Message:
8232230: Suppress warnings on non-serializable non-transient instance fields in java.util.concurrent

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.ref.WeakReference;
10 import java.util.AbstractQueue;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.Objects;
16 import java.util.Spliterator;
17 import java.util.Spliterators;
18 import java.util.concurrent.locks.Condition;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.function.Consumer;
21 import java.util.function.Predicate;
22
23 /**
24 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
25 * array. This queue orders elements FIFO (first-in-first-out). The
26 * <em>head</em> of the queue is that element that has been on the
27 * queue the longest time. The <em>tail</em> of the queue is that
28 * element that has been on the queue the shortest time. New elements
29 * are inserted at the tail of the queue, and the queue retrieval
30 * operations obtain elements at the head of the queue.
31 *
32 * <p>This is a classic &quot;bounded buffer&quot;, in which a
33 * fixed-sized array holds elements inserted by producers and
34 * extracted by consumers. Once created, the capacity cannot be
35 * changed. Attempts to {@code put} an element into a full queue
36 * will result in the operation blocking; attempts to {@code take} an
37 * element from an empty queue will similarly block.
38 *
39 * <p>This class supports an optional fairness policy for ordering
40 * waiting producer and consumer threads. By default, this ordering
41 * is not guaranteed. However, a queue constructed with fairness set
42 * to {@code true} grants threads access in FIFO order. Fairness
43 * generally decreases throughput but reduces variability and avoids
44 * starvation.
45 *
46 * <p>This class and its iterator implement all of the <em>optional</em>
47 * methods of the {@link Collection} and {@link Iterator} interfaces.
48 *
49 * <p>This class is a member of the
50 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
51 * Java Collections Framework</a>.
52 *
53 * @since 1.5
54 * @author Doug Lea
55 * @param <E> the type of elements held in this queue
56 */
57 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
58 implements BlockingQueue<E>, java.io.Serializable {
59
60 /*
61 * Much of the implementation mechanics, especially the unusual
62 * nested loops, are shared and co-maintained with ArrayDeque.
63 */
64
65 /**
66 * Serialization ID. This class relies on default serialization
67 * even for the items array, which is default-serialized, even if
68 * it is empty. Otherwise it could not be declared final, which is
69 * necessary here.
70 */
71 private static final long serialVersionUID = -817911632652898426L;
72
73 /** The queued items */
74 @SuppressWarnings("serial") // Conditionally serializable
75 final Object[] items;
76
77 /** items index for next take, poll, peek or remove */
78 int takeIndex;
79
80 /** items index for next put, offer, or add */
81 int putIndex;
82
83 /** Number of elements in the queue */
84 int count;
85
86 /*
87 * Concurrency control uses the classic two-condition algorithm
88 * found in any textbook.
89 */
90
91 /** Main lock guarding all access */
92 final ReentrantLock lock;
93
94 /** Condition for waiting takes */
95 @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
96 private final Condition notEmpty;
97
98 /** Condition for waiting puts */
99 @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
100 private final Condition notFull;
101
102 /**
103 * Shared state for currently active iterators, or null if there
104 * are known not to be any. Allows queue operations to update
105 * iterator state.
106 */
107 transient Itrs itrs;
108
109 // Internal helper methods
110
111 /**
112 * Increments i, mod modulus.
113 * Precondition and postcondition: 0 <= i < modulus.
114 */
115 static final int inc(int i, int modulus) {
116 if (++i >= modulus) i = 0;
117 return i;
118 }
119
120 /**
121 * Decrements i, mod modulus.
122 * Precondition and postcondition: 0 <= i < modulus.
123 */
124 static final int dec(int i, int modulus) {
125 if (--i < 0) i = modulus - 1;
126 return i;
127 }
128
129 /**
130 * Returns item at index i.
131 */
132 @SuppressWarnings("unchecked")
133 final E itemAt(int i) {
134 return (E) items[i];
135 }
136
137 /**
138 * Returns element at array index i.
139 * This is a slight abuse of generics, accepted by javac.
140 */
141 @SuppressWarnings("unchecked")
142 static <E> E itemAt(Object[] items, int i) {
143 return (E) items[i];
144 }
145
146 /**
147 * Inserts element at current put position, advances, and signals.
148 * Call only when holding lock.
149 */
150 private void enqueue(E e) {
151 // assert lock.isHeldByCurrentThread();
152 // assert lock.getHoldCount() == 1;
153 // assert items[putIndex] == null;
154 final Object[] items = this.items;
155 items[putIndex] = e;
156 if (++putIndex == items.length) putIndex = 0;
157 count++;
158 notEmpty.signal();
159 // checkInvariants();
160 }
161
162 /**
163 * Extracts element at current take position, advances, and signals.
164 * Call only when holding lock.
165 */
166 private E dequeue() {
167 // assert lock.isHeldByCurrentThread();
168 // assert lock.getHoldCount() == 1;
169 // assert items[takeIndex] != null;
170 final Object[] items = this.items;
171 @SuppressWarnings("unchecked")
172 E e = (E) items[takeIndex];
173 items[takeIndex] = null;
174 if (++takeIndex == items.length) takeIndex = 0;
175 count--;
176 if (itrs != null)
177 itrs.elementDequeued();
178 notFull.signal();
179 // checkInvariants();
180 return e;
181 }
182
183 /**
184 * Deletes item at array index removeIndex.
185 * Utility for remove(Object) and iterator.remove.
186 * Call only when holding lock.
187 */
188 void removeAt(final int removeIndex) {
189 // assert lock.isHeldByCurrentThread();
190 // assert lock.getHoldCount() == 1;
191 // assert items[removeIndex] != null;
192 // assert removeIndex >= 0 && removeIndex < items.length;
193 final Object[] items = this.items;
194 if (removeIndex == takeIndex) {
195 // removing front item; just advance
196 items[takeIndex] = null;
197 if (++takeIndex == items.length) takeIndex = 0;
198 count--;
199 if (itrs != null)
200 itrs.elementDequeued();
201 } else {
202 // an "interior" remove
203
204 // slide over all others up through putIndex.
205 for (int i = removeIndex, putIndex = this.putIndex;;) {
206 int pred = i;
207 if (++i == items.length) i = 0;
208 if (i == putIndex) {
209 items[pred] = null;
210 this.putIndex = pred;
211 break;
212 }
213 items[pred] = items[i];
214 }
215 count--;
216 if (itrs != null)
217 itrs.removedAt(removeIndex);
218 }
219 notFull.signal();
220 // checkInvariants();
221 }
222
223 /**
224 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
225 * capacity and default access policy.
226 *
227 * @param capacity the capacity of this queue
228 * @throws IllegalArgumentException if {@code capacity < 1}
229 */
230 public ArrayBlockingQueue(int capacity) {
231 this(capacity, false);
232 }
233
234 /**
235 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
236 * capacity and the specified access policy.
237 *
238 * @param capacity the capacity of this queue
239 * @param fair if {@code true} then queue accesses for threads blocked
240 * on insertion or removal, are processed in FIFO order;
241 * if {@code false} the access order is unspecified.
242 * @throws IllegalArgumentException if {@code capacity < 1}
243 */
244 public ArrayBlockingQueue(int capacity, boolean fair) {
245 if (capacity <= 0)
246 throw new IllegalArgumentException();
247 this.items = new Object[capacity];
248 lock = new ReentrantLock(fair);
249 notEmpty = lock.newCondition();
250 notFull = lock.newCondition();
251 }
252
253 /**
254 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
255 * capacity, the specified access policy and initially containing the
256 * elements of the given collection,
257 * added in traversal order of the collection's iterator.
258 *
259 * @param capacity the capacity of this queue
260 * @param fair if {@code true} then queue accesses for threads blocked
261 * on insertion or removal, are processed in FIFO order;
262 * if {@code false} the access order is unspecified.
263 * @param c the collection of elements to initially contain
264 * @throws IllegalArgumentException if {@code capacity} is less than
265 * {@code c.size()}, or less than 1.
266 * @throws NullPointerException if the specified collection or any
267 * of its elements are null
268 */
269 public ArrayBlockingQueue(int capacity, boolean fair,
270 Collection<? extends E> c) {
271 this(capacity, fair);
272
273 final ReentrantLock lock = this.lock;
274 lock.lock(); // Lock only for visibility, not mutual exclusion
275 try {
276 final Object[] items = this.items;
277 int i = 0;
278 try {
279 for (E e : c)
280 items[i++] = Objects.requireNonNull(e);
281 } catch (ArrayIndexOutOfBoundsException ex) {
282 throw new IllegalArgumentException();
283 }
284 count = i;
285 putIndex = (i == capacity) ? 0 : i;
286 // checkInvariants();
287 } finally {
288 lock.unlock();
289 }
290 }
291
292 /**
293 * Inserts the specified element at the tail of this queue if it is
294 * possible to do so immediately without exceeding the queue's capacity,
295 * returning {@code true} upon success and throwing an
296 * {@code IllegalStateException} if this queue is full.
297 *
298 * @param e the element to add
299 * @return {@code true} (as specified by {@link Collection#add})
300 * @throws IllegalStateException if this queue is full
301 * @throws NullPointerException if the specified element is null
302 */
303 public boolean add(E e) {
304 return super.add(e);
305 }
306
307 /**
308 * Inserts the specified element at the tail of this queue if it is
309 * possible to do so immediately without exceeding the queue's capacity,
310 * returning {@code true} upon success and {@code false} if this queue
311 * is full. This method is generally preferable to method {@link #add},
312 * which can fail to insert an element only by throwing an exception.
313 *
314 * @throws NullPointerException if the specified element is null
315 */
316 public boolean offer(E e) {
317 Objects.requireNonNull(e);
318 final ReentrantLock lock = this.lock;
319 lock.lock();
320 try {
321 if (count == items.length)
322 return false;
323 else {
324 enqueue(e);
325 return true;
326 }
327 } finally {
328 lock.unlock();
329 }
330 }
331
332 /**
333 * Inserts the specified element at the tail of this queue, waiting
334 * for space to become available if the queue is full.
335 *
336 * @throws InterruptedException {@inheritDoc}
337 * @throws NullPointerException {@inheritDoc}
338 */
339 public void put(E e) throws InterruptedException {
340 Objects.requireNonNull(e);
341 final ReentrantLock lock = this.lock;
342 lock.lockInterruptibly();
343 try {
344 while (count == items.length)
345 notFull.await();
346 enqueue(e);
347 } finally {
348 lock.unlock();
349 }
350 }
351
352 /**
353 * Inserts the specified element at the tail of this queue, waiting
354 * up to the specified wait time for space to become available if
355 * the queue is full.
356 *
357 * @throws InterruptedException {@inheritDoc}
358 * @throws NullPointerException {@inheritDoc}
359 */
360 public boolean offer(E e, long timeout, TimeUnit unit)
361 throws InterruptedException {
362
363 Objects.requireNonNull(e);
364 long nanos = unit.toNanos(timeout);
365 final ReentrantLock lock = this.lock;
366 lock.lockInterruptibly();
367 try {
368 while (count == items.length) {
369 if (nanos <= 0L)
370 return false;
371 nanos = notFull.awaitNanos(nanos);
372 }
373 enqueue(e);
374 return true;
375 } finally {
376 // checkInvariants();
377 lock.unlock();
378 }
379 }
380
381 public E poll() {
382 final ReentrantLock lock = this.lock;
383 lock.lock();
384 try {
385 return (count == 0) ? null : dequeue();
386 } finally {
387 lock.unlock();
388 }
389 }
390
391 public E take() throws InterruptedException {
392 final ReentrantLock lock = this.lock;
393 lock.lockInterruptibly();
394 try {
395 while (count == 0)
396 notEmpty.await();
397 return dequeue();
398 } finally {
399 lock.unlock();
400 }
401 }
402
403 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
404 long nanos = unit.toNanos(timeout);
405 final ReentrantLock lock = this.lock;
406 lock.lockInterruptibly();
407 try {
408 while (count == 0) {
409 if (nanos <= 0L)
410 return null;
411 nanos = notEmpty.awaitNanos(nanos);
412 }
413 return dequeue();
414 } finally {
415 // checkInvariants();
416 lock.unlock();
417 }
418 }
419
420 public E peek() {
421 final ReentrantLock lock = this.lock;
422 lock.lock();
423 try {
424 return itemAt(takeIndex); // null when queue is empty
425 } finally {
426 lock.unlock();
427 }
428 }
429
430 // this doc comment is overridden to remove the reference to collections
431 // greater in size than Integer.MAX_VALUE
432 /**
433 * Returns the number of elements in this queue.
434 *
435 * @return the number of elements in this queue
436 */
437 public int size() {
438 final ReentrantLock lock = this.lock;
439 lock.lock();
440 try {
441 return count;
442 } finally {
443 lock.unlock();
444 }
445 }
446
447 // this doc comment is a modified copy of the inherited doc comment,
448 // without the reference to unlimited queues.
449 /**
450 * Returns the number of additional elements that this queue can ideally
451 * (in the absence of memory or resource constraints) accept without
452 * blocking. This is always equal to the initial capacity of this queue
453 * less the current {@code size} of this queue.
454 *
455 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
456 * an element will succeed by inspecting {@code remainingCapacity}
457 * because it may be the case that another thread is about to
458 * insert or remove an element.
459 */
460 public int remainingCapacity() {
461 final ReentrantLock lock = this.lock;
462 lock.lock();
463 try {
464 return items.length - count;
465 } finally {
466 lock.unlock();
467 }
468 }
469
470 /**
471 * Removes a single instance of the specified element from this queue,
472 * if it is present. More formally, removes an element {@code e} such
473 * that {@code o.equals(e)}, if this queue contains one or more such
474 * elements.
475 * Returns {@code true} if this queue contained the specified element
476 * (or equivalently, if this queue changed as a result of the call).
477 *
478 * <p>Removal of interior elements in circular array based queues
479 * is an intrinsically slow and disruptive operation, so should
480 * be undertaken only in exceptional circumstances, ideally
481 * only when the queue is known not to be accessible by other
482 * threads.
483 *
484 * @param o element to be removed from this queue, if present
485 * @return {@code true} if this queue changed as a result of the call
486 */
487 public boolean remove(Object o) {
488 if (o == null) return false;
489 final ReentrantLock lock = this.lock;
490 lock.lock();
491 try {
492 if (count > 0) {
493 final Object[] items = this.items;
494 for (int i = takeIndex, end = putIndex,
495 to = (i < end) ? end : items.length;
496 ; i = 0, to = end) {
497 for (; i < to; i++)
498 if (o.equals(items[i])) {
499 removeAt(i);
500 return true;
501 }
502 if (to == end) break;
503 }
504 }
505 return false;
506 } finally {
507 // checkInvariants();
508 lock.unlock();
509 }
510 }
511
512 /**
513 * Returns {@code true} if this queue contains the specified element.
514 * More formally, returns {@code true} if and only if this queue contains
515 * at least one element {@code e} such that {@code o.equals(e)}.
516 *
517 * @param o object to be checked for containment in this queue
518 * @return {@code true} if this queue contains the specified element
519 */
520 public boolean contains(Object o) {
521 if (o == null) return false;
522 final ReentrantLock lock = this.lock;
523 lock.lock();
524 try {
525 if (count > 0) {
526 final Object[] items = this.items;
527 for (int i = takeIndex, end = putIndex,
528 to = (i < end) ? end : items.length;
529 ; i = 0, to = end) {
530 for (; i < to; i++)
531 if (o.equals(items[i]))
532 return true;
533 if (to == end) break;
534 }
535 }
536 return false;
537 } finally {
538 // checkInvariants();
539 lock.unlock();
540 }
541 }
542
543 /**
544 * Returns an array containing all of the elements in this queue, in
545 * proper sequence.
546 *
547 * <p>The returned array will be "safe" in that no references to it are
548 * maintained by this queue. (In other words, this method must allocate
549 * a new array). The caller is thus free to modify the returned array.
550 *
551 * <p>This method acts as bridge between array-based and collection-based
552 * APIs.
553 *
554 * @return an array containing all of the elements in this queue
555 */
556 public Object[] toArray() {
557 final ReentrantLock lock = this.lock;
558 lock.lock();
559 try {
560 final Object[] items = this.items;
561 final int end = takeIndex + count;
562 final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
563 if (end != putIndex)
564 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
565 return a;
566 } finally {
567 lock.unlock();
568 }
569 }
570
571 /**
572 * Returns an array containing all of the elements in this queue, in
573 * proper sequence; the runtime type of the returned array is that of
574 * the specified array. If the queue fits in the specified array, it
575 * is returned therein. Otherwise, a new array is allocated with the
576 * runtime type of the specified array and the size of this queue.
577 *
578 * <p>If this queue fits in the specified array with room to spare
579 * (i.e., the array has more elements than this queue), the element in
580 * the array immediately following the end of the queue is set to
581 * {@code null}.
582 *
583 * <p>Like the {@link #toArray()} method, this method acts as bridge between
584 * array-based and collection-based APIs. Further, this method allows
585 * precise control over the runtime type of the output array, and may,
586 * under certain circumstances, be used to save allocation costs.
587 *
588 * <p>Suppose {@code x} is a queue known to contain only strings.
589 * The following code can be used to dump the queue into a newly
590 * allocated array of {@code String}:
591 *
592 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
593 *
594 * Note that {@code toArray(new Object[0])} is identical in function to
595 * {@code toArray()}.
596 *
597 * @param a the array into which the elements of the queue are to
598 * be stored, if it is big enough; otherwise, a new array of the
599 * same runtime type is allocated for this purpose
600 * @return an array containing all of the elements in this queue
601 * @throws ArrayStoreException if the runtime type of the specified array
602 * is not a supertype of the runtime type of every element in
603 * this queue
604 * @throws NullPointerException if the specified array is null
605 */
606 @SuppressWarnings("unchecked")
607 public <T> T[] toArray(T[] a) {
608 final ReentrantLock lock = this.lock;
609 lock.lock();
610 try {
611 final Object[] items = this.items;
612 final int count = this.count;
613 final int firstLeg = Math.min(items.length - takeIndex, count);
614 if (a.length < count) {
615 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
616 a.getClass());
617 } else {
618 System.arraycopy(items, takeIndex, a, 0, firstLeg);
619 if (a.length > count)
620 a[count] = null;
621 }
622 if (firstLeg < count)
623 System.arraycopy(items, 0, a, firstLeg, putIndex);
624 return a;
625 } finally {
626 lock.unlock();
627 }
628 }
629
630 public String toString() {
631 return Helpers.collectionToString(this);
632 }
633
634 /**
635 * Atomically removes all of the elements from this queue.
636 * The queue will be empty after this call returns.
637 */
638 public void clear() {
639 final ReentrantLock lock = this.lock;
640 lock.lock();
641 try {
642 int k;
643 if ((k = count) > 0) {
644 circularClear(items, takeIndex, putIndex);
645 takeIndex = putIndex;
646 count = 0;
647 if (itrs != null)
648 itrs.queueIsEmpty();
649 for (; k > 0 && lock.hasWaiters(notFull); k--)
650 notFull.signal();
651 }
652 } finally {
653 // checkInvariants();
654 lock.unlock();
655 }
656 }
657
658 /**
659 * Nulls out slots starting at array index i, upto index end.
660 * Condition i == end means "full" - the entire array is cleared.
661 */
662 private static void circularClear(Object[] items, int i, int end) {
663 // assert 0 <= i && i < items.length;
664 // assert 0 <= end && end < items.length;
665 for (int to = (i < end) ? end : items.length;
666 ; i = 0, to = end) {
667 for (; i < to; i++) items[i] = null;
668 if (to == end) break;
669 }
670 }
671
672 /**
673 * @throws UnsupportedOperationException {@inheritDoc}
674 * @throws ClassCastException {@inheritDoc}
675 * @throws NullPointerException {@inheritDoc}
676 * @throws IllegalArgumentException {@inheritDoc}
677 */
678 public int drainTo(Collection<? super E> c) {
679 return drainTo(c, Integer.MAX_VALUE);
680 }
681
682 /**
683 * @throws UnsupportedOperationException {@inheritDoc}
684 * @throws ClassCastException {@inheritDoc}
685 * @throws NullPointerException {@inheritDoc}
686 * @throws IllegalArgumentException {@inheritDoc}
687 */
688 public int drainTo(Collection<? super E> c, int maxElements) {
689 Objects.requireNonNull(c);
690 if (c == this)
691 throw new IllegalArgumentException();
692 if (maxElements <= 0)
693 return 0;
694 final Object[] items = this.items;
695 final ReentrantLock lock = this.lock;
696 lock.lock();
697 try {
698 int n = Math.min(maxElements, count);
699 int take = takeIndex;
700 int i = 0;
701 try {
702 while (i < n) {
703 @SuppressWarnings("unchecked")
704 E e = (E) items[take];
705 c.add(e);
706 items[take] = null;
707 if (++take == items.length) take = 0;
708 i++;
709 }
710 return n;
711 } finally {
712 // Restore invariants even if c.add() threw
713 if (i > 0) {
714 count -= i;
715 takeIndex = take;
716 if (itrs != null) {
717 if (count == 0)
718 itrs.queueIsEmpty();
719 else if (i > take)
720 itrs.takeIndexWrapped();
721 }
722 for (; i > 0 && lock.hasWaiters(notFull); i--)
723 notFull.signal();
724 }
725 }
726 } finally {
727 // checkInvariants();
728 lock.unlock();
729 }
730 }
731
732 /**
733 * Returns an iterator over the elements in this queue in proper sequence.
734 * The elements will be returned in order from first (head) to last (tail).
735 *
736 * <p>The returned iterator is
737 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
738 *
739 * @return an iterator over the elements in this queue in proper sequence
740 */
741 public Iterator<E> iterator() {
742 return new Itr();
743 }
744
745 /**
746 * Shared data between iterators and their queue, allowing queue
747 * modifications to update iterators when elements are removed.
748 *
749 * This adds a lot of complexity for the sake of correctly
750 * handling some uncommon operations, but the combination of
751 * circular-arrays and supporting interior removes (i.e., those
752 * not at head) would cause iterators to sometimes lose their
753 * places and/or (re)report elements they shouldn't. To avoid
754 * this, when a queue has one or more iterators, it keeps iterator
755 * state consistent by:
756 *
757 * (1) keeping track of the number of "cycles", that is, the
758 * number of times takeIndex has wrapped around to 0.
759 * (2) notifying all iterators via the callback removedAt whenever
760 * an interior element is removed (and thus other elements may
761 * be shifted).
762 *
763 * These suffice to eliminate iterator inconsistencies, but
764 * unfortunately add the secondary responsibility of maintaining
765 * the list of iterators. We track all active iterators in a
766 * simple linked list (accessed only when the queue's lock is
767 * held) of weak references to Itr. The list is cleaned up using
768 * 3 different mechanisms:
769 *
770 * (1) Whenever a new iterator is created, do some O(1) checking for
771 * stale list elements.
772 *
773 * (2) Whenever takeIndex wraps around to 0, check for iterators
774 * that have been unused for more than one wrap-around cycle.
775 *
776 * (3) Whenever the queue becomes empty, all iterators are notified
777 * and this entire data structure is discarded.
778 *
779 * So in addition to the removedAt callback that is necessary for
780 * correctness, iterators have the shutdown and takeIndexWrapped
781 * callbacks that help remove stale iterators from the list.
782 *
783 * Whenever a list element is examined, it is expunged if either
784 * the GC has determined that the iterator is discarded, or if the
785 * iterator reports that it is "detached" (does not need any
786 * further state updates). Overhead is maximal when takeIndex
787 * never advances, iterators are discarded before they are
788 * exhausted, and all removals are interior removes, in which case
789 * all stale iterators are discovered by the GC. But even in this
790 * case we don't increase the amortized complexity.
791 *
792 * Care must be taken to keep list sweeping methods from
793 * reentrantly invoking another such method, causing subtle
794 * corruption bugs.
795 */
796 class Itrs {
797
798 /**
799 * Node in a linked list of weak iterator references.
800 */
801 private class Node extends WeakReference<Itr> {
802 Node next;
803
804 Node(Itr iterator, Node next) {
805 super(iterator);
806 this.next = next;
807 }
808 }
809
810 /** Incremented whenever takeIndex wraps around to 0 */
811 int cycles;
812
813 /** Linked list of weak iterator references */
814 private Node head;
815
816 /** Used to expunge stale iterators */
817 private Node sweeper;
818
819 private static final int SHORT_SWEEP_PROBES = 4;
820 private static final int LONG_SWEEP_PROBES = 16;
821
822 Itrs(Itr initial) {
823 register(initial);
824 }
825
826 /**
827 * Sweeps itrs, looking for and expunging stale iterators.
828 * If at least one was found, tries harder to find more.
829 * Called only from iterating thread.
830 *
831 * @param tryHarder whether to start in try-harder mode, because
832 * there is known to be at least one iterator to collect
833 */
834 void doSomeSweeping(boolean tryHarder) {
835 // assert lock.isHeldByCurrentThread();
836 // assert head != null;
837 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
838 Node o, p;
839 final Node sweeper = this.sweeper;
840 boolean passedGo; // to limit search to one full sweep
841
842 if (sweeper == null) {
843 o = null;
844 p = head;
845 passedGo = true;
846 } else {
847 o = sweeper;
848 p = o.next;
849 passedGo = false;
850 }
851
852 for (; probes > 0; probes--) {
853 if (p == null) {
854 if (passedGo)
855 break;
856 o = null;
857 p = head;
858 passedGo = true;
859 }
860 final Itr it = p.get();
861 final Node next = p.next;
862 if (it == null || it.isDetached()) {
863 // found a discarded/exhausted iterator
864 probes = LONG_SWEEP_PROBES; // "try harder"
865 // unlink p
866 p.clear();
867 p.next = null;
868 if (o == null) {
869 head = next;
870 if (next == null) {
871 // We've run out of iterators to track; retire
872 itrs = null;
873 return;
874 }
875 }
876 else
877 o.next = next;
878 } else {
879 o = p;
880 }
881 p = next;
882 }
883
884 this.sweeper = (p == null) ? null : o;
885 }
886
887 /**
888 * Adds a new iterator to the linked list of tracked iterators.
889 */
890 void register(Itr itr) {
891 // assert lock.isHeldByCurrentThread();
892 head = new Node(itr, head);
893 }
894
895 /**
896 * Called whenever takeIndex wraps around to 0.
897 *
898 * Notifies all iterators, and expunges any that are now stale.
899 */
900 void takeIndexWrapped() {
901 // assert lock.isHeldByCurrentThread();
902 cycles++;
903 for (Node o = null, p = head; p != null;) {
904 final Itr it = p.get();
905 final Node next = p.next;
906 if (it == null || it.takeIndexWrapped()) {
907 // unlink p
908 // assert it == null || it.isDetached();
909 p.clear();
910 p.next = null;
911 if (o == null)
912 head = next;
913 else
914 o.next = next;
915 } else {
916 o = p;
917 }
918 p = next;
919 }
920 if (head == null) // no more iterators to track
921 itrs = null;
922 }
923
924 /**
925 * Called whenever an interior remove (not at takeIndex) occurred.
926 *
927 * Notifies all iterators, and expunges any that are now stale.
928 */
929 void removedAt(int removedIndex) {
930 for (Node o = null, p = head; p != null;) {
931 final Itr it = p.get();
932 final Node next = p.next;
933 if (it == null || it.removedAt(removedIndex)) {
934 // unlink p
935 // assert it == null || it.isDetached();
936 p.clear();
937 p.next = null;
938 if (o == null)
939 head = next;
940 else
941 o.next = next;
942 } else {
943 o = p;
944 }
945 p = next;
946 }
947 if (head == null) // no more iterators to track
948 itrs = null;
949 }
950
951 /**
952 * Called whenever the queue becomes empty.
953 *
954 * Notifies all active iterators that the queue is empty,
955 * clears all weak refs, and unlinks the itrs datastructure.
956 */
957 void queueIsEmpty() {
958 // assert lock.isHeldByCurrentThread();
959 for (Node p = head; p != null; p = p.next) {
960 Itr it = p.get();
961 if (it != null) {
962 p.clear();
963 it.shutdown();
964 }
965 }
966 head = null;
967 itrs = null;
968 }
969
970 /**
971 * Called whenever an element has been dequeued (at takeIndex).
972 */
973 void elementDequeued() {
974 // assert lock.isHeldByCurrentThread();
975 if (count == 0)
976 queueIsEmpty();
977 else if (takeIndex == 0)
978 takeIndexWrapped();
979 }
980 }
981
982 /**
983 * Iterator for ArrayBlockingQueue.
984 *
985 * To maintain weak consistency with respect to puts and takes, we
986 * read ahead one slot, so as to not report hasNext true but then
987 * not have an element to return.
988 *
989 * We switch into "detached" mode (allowing prompt unlinking from
990 * itrs without help from the GC) when all indices are negative, or
991 * when hasNext returns false for the first time. This allows the
992 * iterator to track concurrent updates completely accurately,
993 * except for the corner case of the user calling Iterator.remove()
994 * after hasNext() returned false. Even in this case, we ensure
995 * that we don't remove the wrong element by keeping track of the
996 * expected element to remove, in lastItem. Yes, we may fail to
997 * remove lastItem from the queue if it moved due to an interleaved
998 * interior remove while in detached mode.
999 *
1000 * Method forEachRemaining, added in Java 8, is treated similarly
1001 * to hasNext returning false, in that we switch to detached mode,
1002 * but we regard it as an even stronger request to "close" this
1003 * iteration, and don't bother supporting subsequent remove().
1004 */
1005 private class Itr implements Iterator<E> {
1006 /** Index to look for new nextItem; NONE at end */
1007 private int cursor;
1008
1009 /** Element to be returned by next call to next(); null if none */
1010 private E nextItem;
1011
1012 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
1013 private int nextIndex;
1014
1015 /** Last element returned; null if none or not detached. */
1016 private E lastItem;
1017
1018 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1019 private int lastRet;
1020
1021 /** Previous value of takeIndex, or DETACHED when detached */
1022 private int prevTakeIndex;
1023
1024 /** Previous value of iters.cycles */
1025 private int prevCycles;
1026
1027 /** Special index value indicating "not available" or "undefined" */
1028 private static final int NONE = -1;
1029
1030 /**
1031 * Special index value indicating "removed elsewhere", that is,
1032 * removed by some operation other than a call to this.remove().
1033 */
1034 private static final int REMOVED = -2;
1035
1036 /** Special value for prevTakeIndex indicating "detached mode" */
1037 private static final int DETACHED = -3;
1038
1039 Itr() {
1040 lastRet = NONE;
1041 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1042 lock.lock();
1043 try {
1044 if (count == 0) {
1045 // assert itrs == null;
1046 cursor = NONE;
1047 nextIndex = NONE;
1048 prevTakeIndex = DETACHED;
1049 } else {
1050 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1051 prevTakeIndex = takeIndex;
1052 nextItem = itemAt(nextIndex = takeIndex);
1053 cursor = incCursor(takeIndex);
1054 if (itrs == null) {
1055 itrs = new Itrs(this);
1056 } else {
1057 itrs.register(this); // in this order
1058 itrs.doSomeSweeping(false);
1059 }
1060 prevCycles = itrs.cycles;
1061 // assert takeIndex >= 0;
1062 // assert prevTakeIndex == takeIndex;
1063 // assert nextIndex >= 0;
1064 // assert nextItem != null;
1065 }
1066 } finally {
1067 lock.unlock();
1068 }
1069 }
1070
1071 boolean isDetached() {
1072 // assert lock.isHeldByCurrentThread();
1073 return prevTakeIndex < 0;
1074 }
1075
1076 private int incCursor(int index) {
1077 // assert lock.isHeldByCurrentThread();
1078 if (++index == items.length) index = 0;
1079 if (index == putIndex) index = NONE;
1080 return index;
1081 }
1082
1083 /**
1084 * Returns true if index is invalidated by the given number of
1085 * dequeues, starting from prevTakeIndex.
1086 */
1087 private boolean invalidated(int index, int prevTakeIndex,
1088 long dequeues, int length) {
1089 if (index < 0)
1090 return false;
1091 int distance = index - prevTakeIndex;
1092 if (distance < 0)
1093 distance += length;
1094 return dequeues > distance;
1095 }
1096
1097 /**
1098 * Adjusts indices to incorporate all dequeues since the last
1099 * operation on this iterator. Call only from iterating thread.
1100 */
1101 private void incorporateDequeues() {
1102 // assert lock.isHeldByCurrentThread();
1103 // assert itrs != null;
1104 // assert !isDetached();
1105 // assert count > 0;
1106
1107 final int cycles = itrs.cycles;
1108 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1109 final int prevCycles = this.prevCycles;
1110 final int prevTakeIndex = this.prevTakeIndex;
1111
1112 if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1113 final int len = items.length;
1114 // how far takeIndex has advanced since the previous
1115 // operation of this iterator
1116 long dequeues = (long) (cycles - prevCycles) * len
1117 + (takeIndex - prevTakeIndex);
1118
1119 // Check indices for invalidation
1120 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1121 lastRet = REMOVED;
1122 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1123 nextIndex = REMOVED;
1124 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1125 cursor = takeIndex;
1126
1127 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1128 detach();
1129 else {
1130 this.prevCycles = cycles;
1131 this.prevTakeIndex = takeIndex;
1132 }
1133 }
1134 }
1135
1136 /**
1137 * Called when itrs should stop tracking this iterator, either
1138 * because there are no more indices to update (cursor < 0 &&
1139 * nextIndex < 0 && lastRet < 0) or as a special exception, when
1140 * lastRet >= 0, because hasNext() is about to return false for the
1141 * first time. Call only from iterating thread.
1142 */
1143 private void detach() {
1144 // Switch to detached mode
1145 // assert lock.isHeldByCurrentThread();
1146 // assert cursor == NONE;
1147 // assert nextIndex < 0;
1148 // assert lastRet < 0 || nextItem == null;
1149 // assert lastRet < 0 ^ lastItem != null;
1150 if (prevTakeIndex >= 0) {
1151 // assert itrs != null;
1152 prevTakeIndex = DETACHED;
1153 // try to unlink from itrs (but not too hard)
1154 itrs.doSomeSweeping(true);
1155 }
1156 }
1157
1158 /**
1159 * For performance reasons, we would like not to acquire a lock in
1160 * hasNext in the common case. To allow for this, we only access
1161 * fields (i.e. nextItem) that are not modified by update operations
1162 * triggered by queue modifications.
1163 */
1164 public boolean hasNext() {
1165 if (nextItem != null)
1166 return true;
1167 noNext();
1168 return false;
1169 }
1170
1171 private void noNext() {
1172 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1173 lock.lock();
1174 try {
1175 // assert cursor == NONE;
1176 // assert nextIndex == NONE;
1177 if (!isDetached()) {
1178 // assert lastRet >= 0;
1179 incorporateDequeues(); // might update lastRet
1180 if (lastRet >= 0) {
1181 lastItem = itemAt(lastRet);
1182 // assert lastItem != null;
1183 detach();
1184 }
1185 }
1186 // assert isDetached();
1187 // assert lastRet < 0 ^ lastItem != null;
1188 } finally {
1189 lock.unlock();
1190 }
1191 }
1192
1193 public E next() {
1194 final E e = nextItem;
1195 if (e == null)
1196 throw new NoSuchElementException();
1197 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1198 lock.lock();
1199 try {
1200 if (!isDetached())
1201 incorporateDequeues();
1202 // assert nextIndex != NONE;
1203 // assert lastItem == null;
1204 lastRet = nextIndex;
1205 final int cursor = this.cursor;
1206 if (cursor >= 0) {
1207 nextItem = itemAt(nextIndex = cursor);
1208 // assert nextItem != null;
1209 this.cursor = incCursor(cursor);
1210 } else {
1211 nextIndex = NONE;
1212 nextItem = null;
1213 if (lastRet == REMOVED) detach();
1214 }
1215 } finally {
1216 lock.unlock();
1217 }
1218 return e;
1219 }
1220
1221 public void forEachRemaining(Consumer<? super E> action) {
1222 Objects.requireNonNull(action);
1223 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1224 lock.lock();
1225 try {
1226 final E e = nextItem;
1227 if (e == null) return;
1228 if (!isDetached())
1229 incorporateDequeues();
1230 action.accept(e);
1231 if (isDetached() || cursor < 0) return;
1232 final Object[] items = ArrayBlockingQueue.this.items;
1233 for (int i = cursor, end = putIndex,
1234 to = (i < end) ? end : items.length;
1235 ; i = 0, to = end) {
1236 for (; i < to; i++)
1237 action.accept(itemAt(items, i));
1238 if (to == end) break;
1239 }
1240 } finally {
1241 // Calling forEachRemaining is a strong hint that this
1242 // iteration is surely over; supporting remove() after
1243 // forEachRemaining() is more trouble than it's worth
1244 cursor = nextIndex = lastRet = NONE;
1245 nextItem = lastItem = null;
1246 detach();
1247 lock.unlock();
1248 }
1249 }
1250
1251 public void remove() {
1252 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1253 lock.lock();
1254 // assert lock.getHoldCount() == 1;
1255 try {
1256 if (!isDetached())
1257 incorporateDequeues(); // might update lastRet or detach
1258 final int lastRet = this.lastRet;
1259 this.lastRet = NONE;
1260 if (lastRet >= 0) {
1261 if (!isDetached())
1262 removeAt(lastRet);
1263 else {
1264 final E lastItem = this.lastItem;
1265 // assert lastItem != null;
1266 this.lastItem = null;
1267 if (itemAt(lastRet) == lastItem)
1268 removeAt(lastRet);
1269 }
1270 } else if (lastRet == NONE)
1271 throw new IllegalStateException();
1272 // else lastRet == REMOVED and the last returned element was
1273 // previously asynchronously removed via an operation other
1274 // than this.remove(), so nothing to do.
1275
1276 if (cursor < 0 && nextIndex < 0)
1277 detach();
1278 } finally {
1279 lock.unlock();
1280 // assert lastRet == NONE;
1281 // assert lastItem == null;
1282 }
1283 }
1284
1285 /**
1286 * Called to notify the iterator that the queue is empty, or that it
1287 * has fallen hopelessly behind, so that it should abandon any
1288 * further iteration, except possibly to return one more element
1289 * from next(), as promised by returning true from hasNext().
1290 */
1291 void shutdown() {
1292 // assert lock.isHeldByCurrentThread();
1293 cursor = NONE;
1294 if (nextIndex >= 0)
1295 nextIndex = REMOVED;
1296 if (lastRet >= 0) {
1297 lastRet = REMOVED;
1298 lastItem = null;
1299 }
1300 prevTakeIndex = DETACHED;
1301 // Don't set nextItem to null because we must continue to be
1302 // able to return it on next().
1303 //
1304 // Caller will unlink from itrs when convenient.
1305 }
1306
1307 private int distance(int index, int prevTakeIndex, int length) {
1308 int distance = index - prevTakeIndex;
1309 if (distance < 0)
1310 distance += length;
1311 return distance;
1312 }
1313
1314 /**
1315 * Called whenever an interior remove (not at takeIndex) occurred.
1316 *
1317 * @return true if this iterator should be unlinked from itrs
1318 */
1319 boolean removedAt(int removedIndex) {
1320 // assert lock.isHeldByCurrentThread();
1321 if (isDetached())
1322 return true;
1323
1324 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1325 final int prevTakeIndex = this.prevTakeIndex;
1326 final int len = items.length;
1327 // distance from prevTakeIndex to removedIndex
1328 final int removedDistance =
1329 len * (itrs.cycles - this.prevCycles
1330 + ((removedIndex < takeIndex) ? 1 : 0))
1331 + (removedIndex - prevTakeIndex);
1332 // assert itrs.cycles - this.prevCycles >= 0;
1333 // assert itrs.cycles - this.prevCycles <= 1;
1334 // assert removedDistance > 0;
1335 // assert removedIndex != takeIndex;
1336 int cursor = this.cursor;
1337 if (cursor >= 0) {
1338 int x = distance(cursor, prevTakeIndex, len);
1339 if (x == removedDistance) {
1340 if (cursor == putIndex)
1341 this.cursor = cursor = NONE;
1342 }
1343 else if (x > removedDistance) {
1344 // assert cursor != prevTakeIndex;
1345 this.cursor = cursor = dec(cursor, len);
1346 }
1347 }
1348 int lastRet = this.lastRet;
1349 if (lastRet >= 0) {
1350 int x = distance(lastRet, prevTakeIndex, len);
1351 if (x == removedDistance)
1352 this.lastRet = lastRet = REMOVED;
1353 else if (x > removedDistance)
1354 this.lastRet = lastRet = dec(lastRet, len);
1355 }
1356 int nextIndex = this.nextIndex;
1357 if (nextIndex >= 0) {
1358 int x = distance(nextIndex, prevTakeIndex, len);
1359 if (x == removedDistance)
1360 this.nextIndex = nextIndex = REMOVED;
1361 else if (x > removedDistance)
1362 this.nextIndex = nextIndex = dec(nextIndex, len);
1363 }
1364 if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1365 this.prevTakeIndex = DETACHED;
1366 return true;
1367 }
1368 return false;
1369 }
1370
1371 /**
1372 * Called whenever takeIndex wraps around to zero.
1373 *
1374 * @return true if this iterator should be unlinked from itrs
1375 */
1376 boolean takeIndexWrapped() {
1377 // assert lock.isHeldByCurrentThread();
1378 if (isDetached())
1379 return true;
1380 if (itrs.cycles - prevCycles > 1) {
1381 // All the elements that existed at the time of the last
1382 // operation are gone, so abandon further iteration.
1383 shutdown();
1384 return true;
1385 }
1386 return false;
1387 }
1388
1389 // /** Uncomment for debugging. */
1390 // public String toString() {
1391 // return ("cursor=" + cursor + " " +
1392 // "nextIndex=" + nextIndex + " " +
1393 // "lastRet=" + lastRet + " " +
1394 // "nextItem=" + nextItem + " " +
1395 // "lastItem=" + lastItem + " " +
1396 // "prevCycles=" + prevCycles + " " +
1397 // "prevTakeIndex=" + prevTakeIndex + " " +
1398 // "size()=" + size() + " " +
1399 // "remainingCapacity()=" + remainingCapacity());
1400 // }
1401 }
1402
1403 /**
1404 * Returns a {@link Spliterator} over the elements in this queue.
1405 *
1406 * <p>The returned spliterator is
1407 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1408 *
1409 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1410 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1411 *
1412 * @implNote
1413 * The {@code Spliterator} implements {@code trySplit} to permit limited
1414 * parallelism.
1415 *
1416 * @return a {@code Spliterator} over the elements in this queue
1417 * @since 1.8
1418 */
1419 public Spliterator<E> spliterator() {
1420 return Spliterators.spliterator
1421 (this, (Spliterator.ORDERED |
1422 Spliterator.NONNULL |
1423 Spliterator.CONCURRENT));
1424 }
1425
1426 /**
1427 * @throws NullPointerException {@inheritDoc}
1428 */
1429 public void forEach(Consumer<? super E> action) {
1430 Objects.requireNonNull(action);
1431 final ReentrantLock lock = this.lock;
1432 lock.lock();
1433 try {
1434 if (count > 0) {
1435 final Object[] items = this.items;
1436 for (int i = takeIndex, end = putIndex,
1437 to = (i < end) ? end : items.length;
1438 ; i = 0, to = end) {
1439 for (; i < to; i++)
1440 action.accept(itemAt(items, i));
1441 if (to == end) break;
1442 }
1443 }
1444 } finally {
1445 // checkInvariants();
1446 lock.unlock();
1447 }
1448 }
1449
1450 /**
1451 * @throws NullPointerException {@inheritDoc}
1452 */
1453 public boolean removeIf(Predicate<? super E> filter) {
1454 Objects.requireNonNull(filter);
1455 return bulkRemove(filter);
1456 }
1457
1458 /**
1459 * @throws NullPointerException {@inheritDoc}
1460 */
1461 public boolean removeAll(Collection<?> c) {
1462 Objects.requireNonNull(c);
1463 return bulkRemove(e -> c.contains(e));
1464 }
1465
1466 /**
1467 * @throws NullPointerException {@inheritDoc}
1468 */
1469 public boolean retainAll(Collection<?> c) {
1470 Objects.requireNonNull(c);
1471 return bulkRemove(e -> !c.contains(e));
1472 }
1473
1474 /** Implementation of bulk remove methods. */
1475 private boolean bulkRemove(Predicate<? super E> filter) {
1476 final ReentrantLock lock = this.lock;
1477 lock.lock();
1478 try {
1479 if (itrs == null) { // check for active iterators
1480 if (count > 0) {
1481 final Object[] items = this.items;
1482 // Optimize for initial run of survivors
1483 for (int i = takeIndex, end = putIndex,
1484 to = (i < end) ? end : items.length;
1485 ; i = 0, to = end) {
1486 for (; i < to; i++)
1487 if (filter.test(itemAt(items, i)))
1488 return bulkRemoveModified(filter, i);
1489 if (to == end) break;
1490 }
1491 }
1492 return false;
1493 }
1494 } finally {
1495 // checkInvariants();
1496 lock.unlock();
1497 }
1498 // Active iterators are too hairy!
1499 // Punting (for now) to the slow n^2 algorithm ...
1500 return super.removeIf(filter);
1501 }
1502
1503 // A tiny bit set implementation
1504
1505 private static long[] nBits(int n) {
1506 return new long[((n - 1) >> 6) + 1];
1507 }
1508 private static void setBit(long[] bits, int i) {
1509 bits[i >> 6] |= 1L << i;
1510 }
1511 private static boolean isClear(long[] bits, int i) {
1512 return (bits[i >> 6] & (1L << i)) == 0;
1513 }
1514
1515 /**
1516 * Returns circular distance from i to j, disambiguating i == j to
1517 * items.length; never returns 0.
1518 */
1519 private int distanceNonEmpty(int i, int j) {
1520 if ((j -= i) <= 0) j += items.length;
1521 return j;
1522 }
1523
1524 /**
1525 * Helper for bulkRemove, in case of at least one deletion.
1526 * Tolerate predicates that reentrantly access the collection for
1527 * read (but not write), so traverse once to find elements to
1528 * delete, a second pass to physically expunge.
1529 *
1530 * @param beg valid index of first element to be deleted
1531 */
1532 private boolean bulkRemoveModified(
1533 Predicate<? super E> filter, final int beg) {
1534 final Object[] es = items;
1535 final int capacity = items.length;
1536 final int end = putIndex;
1537 final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
1538 deathRow[0] = 1L; // set bit 0
1539 for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1540 ; i = 0, to = end, k -= capacity) {
1541 for (; i < to; i++)
1542 if (filter.test(itemAt(es, i)))
1543 setBit(deathRow, i - k);
1544 if (to == end) break;
1545 }
1546 // a two-finger traversal, with hare i reading, tortoise w writing
1547 int w = beg;
1548 for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1549 ; w = 0) { // w rejoins i on second leg
1550 // In this loop, i and w are on the same leg, with i > w
1551 for (; i < to; i++)
1552 if (isClear(deathRow, i - k))
1553 es[w++] = es[i];
1554 if (to == end) break;
1555 // In this loop, w is on the first leg, i on the second
1556 for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
1557 if (isClear(deathRow, i - k))
1558 es[w++] = es[i];
1559 if (i >= to) {
1560 if (w == capacity) w = 0; // "corner" case
1561 break;
1562 }
1563 }
1564 count -= distanceNonEmpty(w, end);
1565 circularClear(es, putIndex = w, end);
1566 // checkInvariants();
1567 return true;
1568 }
1569
1570 /** debugging */
1571 void checkInvariants() {
1572 // meta-assertions
1573 // assert lock.isHeldByCurrentThread();
1574 if (!invariantsSatisfied()) {
1575 String detail = String.format(
1576 "takeIndex=%d putIndex=%d count=%d capacity=%d items=%s",
1577 takeIndex, putIndex, count, items.length,
1578 Arrays.toString(items));
1579 System.err.println(detail);
1580 throw new AssertionError(detail);
1581 }
1582 }
1583
1584 private boolean invariantsSatisfied() {
1585 // Unlike ArrayDeque, we have a count field but no spare slot.
1586 // We prefer ArrayDeque's strategy (and the names of its fields!),
1587 // but our field layout is baked into the serial form, and so is
1588 // too annoying to change.
1589 //
1590 // putIndex == takeIndex must be disambiguated by checking count.
1591 int capacity = items.length;
1592 return capacity > 0
1593 && items.getClass() == Object[].class
1594 && (takeIndex | putIndex | count) >= 0
1595 && takeIndex < capacity
1596 && putIndex < capacity
1597 && count <= capacity
1598 && (putIndex - takeIndex - count) % capacity == 0
1599 && (count == 0 || items[takeIndex] != null)
1600 && (count == capacity || items[putIndex] == null)
1601 && (count == 0 || items[dec(putIndex, capacity)] != null);
1602 }
1603
1604 /**
1605 * Reconstitutes this queue from a stream (that is, deserializes it).
1606 *
1607 * @param s the stream
1608 * @throws ClassNotFoundException if the class of a serialized object
1609 * could not be found
1610 * @throws java.io.InvalidObjectException if invariants are violated
1611 * @throws java.io.IOException if an I/O error occurs
1612 */
1613 private void readObject(java.io.ObjectInputStream s)
1614 throws java.io.IOException, ClassNotFoundException {
1615
1616 // Read in items array and various fields
1617 s.defaultReadObject();
1618
1619 if (!invariantsSatisfied())
1620 throw new java.io.InvalidObjectException("invariants violated");
1621 }
1622 }