ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.94
Committed: Mon Jul 18 20:08:18 2011 UTC (12 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.93: +38 -16 lines
Log Message:
Improve the class comment for class Itrs

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.Condition;
9 import java.util.concurrent.locks.ReentrantLock;
10 import java.util.AbstractQueue;
11 import java.util.Collection;
12 import java.util.Iterator;
13 import java.util.NoSuchElementException;
14 import java.lang.ref.WeakReference;
15
16 /**
17 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
18 * array. This queue orders elements FIFO (first-in-first-out). The
19 * <em>head</em> of the queue is that element that has been on the
20 * queue the longest time. The <em>tail</em> of the queue is that
21 * element that has been on the queue the shortest time. New elements
22 * are inserted at the tail of the queue, and the queue retrieval
23 * operations obtain elements at the head of the queue.
24 *
25 * <p>This is a classic &quot;bounded buffer&quot;, in which a
26 * fixed-sized array holds elements inserted by producers and
27 * extracted by consumers. Once created, the capacity cannot be
28 * changed. Attempts to {@code put} an element into a full queue
29 * will result in the operation blocking; attempts to {@code take} an
30 * element from an empty queue will similarly block.
31 *
32 * <p>This class supports an optional fairness policy for ordering
33 * waiting producer and consumer threads. By default, this ordering
34 * is not guaranteed. However, a queue constructed with fairness set
35 * to {@code true} grants threads access in FIFO order. Fairness
36 * generally decreases throughput but reduces variability and avoids
37 * starvation.
38 *
39 * <p>This class and its iterator implement all of the
40 * <em>optional</em> methods of the {@link Collection} and {@link
41 * Iterator} interfaces.
42 *
43 * <p>This class is a member of the
44 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
45 * Java Collections Framework</a>.
46 *
47 * @since 1.5
48 * @author Doug Lea
49 * @param <E> the type of elements held in this collection
50 */
51 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
52 implements BlockingQueue<E>, java.io.Serializable {
53
54 /**
55 * Serialization ID. This class relies on default serialization
56 * even for the items array, which is default-serialized, even if
57 * it is empty. Otherwise it could not be declared final, which is
58 * necessary here.
59 */
60 private static final long serialVersionUID = -817911632652898426L;
61
62 /** The queued items */
63 final Object[] items;
64
65 /** items index for next take, poll, peek or remove */
66 int takeIndex;
67
68 /** items index for next put, offer, or add */
69 int putIndex;
70
71 /** Number of elements in the queue */
72 int count;
73
74 /*
75 * Concurrency control uses the classic two-condition algorithm
76 * found in any textbook.
77 */
78
79 /** Main lock guarding all access */
80 final ReentrantLock lock;
81
82 /** Condition for waiting takes */
83 private final Condition notEmpty;
84
85 /** Condition for waiting puts */
86 private final Condition notFull;
87
88 /**
89 * Shared state for currently active iterators, or null if there
90 * are known not to be any. Allows queue operations to update
91 * iterator state.
92 */
93 transient Itrs itrs = null;
94
95 // Internal helper methods
96
97 /**
98 * Circularly increment i.
99 */
100 final int inc(int i) {
101 return (++i == items.length) ? 0 : i;
102 }
103
104 /**
105 * Circularly decrement i.
106 */
107 final int dec(int i) {
108 return ((i == 0) ? items.length : i) - 1;
109 }
110
111 /**
112 * Returns item at index i.
113 */
114 final E itemAt(int i) {
115 @SuppressWarnings("unchecked") E x = (E) items[i];
116 return x;
117 }
118
119 /**
120 * Throws NullPointerException if argument is null.
121 *
122 * @param v the element
123 */
124 private static void checkNotNull(Object v) {
125 if (v == null)
126 throw new NullPointerException();
127 }
128
129 /**
130 * Inserts element at current put position, advances, and signals.
131 * Call only when holding lock.
132 */
133 private void enqueue(E x) {
134 assert lock.getHoldCount() == 1;
135 assert items[putIndex] == null;
136 items[putIndex] = x;
137 putIndex = inc(putIndex);
138 count++;
139 notEmpty.signal();
140 }
141
142 /**
143 * Extracts element at current take position, advances, and signals.
144 * Call only when holding lock.
145 */
146 private E dequeue() {
147 assert lock.getHoldCount() == 1;
148 assert items[takeIndex] != null;
149 final Object[] items = this.items;
150 @SuppressWarnings("unchecked") E x = (E) items[takeIndex];
151 items[takeIndex] = null;
152 takeIndex = inc(takeIndex);
153 count--;
154 if (itrs != null)
155 itrs.elementDequeued();
156 notFull.signal();
157 return x;
158 }
159
160 /**
161 * Deletes item at array index removeIndex.
162 * Utility for remove(Object) and iterator.remove.
163 * Call only when holding lock.
164 */
165 void removeAt(final int removeIndex) {
166 assert lock.getHoldCount() == 1;
167 assert items[removeIndex] != null;
168 assert removeIndex >= 0 && removeIndex < items.length;
169 final Object[] items = this.items;
170 if (removeIndex == takeIndex) {
171 // removing front item; just advance
172 items[takeIndex] = null;
173 takeIndex = inc(takeIndex);
174 count--;
175 if (itrs != null)
176 itrs.elementDequeued();
177 } else {
178 // an "interior" remove
179
180 // slide over all others up through putIndex.
181 final int putIndex = this.putIndex;
182 for (int i = removeIndex;;) {
183 int next = inc(i);
184 if (next != putIndex) {
185 items[i] = items[next];
186 i = next;
187 } else {
188 items[i] = null;
189 this.putIndex = i;
190 break;
191 }
192 }
193 count--;
194 if (itrs != null)
195 itrs.removedAt(removeIndex);
196 }
197 notFull.signal();
198 }
199
200 /**
201 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
202 * capacity and default access policy.
203 *
204 * @param capacity the capacity of this queue
205 * @throws IllegalArgumentException if {@code capacity < 1}
206 */
207 public ArrayBlockingQueue(int capacity) {
208 this(capacity, false);
209 }
210
211 /**
212 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
213 * capacity and the specified access policy.
214 *
215 * @param capacity the capacity of this queue
216 * @param fair if {@code true} then queue accesses for threads blocked
217 * on insertion or removal, are processed in FIFO order;
218 * if {@code false} the access order is unspecified.
219 * @throws IllegalArgumentException if {@code capacity < 1}
220 */
221 public ArrayBlockingQueue(int capacity, boolean fair) {
222 if (capacity <= 0)
223 throw new IllegalArgumentException();
224 this.items = new Object[capacity];
225 lock = new ReentrantLock(fair);
226 notEmpty = lock.newCondition();
227 notFull = lock.newCondition();
228 }
229
230 /**
231 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
232 * capacity, the specified access policy and initially containing the
233 * elements of the given collection,
234 * added in traversal order of the collection's iterator.
235 *
236 * @param capacity the capacity of this queue
237 * @param fair if {@code true} then queue accesses for threads blocked
238 * on insertion or removal, are processed in FIFO order;
239 * if {@code false} the access order is unspecified.
240 * @param c the collection of elements to initially contain
241 * @throws IllegalArgumentException if {@code capacity} is less than
242 * {@code c.size()}, or less than 1.
243 * @throws NullPointerException if the specified collection or any
244 * of its elements are null
245 */
246 public ArrayBlockingQueue(int capacity, boolean fair,
247 Collection<? extends E> c) {
248 this(capacity, fair);
249
250 final ReentrantLock lock = this.lock;
251 lock.lock(); // Lock only for visibility, not mutual exclusion
252 try {
253 int i = 0;
254 try {
255 for (E e : c) {
256 checkNotNull(e);
257 items[i++] = e;
258 }
259 } catch (ArrayIndexOutOfBoundsException ex) {
260 throw new IllegalArgumentException();
261 }
262 count = i;
263 putIndex = (i == capacity) ? 0 : i;
264 } finally {
265 lock.unlock();
266 }
267 }
268
269 /**
270 * Inserts the specified element at the tail of this queue if it is
271 * possible to do so immediately without exceeding the queue's capacity,
272 * returning {@code true} upon success and throwing an
273 * {@code IllegalStateException} if this queue is full.
274 *
275 * @param e the element to add
276 * @return {@code true} (as specified by {@link Collection#add})
277 * @throws IllegalStateException if this queue is full
278 * @throws NullPointerException if the specified element is null
279 */
280 public boolean add(E e) {
281 return super.add(e);
282 }
283
284 /**
285 * Inserts the specified element at the tail of this queue if it is
286 * possible to do so immediately without exceeding the queue's capacity,
287 * returning {@code true} upon success and {@code false} if this queue
288 * is full. This method is generally preferable to method {@link #add},
289 * which can fail to insert an element only by throwing an exception.
290 *
291 * @throws NullPointerException if the specified element is null
292 */
293 public boolean offer(E e) {
294 checkNotNull(e);
295 final ReentrantLock lock = this.lock;
296 lock.lock();
297 try {
298 if (count == items.length)
299 return false;
300 else {
301 enqueue(e);
302 return true;
303 }
304 } finally {
305 lock.unlock();
306 }
307 }
308
309 /**
310 * Inserts the specified element at the tail of this queue, waiting
311 * for space to become available if the queue is full.
312 *
313 * @throws InterruptedException {@inheritDoc}
314 * @throws NullPointerException {@inheritDoc}
315 */
316 public void put(E e) throws InterruptedException {
317 checkNotNull(e);
318 final ReentrantLock lock = this.lock;
319 lock.lockInterruptibly();
320 try {
321 while (count == items.length)
322 notFull.await();
323 enqueue(e);
324 } finally {
325 lock.unlock();
326 }
327 }
328
329 /**
330 * Inserts the specified element at the tail of this queue, waiting
331 * up to the specified wait time for space to become available if
332 * the queue is full.
333 *
334 * @throws InterruptedException {@inheritDoc}
335 * @throws NullPointerException {@inheritDoc}
336 */
337 public boolean offer(E e, long timeout, TimeUnit unit)
338 throws InterruptedException {
339
340 checkNotNull(e);
341 long nanos = unit.toNanos(timeout);
342 final ReentrantLock lock = this.lock;
343 lock.lockInterruptibly();
344 try {
345 while (count == items.length) {
346 if (nanos <= 0)
347 return false;
348 nanos = notFull.awaitNanos(nanos);
349 }
350 enqueue(e);
351 return true;
352 } finally {
353 lock.unlock();
354 }
355 }
356
357 public E poll() {
358 final ReentrantLock lock = this.lock;
359 lock.lock();
360 try {
361 return (count == 0) ? null : dequeue();
362 } finally {
363 lock.unlock();
364 }
365 }
366
367 public E take() throws InterruptedException {
368 final ReentrantLock lock = this.lock;
369 lock.lockInterruptibly();
370 try {
371 while (count == 0)
372 notEmpty.await();
373 return dequeue();
374 } finally {
375 lock.unlock();
376 }
377 }
378
379 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
380 long nanos = unit.toNanos(timeout);
381 final ReentrantLock lock = this.lock;
382 lock.lockInterruptibly();
383 try {
384 while (count == 0) {
385 if (nanos <= 0)
386 return null;
387 nanos = notEmpty.awaitNanos(nanos);
388 }
389 return dequeue();
390 } finally {
391 lock.unlock();
392 }
393 }
394
395 public E peek() {
396 final ReentrantLock lock = this.lock;
397 lock.lock();
398 try {
399 return (count == 0) ? null : itemAt(takeIndex);
400 } finally {
401 lock.unlock();
402 }
403 }
404
405 // this doc comment is overridden to remove the reference to collections
406 // greater in size than Integer.MAX_VALUE
407 /**
408 * Returns the number of elements in this queue.
409 *
410 * @return the number of elements in this queue
411 */
412 public int size() {
413 final ReentrantLock lock = this.lock;
414 lock.lock();
415 try {
416 return count;
417 } finally {
418 lock.unlock();
419 }
420 }
421
422 // this doc comment is a modified copy of the inherited doc comment,
423 // without the reference to unlimited queues.
424 /**
425 * Returns the number of additional elements that this queue can ideally
426 * (in the absence of memory or resource constraints) accept without
427 * blocking. This is always equal to the initial capacity of this queue
428 * less the current {@code size} of this queue.
429 *
430 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
431 * an element will succeed by inspecting {@code remainingCapacity}
432 * because it may be the case that another thread is about to
433 * insert or remove an element.
434 */
435 public int remainingCapacity() {
436 final ReentrantLock lock = this.lock;
437 lock.lock();
438 try {
439 return items.length - count;
440 } finally {
441 lock.unlock();
442 }
443 }
444
445 /**
446 * Removes a single instance of the specified element from this queue,
447 * if it is present. More formally, removes an element {@code e} such
448 * that {@code o.equals(e)}, if this queue contains one or more such
449 * elements.
450 * Returns {@code true} if this queue contained the specified element
451 * (or equivalently, if this queue changed as a result of the call).
452 *
453 * <p>Removal of interior elements in circular array based queues
454 * is an intrinsically slow and disruptive operation, so should
455 * be undertaken only in exceptional circumstances, ideally
456 * only when the queue is known not to be accessible by other
457 * threads.
458 *
459 * @param o element to be removed from this queue, if present
460 * @return {@code true} if this queue changed as a result of the call
461 */
462 public boolean remove(Object o) {
463 if (o == null) return false;
464 final Object[] items = this.items;
465 final ReentrantLock lock = this.lock;
466 lock.lock();
467 try {
468 if (count > 0) {
469 final int putIndex = this.putIndex;
470 int i = takeIndex;
471 do {
472 if (o.equals(items[i])) {
473 removeAt(i);
474 return true;
475 }
476 } while ((i = inc(i)) != putIndex);
477 }
478 return false;
479 } finally {
480 lock.unlock();
481 }
482 }
483
484 /**
485 * Returns {@code true} if this queue contains the specified element.
486 * More formally, returns {@code true} if and only if this queue contains
487 * at least one element {@code e} such that {@code o.equals(e)}.
488 *
489 * @param o object to be checked for containment in this queue
490 * @return {@code true} if this queue contains the specified element
491 */
492 public boolean contains(Object o) {
493 if (o == null) return false;
494 final Object[] items = this.items;
495 final ReentrantLock lock = this.lock;
496 lock.lock();
497 try {
498 if (count > 0) {
499 final int putIndex = this.putIndex;
500 int i = takeIndex;
501 do {
502 if (o.equals(items[i]))
503 return true;
504 } while ((i = inc(i)) != putIndex);
505 }
506 return false;
507 } finally {
508 lock.unlock();
509 }
510 }
511
512 /**
513 * Returns an array containing all of the elements in this queue, in
514 * proper sequence.
515 *
516 * <p>The returned array will be "safe" in that no references to it are
517 * maintained by this queue. (In other words, this method must allocate
518 * a new array). The caller is thus free to modify the returned array.
519 *
520 * <p>This method acts as bridge between array-based and collection-based
521 * APIs.
522 *
523 * @return an array containing all of the elements in this queue
524 */
525 public Object[] toArray() {
526 final Object[] items = this.items;
527 final ReentrantLock lock = this.lock;
528 lock.lock();
529 try {
530 final int count = this.count;
531 Object[] a = new Object[count];
532 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
533 a[k] = items[i];
534 return a;
535 } finally {
536 lock.unlock();
537 }
538 }
539
540 /**
541 * Returns an array containing all of the elements in this queue, in
542 * proper sequence; the runtime type of the returned array is that of
543 * the specified array. If the queue fits in the specified array, it
544 * is returned therein. Otherwise, a new array is allocated with the
545 * runtime type of the specified array and the size of this queue.
546 *
547 * <p>If this queue fits in the specified array with room to spare
548 * (i.e., the array has more elements than this queue), the element in
549 * the array immediately following the end of the queue is set to
550 * {@code null}.
551 *
552 * <p>Like the {@link #toArray()} method, this method acts as bridge between
553 * array-based and collection-based APIs. Further, this method allows
554 * precise control over the runtime type of the output array, and may,
555 * under certain circumstances, be used to save allocation costs.
556 *
557 * <p>Suppose {@code x} is a queue known to contain only strings.
558 * The following code can be used to dump the queue into a newly
559 * allocated array of {@code String}:
560 *
561 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
562 *
563 * Note that {@code toArray(new Object[0])} is identical in function to
564 * {@code toArray()}.
565 *
566 * @param a the array into which the elements of the queue are to
567 * be stored, if it is big enough; otherwise, a new array of the
568 * same runtime type is allocated for this purpose
569 * @return an array containing all of the elements in this queue
570 * @throws ArrayStoreException if the runtime type of the specified array
571 * is not a supertype of the runtime type of every element in
572 * this queue
573 * @throws NullPointerException if the specified array is null
574 */
575 @SuppressWarnings("unchecked")
576 public <T> T[] toArray(T[] a) {
577 final Object[] items = this.items;
578 final ReentrantLock lock = this.lock;
579 lock.lock();
580 try {
581 final int count = this.count;
582 final int len = a.length;
583 if (len < count)
584 a = (T[])java.lang.reflect.Array.newInstance(
585 a.getClass().getComponentType(), count);
586 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
587 a[k] = (T) items[i];
588 if (len > count)
589 a[count] = null;
590 return a;
591 } finally {
592 lock.unlock();
593 }
594 }
595
596 public String toString() {
597 final ReentrantLock lock = this.lock;
598 lock.lock();
599 try {
600 int k = count;
601 if (k == 0)
602 return "[]";
603
604 StringBuilder sb = new StringBuilder();
605 sb.append('[');
606 for (int i = takeIndex; ; i = inc(i)) {
607 Object e = items[i];
608 sb.append(e == this ? "(this Collection)" : e);
609 if (--k == 0)
610 return sb.append(']').toString();
611 sb.append(',').append(' ');
612 }
613 } finally {
614 lock.unlock();
615 }
616 }
617
618 /**
619 * Atomically removes all of the elements from this queue.
620 * The queue will be empty after this call returns.
621 */
622 public void clear() {
623 final Object[] items = this.items;
624 final ReentrantLock lock = this.lock;
625 lock.lock();
626 try {
627 int k = count;
628 if (k > 0) {
629 final int putIndex = this.putIndex;
630 int i = takeIndex;
631 do {
632 items[i] = null;
633 } while ((i = inc(i)) != putIndex);
634 takeIndex = putIndex;
635 count = 0;
636 if (itrs != null)
637 itrs.queueIsEmpty();
638 for (; k > 0 && lock.hasWaiters(notFull); k--)
639 notFull.signal();
640 }
641 } finally {
642 lock.unlock();
643 }
644 }
645
646 /**
647 * @throws UnsupportedOperationException {@inheritDoc}
648 * @throws ClassCastException {@inheritDoc}
649 * @throws NullPointerException {@inheritDoc}
650 * @throws IllegalArgumentException {@inheritDoc}
651 */
652 public int drainTo(Collection<? super E> c) {
653 return drainTo(c, Integer.MAX_VALUE);
654 }
655
656 /**
657 * @throws UnsupportedOperationException {@inheritDoc}
658 * @throws ClassCastException {@inheritDoc}
659 * @throws NullPointerException {@inheritDoc}
660 * @throws IllegalArgumentException {@inheritDoc}
661 */
662 public int drainTo(Collection<? super E> c, int maxElements) {
663 checkNotNull(c);
664 if (c == this)
665 throw new IllegalArgumentException();
666 if (maxElements <= 0)
667 return 0;
668 final Object[] items = this.items;
669 final ReentrantLock lock = this.lock;
670 lock.lock();
671 try {
672 int n = Math.min(maxElements, count);
673 int take = takeIndex;
674 int i = 0;
675 try {
676 while (i < n) {
677 @SuppressWarnings("unchecked") E x = (E) items[take];
678 c.add(x);
679 items[take] = null;
680 take = inc(take);
681 i++;
682 }
683 return n;
684 } finally {
685 // Restore invariants even if c.add() threw
686 if (i > 0) {
687 count -= i;
688 takeIndex = take;
689 if (itrs != null) {
690 if (count == 0)
691 itrs.queueIsEmpty();
692 else if (i > take)
693 itrs.takeIndexWrapped();
694 }
695 for (; i > 0 && lock.hasWaiters(notFull); i--)
696 notFull.signal();
697 }
698 }
699 } finally {
700 lock.unlock();
701 }
702 }
703
704 /**
705 * Returns an iterator over the elements in this queue in proper sequence.
706 * The elements will be returned in order from first (head) to last (tail).
707 *
708 * <p>The returned iterator is a "weakly consistent" iterator that
709 * will never throw {@link java.util.ConcurrentModificationException
710 * ConcurrentModificationException}, and guarantees to traverse
711 * elements as they existed upon construction of the iterator, and
712 * may (but is not guaranteed to) reflect any modifications
713 * subsequent to construction.
714 *
715 * @return an iterator over the elements in this queue in proper sequence
716 */
717 public Iterator<E> iterator() {
718 return new Itr();
719 }
720
721 /**
722 * Shared data between iterators and their queue, allowing queue
723 * modifications to update iterators when elements are removed.
724 *
725 * This adds a lot of complexity for the sake of correctly
726 * handling some uncommon operations, but the combination of
727 * circular-arrays and supporting interior removes (i.e., those
728 * not at head) would cause iterators to sometimes lose their
729 * places and/or (re)report elements they shouldn't. To avoid
730 * this, when a queue has one or more iterators, it keeps iterator
731 * state consistent by:
732 *
733 * (1) keeping track of the number of "cycles", that is, the
734 * number of times takeIndex has wrapped around to 0.
735 * (2) notifying all iterators via the callback removedAt whenever
736 * an interior element is removed (and thus other elements may
737 * be shifted).
738 *
739 * These suffice to eliminate iterator inconsistencies, but
740 * unfortunately add the secondary responsibility of maintaining
741 * the list of iterators. We track all active iterators in a
742 * simple linked list (accessed only when the queue's lock is
743 * held) of weak references to Itr. The list is cleaned up using
744 * 3 different mechanisms:
745 *
746 * (1) Whenever a new iterator is created, do some O(1) checking for
747 * stale list elements.
748 *
749 * (2) Whenever takeIndex wraps around to 0, check for iterators
750 * that have been unused for more than one wrap-around cycle.
751 *
752 * (3) Whenever the queue becomes empty, all iterators are notified
753 * and this entire data structure is discarded.
754 *
755 * So in addition to the removedAt callback that is necessary for
756 * correctness, iterators have the shutdown and takeIndexWrapped
757 * callbacks that help remove stale iterators from the list.
758 *
759 * Whenever a list element is examined, it is expunged if either
760 * the GC has determined that the iterator is discarded, or if the
761 * iterator reports that it is "detached" (does not need any
762 * further state updates). Overhead is maximal when takeIndex
763 * never advances, iterators are discarded before they are
764 * exhausted, and all removals are interior removes, in which case
765 * all stale iterators are discovered by the GC. But even in this
766 * case we don't increase the amortized complexity.
767 *
768 * Care must be taken to keep list sweeping methods from
769 * reentrantly invoking another such method, causing subtle
770 * corruption bugs.
771 */
772 class Itrs {
773
774 /**
775 * Node in a linked list of weak iterator references.
776 */
777 private class Node extends WeakReference<Itr> {
778 Node next;
779
780 Node(Itr iterator, Node next) {
781 super(iterator);
782 this.next = next;
783 }
784 }
785
786 /** Incremented whenever takeIndex wraps around to 0 */
787 int cycles = 0;
788
789 /** Linked list of weak iterator references */
790 private Node head;
791
792 /** Used to expunge stale iterators */
793 private Node sweeper = null;
794
795 private static final int SHORT_SWEEP_PROBES = 4;
796 private static final int LONG_SWEEP_PROBES = 16;
797
798 Itrs(Itr initial) {
799 register(initial);
800 }
801
802 /**
803 * Sweeps itrs, looking for and expunging stale iterators.
804 * If at least one was found, tries harder to find more.
805 * Called only from iterating thread.
806 *
807 * @param tryHarder whether to start in try-harder mode, because
808 * there is known to be at least one iterator to collect
809 */
810 void doSomeSweeping(boolean tryHarder) {
811 assert lock.getHoldCount() == 1;
812 assert head != null;
813 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
814 Node o, p;
815 final Node sweeper = this.sweeper;
816 boolean passedGo; // to limit search to one full sweep
817
818 if (sweeper == null) {
819 o = null;
820 p = head;
821 passedGo = true;
822 } else {
823 o = sweeper;
824 p = o.next;
825 passedGo = false;
826 }
827
828 for (; probes > 0; probes--) {
829 if (p == null) {
830 if (passedGo)
831 break;
832 o = null;
833 p = head;
834 passedGo = true;
835 }
836 final Itr it = p.get();
837 final Node next = p.next;
838 if (it == null || it.isDetached()) {
839 // found a discarded/exhausted iterator
840 probes = LONG_SWEEP_PROBES; // "try harder"
841 // unlink p
842 p.clear();
843 p.next = null;
844 if (o == null) {
845 head = next;
846 if (next == null) {
847 // We've run out of iterators to track; retire
848 itrs = null;
849 return;
850 }
851 }
852 else
853 o.next = next;
854 } else {
855 o = p;
856 }
857 p = next;
858 }
859
860 this.sweeper = (p == null) ? null : o;
861 }
862
863 /**
864 * Adds a new iterator to the linked list of tracked iterators.
865 */
866 void register(Itr itr) {
867 assert lock.getHoldCount() == 1;
868 head = new Node(itr, head);
869 }
870
871 /**
872 * Called whenever takeIndex wraps around to 0.
873 *
874 * Notifies all iterators, and expunges any that are now stale.
875 */
876 void takeIndexWrapped() {
877 assert lock.getHoldCount() == 1;
878 cycles++;
879 for (Node o = null, p = head; p != null;) {
880 final Itr it = p.get();
881 final Node next = p.next;
882 if (it == null || it.takeIndexWrapped()) {
883 // unlink p
884 assert it == null || it.isDetached();
885 p.clear();
886 p.next = null;
887 if (o == null)
888 head = next;
889 else
890 o.next = next;
891 } else {
892 o = p;
893 }
894 p = next;
895 }
896 if (head == null) // no more iterators to track
897 itrs = null;
898 }
899
900 /**
901 * Called whenever an interior remove (not at takeIndex) occured.
902 *
903 * Notifies all iterators, and expunges any that are now stale.
904 */
905 void removedAt(int removedIndex) {
906 for (Node o = null, p = head; p != null;) {
907 final Itr it = p.get();
908 final Node next = p.next;
909 if (it == null || it.removedAt(removedIndex)) {
910 // unlink p
911 assert it == null || it.isDetached();
912 p.clear();
913 p.next = null;
914 if (o == null)
915 head = next;
916 else
917 o.next = next;
918 } else {
919 o = p;
920 }
921 p = next;
922 }
923 if (head == null) // no more iterators to track
924 itrs = null;
925 }
926
927 /**
928 * Called whenever the queue becomes empty.
929 *
930 * Notifies all active iterators that the queue is empty,
931 * clears all weak refs, and unlinks the itrs datastructure.
932 */
933 void queueIsEmpty() {
934 assert lock.getHoldCount() == 1;
935 for (Node p = head; p != null; p = p.next) {
936 Itr it = p.get();
937 if (it != null) {
938 p.clear();
939 it.shutdown();
940 }
941 }
942 head = null;
943 itrs = null;
944 }
945
946 /**
947 * Called whenever an element has been dequeued (at takeIndex).
948 */
949 void elementDequeued() {
950 assert lock.getHoldCount() == 1;
951 if (count == 0)
952 queueIsEmpty();
953 else if (takeIndex == 0)
954 takeIndexWrapped();
955 }
956 }
957
958 /**
959 * Iterator for ArrayBlockingQueue.
960 *
961 * To maintain weak consistency with respect to puts and takes, we
962 * read ahead one slot, so as to not report hasNext true but then
963 * not have an element to return.
964 *
965 * We switch into "detached" mode (allowing prompt unlinking from
966 * itrs without help from the GC) when all indices are negative, or
967 * when hasNext returns false for the first time. This allows the
968 * iterator to track concurrent updates completely accurately,
969 * except for the corner case of the user calling Iterator.remove()
970 * after hasNext() returned false. Even in this case, we ensure
971 * that we don't remove the wrong element by keeping track of the
972 * expected element to remove, in lastItem. Yes, we may fail to
973 * remove lastItem from the queue if it moved due to an interleaved
974 * interior remove while in detached mode.
975 */
976 private class Itr implements Iterator<E> {
977 /** Index to look for new nextItem; NONE at end */
978 private int cursor;
979
980 /** Element to be returned by next call to next(); null if none */
981 private E nextItem;
982
983 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
984 private int nextIndex;
985
986 /** Last element returned; null if none or not detached. */
987 private E lastItem;
988
989 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
990 private int lastRet;
991
992 /** Previous value of takeIndex, or DETACHED when detached */
993 private int prevTakeIndex;
994
995 /** Previous value of iters.cycles */
996 private int prevCycles;
997
998 /** Special index value indicating "not available" or "undefined" */
999 private static final int NONE = -1;
1000
1001 /**
1002 * Special index value indicating "removed elsewhere", that is,
1003 * removed by some operation other than a call to this.remove().
1004 */
1005 private static final int REMOVED = -2;
1006
1007 /** Special value for prevTakeIndex indicating "detached mode" */
1008 private static final int DETACHED = -3;
1009
1010 Itr() {
1011 assert lock.getHoldCount() == 0;
1012 lastRet = NONE;
1013 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1014 lock.lock();
1015 try {
1016 if (count == 0) {
1017 assert itrs == null;
1018 cursor = NONE;
1019 nextIndex = NONE;
1020 prevTakeIndex = DETACHED;
1021 } else {
1022 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1023 prevTakeIndex = takeIndex;
1024 nextItem = itemAt(nextIndex = takeIndex);
1025 cursor = incCursor(takeIndex);
1026 if (itrs == null) {
1027 itrs = new Itrs(this);
1028 } else {
1029 itrs.register(this); // in this order
1030 itrs.doSomeSweeping(false);
1031 }
1032 prevCycles = itrs.cycles;
1033 assert takeIndex >= 0;
1034 assert prevTakeIndex == takeIndex;
1035 assert nextIndex >= 0;
1036 assert nextItem != null;
1037 }
1038 } finally {
1039 lock.unlock();
1040 }
1041 }
1042
1043 boolean isDetached() {
1044 assert lock.getHoldCount() == 1;
1045 return prevTakeIndex < 0;
1046 }
1047
1048 private int incCursor(int index) {
1049 assert lock.getHoldCount() == 1;
1050 index = inc(index);
1051 if (index == putIndex)
1052 index = NONE;
1053 return index;
1054 }
1055
1056 /**
1057 * Returns true if index is invalidated by the given number of
1058 * dequeues, starting from prevTakeIndex.
1059 */
1060 private boolean invalidated(int index, int prevTakeIndex,
1061 long dequeues, int length) {
1062 if (index < 0)
1063 return false;
1064 int distance = index - prevTakeIndex;
1065 if (distance < 0)
1066 distance += length;
1067 return dequeues > distance;
1068 }
1069
1070 /**
1071 * Adjusts indices to incorporate all dequeues since the last
1072 * operation on this iterator. Call only from iterating thread.
1073 */
1074 private void incorporateDequeues() {
1075 assert lock.getHoldCount() == 1;
1076 assert itrs != null;
1077 assert !isDetached();
1078 assert count > 0;
1079
1080 final int cycles = itrs.cycles;
1081 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1082 final int prevCycles = this.prevCycles;
1083 final int prevTakeIndex = this.prevTakeIndex;
1084
1085 if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1086 final int len = items.length;
1087 // how far takeIndex has advanced since the previous
1088 // operation of this iterator
1089 long dequeues = (cycles - prevCycles) * len
1090 + (takeIndex - prevTakeIndex);
1091
1092 // Check indices for invalidation
1093 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1094 lastRet = REMOVED;
1095 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1096 nextIndex = REMOVED;
1097 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1098 cursor = takeIndex;
1099
1100 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1101 detach();
1102 else {
1103 this.prevCycles = cycles;
1104 this.prevTakeIndex = takeIndex;
1105 }
1106 }
1107 }
1108
1109 /**
1110 * Called when itrs should stop tracking this iterator, either
1111 * because there are no more indices to update (cursor < 0 &&
1112 * nextIndex < 0 && lastRet < 0) or as a special exception, when
1113 * lastRet >= 0, because hasNext() is about to return false for the
1114 * first time. Call only from iterating thread.
1115 */
1116 private void detach() {
1117 // Switch to detached mode
1118 assert lock.getHoldCount() == 1;
1119 assert cursor == NONE;
1120 assert nextIndex < 0;
1121 assert lastRet < 0 || nextItem == null;
1122 assert lastRet < 0 ^ lastItem != null;
1123 if (prevTakeIndex >= 0) {
1124 assert itrs != null;
1125 prevTakeIndex = DETACHED;
1126 // try to unlink from itrs (but not too hard)
1127 itrs.doSomeSweeping(true);
1128 }
1129 }
1130
1131 /**
1132 * For performance reasons, we would like not to acquire a lock in
1133 * hasNext in the common case. To allow for this, we only access
1134 * fields (i.e. nextItem) that are not modified by update operations
1135 * triggered by queue modifications.
1136 */
1137 public boolean hasNext() {
1138 assert lock.getHoldCount() == 0;
1139 if (nextItem != null)
1140 return true;
1141 noNext();
1142 return false;
1143 }
1144
1145 private void noNext() {
1146 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1147 lock.lock();
1148 try {
1149 assert cursor == NONE;
1150 assert nextIndex == NONE;
1151 if (!isDetached()) {
1152 assert lastRet >= 0;
1153 incorporateDequeues(); // might update lastRet
1154 if (lastRet >= 0) {
1155 lastItem = itemAt(lastRet);
1156 assert lastItem != null;
1157 detach();
1158 }
1159 }
1160 assert isDetached();
1161 assert lastRet < 0 ^ lastItem != null;
1162 } finally {
1163 lock.unlock();
1164 }
1165 }
1166
1167 public E next() {
1168 assert lock.getHoldCount() == 0;
1169 final E x = nextItem;
1170 if (x == null)
1171 throw new NoSuchElementException();
1172 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1173 lock.lock();
1174 try {
1175 if (!isDetached())
1176 incorporateDequeues();
1177 assert nextIndex != NONE;
1178 assert lastItem == null;
1179 lastRet = nextIndex;
1180 final int cursor = this.cursor;
1181 if (cursor >= 0) {
1182 nextItem = itemAt(nextIndex = cursor);
1183 assert nextItem != null;
1184 this.cursor = incCursor(cursor);
1185 } else {
1186 nextIndex = NONE;
1187 nextItem = null;
1188 }
1189 } finally {
1190 lock.unlock();
1191 }
1192 return x;
1193 }
1194
1195 public void remove() {
1196 assert lock.getHoldCount() == 0;
1197 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1198 lock.lock();
1199 try {
1200 if (!isDetached())
1201 incorporateDequeues(); // might update lastRet or detach
1202 final int lastRet = this.lastRet;
1203 this.lastRet = NONE;
1204 if (lastRet >= 0) {
1205 if (!isDetached())
1206 removeAt(lastRet);
1207 else {
1208 final E lastItem = this.lastItem;
1209 assert lastItem != null;
1210 this.lastItem = null;
1211 if (itemAt(lastRet) == lastItem)
1212 removeAt(lastRet);
1213 }
1214 } else if (lastRet == NONE)
1215 throw new IllegalStateException();
1216 // else lastRet == REMOVED and the last returned element was
1217 // previously asynchronously removed via an operation other
1218 // than this.remove(), so nothing to do.
1219
1220 if (cursor < 0 && nextIndex < 0)
1221 detach();
1222 } finally {
1223 lock.unlock();
1224 assert lastRet == NONE;
1225 assert lastItem == null;
1226 }
1227 }
1228
1229 /**
1230 * Called to notify the iterator that the queue is empty, or that it
1231 * has fallen hopelessly behind, so that it should abandon any
1232 * further iteration, except possibly to return one more element
1233 * from next(), as promised by returning true from hasNext().
1234 */
1235 void shutdown() {
1236 assert lock.getHoldCount() == 1;
1237 cursor = NONE;
1238 if (nextIndex >= 0)
1239 nextIndex = REMOVED;
1240 if (lastRet >= 0) {
1241 lastRet = REMOVED;
1242 lastItem = null;
1243 }
1244 prevTakeIndex = DETACHED;
1245 // Don't set nextItem to null because we must continue to be
1246 // able to return it on next().
1247 //
1248 // Caller will unlink from itrs when convenient.
1249 }
1250
1251 private int distance(int index, int prevTakeIndex, int length) {
1252 int distance = index - prevTakeIndex;
1253 if (distance < 0)
1254 distance += length;
1255 return distance;
1256 }
1257
1258 /**
1259 * Called whenever an interior remove (not at takeIndex) occured.
1260 *
1261 * @return true if this iterator should be unlinked from itrs
1262 */
1263 boolean removedAt(int removedIndex) {
1264 assert lock.getHoldCount() == 1;
1265 if (isDetached())
1266 return true;
1267
1268 final int cycles = itrs.cycles;
1269 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1270 final int prevCycles = this.prevCycles;
1271 final int prevTakeIndex = this.prevTakeIndex;
1272 final int len = items.length;
1273 int cycleDiff = cycles - prevCycles;
1274 if (removedIndex < takeIndex)
1275 cycleDiff++;
1276 final int removedDistance =
1277 (cycleDiff * len) + (removedIndex - prevTakeIndex);
1278 assert removedDistance >= 0;
1279 int cursor = this.cursor;
1280 if (cursor >= 0) {
1281 int x = distance(cursor, prevTakeIndex, len);
1282 if (x == removedDistance) {
1283 if (cursor == putIndex)
1284 this.cursor = cursor = NONE;
1285 }
1286 else if (x > removedDistance) {
1287 assert cursor != prevTakeIndex;
1288 this.cursor = cursor = dec(cursor);
1289 }
1290 }
1291 int lastRet = this.lastRet;
1292 if (lastRet >= 0) {
1293 int x = distance(lastRet, prevTakeIndex, len);
1294 if (x == removedDistance)
1295 this.lastRet = lastRet = REMOVED;
1296 else if (x > removedDistance)
1297 this.lastRet = lastRet = dec(lastRet);
1298 }
1299 int nextIndex = this.nextIndex;
1300 if (nextIndex >= 0) {
1301 int x = distance(nextIndex, prevTakeIndex, len);
1302 if (x == removedDistance)
1303 this.nextIndex = nextIndex = REMOVED;
1304 else if (x > removedDistance)
1305 this.nextIndex = nextIndex = dec(nextIndex);
1306 }
1307 else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1308 this.prevTakeIndex = DETACHED;
1309 return true;
1310 }
1311 return false;
1312 }
1313
1314 /**
1315 * Called whenever takeIndex wraps around to zero.
1316 *
1317 * @return true if this iterator should be unlinked from itrs
1318 */
1319 boolean takeIndexWrapped() {
1320 assert lock.getHoldCount() == 1;
1321 if (isDetached())
1322 return true;
1323 if (itrs.cycles - prevCycles > 1) {
1324 // All the elements that existed at the time of the last
1325 // operation are gone, so abandon further iteration.
1326 shutdown();
1327 return true;
1328 }
1329 return false;
1330 }
1331
1332 // /** Uncomment for debugging. */
1333 // public String toString() {
1334 // return ("cursor=" + cursor + " " +
1335 // "nextIndex=" + nextIndex + " " +
1336 // "lastRet=" + lastRet + " " +
1337 // "nextItem=" + nextItem + " " +
1338 // "lastItem=" + lastItem + " " +
1339 // "prevCycles=" + prevCycles + " " +
1340 // "prevTakeIndex=" + prevTakeIndex + " " +
1341 // "size()=" + size() + " " +
1342 // "remainingCapacity()=" + remainingCapacity());
1343 // }
1344 }
1345 }