ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.121
Committed: Fri Feb 27 07:33:00 2015 UTC (9 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.120: +1 -1 lines
Log Message:
elide null assignment in <init>

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.lang.ref.WeakReference;
10 import java.util.Arrays;
11 import java.util.AbstractQueue;
12 import java.util.Collection;
13 import java.util.Iterator;
14 import java.util.NoSuchElementException;
15 import java.util.Objects;
16 import java.util.Spliterator;
17 import java.util.Spliterators;
18 import java.util.concurrent.locks.Condition;
19 import java.util.concurrent.locks.ReentrantLock;
20
21 /**
22 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
23 * array. This queue orders elements FIFO (first-in-first-out). The
24 * <em>head</em> of the queue is that element that has been on the
25 * queue the longest time. The <em>tail</em> of the queue is that
26 * element that has been on the queue the shortest time. New elements
27 * are inserted at the tail of the queue, and the queue retrieval
28 * operations obtain elements at the head of the queue.
29 *
30 * <p>This is a classic &quot;bounded buffer&quot;, in which a
31 * fixed-sized array holds elements inserted by producers and
32 * extracted by consumers. Once created, the capacity cannot be
33 * changed. Attempts to {@code put} an element into a full queue
34 * will result in the operation blocking; attempts to {@code take} an
35 * element from an empty queue will similarly block.
36 *
37 * <p>This class supports an optional fairness policy for ordering
38 * waiting producer and consumer threads. By default, this ordering
39 * is not guaranteed. However, a queue constructed with fairness set
40 * to {@code true} grants threads access in FIFO order. Fairness
41 * generally decreases throughput but reduces variability and avoids
42 * starvation.
43 *
44 * <p>This class and its iterator implement all of the
45 * <em>optional</em> methods of the {@link Collection} and {@link
46 * Iterator} interfaces.
47 *
48 * <p>This class is a member of the
49 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
50 * Java Collections Framework</a>.
51 *
52 * @since 1.5
53 * @author Doug Lea
54 * @param <E> the type of elements held in this queue
55 */
56 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
57 implements BlockingQueue<E>, java.io.Serializable {
58
59 /**
60 * Serialization ID. This class relies on default serialization
61 * even for the items array, which is default-serialized, even if
62 * it is empty. Otherwise it could not be declared final, which is
63 * necessary here.
64 */
65 private static final long serialVersionUID = -817911632652898426L;
66
67 /** The queued items */
68 final Object[] items;
69
70 /** items index for next take, poll, peek or remove */
71 int takeIndex;
72
73 /** items index for next put, offer, or add */
74 int putIndex;
75
76 /** Number of elements in the queue */
77 int count;
78
79 /*
80 * Concurrency control uses the classic two-condition algorithm
81 * found in any textbook.
82 */
83
84 /** Main lock guarding all access */
85 final ReentrantLock lock;
86
87 /** Condition for waiting takes */
88 private final Condition notEmpty;
89
90 /** Condition for waiting puts */
91 private final Condition notFull;
92
93 /**
94 * Shared state for currently active iterators, or null if there
95 * are known not to be any. Allows queue operations to update
96 * iterator state.
97 */
98 transient Itrs itrs;
99
100 // Internal helper methods
101
102 /**
103 * Circularly decrements array index i.
104 */
105 final int dec(int i) {
106 return ((i == 0) ? items.length : i) - 1;
107 }
108
109 /**
110 * Returns item at index i.
111 */
112 @SuppressWarnings("unchecked")
113 final E itemAt(int i) {
114 return (E) items[i];
115 }
116
117 /**
118 * Inserts element at current put position, advances, and signals.
119 * Call only when holding lock.
120 */
121 private void enqueue(E x) {
122 // assert lock.getHoldCount() == 1;
123 // assert items[putIndex] == null;
124 final Object[] items = this.items;
125 items[putIndex] = x;
126 if (++putIndex == items.length) putIndex = 0;
127 count++;
128 notEmpty.signal();
129 }
130
131 /**
132 * Extracts element at current take position, advances, and signals.
133 * Call only when holding lock.
134 */
135 private E dequeue() {
136 // assert lock.getHoldCount() == 1;
137 // assert items[takeIndex] != null;
138 final Object[] items = this.items;
139 @SuppressWarnings("unchecked")
140 E x = (E) items[takeIndex];
141 items[takeIndex] = null;
142 if (++takeIndex == items.length) takeIndex = 0;
143 count--;
144 if (itrs != null)
145 itrs.elementDequeued();
146 notFull.signal();
147 return x;
148 }
149
150 /**
151 * Deletes item at array index removeIndex.
152 * Utility for remove(Object) and iterator.remove.
153 * Call only when holding lock.
154 */
155 void removeAt(final int removeIndex) {
156 // assert lock.getHoldCount() == 1;
157 // assert items[removeIndex] != null;
158 // assert removeIndex >= 0 && removeIndex < items.length;
159 final Object[] items = this.items;
160 if (removeIndex == takeIndex) {
161 // removing front item; just advance
162 items[takeIndex] = null;
163 if (++takeIndex == items.length) takeIndex = 0;
164 count--;
165 if (itrs != null)
166 itrs.elementDequeued();
167 } else {
168 // an "interior" remove
169
170 // slide over all others up through putIndex.
171 for (int i = removeIndex, putIndex = this.putIndex;;) {
172 int pred = i;
173 if (++i == items.length) i = 0;
174 if (i == putIndex) {
175 items[pred] = null;
176 this.putIndex = pred;
177 break;
178 }
179 items[pred] = items[i];
180 }
181 count--;
182 if (itrs != null)
183 itrs.removedAt(removeIndex);
184 }
185 notFull.signal();
186 }
187
188 /**
189 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
190 * capacity and default access policy.
191 *
192 * @param capacity the capacity of this queue
193 * @throws IllegalArgumentException if {@code capacity < 1}
194 */
195 public ArrayBlockingQueue(int capacity) {
196 this(capacity, false);
197 }
198
199 /**
200 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
201 * capacity and the specified access policy.
202 *
203 * @param capacity the capacity of this queue
204 * @param fair if {@code true} then queue accesses for threads blocked
205 * on insertion or removal, are processed in FIFO order;
206 * if {@code false} the access order is unspecified.
207 * @throws IllegalArgumentException if {@code capacity < 1}
208 */
209 public ArrayBlockingQueue(int capacity, boolean fair) {
210 if (capacity <= 0)
211 throw new IllegalArgumentException();
212 this.items = new Object[capacity];
213 lock = new ReentrantLock(fair);
214 notEmpty = lock.newCondition();
215 notFull = lock.newCondition();
216 }
217
218 /**
219 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
220 * capacity, the specified access policy and initially containing the
221 * elements of the given collection,
222 * added in traversal order of the collection's iterator.
223 *
224 * @param capacity the capacity of this queue
225 * @param fair if {@code true} then queue accesses for threads blocked
226 * on insertion or removal, are processed in FIFO order;
227 * if {@code false} the access order is unspecified.
228 * @param c the collection of elements to initially contain
229 * @throws IllegalArgumentException if {@code capacity} is less than
230 * {@code c.size()}, or less than 1.
231 * @throws NullPointerException if the specified collection or any
232 * of its elements are null
233 */
234 public ArrayBlockingQueue(int capacity, boolean fair,
235 Collection<? extends E> c) {
236 this(capacity, fair);
237
238 final ReentrantLock lock = this.lock;
239 lock.lock(); // Lock only for visibility, not mutual exclusion
240 try {
241 int i = 0;
242 try {
243 for (E e : c)
244 items[i++] = Objects.requireNonNull(e);
245 } catch (ArrayIndexOutOfBoundsException ex) {
246 throw new IllegalArgumentException();
247 }
248 count = i;
249 putIndex = (i == capacity) ? 0 : i;
250 } finally {
251 lock.unlock();
252 }
253 }
254
255 /**
256 * Inserts the specified element at the tail of this queue if it is
257 * possible to do so immediately without exceeding the queue's capacity,
258 * returning {@code true} upon success and throwing an
259 * {@code IllegalStateException} if this queue is full.
260 *
261 * @param e the element to add
262 * @return {@code true} (as specified by {@link Collection#add})
263 * @throws IllegalStateException if this queue is full
264 * @throws NullPointerException if the specified element is null
265 */
266 public boolean add(E e) {
267 return super.add(e);
268 }
269
270 /**
271 * Inserts the specified element at the tail of this queue if it is
272 * possible to do so immediately without exceeding the queue's capacity,
273 * returning {@code true} upon success and {@code false} if this queue
274 * is full. This method is generally preferable to method {@link #add},
275 * which can fail to insert an element only by throwing an exception.
276 *
277 * @throws NullPointerException if the specified element is null
278 */
279 public boolean offer(E e) {
280 Objects.requireNonNull(e);
281 final ReentrantLock lock = this.lock;
282 lock.lock();
283 try {
284 if (count == items.length)
285 return false;
286 else {
287 enqueue(e);
288 return true;
289 }
290 } finally {
291 lock.unlock();
292 }
293 }
294
295 /**
296 * Inserts the specified element at the tail of this queue, waiting
297 * for space to become available if the queue is full.
298 *
299 * @throws InterruptedException {@inheritDoc}
300 * @throws NullPointerException {@inheritDoc}
301 */
302 public void put(E e) throws InterruptedException {
303 Objects.requireNonNull(e);
304 final ReentrantLock lock = this.lock;
305 lock.lockInterruptibly();
306 try {
307 while (count == items.length)
308 notFull.await();
309 enqueue(e);
310 } finally {
311 lock.unlock();
312 }
313 }
314
315 /**
316 * Inserts the specified element at the tail of this queue, waiting
317 * up to the specified wait time for space to become available if
318 * the queue is full.
319 *
320 * @throws InterruptedException {@inheritDoc}
321 * @throws NullPointerException {@inheritDoc}
322 */
323 public boolean offer(E e, long timeout, TimeUnit unit)
324 throws InterruptedException {
325
326 Objects.requireNonNull(e);
327 long nanos = unit.toNanos(timeout);
328 final ReentrantLock lock = this.lock;
329 lock.lockInterruptibly();
330 try {
331 while (count == items.length) {
332 if (nanos <= 0)
333 return false;
334 nanos = notFull.awaitNanos(nanos);
335 }
336 enqueue(e);
337 return true;
338 } finally {
339 lock.unlock();
340 }
341 }
342
343 public E poll() {
344 final ReentrantLock lock = this.lock;
345 lock.lock();
346 try {
347 return (count == 0) ? null : dequeue();
348 } finally {
349 lock.unlock();
350 }
351 }
352
353 public E take() throws InterruptedException {
354 final ReentrantLock lock = this.lock;
355 lock.lockInterruptibly();
356 try {
357 while (count == 0)
358 notEmpty.await();
359 return dequeue();
360 } finally {
361 lock.unlock();
362 }
363 }
364
365 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
366 long nanos = unit.toNanos(timeout);
367 final ReentrantLock lock = this.lock;
368 lock.lockInterruptibly();
369 try {
370 while (count == 0) {
371 if (nanos <= 0)
372 return null;
373 nanos = notEmpty.awaitNanos(nanos);
374 }
375 return dequeue();
376 } finally {
377 lock.unlock();
378 }
379 }
380
381 public E peek() {
382 final ReentrantLock lock = this.lock;
383 lock.lock();
384 try {
385 return itemAt(takeIndex); // null when queue is empty
386 } finally {
387 lock.unlock();
388 }
389 }
390
391 // this doc comment is overridden to remove the reference to collections
392 // greater in size than Integer.MAX_VALUE
393 /**
394 * Returns the number of elements in this queue.
395 *
396 * @return the number of elements in this queue
397 */
398 public int size() {
399 final ReentrantLock lock = this.lock;
400 lock.lock();
401 try {
402 return count;
403 } finally {
404 lock.unlock();
405 }
406 }
407
408 // this doc comment is a modified copy of the inherited doc comment,
409 // without the reference to unlimited queues.
410 /**
411 * Returns the number of additional elements that this queue can ideally
412 * (in the absence of memory or resource constraints) accept without
413 * blocking. This is always equal to the initial capacity of this queue
414 * less the current {@code size} of this queue.
415 *
416 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
417 * an element will succeed by inspecting {@code remainingCapacity}
418 * because it may be the case that another thread is about to
419 * insert or remove an element.
420 */
421 public int remainingCapacity() {
422 final ReentrantLock lock = this.lock;
423 lock.lock();
424 try {
425 return items.length - count;
426 } finally {
427 lock.unlock();
428 }
429 }
430
431 /**
432 * Removes a single instance of the specified element from this queue,
433 * if it is present. More formally, removes an element {@code e} such
434 * that {@code o.equals(e)}, if this queue contains one or more such
435 * elements.
436 * Returns {@code true} if this queue contained the specified element
437 * (or equivalently, if this queue changed as a result of the call).
438 *
439 * <p>Removal of interior elements in circular array based queues
440 * is an intrinsically slow and disruptive operation, so should
441 * be undertaken only in exceptional circumstances, ideally
442 * only when the queue is known not to be accessible by other
443 * threads.
444 *
445 * @param o element to be removed from this queue, if present
446 * @return {@code true} if this queue changed as a result of the call
447 */
448 public boolean remove(Object o) {
449 if (o == null) return false;
450 final ReentrantLock lock = this.lock;
451 lock.lock();
452 try {
453 if (count > 0) {
454 final Object[] items = this.items;
455 final int putIndex = this.putIndex;
456 int i = takeIndex;
457 do {
458 if (o.equals(items[i])) {
459 removeAt(i);
460 return true;
461 }
462 if (++i == items.length) i = 0;
463 } while (i != putIndex);
464 }
465 return false;
466 } finally {
467 lock.unlock();
468 }
469 }
470
471 /**
472 * Returns {@code true} if this queue contains the specified element.
473 * More formally, returns {@code true} if and only if this queue contains
474 * at least one element {@code e} such that {@code o.equals(e)}.
475 *
476 * @param o object to be checked for containment in this queue
477 * @return {@code true} if this queue contains the specified element
478 */
479 public boolean contains(Object o) {
480 if (o == null) return false;
481 final ReentrantLock lock = this.lock;
482 lock.lock();
483 try {
484 if (count > 0) {
485 final Object[] items = this.items;
486 final int putIndex = this.putIndex;
487 int i = takeIndex;
488 do {
489 if (o.equals(items[i]))
490 return true;
491 if (++i == items.length) i = 0;
492 } while (i != putIndex);
493 }
494 return false;
495 } finally {
496 lock.unlock();
497 }
498 }
499
500 /**
501 * Returns an array containing all of the elements in this queue, in
502 * proper sequence.
503 *
504 * <p>The returned array will be "safe" in that no references to it are
505 * maintained by this queue. (In other words, this method must allocate
506 * a new array). The caller is thus free to modify the returned array.
507 *
508 * <p>This method acts as bridge between array-based and collection-based
509 * APIs.
510 *
511 * @return an array containing all of the elements in this queue
512 */
513 public Object[] toArray() {
514 final ReentrantLock lock = this.lock;
515 lock.lock();
516 try {
517 final Object[] items = this.items;
518 final int end = takeIndex + count;
519 final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
520 if (end != putIndex)
521 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
522 return a;
523 } finally {
524 lock.unlock();
525 }
526 }
527
528 /**
529 * Returns an array containing all of the elements in this queue, in
530 * proper sequence; the runtime type of the returned array is that of
531 * the specified array. If the queue fits in the specified array, it
532 * is returned therein. Otherwise, a new array is allocated with the
533 * runtime type of the specified array and the size of this queue.
534 *
535 * <p>If this queue fits in the specified array with room to spare
536 * (i.e., the array has more elements than this queue), the element in
537 * the array immediately following the end of the queue is set to
538 * {@code null}.
539 *
540 * <p>Like the {@link #toArray()} method, this method acts as bridge between
541 * array-based and collection-based APIs. Further, this method allows
542 * precise control over the runtime type of the output array, and may,
543 * under certain circumstances, be used to save allocation costs.
544 *
545 * <p>Suppose {@code x} is a queue known to contain only strings.
546 * The following code can be used to dump the queue into a newly
547 * allocated array of {@code String}:
548 *
549 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
550 *
551 * Note that {@code toArray(new Object[0])} is identical in function to
552 * {@code toArray()}.
553 *
554 * @param a the array into which the elements of the queue are to
555 * be stored, if it is big enough; otherwise, a new array of the
556 * same runtime type is allocated for this purpose
557 * @return an array containing all of the elements in this queue
558 * @throws ArrayStoreException if the runtime type of the specified array
559 * is not a supertype of the runtime type of every element in
560 * this queue
561 * @throws NullPointerException if the specified array is null
562 */
563 @SuppressWarnings("unchecked")
564 public <T> T[] toArray(T[] a) {
565 final ReentrantLock lock = this.lock;
566 lock.lock();
567 try {
568 final Object[] items = this.items;
569 final int count = this.count;
570 final int firstLeg = Math.min(items.length - takeIndex, count);
571 if (a.length < count) {
572 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
573 a.getClass());
574 } else {
575 System.arraycopy(items, takeIndex, a, 0, firstLeg);
576 if (a.length > count)
577 a[count] = null;
578 }
579 if (firstLeg < count)
580 System.arraycopy(items, 0, a, firstLeg, putIndex);
581 return a;
582 } finally {
583 lock.unlock();
584 }
585 }
586
587 public String toString() {
588 final ReentrantLock lock = this.lock;
589 lock.lock();
590 try {
591 int k = count;
592 if (k == 0)
593 return "[]";
594
595 final Object[] items = this.items;
596 StringBuilder sb = new StringBuilder();
597 sb.append('[');
598 for (int i = takeIndex; ; ) {
599 Object e = items[i];
600 sb.append(e == this ? "(this Collection)" : e);
601 if (--k == 0)
602 return sb.append(']').toString();
603 sb.append(',').append(' ');
604 if (++i == items.length) i = 0;
605 }
606 } finally {
607 lock.unlock();
608 }
609 }
610
611 /**
612 * Atomically removes all of the elements from this queue.
613 * The queue will be empty after this call returns.
614 */
615 public void clear() {
616 final Object[] items = this.items;
617 final ReentrantLock lock = this.lock;
618 lock.lock();
619 try {
620 int k = count;
621 if (k > 0) {
622 final int putIndex = this.putIndex;
623 int i = takeIndex;
624 do {
625 items[i] = null;
626 if (++i == items.length) i = 0;
627 } while (i != putIndex);
628 takeIndex = putIndex;
629 count = 0;
630 if (itrs != null)
631 itrs.queueIsEmpty();
632 for (; k > 0 && lock.hasWaiters(notFull); k--)
633 notFull.signal();
634 }
635 } finally {
636 lock.unlock();
637 }
638 }
639
640 /**
641 * @throws UnsupportedOperationException {@inheritDoc}
642 * @throws ClassCastException {@inheritDoc}
643 * @throws NullPointerException {@inheritDoc}
644 * @throws IllegalArgumentException {@inheritDoc}
645 */
646 public int drainTo(Collection<? super E> c) {
647 return drainTo(c, Integer.MAX_VALUE);
648 }
649
650 /**
651 * @throws UnsupportedOperationException {@inheritDoc}
652 * @throws ClassCastException {@inheritDoc}
653 * @throws NullPointerException {@inheritDoc}
654 * @throws IllegalArgumentException {@inheritDoc}
655 */
656 public int drainTo(Collection<? super E> c, int maxElements) {
657 Objects.requireNonNull(c);
658 if (c == this)
659 throw new IllegalArgumentException();
660 if (maxElements <= 0)
661 return 0;
662 final Object[] items = this.items;
663 final ReentrantLock lock = this.lock;
664 lock.lock();
665 try {
666 int n = Math.min(maxElements, count);
667 int take = takeIndex;
668 int i = 0;
669 try {
670 while (i < n) {
671 @SuppressWarnings("unchecked")
672 E x = (E) items[take];
673 c.add(x);
674 items[take] = null;
675 if (++take == items.length) take = 0;
676 i++;
677 }
678 return n;
679 } finally {
680 // Restore invariants even if c.add() threw
681 if (i > 0) {
682 count -= i;
683 takeIndex = take;
684 if (itrs != null) {
685 if (count == 0)
686 itrs.queueIsEmpty();
687 else if (i > take)
688 itrs.takeIndexWrapped();
689 }
690 for (; i > 0 && lock.hasWaiters(notFull); i--)
691 notFull.signal();
692 }
693 }
694 } finally {
695 lock.unlock();
696 }
697 }
698
699 /**
700 * Returns an iterator over the elements in this queue in proper sequence.
701 * The elements will be returned in order from first (head) to last (tail).
702 *
703 * <p>The returned iterator is
704 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
705 *
706 * @return an iterator over the elements in this queue in proper sequence
707 */
708 public Iterator<E> iterator() {
709 return new Itr();
710 }
711
712 /**
713 * Shared data between iterators and their queue, allowing queue
714 * modifications to update iterators when elements are removed.
715 *
716 * This adds a lot of complexity for the sake of correctly
717 * handling some uncommon operations, but the combination of
718 * circular-arrays and supporting interior removes (i.e., those
719 * not at head) would cause iterators to sometimes lose their
720 * places and/or (re)report elements they shouldn't. To avoid
721 * this, when a queue has one or more iterators, it keeps iterator
722 * state consistent by:
723 *
724 * (1) keeping track of the number of "cycles", that is, the
725 * number of times takeIndex has wrapped around to 0.
726 * (2) notifying all iterators via the callback removedAt whenever
727 * an interior element is removed (and thus other elements may
728 * be shifted).
729 *
730 * These suffice to eliminate iterator inconsistencies, but
731 * unfortunately add the secondary responsibility of maintaining
732 * the list of iterators. We track all active iterators in a
733 * simple linked list (accessed only when the queue's lock is
734 * held) of weak references to Itr. The list is cleaned up using
735 * 3 different mechanisms:
736 *
737 * (1) Whenever a new iterator is created, do some O(1) checking for
738 * stale list elements.
739 *
740 * (2) Whenever takeIndex wraps around to 0, check for iterators
741 * that have been unused for more than one wrap-around cycle.
742 *
743 * (3) Whenever the queue becomes empty, all iterators are notified
744 * and this entire data structure is discarded.
745 *
746 * So in addition to the removedAt callback that is necessary for
747 * correctness, iterators have the shutdown and takeIndexWrapped
748 * callbacks that help remove stale iterators from the list.
749 *
750 * Whenever a list element is examined, it is expunged if either
751 * the GC has determined that the iterator is discarded, or if the
752 * iterator reports that it is "detached" (does not need any
753 * further state updates). Overhead is maximal when takeIndex
754 * never advances, iterators are discarded before they are
755 * exhausted, and all removals are interior removes, in which case
756 * all stale iterators are discovered by the GC. But even in this
757 * case we don't increase the amortized complexity.
758 *
759 * Care must be taken to keep list sweeping methods from
760 * reentrantly invoking another such method, causing subtle
761 * corruption bugs.
762 */
763 class Itrs {
764
765 /**
766 * Node in a linked list of weak iterator references.
767 */
768 private class Node extends WeakReference<Itr> {
769 Node next;
770
771 Node(Itr iterator, Node next) {
772 super(iterator);
773 this.next = next;
774 }
775 }
776
777 /** Incremented whenever takeIndex wraps around to 0 */
778 int cycles;
779
780 /** Linked list of weak iterator references */
781 private Node head;
782
783 /** Used to expunge stale iterators */
784 private Node sweeper;
785
786 private static final int SHORT_SWEEP_PROBES = 4;
787 private static final int LONG_SWEEP_PROBES = 16;
788
789 Itrs(Itr initial) {
790 register(initial);
791 }
792
793 /**
794 * Sweeps itrs, looking for and expunging stale iterators.
795 * If at least one was found, tries harder to find more.
796 * Called only from iterating thread.
797 *
798 * @param tryHarder whether to start in try-harder mode, because
799 * there is known to be at least one iterator to collect
800 */
801 void doSomeSweeping(boolean tryHarder) {
802 // assert lock.getHoldCount() == 1;
803 // assert head != null;
804 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
805 Node o, p;
806 final Node sweeper = this.sweeper;
807 boolean passedGo; // to limit search to one full sweep
808
809 if (sweeper == null) {
810 o = null;
811 p = head;
812 passedGo = true;
813 } else {
814 o = sweeper;
815 p = o.next;
816 passedGo = false;
817 }
818
819 for (; probes > 0; probes--) {
820 if (p == null) {
821 if (passedGo)
822 break;
823 o = null;
824 p = head;
825 passedGo = true;
826 }
827 final Itr it = p.get();
828 final Node next = p.next;
829 if (it == null || it.isDetached()) {
830 // found a discarded/exhausted iterator
831 probes = LONG_SWEEP_PROBES; // "try harder"
832 // unlink p
833 p.clear();
834 p.next = null;
835 if (o == null) {
836 head = next;
837 if (next == null) {
838 // We've run out of iterators to track; retire
839 itrs = null;
840 return;
841 }
842 }
843 else
844 o.next = next;
845 } else {
846 o = p;
847 }
848 p = next;
849 }
850
851 this.sweeper = (p == null) ? null : o;
852 }
853
854 /**
855 * Adds a new iterator to the linked list of tracked iterators.
856 */
857 void register(Itr itr) {
858 // assert lock.getHoldCount() == 1;
859 head = new Node(itr, head);
860 }
861
862 /**
863 * Called whenever takeIndex wraps around to 0.
864 *
865 * Notifies all iterators, and expunges any that are now stale.
866 */
867 void takeIndexWrapped() {
868 // assert lock.getHoldCount() == 1;
869 cycles++;
870 for (Node o = null, p = head; p != null;) {
871 final Itr it = p.get();
872 final Node next = p.next;
873 if (it == null || it.takeIndexWrapped()) {
874 // unlink p
875 // assert it == null || it.isDetached();
876 p.clear();
877 p.next = null;
878 if (o == null)
879 head = next;
880 else
881 o.next = next;
882 } else {
883 o = p;
884 }
885 p = next;
886 }
887 if (head == null) // no more iterators to track
888 itrs = null;
889 }
890
891 /**
892 * Called whenever an interior remove (not at takeIndex) occurred.
893 *
894 * Notifies all iterators, and expunges any that are now stale.
895 */
896 void removedAt(int removedIndex) {
897 for (Node o = null, p = head; p != null;) {
898 final Itr it = p.get();
899 final Node next = p.next;
900 if (it == null || it.removedAt(removedIndex)) {
901 // unlink p
902 // assert it == null || it.isDetached();
903 p.clear();
904 p.next = null;
905 if (o == null)
906 head = next;
907 else
908 o.next = next;
909 } else {
910 o = p;
911 }
912 p = next;
913 }
914 if (head == null) // no more iterators to track
915 itrs = null;
916 }
917
918 /**
919 * Called whenever the queue becomes empty.
920 *
921 * Notifies all active iterators that the queue is empty,
922 * clears all weak refs, and unlinks the itrs datastructure.
923 */
924 void queueIsEmpty() {
925 // assert lock.getHoldCount() == 1;
926 for (Node p = head; p != null; p = p.next) {
927 Itr it = p.get();
928 if (it != null) {
929 p.clear();
930 it.shutdown();
931 }
932 }
933 head = null;
934 itrs = null;
935 }
936
937 /**
938 * Called whenever an element has been dequeued (at takeIndex).
939 */
940 void elementDequeued() {
941 // assert lock.getHoldCount() == 1;
942 if (count == 0)
943 queueIsEmpty();
944 else if (takeIndex == 0)
945 takeIndexWrapped();
946 }
947 }
948
949 /**
950 * Iterator for ArrayBlockingQueue.
951 *
952 * To maintain weak consistency with respect to puts and takes, we
953 * read ahead one slot, so as to not report hasNext true but then
954 * not have an element to return.
955 *
956 * We switch into "detached" mode (allowing prompt unlinking from
957 * itrs without help from the GC) when all indices are negative, or
958 * when hasNext returns false for the first time. This allows the
959 * iterator to track concurrent updates completely accurately,
960 * except for the corner case of the user calling Iterator.remove()
961 * after hasNext() returned false. Even in this case, we ensure
962 * that we don't remove the wrong element by keeping track of the
963 * expected element to remove, in lastItem. Yes, we may fail to
964 * remove lastItem from the queue if it moved due to an interleaved
965 * interior remove while in detached mode.
966 */
967 private class Itr implements Iterator<E> {
968 /** Index to look for new nextItem; NONE at end */
969 private int cursor;
970
971 /** Element to be returned by next call to next(); null if none */
972 private E nextItem;
973
974 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
975 private int nextIndex;
976
977 /** Last element returned; null if none or not detached. */
978 private E lastItem;
979
980 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
981 private int lastRet;
982
983 /** Previous value of takeIndex, or DETACHED when detached */
984 private int prevTakeIndex;
985
986 /** Previous value of iters.cycles */
987 private int prevCycles;
988
989 /** Special index value indicating "not available" or "undefined" */
990 private static final int NONE = -1;
991
992 /**
993 * Special index value indicating "removed elsewhere", that is,
994 * removed by some operation other than a call to this.remove().
995 */
996 private static final int REMOVED = -2;
997
998 /** Special value for prevTakeIndex indicating "detached mode" */
999 private static final int DETACHED = -3;
1000
1001 Itr() {
1002 // assert lock.getHoldCount() == 0;
1003 lastRet = NONE;
1004 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1005 lock.lock();
1006 try {
1007 if (count == 0) {
1008 // assert itrs == null;
1009 cursor = NONE;
1010 nextIndex = NONE;
1011 prevTakeIndex = DETACHED;
1012 } else {
1013 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1014 prevTakeIndex = takeIndex;
1015 nextItem = itemAt(nextIndex = takeIndex);
1016 cursor = incCursor(takeIndex);
1017 if (itrs == null) {
1018 itrs = new Itrs(this);
1019 } else {
1020 itrs.register(this); // in this order
1021 itrs.doSomeSweeping(false);
1022 }
1023 prevCycles = itrs.cycles;
1024 // assert takeIndex >= 0;
1025 // assert prevTakeIndex == takeIndex;
1026 // assert nextIndex >= 0;
1027 // assert nextItem != null;
1028 }
1029 } finally {
1030 lock.unlock();
1031 }
1032 }
1033
1034 boolean isDetached() {
1035 // assert lock.getHoldCount() == 1;
1036 return prevTakeIndex < 0;
1037 }
1038
1039 private int incCursor(int index) {
1040 // assert lock.getHoldCount() == 1;
1041 if (++index == items.length) index = 0;
1042 if (index == putIndex) index = NONE;
1043 return index;
1044 }
1045
1046 /**
1047 * Returns true if index is invalidated by the given number of
1048 * dequeues, starting from prevTakeIndex.
1049 */
1050 private boolean invalidated(int index, int prevTakeIndex,
1051 long dequeues, int length) {
1052 if (index < 0)
1053 return false;
1054 int distance = index - prevTakeIndex;
1055 if (distance < 0)
1056 distance += length;
1057 return dequeues > distance;
1058 }
1059
1060 /**
1061 * Adjusts indices to incorporate all dequeues since the last
1062 * operation on this iterator. Call only from iterating thread.
1063 */
1064 private void incorporateDequeues() {
1065 // assert lock.getHoldCount() == 1;
1066 // assert itrs != null;
1067 // assert !isDetached();
1068 // assert count > 0;
1069
1070 final int cycles = itrs.cycles;
1071 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1072 final int prevCycles = this.prevCycles;
1073 final int prevTakeIndex = this.prevTakeIndex;
1074
1075 if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1076 final int len = items.length;
1077 // how far takeIndex has advanced since the previous
1078 // operation of this iterator
1079 long dequeues = (cycles - prevCycles) * len
1080 + (takeIndex - prevTakeIndex);
1081
1082 // Check indices for invalidation
1083 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1084 lastRet = REMOVED;
1085 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1086 nextIndex = REMOVED;
1087 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1088 cursor = takeIndex;
1089
1090 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1091 detach();
1092 else {
1093 this.prevCycles = cycles;
1094 this.prevTakeIndex = takeIndex;
1095 }
1096 }
1097 }
1098
1099 /**
1100 * Called when itrs should stop tracking this iterator, either
1101 * because there are no more indices to update (cursor < 0 &&
1102 * nextIndex < 0 && lastRet < 0) or as a special exception, when
1103 * lastRet >= 0, because hasNext() is about to return false for the
1104 * first time. Call only from iterating thread.
1105 */
1106 private void detach() {
1107 // Switch to detached mode
1108 // assert lock.getHoldCount() == 1;
1109 // assert cursor == NONE;
1110 // assert nextIndex < 0;
1111 // assert lastRet < 0 || nextItem == null;
1112 // assert lastRet < 0 ^ lastItem != null;
1113 if (prevTakeIndex >= 0) {
1114 // assert itrs != null;
1115 prevTakeIndex = DETACHED;
1116 // try to unlink from itrs (but not too hard)
1117 itrs.doSomeSweeping(true);
1118 }
1119 }
1120
1121 /**
1122 * For performance reasons, we would like not to acquire a lock in
1123 * hasNext in the common case. To allow for this, we only access
1124 * fields (i.e. nextItem) that are not modified by update operations
1125 * triggered by queue modifications.
1126 */
1127 public boolean hasNext() {
1128 // assert lock.getHoldCount() == 0;
1129 if (nextItem != null)
1130 return true;
1131 noNext();
1132 return false;
1133 }
1134
1135 private void noNext() {
1136 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1137 lock.lock();
1138 try {
1139 // assert cursor == NONE;
1140 // assert nextIndex == NONE;
1141 if (!isDetached()) {
1142 // assert lastRet >= 0;
1143 incorporateDequeues(); // might update lastRet
1144 if (lastRet >= 0) {
1145 lastItem = itemAt(lastRet);
1146 // assert lastItem != null;
1147 detach();
1148 }
1149 }
1150 // assert isDetached();
1151 // assert lastRet < 0 ^ lastItem != null;
1152 } finally {
1153 lock.unlock();
1154 }
1155 }
1156
1157 public E next() {
1158 // assert lock.getHoldCount() == 0;
1159 final E x = nextItem;
1160 if (x == null)
1161 throw new NoSuchElementException();
1162 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1163 lock.lock();
1164 try {
1165 if (!isDetached())
1166 incorporateDequeues();
1167 // assert nextIndex != NONE;
1168 // assert lastItem == null;
1169 lastRet = nextIndex;
1170 final int cursor = this.cursor;
1171 if (cursor >= 0) {
1172 nextItem = itemAt(nextIndex = cursor);
1173 // assert nextItem != null;
1174 this.cursor = incCursor(cursor);
1175 } else {
1176 nextIndex = NONE;
1177 nextItem = null;
1178 }
1179 } finally {
1180 lock.unlock();
1181 }
1182 return x;
1183 }
1184
1185 public void remove() {
1186 // assert lock.getHoldCount() == 0;
1187 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1188 lock.lock();
1189 try {
1190 if (!isDetached())
1191 incorporateDequeues(); // might update lastRet or detach
1192 final int lastRet = this.lastRet;
1193 this.lastRet = NONE;
1194 if (lastRet >= 0) {
1195 if (!isDetached())
1196 removeAt(lastRet);
1197 else {
1198 final E lastItem = this.lastItem;
1199 // assert lastItem != null;
1200 this.lastItem = null;
1201 if (itemAt(lastRet) == lastItem)
1202 removeAt(lastRet);
1203 }
1204 } else if (lastRet == NONE)
1205 throw new IllegalStateException();
1206 // else lastRet == REMOVED and the last returned element was
1207 // previously asynchronously removed via an operation other
1208 // than this.remove(), so nothing to do.
1209
1210 if (cursor < 0 && nextIndex < 0)
1211 detach();
1212 } finally {
1213 lock.unlock();
1214 // assert lastRet == NONE;
1215 // assert lastItem == null;
1216 }
1217 }
1218
1219 /**
1220 * Called to notify the iterator that the queue is empty, or that it
1221 * has fallen hopelessly behind, so that it should abandon any
1222 * further iteration, except possibly to return one more element
1223 * from next(), as promised by returning true from hasNext().
1224 */
1225 void shutdown() {
1226 // assert lock.getHoldCount() == 1;
1227 cursor = NONE;
1228 if (nextIndex >= 0)
1229 nextIndex = REMOVED;
1230 if (lastRet >= 0) {
1231 lastRet = REMOVED;
1232 lastItem = null;
1233 }
1234 prevTakeIndex = DETACHED;
1235 // Don't set nextItem to null because we must continue to be
1236 // able to return it on next().
1237 //
1238 // Caller will unlink from itrs when convenient.
1239 }
1240
1241 private int distance(int index, int prevTakeIndex, int length) {
1242 int distance = index - prevTakeIndex;
1243 if (distance < 0)
1244 distance += length;
1245 return distance;
1246 }
1247
1248 /**
1249 * Called whenever an interior remove (not at takeIndex) occurred.
1250 *
1251 * @return true if this iterator should be unlinked from itrs
1252 */
1253 boolean removedAt(int removedIndex) {
1254 // assert lock.getHoldCount() == 1;
1255 if (isDetached())
1256 return true;
1257
1258 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1259 final int prevTakeIndex = this.prevTakeIndex;
1260 final int len = items.length;
1261 // distance from prevTakeIndex to removedIndex
1262 final int removedDistance =
1263 len * (itrs.cycles - this.prevCycles
1264 + ((removedIndex < takeIndex) ? 1 : 0))
1265 + (removedIndex - prevTakeIndex);
1266 // assert itrs.cycles - this.prevCycles >= 0;
1267 // assert itrs.cycles - this.prevCycles <= 1;
1268 // assert removedDistance > 0;
1269 // assert removedIndex != takeIndex;
1270 int cursor = this.cursor;
1271 if (cursor >= 0) {
1272 int x = distance(cursor, prevTakeIndex, len);
1273 if (x == removedDistance) {
1274 if (cursor == putIndex)
1275 this.cursor = cursor = NONE;
1276 }
1277 else if (x > removedDistance) {
1278 // assert cursor != prevTakeIndex;
1279 this.cursor = cursor = dec(cursor);
1280 }
1281 }
1282 int lastRet = this.lastRet;
1283 if (lastRet >= 0) {
1284 int x = distance(lastRet, prevTakeIndex, len);
1285 if (x == removedDistance)
1286 this.lastRet = lastRet = REMOVED;
1287 else if (x > removedDistance)
1288 this.lastRet = lastRet = dec(lastRet);
1289 }
1290 int nextIndex = this.nextIndex;
1291 if (nextIndex >= 0) {
1292 int x = distance(nextIndex, prevTakeIndex, len);
1293 if (x == removedDistance)
1294 this.nextIndex = nextIndex = REMOVED;
1295 else if (x > removedDistance)
1296 this.nextIndex = nextIndex = dec(nextIndex);
1297 }
1298 if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1299 this.prevTakeIndex = DETACHED;
1300 return true;
1301 }
1302 return false;
1303 }
1304
1305 /**
1306 * Called whenever takeIndex wraps around to zero.
1307 *
1308 * @return true if this iterator should be unlinked from itrs
1309 */
1310 boolean takeIndexWrapped() {
1311 // assert lock.getHoldCount() == 1;
1312 if (isDetached())
1313 return true;
1314 if (itrs.cycles - prevCycles > 1) {
1315 // All the elements that existed at the time of the last
1316 // operation are gone, so abandon further iteration.
1317 shutdown();
1318 return true;
1319 }
1320 return false;
1321 }
1322
1323 // /** Uncomment for debugging. */
1324 // public String toString() {
1325 // return ("cursor=" + cursor + " " +
1326 // "nextIndex=" + nextIndex + " " +
1327 // "lastRet=" + lastRet + " " +
1328 // "nextItem=" + nextItem + " " +
1329 // "lastItem=" + lastItem + " " +
1330 // "prevCycles=" + prevCycles + " " +
1331 // "prevTakeIndex=" + prevTakeIndex + " " +
1332 // "size()=" + size() + " " +
1333 // "remainingCapacity()=" + remainingCapacity());
1334 // }
1335 }
1336
1337 /**
1338 * Returns a {@link Spliterator} over the elements in this queue.
1339 *
1340 * <p>The returned spliterator is
1341 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1342 *
1343 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1344 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1345 *
1346 * @implNote
1347 * The {@code Spliterator} implements {@code trySplit} to permit limited
1348 * parallelism.
1349 *
1350 * @return a {@code Spliterator} over the elements in this queue
1351 * @since 1.8
1352 */
1353 public Spliterator<E> spliterator() {
1354 return Spliterators.spliterator
1355 (this, Spliterator.ORDERED | Spliterator.NONNULL |
1356 Spliterator.CONCURRENT);
1357 }
1358
1359 }