ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.90
Committed: Fri Jul 15 19:45:00 2011 UTC (12 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.89: +28 -26 lines
Log Message:
minor improvements to removeAt

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.Condition;
9 import java.util.concurrent.locks.ReentrantLock;
10 import java.util.AbstractQueue;
11 import java.util.Collection;
12 import java.util.Iterator;
13 import java.util.NoSuchElementException;
14 import java.lang.ref.WeakReference;
15
16 /**
17 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
18 * array. This queue orders elements FIFO (first-in-first-out). The
19 * <em>head</em> of the queue is that element that has been on the
20 * queue the longest time. The <em>tail</em> of the queue is that
21 * element that has been on the queue the shortest time. New elements
22 * are inserted at the tail of the queue, and the queue retrieval
23 * operations obtain elements at the head of the queue.
24 *
25 * <p>This is a classic &quot;bounded buffer&quot;, in which a
26 * fixed-sized array holds elements inserted by producers and
27 * extracted by consumers. Once created, the capacity cannot be
28 * changed. Attempts to {@code put} an element into a full queue
29 * will result in the operation blocking; attempts to {@code take} an
30 * element from an empty queue will similarly block.
31 *
32 * <p>This class supports an optional fairness policy for ordering
33 * waiting producer and consumer threads. By default, this ordering
34 * is not guaranteed. However, a queue constructed with fairness set
35 * to {@code true} grants threads access in FIFO order. Fairness
36 * generally decreases throughput but reduces variability and avoids
37 * starvation.
38 *
39 * <p>This class and its iterator implement all of the
40 * <em>optional</em> methods of the {@link Collection} and {@link
41 * Iterator} interfaces.
42 *
43 * <p>This class is a member of the
44 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
45 * Java Collections Framework</a>.
46 *
47 * @since 1.5
48 * @author Doug Lea
49 * @param <E> the type of elements held in this collection
50 */
51 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
52 implements BlockingQueue<E>, java.io.Serializable {
53
54 /**
55 * Serialization ID. This class relies on default serialization
56 * even for the items array, which is default-serialized, even if
57 * it is empty. Otherwise it could not be declared final, which is
58 * necessary here.
59 */
60 private static final long serialVersionUID = -817911632652898426L;
61
62 /** The queued items */
63 final Object[] items;
64
65 /** items index for next take, poll, peek or remove */
66 int takeIndex;
67
68 /** items index for next put, offer, or add */
69 int putIndex;
70
71 /** Number of elements in the queue */
72 int count;
73
74 /*
75 * Concurrency control uses the classic two-condition algorithm
76 * found in any textbook.
77 */
78
79 /** Main lock guarding all access */
80 final ReentrantLock lock;
81
82 /** Condition for waiting takes */
83 private final Condition notEmpty;
84
85 /** Condition for waiting puts */
86 private final Condition notFull;
87
88 /**
89 * Shared state for currently active iterators, or null if there
90 * are known not to be any. Allows queue operations to update
91 * iterator state.
92 */
93 transient Itrs itrs = null;
94
95 // Internal helper methods
96
97 /**
98 * Circularly increment i.
99 */
100 final int inc(int i) {
101 return (++i == items.length) ? 0 : i;
102 }
103
104 /**
105 * Circularly decrement i.
106 */
107 final int dec(int i) {
108 return ((i == 0) ? items.length : i) - 1;
109 }
110
111 /**
112 * Returns item at index i.
113 */
114 final E itemAt(int i) {
115 @SuppressWarnings("unchecked") E x = (E) items[i];
116 return x;
117 }
118
119 /**
120 * Throws NullPointerException if argument is null.
121 *
122 * @param v the element
123 */
124 private static void checkNotNull(Object v) {
125 if (v == null)
126 throw new NullPointerException();
127 }
128
129 /**
130 * Inserts element at current put position, advances, and signals.
131 * Call only when holding lock.
132 */
133 private void enqueue(E x) {
134 assert lock.getHoldCount() == 1;
135 assert items[putIndex] == null;
136 items[putIndex] = x;
137 putIndex = inc(putIndex);
138 count++;
139 notEmpty.signal();
140 }
141
142 /**
143 * Extracts element at current take position, advances, and signals.
144 * Call only when holding lock.
145 */
146 private E dequeue() {
147 assert lock.getHoldCount() == 1;
148 assert items[takeIndex] != null;
149 final Object[] items = this.items;
150 @SuppressWarnings("unchecked") E x = (E) items[takeIndex];
151 items[takeIndex] = null;
152 takeIndex = inc(takeIndex);
153 count--;
154 if (itrs != null)
155 itrs.elementDequeued();
156 notFull.signal();
157 return x;
158 }
159
160 /**
161 * Deletes item at array index removeIndex.
162 * Utility for remove(Object) and iterator.remove.
163 * Call only when holding lock.
164 */
165 void removeAt(final int removeIndex) {
166 assert lock.getHoldCount() == 1;
167 assert items[removeIndex] != null;
168 assert removeIndex >= 0 && removeIndex < items.length;
169 final Object[] items = this.items;
170 if (removeIndex == takeIndex) {
171 // removing front item; just advance
172 items[takeIndex] = null;
173 takeIndex = inc(takeIndex);
174 count--;
175 if (itrs != null)
176 itrs.elementDequeued();
177 } else {
178 // an "interior" remove
179
180 // slide over all others up through putIndex.
181 final int putIndex = this.putIndex;
182 for (int i = removeIndex;;) {
183 int next = inc(i);
184 if (next != putIndex) {
185 items[i] = items[next];
186 i = next;
187 } else {
188 items[i] = null;
189 this.putIndex = i;
190 break;
191 }
192 }
193 count--;
194 if (itrs != null)
195 itrs.removedAt(removeIndex);
196 }
197 notFull.signal();
198 }
199
200 /**
201 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
202 * capacity and default access policy.
203 *
204 * @param capacity the capacity of this queue
205 * @throws IllegalArgumentException if {@code capacity < 1}
206 */
207 public ArrayBlockingQueue(int capacity) {
208 this(capacity, false);
209 }
210
211 /**
212 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
213 * capacity and the specified access policy.
214 *
215 * @param capacity the capacity of this queue
216 * @param fair if {@code true} then queue accesses for threads blocked
217 * on insertion or removal, are processed in FIFO order;
218 * if {@code false} the access order is unspecified.
219 * @throws IllegalArgumentException if {@code capacity < 1}
220 */
221 public ArrayBlockingQueue(int capacity, boolean fair) {
222 if (capacity <= 0)
223 throw new IllegalArgumentException();
224 this.items = new Object[capacity];
225 lock = new ReentrantLock(fair);
226 notEmpty = lock.newCondition();
227 notFull = lock.newCondition();
228 }
229
230 /**
231 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
232 * capacity, the specified access policy and initially containing the
233 * elements of the given collection,
234 * added in traversal order of the collection's iterator.
235 *
236 * @param capacity the capacity of this queue
237 * @param fair if {@code true} then queue accesses for threads blocked
238 * on insertion or removal, are processed in FIFO order;
239 * if {@code false} the access order is unspecified.
240 * @param c the collection of elements to initially contain
241 * @throws IllegalArgumentException if {@code capacity} is less than
242 * {@code c.size()}, or less than 1.
243 * @throws NullPointerException if the specified collection or any
244 * of its elements are null
245 */
246 public ArrayBlockingQueue(int capacity, boolean fair,
247 Collection<? extends E> c) {
248 this(capacity, fair);
249
250 final ReentrantLock lock = this.lock;
251 lock.lock(); // Lock only for visibility, not mutual exclusion
252 try {
253 int i = 0;
254 try {
255 for (E e : c) {
256 checkNotNull(e);
257 items[i++] = e;
258 }
259 } catch (ArrayIndexOutOfBoundsException ex) {
260 throw new IllegalArgumentException();
261 }
262 count = i;
263 putIndex = (i == capacity) ? 0 : i;
264 } finally {
265 lock.unlock();
266 }
267 }
268
269 /**
270 * Inserts the specified element at the tail of this queue if it is
271 * possible to do so immediately without exceeding the queue's capacity,
272 * returning {@code true} upon success and throwing an
273 * {@code IllegalStateException} if this queue is full.
274 *
275 * @param e the element to add
276 * @return {@code true} (as specified by {@link Collection#add})
277 * @throws IllegalStateException if this queue is full
278 * @throws NullPointerException if the specified element is null
279 */
280 public boolean add(E e) {
281 return super.add(e);
282 }
283
284 /**
285 * Inserts the specified element at the tail of this queue if it is
286 * possible to do so immediately without exceeding the queue's capacity,
287 * returning {@code true} upon success and {@code false} if this queue
288 * is full. This method is generally preferable to method {@link #add},
289 * which can fail to insert an element only by throwing an exception.
290 *
291 * @throws NullPointerException if the specified element is null
292 */
293 public boolean offer(E e) {
294 checkNotNull(e);
295 final ReentrantLock lock = this.lock;
296 lock.lock();
297 try {
298 if (count == items.length)
299 return false;
300 else {
301 enqueue(e);
302 return true;
303 }
304 } finally {
305 lock.unlock();
306 }
307 }
308
309 /**
310 * Inserts the specified element at the tail of this queue, waiting
311 * for space to become available if the queue is full.
312 *
313 * @throws InterruptedException {@inheritDoc}
314 * @throws NullPointerException {@inheritDoc}
315 */
316 public void put(E e) throws InterruptedException {
317 checkNotNull(e);
318 final ReentrantLock lock = this.lock;
319 lock.lockInterruptibly();
320 try {
321 while (count == items.length)
322 notFull.await();
323 enqueue(e);
324 } finally {
325 lock.unlock();
326 }
327 }
328
329 /**
330 * Inserts the specified element at the tail of this queue, waiting
331 * up to the specified wait time for space to become available if
332 * the queue is full.
333 *
334 * @throws InterruptedException {@inheritDoc}
335 * @throws NullPointerException {@inheritDoc}
336 */
337 public boolean offer(E e, long timeout, TimeUnit unit)
338 throws InterruptedException {
339
340 checkNotNull(e);
341 long nanos = unit.toNanos(timeout);
342 final ReentrantLock lock = this.lock;
343 lock.lockInterruptibly();
344 try {
345 while (count == items.length) {
346 if (nanos <= 0)
347 return false;
348 nanos = notFull.awaitNanos(nanos);
349 }
350 enqueue(e);
351 return true;
352 } finally {
353 lock.unlock();
354 }
355 }
356
357 public E poll() {
358 final ReentrantLock lock = this.lock;
359 lock.lock();
360 try {
361 return (count == 0) ? null : dequeue();
362 } finally {
363 lock.unlock();
364 }
365 }
366
367 public E take() throws InterruptedException {
368 final ReentrantLock lock = this.lock;
369 lock.lockInterruptibly();
370 try {
371 while (count == 0)
372 notEmpty.await();
373 return dequeue();
374 } finally {
375 lock.unlock();
376 }
377 }
378
379 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
380 long nanos = unit.toNanos(timeout);
381 final ReentrantLock lock = this.lock;
382 lock.lockInterruptibly();
383 try {
384 while (count == 0) {
385 if (nanos <= 0)
386 return null;
387 nanos = notEmpty.awaitNanos(nanos);
388 }
389 return dequeue();
390 } finally {
391 lock.unlock();
392 }
393 }
394
395 public E peek() {
396 final ReentrantLock lock = this.lock;
397 lock.lock();
398 try {
399 return (count == 0) ? null : itemAt(takeIndex);
400 } finally {
401 lock.unlock();
402 }
403 }
404
405 // this doc comment is overridden to remove the reference to collections
406 // greater in size than Integer.MAX_VALUE
407 /**
408 * Returns the number of elements in this queue.
409 *
410 * @return the number of elements in this queue
411 */
412 public int size() {
413 final ReentrantLock lock = this.lock;
414 lock.lock();
415 try {
416 return count;
417 } finally {
418 lock.unlock();
419 }
420 }
421
422 // this doc comment is a modified copy of the inherited doc comment,
423 // without the reference to unlimited queues.
424 /**
425 * Returns the number of additional elements that this queue can ideally
426 * (in the absence of memory or resource constraints) accept without
427 * blocking. This is always equal to the initial capacity of this queue
428 * less the current {@code size} of this queue.
429 *
430 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
431 * an element will succeed by inspecting {@code remainingCapacity}
432 * because it may be the case that another thread is about to
433 * insert or remove an element.
434 */
435 public int remainingCapacity() {
436 final ReentrantLock lock = this.lock;
437 lock.lock();
438 try {
439 return items.length - count;
440 } finally {
441 lock.unlock();
442 }
443 }
444
445 /**
446 * Removes a single instance of the specified element from this queue,
447 * if it is present. More formally, removes an element {@code e} such
448 * that {@code o.equals(e)}, if this queue contains one or more such
449 * elements.
450 * Returns {@code true} if this queue contained the specified element
451 * (or equivalently, if this queue changed as a result of the call).
452 *
453 * <p>Removal of interior elements in circular array based queues
454 * is an intrinsically slow and disruptive operation, so should
455 * be undertaken only in exceptional circumstances, ideally
456 * only when the queue is known not to be accessible by other
457 * threads.
458 *
459 * @param o element to be removed from this queue, if present
460 * @return {@code true} if this queue changed as a result of the call
461 */
462 public boolean remove(Object o) {
463 if (o == null) return false;
464 final Object[] items = this.items;
465 final ReentrantLock lock = this.lock;
466 lock.lock();
467 try {
468 if (count > 0) {
469 final int putIndex = this.putIndex;
470 int i = takeIndex;
471 do {
472 if (o.equals(items[i])) {
473 removeAt(i);
474 return true;
475 }
476 } while ((i = inc(i)) != putIndex);
477 }
478 return false;
479 } finally {
480 lock.unlock();
481 }
482 }
483
484 /**
485 * Returns {@code true} if this queue contains the specified element.
486 * More formally, returns {@code true} if and only if this queue contains
487 * at least one element {@code e} such that {@code o.equals(e)}.
488 *
489 * @param o object to be checked for containment in this queue
490 * @return {@code true} if this queue contains the specified element
491 */
492 public boolean contains(Object o) {
493 if (o == null) return false;
494 final Object[] items = this.items;
495 final ReentrantLock lock = this.lock;
496 lock.lock();
497 try {
498 if (count > 0) {
499 final int putIndex = this.putIndex;
500 int i = takeIndex;
501 do {
502 if (o.equals(items[i]))
503 return true;
504 } while ((i = inc(i)) != putIndex);
505 }
506 return false;
507 } finally {
508 lock.unlock();
509 }
510 }
511
512 /**
513 * Returns an array containing all of the elements in this queue, in
514 * proper sequence.
515 *
516 * <p>The returned array will be "safe" in that no references to it are
517 * maintained by this queue. (In other words, this method must allocate
518 * a new array). The caller is thus free to modify the returned array.
519 *
520 * <p>This method acts as bridge between array-based and collection-based
521 * APIs.
522 *
523 * @return an array containing all of the elements in this queue
524 */
525 public Object[] toArray() {
526 final Object[] items = this.items;
527 final ReentrantLock lock = this.lock;
528 lock.lock();
529 try {
530 final int count = this.count;
531 Object[] a = new Object[count];
532 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
533 a[k] = items[i];
534 return a;
535 } finally {
536 lock.unlock();
537 }
538 }
539
540 /**
541 * Returns an array containing all of the elements in this queue, in
542 * proper sequence; the runtime type of the returned array is that of
543 * the specified array. If the queue fits in the specified array, it
544 * is returned therein. Otherwise, a new array is allocated with the
545 * runtime type of the specified array and the size of this queue.
546 *
547 * <p>If this queue fits in the specified array with room to spare
548 * (i.e., the array has more elements than this queue), the element in
549 * the array immediately following the end of the queue is set to
550 * {@code null}.
551 *
552 * <p>Like the {@link #toArray()} method, this method acts as bridge between
553 * array-based and collection-based APIs. Further, this method allows
554 * precise control over the runtime type of the output array, and may,
555 * under certain circumstances, be used to save allocation costs.
556 *
557 * <p>Suppose {@code x} is a queue known to contain only strings.
558 * The following code can be used to dump the queue into a newly
559 * allocated array of {@code String}:
560 *
561 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
562 *
563 * Note that {@code toArray(new Object[0])} is identical in function to
564 * {@code toArray()}.
565 *
566 * @param a the array into which the elements of the queue are to
567 * be stored, if it is big enough; otherwise, a new array of the
568 * same runtime type is allocated for this purpose
569 * @return an array containing all of the elements in this queue
570 * @throws ArrayStoreException if the runtime type of the specified array
571 * is not a supertype of the runtime type of every element in
572 * this queue
573 * @throws NullPointerException if the specified array is null
574 */
575 @SuppressWarnings("unchecked")
576 public <T> T[] toArray(T[] a) {
577 final Object[] items = this.items;
578 final ReentrantLock lock = this.lock;
579 lock.lock();
580 try {
581 final int count = this.count;
582 final int len = a.length;
583 if (len < count)
584 a = (T[])java.lang.reflect.Array.newInstance(
585 a.getClass().getComponentType(), count);
586 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
587 a[k] = (T) items[i];
588 if (len > count)
589 a[count] = null;
590 return a;
591 } finally {
592 lock.unlock();
593 }
594 }
595
596 public String toString() {
597 final ReentrantLock lock = this.lock;
598 lock.lock();
599 try {
600 int k = count;
601 if (k == 0)
602 return "[]";
603
604 StringBuilder sb = new StringBuilder();
605 sb.append('[');
606 for (int i = takeIndex; ; i = inc(i)) {
607 Object e = items[i];
608 sb.append(e == this ? "(this Collection)" : e);
609 if (--k == 0)
610 return sb.append(']').toString();
611 sb.append(',').append(' ');
612 }
613 } finally {
614 lock.unlock();
615 }
616 }
617
618 /**
619 * Atomically removes all of the elements from this queue.
620 * The queue will be empty after this call returns.
621 */
622 public void clear() {
623 final Object[] items = this.items;
624 final ReentrantLock lock = this.lock;
625 lock.lock();
626 try {
627 int k = count;
628 if (k > 0) {
629 final int putIndex = this.putIndex;
630 int i = takeIndex;
631 do {
632 items[i] = null;
633 } while ((i = inc(i)) != putIndex);
634 takeIndex = putIndex;
635 count = 0;
636 if (itrs != null)
637 itrs.queueIsEmpty();
638 for (; k > 0 && lock.hasWaiters(notFull); k--)
639 notFull.signal();
640 }
641 } finally {
642 lock.unlock();
643 }
644 }
645
646 /**
647 * @throws UnsupportedOperationException {@inheritDoc}
648 * @throws ClassCastException {@inheritDoc}
649 * @throws NullPointerException {@inheritDoc}
650 * @throws IllegalArgumentException {@inheritDoc}
651 */
652 public int drainTo(Collection<? super E> c) {
653 return drainTo(c, Integer.MAX_VALUE);
654 }
655
656 /**
657 * @throws UnsupportedOperationException {@inheritDoc}
658 * @throws ClassCastException {@inheritDoc}
659 * @throws NullPointerException {@inheritDoc}
660 * @throws IllegalArgumentException {@inheritDoc}
661 */
662 public int drainTo(Collection<? super E> c, int maxElements) {
663 checkNotNull(c);
664 if (c == this)
665 throw new IllegalArgumentException();
666 if (maxElements <= 0)
667 return 0;
668 final Object[] items = this.items;
669 final ReentrantLock lock = this.lock;
670 lock.lock();
671 try {
672 int n = Math.min(maxElements, count);
673 int take = takeIndex;
674 int i = 0;
675 try {
676 while (i < n) {
677 @SuppressWarnings("unchecked") E x = (E) items[take];
678 c.add(x);
679 items[take] = null;
680 take = inc(take);
681 i++;
682 }
683 return n;
684 } finally {
685 // Restore invariants even if c.add() threw
686 if (i > 0) {
687 count -= i;
688 takeIndex = take;
689 if (itrs != null) {
690 if (count == 0)
691 itrs.queueIsEmpty();
692 else if (i > take)
693 itrs.takeIndexWrapped();
694 }
695 for (; i > 0 && lock.hasWaiters(notFull); i--)
696 notFull.signal();
697 }
698 }
699 } finally {
700 lock.unlock();
701 }
702 }
703
704 /**
705 * Returns an iterator over the elements in this queue in proper sequence.
706 * The elements will be returned in order from first (head) to last (tail).
707 *
708 * <p>The returned iterator is a "weakly consistent" iterator that
709 * will never throw {@link java.util.ConcurrentModificationException
710 * ConcurrentModificationException}, and guarantees to traverse
711 * elements as they existed upon construction of the iterator, and
712 * may (but is not guaranteed to) reflect any modifications
713 * subsequent to construction.
714 *
715 * @return an iterator over the elements in this queue in proper sequence
716 */
717 public Iterator<E> iterator() {
718 return new Itr();
719 }
720
721 /**
722 * Shared data between iterators and their queue. Allows queue
723 * modifications to update iterators when elements are removed.
724 *
725 * We track all active iterators in a linked list of weak references
726 * to Itr, so that queue operations can update the iterators. The
727 * list is cleaned up using 3 different mechanisms:
728 *
729 * (1) Whenever a new iterator is created, do some O(1) checking for
730 * stale list elements.
731 *
732 * (2) Whenever takeIndex wraps around to 0, check for iterators
733 * that have been unused for more than one wrap-around cycle.
734 *
735 * (3) Whenever the queue becomes empty, all iterators are notified
736 * and this entire data structure is discarded.
737 *
738 * Whenever a list element is examined, it is expunged if either the
739 * GC has determined that the iterator is discarded, or if the
740 * iterator reports that it is "detached" (does not need any further
741 * state updates).
742 *
743 * This achieves our goals of a high-reliability iterator that adds
744 * very little overhead if users don't create iterators. The most
745 * overhead is seen if takeIndex never advances, iterators are
746 * discarded before they are exhausted, and all removals are
747 * interior removes, in which case all stale iterators are
748 * discovered by the GC. But even in this case we don't increase
749 * the amortized complexity.
750 */
751 class Itrs {
752
753 /**
754 * Node in a linked list of weak iterator references.
755 */
756 private class Node extends WeakReference<Itr> {
757 Node next;
758
759 Node(Itr iterator, Node next) {
760 super(iterator);
761 this.next = next;
762 }
763 }
764
765 /** Incremented whenever takeIndex wraps around to 0 */
766 int cycles = 0;
767
768 /** Linked list of weak iterator references */
769 private Node head = null;
770
771 /** Used to expunge stale iterators */
772 private Node sweeper = null;
773
774 private static final int SHORT_SWEEP_PROBES = 4;
775 private static final int LONG_SWEEP_PROBES = 16;
776
777 /**
778 * Sweeps itrs, looking for and expunging stale iterators.
779 * If at least one was found, tries harder to find more.
780 *
781 * @param tryHarder whether to start in try-harder mode, because
782 * there is known to be at least one iterator to collect
783 */
784 void doSomeSweeping(boolean tryHarder) {
785 assert lock.getHoldCount() == 1;
786 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
787 Node o, p;
788 final Node sweeper = this.sweeper;
789
790 if (sweeper != null && sweeper.get() != null) {
791 o = sweeper;
792 p = o.next;
793 } else {
794 o = null;
795 p = head;
796 }
797
798 for (; p != null && probes > 0; probes--) {
799 final Itr it = p.get();
800 final Node next = p.next;
801 if (it == null || it.isDetached()) {
802 // found a discarded/exhausted iterator
803 probes = LONG_SWEEP_PROBES; // "try harder"
804 // unlink p
805 p.clear();
806 p.next = null;
807 if (o == null)
808 head = next;
809 else
810 o.next = next;
811 } else {
812 o = p;
813 }
814 p = next;
815 }
816 if (head == null) // no more iterators to track
817 itrs = null;
818
819 this.sweeper = (p == null) ? null : o;
820 }
821
822 /**
823 * Adds a new iterator to the linked list of tracked iterators.
824 */
825 void register(Itr itr) {
826 assert lock.getHoldCount() == 1;
827 head = new Node(itr, head);
828 }
829
830 /**
831 * Called whenever takeIndex wraps around to 0.
832 *
833 * Notifies all iterators, and expunges any that are now stale.
834 */
835 void takeIndexWrapped() {
836 assert lock.getHoldCount() == 1;
837 cycles++;
838 for (Node o = null, p = head; p != null;) {
839 final Itr it = p.get();
840 final Node next = p.next;
841 if (it == null || it.takeIndexWrapped()) {
842 // unlink p
843 assert it == null || it.isDetached();
844 p.clear();
845 p.next = null;
846 if (o == null)
847 head = next;
848 else
849 o.next = next;
850 } else {
851 o = p;
852 }
853 p = next;
854 }
855 if (head == null) // no more iterators to track
856 itrs = null;
857 }
858
859 /**
860 * Called whenever an interior remove (not at takeIndex) occured.
861 */
862 void removedAt(int removedIndex) {
863 for (Node o = null, p = head; p != null;) {
864 final Itr it = p.get();
865 final Node next = p.next;
866 if (it == null || it.removedAt(removedIndex)) {
867 // unlink p
868 assert it == null || it.isDetached();
869 p.clear();
870 p.next = null;
871 if (o == null)
872 head = next;
873 else
874 o.next = next;
875 } else {
876 o = p;
877 }
878 p = next;
879 }
880 if (head == null) // no more iterators to track
881 itrs = null;
882 }
883
884 /**
885 * Called whenever the queue becomes empty.
886 *
887 * Notifies all active iterators that the queue is empty,
888 * clears all weak refs, and unlinks the itrs datastructure.
889 */
890 void queueIsEmpty() {
891 assert lock.getHoldCount() == 1;
892 for (Node p = head; p != null; p = p.next) {
893 Itr it = p.get();
894 if (it != null) {
895 p.clear();
896 it.shutdown();
897 }
898 }
899 head = null;
900 itrs = null;
901 }
902
903 /**
904 * Called whenever an element has been dequeued (at takeIndex).
905 */
906 void elementDequeued() {
907 assert lock.getHoldCount() == 1;
908 if (count == 0)
909 queueIsEmpty();
910 else if (takeIndex == 0)
911 takeIndexWrapped();
912 }
913 }
914
915 /**
916 * Iterator for ArrayBlockingQueue.
917 *
918 * To maintain weak consistency with respect to puts and takes, we
919 * read ahead one slot, so as to not report hasNext true but then
920 * not have an element to return.
921 *
922 * We switch into "detached" mode (allowing prompt unlinking from
923 * itrs without help from the GC) when all indices are negative, or
924 * when hasNext returns false for the first time. This allows the
925 * iterator to track concurrent updates completely accurately,
926 * except for the corner case of the user calling Iterator.remove()
927 * after hasNext() returned false. Even in this case, we ensure
928 * that we don't remove the wrong element by keeping track of the
929 * expected element to remove, in lastItem. Yes, we may fail to
930 * remove lastItem from the queue if it moved due to an interleaved
931 * interior remove while in detached mode.
932 */
933 private class Itr implements Iterator<E> {
934 /** Index to look for new nextItem; -1 at end */
935 private int cursor;
936
937 /** Element to be returned by next call to next(); null if none */
938 private E nextItem;
939
940 /** Index of nextItem; -1 if none, -2 if removed elsewhere */
941 private int nextIndex;
942
943 /** Last element returned; null if none or not detached. */
944 private E lastItem;
945
946 /** Index of lastItem, -1 if none, -2 if removed elsewhere */
947 private int lastRet;
948
949 /** Previous value of takeIndex; -1 if detached */
950 private int prevTakeIndex;
951
952 /** Previous value of iters.cycles */
953 private int prevCycles;
954
955 Itr() {
956 assert lock.getHoldCount() == 0;
957 lastRet = -1;
958 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
959 lock.lock();
960 try {
961 if (count == 0) {
962 assert itrs == null;
963 cursor = -1;
964 nextIndex = -1;
965 prevTakeIndex = -1;
966 } else {
967 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
968 prevTakeIndex = takeIndex;
969 nextItem = itemAt(nextIndex = takeIndex);
970 cursor = incCursor(takeIndex);
971 if (itrs == null) {
972 itrs = new Itrs();
973 itrs.register(this);
974 } else {
975 itrs.register(this); // in this order
976 itrs.doSomeSweeping(false);
977 }
978 prevCycles = itrs.cycles;
979 assert takeIndex >= 0;
980 assert prevTakeIndex == takeIndex;
981 assert nextIndex >= 0;
982 assert nextItem != null;
983 }
984 } finally {
985 lock.unlock();
986 }
987 }
988
989 boolean isDetached() {
990 assert lock.getHoldCount() == 1;
991 return prevTakeIndex < 0;
992 }
993
994 private int incCursor(int index) {
995 assert lock.getHoldCount() == 1;
996 index = inc(index);
997 if (index == putIndex)
998 index = -1;
999 return index;
1000 }
1001
1002 /**
1003 * Returns true if index is invalidated by the given number of
1004 * dequeues, starting from prevTakeIndex.
1005 */
1006 private boolean invalidated(int index, int prevTakeIndex,
1007 long dequeues, int length) {
1008 if (index < 0)
1009 return false;
1010 int distance = index - prevTakeIndex;
1011 if (distance < 0)
1012 distance += length;
1013 return dequeues > distance;
1014 }
1015
1016 /**
1017 * Adjusts indices to incorporate all dequeues since the last
1018 * operation on this iterator. Call only from iterating thread.
1019 */
1020 private void incorporateDequeues() {
1021 assert lock.getHoldCount() == 1;
1022 assert itrs != null;
1023 assert !isDetached();
1024 assert count > 0;
1025
1026 final int cycles = itrs.cycles;
1027 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1028 final int prevCycles = this.prevCycles;
1029 final int prevTakeIndex = this.prevTakeIndex;
1030
1031 if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1032 final int len = items.length;
1033 // how far takeIndex has advanced since the previous
1034 // operation of this iterator
1035 long dequeues = (cycles - prevCycles) * len
1036 + (takeIndex - prevTakeIndex);
1037
1038 // Check indices for invalidation
1039 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1040 lastRet = -2;
1041 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1042 nextIndex = -2;
1043 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1044 cursor = takeIndex;
1045
1046 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1047 detach();
1048 else {
1049 this.prevCycles = cycles;
1050 this.prevTakeIndex = takeIndex;
1051 }
1052 }
1053 }
1054
1055 /**
1056 * Called when itrs should stop tracking this iterator, either
1057 * because there are no more indices to update (cursor < 0 &&
1058 * nextIndex < 0 && lastRet < 0) or as a special exception, when
1059 * lastRet >= 0, because hasNext() is about to return false for the
1060 * first time. Call only from iterating thread.
1061 */
1062 private void detach() {
1063 // Switch to detached mode
1064 assert lock.getHoldCount() == 1;
1065 assert cursor == -1;
1066 assert nextIndex < 0;
1067 assert lastRet < 0 || nextItem == null;
1068 assert lastRet < 0 ^ lastItem != null;
1069 if (prevTakeIndex >= 0) {
1070 assert itrs != null;
1071 prevTakeIndex = -1;
1072 // try to unlink from itrs (but not too hard)
1073 itrs.doSomeSweeping(true);
1074 }
1075 }
1076
1077 /**
1078 * For performance reasons, we would like not to acquire a lock in
1079 * hasNext in the common case. To allow for this, we only access
1080 * fields (i.e. nextItem) that are not modified by update operations
1081 * triggered by queue modifications.
1082 */
1083 public boolean hasNext() {
1084 assert lock.getHoldCount() == 0;
1085 if (nextItem != null)
1086 return true;
1087 noNext();
1088 return false;
1089 }
1090
1091 private void noNext() {
1092 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1093 lock.lock();
1094 try {
1095 assert cursor == -1;
1096 assert nextIndex == -1;
1097 if (!isDetached()) {
1098 assert itrs != null;
1099 assert lastRet >= 0;
1100 incorporateDequeues(); // might update lastRet
1101 if (lastRet >= 0) {
1102 lastItem = itemAt(lastRet);
1103 assert lastItem != null;
1104 detach();
1105 }
1106 }
1107 assert isDetached();
1108 assert lastRet < 0 ^ lastItem != null;
1109 } finally {
1110 lock.unlock();
1111 }
1112 }
1113
1114 public E next() {
1115 assert lock.getHoldCount() == 0;
1116 final E x = nextItem;
1117 if (x == null)
1118 throw new NoSuchElementException();
1119 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1120 lock.lock();
1121 try {
1122 if (!isDetached()) {
1123 assert itrs != null;
1124 incorporateDequeues();
1125 }
1126 assert nextIndex != -1;
1127 assert lastItem == null;
1128 lastRet = nextIndex;
1129 final int cursor = this.cursor;
1130 if (cursor >= 0) {
1131 nextItem = itemAt(nextIndex = cursor);
1132 assert nextItem != null;
1133 this.cursor = incCursor(cursor);
1134 } else {
1135 nextIndex = -1;
1136 nextItem = null;
1137 }
1138 } finally {
1139 lock.unlock();
1140 }
1141 return x;
1142 }
1143
1144 public void remove() {
1145 assert lock.getHoldCount() == 0;
1146 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1147 lock.lock();
1148 try {
1149 if (lastRet >= 0) {
1150 if (isDetached()) {
1151 assert lastItem != null;
1152 if (itemAt(lastRet) == lastItem)
1153 removeAt(lastRet);
1154 lastItem = null;
1155 } else {
1156 assert lastItem == null;
1157 incorporateDequeues(); // might update lastRet
1158 if (lastRet >= 0)
1159 removeAt(lastRet);
1160 }
1161 } else if (lastRet == -1)
1162 throw new IllegalStateException();
1163 // else lastRet == -2 and the last returned element was
1164 // previously asynchronously removed via an operation other
1165 // than this.remove(), so nothing to do.
1166
1167 lastRet = -1;
1168 if (cursor < 0 && nextIndex < 0)
1169 detach();
1170 } finally {
1171 lock.unlock();
1172 assert lastRet == -1;
1173 assert lastItem == null;
1174 }
1175 }
1176
1177 /**
1178 * Called to notify the iterator that the queue is empty, or that it
1179 * has fallen hopelessly behind, so that it should abandon any
1180 * further iteration, except possibly to return one more element
1181 * from next(), as promised by returning true from hasNext().
1182 */
1183 void shutdown() {
1184 assert lock.getHoldCount() == 1;
1185 cursor = -1;
1186 if (nextIndex >= 0)
1187 nextIndex = -2;
1188 if (lastRet >= 0) {
1189 lastRet = -2;
1190 lastItem = null;
1191 }
1192 prevTakeIndex = -1;
1193 // Don't set nextItem to null because we must continue to be
1194 // able to return it on next().
1195 //
1196 // Caller will unlink from itrs when convenient.
1197 }
1198
1199 private int distance(int index, int prevTakeIndex, int length) {
1200 int distance = index - prevTakeIndex;
1201 if (distance < 0)
1202 distance += length;
1203 return distance;
1204 }
1205
1206 /**
1207 * Called whenever an interior remove (not at takeIndex) occured.
1208 *
1209 * @return true if this iterator should be unlinked from itrs
1210 */
1211 boolean removedAt(int removedIndex) {
1212 assert lock.getHoldCount() == 1;
1213 if (isDetached())
1214 return true;
1215
1216 final int cycles = itrs.cycles;
1217 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1218 final int prevCycles = this.prevCycles;
1219 final int prevTakeIndex = this.prevTakeIndex;
1220 final int len = items.length;
1221 int cycleDiff = cycles - prevCycles;
1222 if (removedIndex < takeIndex)
1223 cycleDiff++;
1224 final int removedDistance =
1225 (cycleDiff * len) + (removedIndex - prevTakeIndex);
1226 assert removedDistance >= 0;
1227 int cursor = this.cursor;
1228 if (cursor >= 0) {
1229 int x = distance(cursor, prevTakeIndex, len);
1230 if (x == removedDistance) {
1231 if (cursor == putIndex)
1232 this.cursor = cursor = -1;
1233 }
1234 else if (x > removedDistance) {
1235 assert cursor != prevTakeIndex;
1236 this.cursor = cursor = dec(cursor);
1237 }
1238 }
1239 int lastRet = this.lastRet;
1240 if (lastRet >= 0) {
1241 int x = distance(lastRet, prevTakeIndex, len);
1242 if (x == removedDistance)
1243 this.lastRet = lastRet = -2;
1244 else if (x > removedDistance)
1245 this.lastRet = lastRet = dec(lastRet);
1246 }
1247 int nextIndex = this.nextIndex;
1248 if (nextIndex >= 0) {
1249 int x = distance(nextIndex, prevTakeIndex, len);
1250 if (x == removedDistance)
1251 this.nextIndex = nextIndex = -2;
1252 else if (x > removedDistance)
1253 this.nextIndex = nextIndex = dec(nextIndex);
1254 }
1255 else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1256 this.prevTakeIndex = -1;
1257 return true;
1258 }
1259 return false;
1260 }
1261
1262 /**
1263 * Called whenever takeIndex wraps around to zero.
1264 *
1265 * @return true if this iterator should be unlinked from itrs
1266 */
1267 boolean takeIndexWrapped() {
1268 assert lock.getHoldCount() == 1;
1269 if (isDetached())
1270 return true;
1271 if (itrs.cycles - prevCycles > 1) {
1272 // All the elements that existed at the time of the last
1273 // operation are gone, so abandon further iteration.
1274 shutdown();
1275 return true;
1276 }
1277 return false;
1278 }
1279
1280 // /** Uncomment for debugging. */
1281 // public String toString() {
1282 // return ("cursor=" + cursor + " " +
1283 // "nextIndex=" + nextIndex + " " +
1284 // "lastRet=" + lastRet + " " +
1285 // "nextItem=" + nextItem + " " +
1286 // "lastItem=" + lastItem + " " +
1287 // "prevCycles=" + prevCycles + " " +
1288 // "prevTakeIndex=" + prevTakeIndex + " " +
1289 // "size()=" + size() + " " +
1290 // "remainingCapacity()=" + remainingCapacity());
1291 // }
1292 }
1293 }