ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.125
Committed: Wed Jun 17 06:49:07 2015 UTC (8 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.124: +1 -1 lines
Log Message:
import order

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.ref.WeakReference;
10 import java.util.AbstractQueue;
11 import java.util.Arrays;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.Objects;
16 import java.util.Spliterator;
17 import java.util.Spliterators;
18 import java.util.concurrent.locks.Condition;
19 import java.util.concurrent.locks.ReentrantLock;
20
21 /**
22 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
23 * array. This queue orders elements FIFO (first-in-first-out). The
24 * <em>head</em> of the queue is that element that has been on the
25 * queue the longest time. The <em>tail</em> of the queue is that
26 * element that has been on the queue the shortest time. New elements
27 * are inserted at the tail of the queue, and the queue retrieval
28 * operations obtain elements at the head of the queue.
29 *
30 * <p>This is a classic &quot;bounded buffer&quot;, in which a
31 * fixed-sized array holds elements inserted by producers and
32 * extracted by consumers. Once created, the capacity cannot be
33 * changed. Attempts to {@code put} an element into a full queue
34 * will result in the operation blocking; attempts to {@code take} an
35 * element from an empty queue will similarly block.
36 *
37 * <p>This class supports an optional fairness policy for ordering
38 * waiting producer and consumer threads. By default, this ordering
39 * is not guaranteed. However, a queue constructed with fairness set
40 * to {@code true} grants threads access in FIFO order. Fairness
41 * generally decreases throughput but reduces variability and avoids
42 * starvation.
43 *
44 * <p>This class and its iterator implement all of the
45 * <em>optional</em> methods of the {@link Collection} and {@link
46 * Iterator} interfaces.
47 *
48 * <p>This class is a member of the
49 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
50 * Java Collections Framework</a>.
51 *
52 * @since 1.5
53 * @author Doug Lea
54 * @param <E> the type of elements held in this queue
55 */
56 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
57 implements BlockingQueue<E>, java.io.Serializable {
58
59 /**
60 * Serialization ID. This class relies on default serialization
61 * even for the items array, which is default-serialized, even if
62 * it is empty. Otherwise it could not be declared final, which is
63 * necessary here.
64 */
65 private static final long serialVersionUID = -817911632652898426L;
66
67 /** The queued items */
68 final Object[] items;
69
70 /** items index for next take, poll, peek or remove */
71 int takeIndex;
72
73 /** items index for next put, offer, or add */
74 int putIndex;
75
76 /** Number of elements in the queue */
77 int count;
78
79 /*
80 * Concurrency control uses the classic two-condition algorithm
81 * found in any textbook.
82 */
83
84 /** Main lock guarding all access */
85 final ReentrantLock lock;
86
87 /** Condition for waiting takes */
88 private final Condition notEmpty;
89
90 /** Condition for waiting puts */
91 private final Condition notFull;
92
93 /**
94 * Shared state for currently active iterators, or null if there
95 * are known not to be any. Allows queue operations to update
96 * iterator state.
97 */
98 transient Itrs itrs;
99
100 // Internal helper methods
101
102 /**
103 * Circularly decrements array index i.
104 */
105 final int dec(int i) {
106 return ((i == 0) ? items.length : i) - 1;
107 }
108
109 /**
110 * Returns item at index i.
111 */
112 @SuppressWarnings("unchecked")
113 final E itemAt(int i) {
114 return (E) items[i];
115 }
116
117 /**
118 * Inserts element at current put position, advances, and signals.
119 * Call only when holding lock.
120 */
121 private void enqueue(E x) {
122 // assert lock.getHoldCount() == 1;
123 // assert items[putIndex] == null;
124 final Object[] items = this.items;
125 items[putIndex] = x;
126 if (++putIndex == items.length) putIndex = 0;
127 count++;
128 notEmpty.signal();
129 }
130
131 /**
132 * Extracts element at current take position, advances, and signals.
133 * Call only when holding lock.
134 */
135 private E dequeue() {
136 // assert lock.getHoldCount() == 1;
137 // assert items[takeIndex] != null;
138 final Object[] items = this.items;
139 @SuppressWarnings("unchecked")
140 E x = (E) items[takeIndex];
141 items[takeIndex] = null;
142 if (++takeIndex == items.length) takeIndex = 0;
143 count--;
144 if (itrs != null)
145 itrs.elementDequeued();
146 notFull.signal();
147 return x;
148 }
149
150 /**
151 * Deletes item at array index removeIndex.
152 * Utility for remove(Object) and iterator.remove.
153 * Call only when holding lock.
154 */
155 void removeAt(final int removeIndex) {
156 // assert lock.getHoldCount() == 1;
157 // assert items[removeIndex] != null;
158 // assert removeIndex >= 0 && removeIndex < items.length;
159 final Object[] items = this.items;
160 if (removeIndex == takeIndex) {
161 // removing front item; just advance
162 items[takeIndex] = null;
163 if (++takeIndex == items.length) takeIndex = 0;
164 count--;
165 if (itrs != null)
166 itrs.elementDequeued();
167 } else {
168 // an "interior" remove
169
170 // slide over all others up through putIndex.
171 for (int i = removeIndex, putIndex = this.putIndex;;) {
172 int pred = i;
173 if (++i == items.length) i = 0;
174 if (i == putIndex) {
175 items[pred] = null;
176 this.putIndex = pred;
177 break;
178 }
179 items[pred] = items[i];
180 }
181 count--;
182 if (itrs != null)
183 itrs.removedAt(removeIndex);
184 }
185 notFull.signal();
186 }
187
188 /**
189 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
190 * capacity and default access policy.
191 *
192 * @param capacity the capacity of this queue
193 * @throws IllegalArgumentException if {@code capacity < 1}
194 */
195 public ArrayBlockingQueue(int capacity) {
196 this(capacity, false);
197 }
198
199 /**
200 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
201 * capacity and the specified access policy.
202 *
203 * @param capacity the capacity of this queue
204 * @param fair if {@code true} then queue accesses for threads blocked
205 * on insertion or removal, are processed in FIFO order;
206 * if {@code false} the access order is unspecified.
207 * @throws IllegalArgumentException if {@code capacity < 1}
208 */
209 public ArrayBlockingQueue(int capacity, boolean fair) {
210 if (capacity <= 0)
211 throw new IllegalArgumentException();
212 this.items = new Object[capacity];
213 lock = new ReentrantLock(fair);
214 notEmpty = lock.newCondition();
215 notFull = lock.newCondition();
216 }
217
218 /**
219 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
220 * capacity, the specified access policy and initially containing the
221 * elements of the given collection,
222 * added in traversal order of the collection's iterator.
223 *
224 * @param capacity the capacity of this queue
225 * @param fair if {@code true} then queue accesses for threads blocked
226 * on insertion or removal, are processed in FIFO order;
227 * if {@code false} the access order is unspecified.
228 * @param c the collection of elements to initially contain
229 * @throws IllegalArgumentException if {@code capacity} is less than
230 * {@code c.size()}, or less than 1.
231 * @throws NullPointerException if the specified collection or any
232 * of its elements are null
233 */
234 public ArrayBlockingQueue(int capacity, boolean fair,
235 Collection<? extends E> c) {
236 this(capacity, fair);
237
238 final ReentrantLock lock = this.lock;
239 lock.lock(); // Lock only for visibility, not mutual exclusion
240 try {
241 int i = 0;
242 try {
243 for (E e : c)
244 items[i++] = Objects.requireNonNull(e);
245 } catch (ArrayIndexOutOfBoundsException ex) {
246 throw new IllegalArgumentException();
247 }
248 count = i;
249 putIndex = (i == capacity) ? 0 : i;
250 } finally {
251 lock.unlock();
252 }
253 }
254
255 /**
256 * Inserts the specified element at the tail of this queue if it is
257 * possible to do so immediately without exceeding the queue's capacity,
258 * returning {@code true} upon success and throwing an
259 * {@code IllegalStateException} if this queue is full.
260 *
261 * @param e the element to add
262 * @return {@code true} (as specified by {@link Collection#add})
263 * @throws IllegalStateException if this queue is full
264 * @throws NullPointerException if the specified element is null
265 */
266 public boolean add(E e) {
267 return super.add(e);
268 }
269
270 /**
271 * Inserts the specified element at the tail of this queue if it is
272 * possible to do so immediately without exceeding the queue's capacity,
273 * returning {@code true} upon success and {@code false} if this queue
274 * is full. This method is generally preferable to method {@link #add},
275 * which can fail to insert an element only by throwing an exception.
276 *
277 * @throws NullPointerException if the specified element is null
278 */
279 public boolean offer(E e) {
280 Objects.requireNonNull(e);
281 final ReentrantLock lock = this.lock;
282 lock.lock();
283 try {
284 if (count == items.length)
285 return false;
286 else {
287 enqueue(e);
288 return true;
289 }
290 } finally {
291 lock.unlock();
292 }
293 }
294
295 /**
296 * Inserts the specified element at the tail of this queue, waiting
297 * for space to become available if the queue is full.
298 *
299 * @throws InterruptedException {@inheritDoc}
300 * @throws NullPointerException {@inheritDoc}
301 */
302 public void put(E e) throws InterruptedException {
303 Objects.requireNonNull(e);
304 final ReentrantLock lock = this.lock;
305 lock.lockInterruptibly();
306 try {
307 while (count == items.length)
308 notFull.await();
309 enqueue(e);
310 } finally {
311 lock.unlock();
312 }
313 }
314
315 /**
316 * Inserts the specified element at the tail of this queue, waiting
317 * up to the specified wait time for space to become available if
318 * the queue is full.
319 *
320 * @throws InterruptedException {@inheritDoc}
321 * @throws NullPointerException {@inheritDoc}
322 */
323 public boolean offer(E e, long timeout, TimeUnit unit)
324 throws InterruptedException {
325
326 Objects.requireNonNull(e);
327 long nanos = unit.toNanos(timeout);
328 final ReentrantLock lock = this.lock;
329 lock.lockInterruptibly();
330 try {
331 while (count == items.length) {
332 if (nanos <= 0)
333 return false;
334 nanos = notFull.awaitNanos(nanos);
335 }
336 enqueue(e);
337 return true;
338 } finally {
339 lock.unlock();
340 }
341 }
342
343 public E poll() {
344 final ReentrantLock lock = this.lock;
345 lock.lock();
346 try {
347 return (count == 0) ? null : dequeue();
348 } finally {
349 lock.unlock();
350 }
351 }
352
353 public E take() throws InterruptedException {
354 final ReentrantLock lock = this.lock;
355 lock.lockInterruptibly();
356 try {
357 while (count == 0)
358 notEmpty.await();
359 return dequeue();
360 } finally {
361 lock.unlock();
362 }
363 }
364
365 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
366 long nanos = unit.toNanos(timeout);
367 final ReentrantLock lock = this.lock;
368 lock.lockInterruptibly();
369 try {
370 while (count == 0) {
371 if (nanos <= 0)
372 return null;
373 nanos = notEmpty.awaitNanos(nanos);
374 }
375 return dequeue();
376 } finally {
377 lock.unlock();
378 }
379 }
380
381 public E peek() {
382 final ReentrantLock lock = this.lock;
383 lock.lock();
384 try {
385 return itemAt(takeIndex); // null when queue is empty
386 } finally {
387 lock.unlock();
388 }
389 }
390
391 // this doc comment is overridden to remove the reference to collections
392 // greater in size than Integer.MAX_VALUE
393 /**
394 * Returns the number of elements in this queue.
395 *
396 * @return the number of elements in this queue
397 */
398 public int size() {
399 final ReentrantLock lock = this.lock;
400 lock.lock();
401 try {
402 return count;
403 } finally {
404 lock.unlock();
405 }
406 }
407
408 // this doc comment is a modified copy of the inherited doc comment,
409 // without the reference to unlimited queues.
410 /**
411 * Returns the number of additional elements that this queue can ideally
412 * (in the absence of memory or resource constraints) accept without
413 * blocking. This is always equal to the initial capacity of this queue
414 * less the current {@code size} of this queue.
415 *
416 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
417 * an element will succeed by inspecting {@code remainingCapacity}
418 * because it may be the case that another thread is about to
419 * insert or remove an element.
420 */
421 public int remainingCapacity() {
422 final ReentrantLock lock = this.lock;
423 lock.lock();
424 try {
425 return items.length - count;
426 } finally {
427 lock.unlock();
428 }
429 }
430
431 /**
432 * Removes a single instance of the specified element from this queue,
433 * if it is present. More formally, removes an element {@code e} such
434 * that {@code o.equals(e)}, if this queue contains one or more such
435 * elements.
436 * Returns {@code true} if this queue contained the specified element
437 * (or equivalently, if this queue changed as a result of the call).
438 *
439 * <p>Removal of interior elements in circular array based queues
440 * is an intrinsically slow and disruptive operation, so should
441 * be undertaken only in exceptional circumstances, ideally
442 * only when the queue is known not to be accessible by other
443 * threads.
444 *
445 * @param o element to be removed from this queue, if present
446 * @return {@code true} if this queue changed as a result of the call
447 */
448 public boolean remove(Object o) {
449 if (o == null) return false;
450 final ReentrantLock lock = this.lock;
451 lock.lock();
452 try {
453 if (count > 0) {
454 final Object[] items = this.items;
455 final int putIndex = this.putIndex;
456 int i = takeIndex;
457 do {
458 if (o.equals(items[i])) {
459 removeAt(i);
460 return true;
461 }
462 if (++i == items.length) i = 0;
463 } while (i != putIndex);
464 }
465 return false;
466 } finally {
467 lock.unlock();
468 }
469 }
470
471 /**
472 * Returns {@code true} if this queue contains the specified element.
473 * More formally, returns {@code true} if and only if this queue contains
474 * at least one element {@code e} such that {@code o.equals(e)}.
475 *
476 * @param o object to be checked for containment in this queue
477 * @return {@code true} if this queue contains the specified element
478 */
479 public boolean contains(Object o) {
480 if (o == null) return false;
481 final ReentrantLock lock = this.lock;
482 lock.lock();
483 try {
484 if (count > 0) {
485 final Object[] items = this.items;
486 final int putIndex = this.putIndex;
487 int i = takeIndex;
488 do {
489 if (o.equals(items[i]))
490 return true;
491 if (++i == items.length) i = 0;
492 } while (i != putIndex);
493 }
494 return false;
495 } finally {
496 lock.unlock();
497 }
498 }
499
500 /**
501 * Returns an array containing all of the elements in this queue, in
502 * proper sequence.
503 *
504 * <p>The returned array will be "safe" in that no references to it are
505 * maintained by this queue. (In other words, this method must allocate
506 * a new array). The caller is thus free to modify the returned array.
507 *
508 * <p>This method acts as bridge between array-based and collection-based
509 * APIs.
510 *
511 * @return an array containing all of the elements in this queue
512 */
513 public Object[] toArray() {
514 final ReentrantLock lock = this.lock;
515 lock.lock();
516 try {
517 final Object[] items = this.items;
518 final int end = takeIndex + count;
519 final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
520 if (end != putIndex)
521 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
522 return a;
523 } finally {
524 lock.unlock();
525 }
526 }
527
528 /**
529 * Returns an array containing all of the elements in this queue, in
530 * proper sequence; the runtime type of the returned array is that of
531 * the specified array. If the queue fits in the specified array, it
532 * is returned therein. Otherwise, a new array is allocated with the
533 * runtime type of the specified array and the size of this queue.
534 *
535 * <p>If this queue fits in the specified array with room to spare
536 * (i.e., the array has more elements than this queue), the element in
537 * the array immediately following the end of the queue is set to
538 * {@code null}.
539 *
540 * <p>Like the {@link #toArray()} method, this method acts as bridge between
541 * array-based and collection-based APIs. Further, this method allows
542 * precise control over the runtime type of the output array, and may,
543 * under certain circumstances, be used to save allocation costs.
544 *
545 * <p>Suppose {@code x} is a queue known to contain only strings.
546 * The following code can be used to dump the queue into a newly
547 * allocated array of {@code String}:
548 *
549 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
550 *
551 * Note that {@code toArray(new Object[0])} is identical in function to
552 * {@code toArray()}.
553 *
554 * @param a the array into which the elements of the queue are to
555 * be stored, if it is big enough; otherwise, a new array of the
556 * same runtime type is allocated for this purpose
557 * @return an array containing all of the elements in this queue
558 * @throws ArrayStoreException if the runtime type of the specified array
559 * is not a supertype of the runtime type of every element in
560 * this queue
561 * @throws NullPointerException if the specified array is null
562 */
563 @SuppressWarnings("unchecked")
564 public <T> T[] toArray(T[] a) {
565 final ReentrantLock lock = this.lock;
566 lock.lock();
567 try {
568 final Object[] items = this.items;
569 final int count = this.count;
570 final int firstLeg = Math.min(items.length - takeIndex, count);
571 if (a.length < count) {
572 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
573 a.getClass());
574 } else {
575 System.arraycopy(items, takeIndex, a, 0, firstLeg);
576 if (a.length > count)
577 a[count] = null;
578 }
579 if (firstLeg < count)
580 System.arraycopy(items, 0, a, firstLeg, putIndex);
581 return a;
582 } finally {
583 lock.unlock();
584 }
585 }
586
587 public String toString() {
588 return Helpers.collectionToString(this);
589 }
590
591 /**
592 * Atomically removes all of the elements from this queue.
593 * The queue will be empty after this call returns.
594 */
595 public void clear() {
596 final ReentrantLock lock = this.lock;
597 lock.lock();
598 try {
599 int k = count;
600 if (k > 0) {
601 final Object[] items = this.items;
602 final int putIndex = this.putIndex;
603 int i = takeIndex;
604 do {
605 items[i] = null;
606 if (++i == items.length) i = 0;
607 } while (i != putIndex);
608 takeIndex = putIndex;
609 count = 0;
610 if (itrs != null)
611 itrs.queueIsEmpty();
612 for (; k > 0 && lock.hasWaiters(notFull); k--)
613 notFull.signal();
614 }
615 } finally {
616 lock.unlock();
617 }
618 }
619
620 /**
621 * @throws UnsupportedOperationException {@inheritDoc}
622 * @throws ClassCastException {@inheritDoc}
623 * @throws NullPointerException {@inheritDoc}
624 * @throws IllegalArgumentException {@inheritDoc}
625 */
626 public int drainTo(Collection<? super E> c) {
627 return drainTo(c, Integer.MAX_VALUE);
628 }
629
630 /**
631 * @throws UnsupportedOperationException {@inheritDoc}
632 * @throws ClassCastException {@inheritDoc}
633 * @throws NullPointerException {@inheritDoc}
634 * @throws IllegalArgumentException {@inheritDoc}
635 */
636 public int drainTo(Collection<? super E> c, int maxElements) {
637 Objects.requireNonNull(c);
638 if (c == this)
639 throw new IllegalArgumentException();
640 if (maxElements <= 0)
641 return 0;
642 final Object[] items = this.items;
643 final ReentrantLock lock = this.lock;
644 lock.lock();
645 try {
646 int n = Math.min(maxElements, count);
647 int take = takeIndex;
648 int i = 0;
649 try {
650 while (i < n) {
651 @SuppressWarnings("unchecked")
652 E x = (E) items[take];
653 c.add(x);
654 items[take] = null;
655 if (++take == items.length) take = 0;
656 i++;
657 }
658 return n;
659 } finally {
660 // Restore invariants even if c.add() threw
661 if (i > 0) {
662 count -= i;
663 takeIndex = take;
664 if (itrs != null) {
665 if (count == 0)
666 itrs.queueIsEmpty();
667 else if (i > take)
668 itrs.takeIndexWrapped();
669 }
670 for (; i > 0 && lock.hasWaiters(notFull); i--)
671 notFull.signal();
672 }
673 }
674 } finally {
675 lock.unlock();
676 }
677 }
678
679 /**
680 * Returns an iterator over the elements in this queue in proper sequence.
681 * The elements will be returned in order from first (head) to last (tail).
682 *
683 * <p>The returned iterator is
684 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
685 *
686 * @return an iterator over the elements in this queue in proper sequence
687 */
688 public Iterator<E> iterator() {
689 return new Itr();
690 }
691
692 /**
693 * Shared data between iterators and their queue, allowing queue
694 * modifications to update iterators when elements are removed.
695 *
696 * This adds a lot of complexity for the sake of correctly
697 * handling some uncommon operations, but the combination of
698 * circular-arrays and supporting interior removes (i.e., those
699 * not at head) would cause iterators to sometimes lose their
700 * places and/or (re)report elements they shouldn't. To avoid
701 * this, when a queue has one or more iterators, it keeps iterator
702 * state consistent by:
703 *
704 * (1) keeping track of the number of "cycles", that is, the
705 * number of times takeIndex has wrapped around to 0.
706 * (2) notifying all iterators via the callback removedAt whenever
707 * an interior element is removed (and thus other elements may
708 * be shifted).
709 *
710 * These suffice to eliminate iterator inconsistencies, but
711 * unfortunately add the secondary responsibility of maintaining
712 * the list of iterators. We track all active iterators in a
713 * simple linked list (accessed only when the queue's lock is
714 * held) of weak references to Itr. The list is cleaned up using
715 * 3 different mechanisms:
716 *
717 * (1) Whenever a new iterator is created, do some O(1) checking for
718 * stale list elements.
719 *
720 * (2) Whenever takeIndex wraps around to 0, check for iterators
721 * that have been unused for more than one wrap-around cycle.
722 *
723 * (3) Whenever the queue becomes empty, all iterators are notified
724 * and this entire data structure is discarded.
725 *
726 * So in addition to the removedAt callback that is necessary for
727 * correctness, iterators have the shutdown and takeIndexWrapped
728 * callbacks that help remove stale iterators from the list.
729 *
730 * Whenever a list element is examined, it is expunged if either
731 * the GC has determined that the iterator is discarded, or if the
732 * iterator reports that it is "detached" (does not need any
733 * further state updates). Overhead is maximal when takeIndex
734 * never advances, iterators are discarded before they are
735 * exhausted, and all removals are interior removes, in which case
736 * all stale iterators are discovered by the GC. But even in this
737 * case we don't increase the amortized complexity.
738 *
739 * Care must be taken to keep list sweeping methods from
740 * reentrantly invoking another such method, causing subtle
741 * corruption bugs.
742 */
743 class Itrs {
744
745 /**
746 * Node in a linked list of weak iterator references.
747 */
748 private class Node extends WeakReference<Itr> {
749 Node next;
750
751 Node(Itr iterator, Node next) {
752 super(iterator);
753 this.next = next;
754 }
755 }
756
757 /** Incremented whenever takeIndex wraps around to 0 */
758 int cycles;
759
760 /** Linked list of weak iterator references */
761 private Node head;
762
763 /** Used to expunge stale iterators */
764 private Node sweeper;
765
766 private static final int SHORT_SWEEP_PROBES = 4;
767 private static final int LONG_SWEEP_PROBES = 16;
768
769 Itrs(Itr initial) {
770 register(initial);
771 }
772
773 /**
774 * Sweeps itrs, looking for and expunging stale iterators.
775 * If at least one was found, tries harder to find more.
776 * Called only from iterating thread.
777 *
778 * @param tryHarder whether to start in try-harder mode, because
779 * there is known to be at least one iterator to collect
780 */
781 void doSomeSweeping(boolean tryHarder) {
782 // assert lock.getHoldCount() == 1;
783 // assert head != null;
784 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
785 Node o, p;
786 final Node sweeper = this.sweeper;
787 boolean passedGo; // to limit search to one full sweep
788
789 if (sweeper == null) {
790 o = null;
791 p = head;
792 passedGo = true;
793 } else {
794 o = sweeper;
795 p = o.next;
796 passedGo = false;
797 }
798
799 for (; probes > 0; probes--) {
800 if (p == null) {
801 if (passedGo)
802 break;
803 o = null;
804 p = head;
805 passedGo = true;
806 }
807 final Itr it = p.get();
808 final Node next = p.next;
809 if (it == null || it.isDetached()) {
810 // found a discarded/exhausted iterator
811 probes = LONG_SWEEP_PROBES; // "try harder"
812 // unlink p
813 p.clear();
814 p.next = null;
815 if (o == null) {
816 head = next;
817 if (next == null) {
818 // We've run out of iterators to track; retire
819 itrs = null;
820 return;
821 }
822 }
823 else
824 o.next = next;
825 } else {
826 o = p;
827 }
828 p = next;
829 }
830
831 this.sweeper = (p == null) ? null : o;
832 }
833
834 /**
835 * Adds a new iterator to the linked list of tracked iterators.
836 */
837 void register(Itr itr) {
838 // assert lock.getHoldCount() == 1;
839 head = new Node(itr, head);
840 }
841
842 /**
843 * Called whenever takeIndex wraps around to 0.
844 *
845 * Notifies all iterators, and expunges any that are now stale.
846 */
847 void takeIndexWrapped() {
848 // assert lock.getHoldCount() == 1;
849 cycles++;
850 for (Node o = null, p = head; p != null;) {
851 final Itr it = p.get();
852 final Node next = p.next;
853 if (it == null || it.takeIndexWrapped()) {
854 // unlink p
855 // assert it == null || it.isDetached();
856 p.clear();
857 p.next = null;
858 if (o == null)
859 head = next;
860 else
861 o.next = next;
862 } else {
863 o = p;
864 }
865 p = next;
866 }
867 if (head == null) // no more iterators to track
868 itrs = null;
869 }
870
871 /**
872 * Called whenever an interior remove (not at takeIndex) occurred.
873 *
874 * Notifies all iterators, and expunges any that are now stale.
875 */
876 void removedAt(int removedIndex) {
877 for (Node o = null, p = head; p != null;) {
878 final Itr it = p.get();
879 final Node next = p.next;
880 if (it == null || it.removedAt(removedIndex)) {
881 // unlink p
882 // assert it == null || it.isDetached();
883 p.clear();
884 p.next = null;
885 if (o == null)
886 head = next;
887 else
888 o.next = next;
889 } else {
890 o = p;
891 }
892 p = next;
893 }
894 if (head == null) // no more iterators to track
895 itrs = null;
896 }
897
898 /**
899 * Called whenever the queue becomes empty.
900 *
901 * Notifies all active iterators that the queue is empty,
902 * clears all weak refs, and unlinks the itrs datastructure.
903 */
904 void queueIsEmpty() {
905 // assert lock.getHoldCount() == 1;
906 for (Node p = head; p != null; p = p.next) {
907 Itr it = p.get();
908 if (it != null) {
909 p.clear();
910 it.shutdown();
911 }
912 }
913 head = null;
914 itrs = null;
915 }
916
917 /**
918 * Called whenever an element has been dequeued (at takeIndex).
919 */
920 void elementDequeued() {
921 // assert lock.getHoldCount() == 1;
922 if (count == 0)
923 queueIsEmpty();
924 else if (takeIndex == 0)
925 takeIndexWrapped();
926 }
927 }
928
929 /**
930 * Iterator for ArrayBlockingQueue.
931 *
932 * To maintain weak consistency with respect to puts and takes, we
933 * read ahead one slot, so as to not report hasNext true but then
934 * not have an element to return.
935 *
936 * We switch into "detached" mode (allowing prompt unlinking from
937 * itrs without help from the GC) when all indices are negative, or
938 * when hasNext returns false for the first time. This allows the
939 * iterator to track concurrent updates completely accurately,
940 * except for the corner case of the user calling Iterator.remove()
941 * after hasNext() returned false. Even in this case, we ensure
942 * that we don't remove the wrong element by keeping track of the
943 * expected element to remove, in lastItem. Yes, we may fail to
944 * remove lastItem from the queue if it moved due to an interleaved
945 * interior remove while in detached mode.
946 */
947 private class Itr implements Iterator<E> {
948 /** Index to look for new nextItem; NONE at end */
949 private int cursor;
950
951 /** Element to be returned by next call to next(); null if none */
952 private E nextItem;
953
954 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
955 private int nextIndex;
956
957 /** Last element returned; null if none or not detached. */
958 private E lastItem;
959
960 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
961 private int lastRet;
962
963 /** Previous value of takeIndex, or DETACHED when detached */
964 private int prevTakeIndex;
965
966 /** Previous value of iters.cycles */
967 private int prevCycles;
968
969 /** Special index value indicating "not available" or "undefined" */
970 private static final int NONE = -1;
971
972 /**
973 * Special index value indicating "removed elsewhere", that is,
974 * removed by some operation other than a call to this.remove().
975 */
976 private static final int REMOVED = -2;
977
978 /** Special value for prevTakeIndex indicating "detached mode" */
979 private static final int DETACHED = -3;
980
981 Itr() {
982 // assert lock.getHoldCount() == 0;
983 lastRet = NONE;
984 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
985 lock.lock();
986 try {
987 if (count == 0) {
988 // assert itrs == null;
989 cursor = NONE;
990 nextIndex = NONE;
991 prevTakeIndex = DETACHED;
992 } else {
993 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
994 prevTakeIndex = takeIndex;
995 nextItem = itemAt(nextIndex = takeIndex);
996 cursor = incCursor(takeIndex);
997 if (itrs == null) {
998 itrs = new Itrs(this);
999 } else {
1000 itrs.register(this); // in this order
1001 itrs.doSomeSweeping(false);
1002 }
1003 prevCycles = itrs.cycles;
1004 // assert takeIndex >= 0;
1005 // assert prevTakeIndex == takeIndex;
1006 // assert nextIndex >= 0;
1007 // assert nextItem != null;
1008 }
1009 } finally {
1010 lock.unlock();
1011 }
1012 }
1013
1014 boolean isDetached() {
1015 // assert lock.getHoldCount() == 1;
1016 return prevTakeIndex < 0;
1017 }
1018
1019 private int incCursor(int index) {
1020 // assert lock.getHoldCount() == 1;
1021 if (++index == items.length) index = 0;
1022 if (index == putIndex) index = NONE;
1023 return index;
1024 }
1025
1026 /**
1027 * Returns true if index is invalidated by the given number of
1028 * dequeues, starting from prevTakeIndex.
1029 */
1030 private boolean invalidated(int index, int prevTakeIndex,
1031 long dequeues, int length) {
1032 if (index < 0)
1033 return false;
1034 int distance = index - prevTakeIndex;
1035 if (distance < 0)
1036 distance += length;
1037 return dequeues > distance;
1038 }
1039
1040 /**
1041 * Adjusts indices to incorporate all dequeues since the last
1042 * operation on this iterator. Call only from iterating thread.
1043 */
1044 private void incorporateDequeues() {
1045 // assert lock.getHoldCount() == 1;
1046 // assert itrs != null;
1047 // assert !isDetached();
1048 // assert count > 0;
1049
1050 final int cycles = itrs.cycles;
1051 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1052 final int prevCycles = this.prevCycles;
1053 final int prevTakeIndex = this.prevTakeIndex;
1054
1055 if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1056 final int len = items.length;
1057 // how far takeIndex has advanced since the previous
1058 // operation of this iterator
1059 long dequeues = (cycles - prevCycles) * len
1060 + (takeIndex - prevTakeIndex);
1061
1062 // Check indices for invalidation
1063 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1064 lastRet = REMOVED;
1065 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1066 nextIndex = REMOVED;
1067 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1068 cursor = takeIndex;
1069
1070 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1071 detach();
1072 else {
1073 this.prevCycles = cycles;
1074 this.prevTakeIndex = takeIndex;
1075 }
1076 }
1077 }
1078
1079 /**
1080 * Called when itrs should stop tracking this iterator, either
1081 * because there are no more indices to update (cursor < 0 &&
1082 * nextIndex < 0 && lastRet < 0) or as a special exception, when
1083 * lastRet >= 0, because hasNext() is about to return false for the
1084 * first time. Call only from iterating thread.
1085 */
1086 private void detach() {
1087 // Switch to detached mode
1088 // assert lock.getHoldCount() == 1;
1089 // assert cursor == NONE;
1090 // assert nextIndex < 0;
1091 // assert lastRet < 0 || nextItem == null;
1092 // assert lastRet < 0 ^ lastItem != null;
1093 if (prevTakeIndex >= 0) {
1094 // assert itrs != null;
1095 prevTakeIndex = DETACHED;
1096 // try to unlink from itrs (but not too hard)
1097 itrs.doSomeSweeping(true);
1098 }
1099 }
1100
1101 /**
1102 * For performance reasons, we would like not to acquire a lock in
1103 * hasNext in the common case. To allow for this, we only access
1104 * fields (i.e. nextItem) that are not modified by update operations
1105 * triggered by queue modifications.
1106 */
1107 public boolean hasNext() {
1108 // assert lock.getHoldCount() == 0;
1109 if (nextItem != null)
1110 return true;
1111 noNext();
1112 return false;
1113 }
1114
1115 private void noNext() {
1116 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1117 lock.lock();
1118 try {
1119 // assert cursor == NONE;
1120 // assert nextIndex == NONE;
1121 if (!isDetached()) {
1122 // assert lastRet >= 0;
1123 incorporateDequeues(); // might update lastRet
1124 if (lastRet >= 0) {
1125 lastItem = itemAt(lastRet);
1126 // assert lastItem != null;
1127 detach();
1128 }
1129 }
1130 // assert isDetached();
1131 // assert lastRet < 0 ^ lastItem != null;
1132 } finally {
1133 lock.unlock();
1134 }
1135 }
1136
1137 public E next() {
1138 // assert lock.getHoldCount() == 0;
1139 final E x = nextItem;
1140 if (x == null)
1141 throw new NoSuchElementException();
1142 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1143 lock.lock();
1144 try {
1145 if (!isDetached())
1146 incorporateDequeues();
1147 // assert nextIndex != NONE;
1148 // assert lastItem == null;
1149 lastRet = nextIndex;
1150 final int cursor = this.cursor;
1151 if (cursor >= 0) {
1152 nextItem = itemAt(nextIndex = cursor);
1153 // assert nextItem != null;
1154 this.cursor = incCursor(cursor);
1155 } else {
1156 nextIndex = NONE;
1157 nextItem = null;
1158 }
1159 } finally {
1160 lock.unlock();
1161 }
1162 return x;
1163 }
1164
1165 public void remove() {
1166 // assert lock.getHoldCount() == 0;
1167 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1168 lock.lock();
1169 try {
1170 if (!isDetached())
1171 incorporateDequeues(); // might update lastRet or detach
1172 final int lastRet = this.lastRet;
1173 this.lastRet = NONE;
1174 if (lastRet >= 0) {
1175 if (!isDetached())
1176 removeAt(lastRet);
1177 else {
1178 final E lastItem = this.lastItem;
1179 // assert lastItem != null;
1180 this.lastItem = null;
1181 if (itemAt(lastRet) == lastItem)
1182 removeAt(lastRet);
1183 }
1184 } else if (lastRet == NONE)
1185 throw new IllegalStateException();
1186 // else lastRet == REMOVED and the last returned element was
1187 // previously asynchronously removed via an operation other
1188 // than this.remove(), so nothing to do.
1189
1190 if (cursor < 0 && nextIndex < 0)
1191 detach();
1192 } finally {
1193 lock.unlock();
1194 // assert lastRet == NONE;
1195 // assert lastItem == null;
1196 }
1197 }
1198
1199 /**
1200 * Called to notify the iterator that the queue is empty, or that it
1201 * has fallen hopelessly behind, so that it should abandon any
1202 * further iteration, except possibly to return one more element
1203 * from next(), as promised by returning true from hasNext().
1204 */
1205 void shutdown() {
1206 // assert lock.getHoldCount() == 1;
1207 cursor = NONE;
1208 if (nextIndex >= 0)
1209 nextIndex = REMOVED;
1210 if (lastRet >= 0) {
1211 lastRet = REMOVED;
1212 lastItem = null;
1213 }
1214 prevTakeIndex = DETACHED;
1215 // Don't set nextItem to null because we must continue to be
1216 // able to return it on next().
1217 //
1218 // Caller will unlink from itrs when convenient.
1219 }
1220
1221 private int distance(int index, int prevTakeIndex, int length) {
1222 int distance = index - prevTakeIndex;
1223 if (distance < 0)
1224 distance += length;
1225 return distance;
1226 }
1227
1228 /**
1229 * Called whenever an interior remove (not at takeIndex) occurred.
1230 *
1231 * @return true if this iterator should be unlinked from itrs
1232 */
1233 boolean removedAt(int removedIndex) {
1234 // assert lock.getHoldCount() == 1;
1235 if (isDetached())
1236 return true;
1237
1238 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1239 final int prevTakeIndex = this.prevTakeIndex;
1240 final int len = items.length;
1241 // distance from prevTakeIndex to removedIndex
1242 final int removedDistance =
1243 len * (itrs.cycles - this.prevCycles
1244 + ((removedIndex < takeIndex) ? 1 : 0))
1245 + (removedIndex - prevTakeIndex);
1246 // assert itrs.cycles - this.prevCycles >= 0;
1247 // assert itrs.cycles - this.prevCycles <= 1;
1248 // assert removedDistance > 0;
1249 // assert removedIndex != takeIndex;
1250 int cursor = this.cursor;
1251 if (cursor >= 0) {
1252 int x = distance(cursor, prevTakeIndex, len);
1253 if (x == removedDistance) {
1254 if (cursor == putIndex)
1255 this.cursor = cursor = NONE;
1256 }
1257 else if (x > removedDistance) {
1258 // assert cursor != prevTakeIndex;
1259 this.cursor = cursor = dec(cursor);
1260 }
1261 }
1262 int lastRet = this.lastRet;
1263 if (lastRet >= 0) {
1264 int x = distance(lastRet, prevTakeIndex, len);
1265 if (x == removedDistance)
1266 this.lastRet = lastRet = REMOVED;
1267 else if (x > removedDistance)
1268 this.lastRet = lastRet = dec(lastRet);
1269 }
1270 int nextIndex = this.nextIndex;
1271 if (nextIndex >= 0) {
1272 int x = distance(nextIndex, prevTakeIndex, len);
1273 if (x == removedDistance)
1274 this.nextIndex = nextIndex = REMOVED;
1275 else if (x > removedDistance)
1276 this.nextIndex = nextIndex = dec(nextIndex);
1277 }
1278 if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1279 this.prevTakeIndex = DETACHED;
1280 return true;
1281 }
1282 return false;
1283 }
1284
1285 /**
1286 * Called whenever takeIndex wraps around to zero.
1287 *
1288 * @return true if this iterator should be unlinked from itrs
1289 */
1290 boolean takeIndexWrapped() {
1291 // assert lock.getHoldCount() == 1;
1292 if (isDetached())
1293 return true;
1294 if (itrs.cycles - prevCycles > 1) {
1295 // All the elements that existed at the time of the last
1296 // operation are gone, so abandon further iteration.
1297 shutdown();
1298 return true;
1299 }
1300 return false;
1301 }
1302
1303 // /** Uncomment for debugging. */
1304 // public String toString() {
1305 // return ("cursor=" + cursor + " " +
1306 // "nextIndex=" + nextIndex + " " +
1307 // "lastRet=" + lastRet + " " +
1308 // "nextItem=" + nextItem + " " +
1309 // "lastItem=" + lastItem + " " +
1310 // "prevCycles=" + prevCycles + " " +
1311 // "prevTakeIndex=" + prevTakeIndex + " " +
1312 // "size()=" + size() + " " +
1313 // "remainingCapacity()=" + remainingCapacity());
1314 // }
1315 }
1316
1317 /**
1318 * Returns a {@link Spliterator} over the elements in this queue.
1319 *
1320 * <p>The returned spliterator is
1321 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1322 *
1323 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1324 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1325 *
1326 * @implNote
1327 * The {@code Spliterator} implements {@code trySplit} to permit limited
1328 * parallelism.
1329 *
1330 * @return a {@code Spliterator} over the elements in this queue
1331 * @since 1.8
1332 */
1333 public Spliterator<E> spliterator() {
1334 return Spliterators.spliterator
1335 (this, (Spliterator.ORDERED |
1336 Spliterator.NONNULL |
1337 Spliterator.CONCURRENT));
1338 }
1339
1340 }