ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.98
Committed: Sun Feb 17 06:11:39 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.97: +14 -4 lines
Log Message:
optimize toArray

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.Condition;
9 import java.util.concurrent.locks.ReentrantLock;
10 import java.util.AbstractQueue;
11 import java.util.Collection;
12 import java.util.Iterator;
13 import java.util.NoSuchElementException;
14 import java.lang.ref.WeakReference;
15
16 /**
17 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
18 * array. This queue orders elements FIFO (first-in-first-out). The
19 * <em>head</em> of the queue is that element that has been on the
20 * queue the longest time. The <em>tail</em> of the queue is that
21 * element that has been on the queue the shortest time. New elements
22 * are inserted at the tail of the queue, and the queue retrieval
23 * operations obtain elements at the head of the queue.
24 *
25 * <p>This is a classic &quot;bounded buffer&quot;, in which a
26 * fixed-sized array holds elements inserted by producers and
27 * extracted by consumers. Once created, the capacity cannot be
28 * changed. Attempts to {@code put} an element into a full queue
29 * will result in the operation blocking; attempts to {@code take} an
30 * element from an empty queue will similarly block.
31 *
32 * <p>This class supports an optional fairness policy for ordering
33 * waiting producer and consumer threads. By default, this ordering
34 * is not guaranteed. However, a queue constructed with fairness set
35 * to {@code true} grants threads access in FIFO order. Fairness
36 * generally decreases throughput but reduces variability and avoids
37 * starvation.
38 *
39 * <p>This class and its iterator implement all of the
40 * <em>optional</em> methods of the {@link Collection} and {@link
41 * Iterator} interfaces.
42 *
43 * <p>This class is a member of the
44 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
45 * Java Collections Framework</a>.
46 *
47 * @since 1.5
48 * @author Doug Lea
49 * @param <E> the type of elements held in this collection
50 */
51 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
52 implements BlockingQueue<E>, java.io.Serializable {
53
54 /**
55 * Serialization ID. This class relies on default serialization
56 * even for the items array, which is default-serialized, even if
57 * it is empty. Otherwise it could not be declared final, which is
58 * necessary here.
59 */
60 private static final long serialVersionUID = -817911632652898426L;
61
62 /** The queued items */
63 final Object[] items;
64
65 /** items index for next take, poll, peek or remove */
66 int takeIndex;
67
68 /** items index for next put, offer, or add */
69 int putIndex;
70
71 /** Number of elements in the queue */
72 int count;
73
74 /*
75 * Concurrency control uses the classic two-condition algorithm
76 * found in any textbook.
77 */
78
79 /** Main lock guarding all access */
80 final ReentrantLock lock;
81
82 /** Condition for waiting takes */
83 private final Condition notEmpty;
84
85 /** Condition for waiting puts */
86 private final Condition notFull;
87
88 /**
89 * Shared state for currently active iterators, or null if there
90 * are known not to be any. Allows queue operations to update
91 * iterator state.
92 */
93 transient Itrs itrs = null;
94
95 // Internal helper methods
96
97 /**
98 * Circularly increment i.
99 */
100 final int inc(int i) {
101 return (++i == items.length) ? 0 : i;
102 }
103
104 /**
105 * Circularly decrement i.
106 */
107 final int dec(int i) {
108 return ((i == 0) ? items.length : i) - 1;
109 }
110
111 /**
112 * Returns item at index i.
113 */
114 @SuppressWarnings("unchecked")
115 final E itemAt(int i) {
116 return (E) items[i];
117 }
118
119 /**
120 * Throws NullPointerException if argument is null.
121 *
122 * @param v the element
123 */
124 private static void checkNotNull(Object v) {
125 if (v == null)
126 throw new NullPointerException();
127 }
128
129 /**
130 * Inserts element at current put position, advances, and signals.
131 * Call only when holding lock.
132 */
133 private void enqueue(E x) {
134 // assert lock.getHoldCount() == 1;
135 // assert items[putIndex] == null;
136 items[putIndex] = x;
137 putIndex = inc(putIndex);
138 count++;
139 notEmpty.signal();
140 }
141
142 /**
143 * Extracts element at current take position, advances, and signals.
144 * Call only when holding lock.
145 */
146 private E dequeue() {
147 // assert lock.getHoldCount() == 1;
148 // assert items[takeIndex] != null;
149 final Object[] items = this.items;
150 @SuppressWarnings("unchecked")
151 E x = (E) items[takeIndex];
152 items[takeIndex] = null;
153 takeIndex = inc(takeIndex);
154 count--;
155 if (itrs != null)
156 itrs.elementDequeued();
157 notFull.signal();
158 return x;
159 }
160
161 /**
162 * Deletes item at array index removeIndex.
163 * Utility for remove(Object) and iterator.remove.
164 * Call only when holding lock.
165 */
166 void removeAt(final int removeIndex) {
167 // assert lock.getHoldCount() == 1;
168 // assert items[removeIndex] != null;
169 // assert removeIndex >= 0 && removeIndex < items.length;
170 final Object[] items = this.items;
171 if (removeIndex == takeIndex) {
172 // removing front item; just advance
173 items[takeIndex] = null;
174 takeIndex = inc(takeIndex);
175 count--;
176 if (itrs != null)
177 itrs.elementDequeued();
178 } else {
179 // an "interior" remove
180
181 // slide over all others up through putIndex.
182 final int putIndex = this.putIndex;
183 for (int i = removeIndex;;) {
184 int next = inc(i);
185 if (next != putIndex) {
186 items[i] = items[next];
187 i = next;
188 } else {
189 items[i] = null;
190 this.putIndex = i;
191 break;
192 }
193 }
194 count--;
195 if (itrs != null)
196 itrs.removedAt(removeIndex);
197 }
198 notFull.signal();
199 }
200
201 /**
202 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
203 * capacity and default access policy.
204 *
205 * @param capacity the capacity of this queue
206 * @throws IllegalArgumentException if {@code capacity < 1}
207 */
208 public ArrayBlockingQueue(int capacity) {
209 this(capacity, false);
210 }
211
212 /**
213 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
214 * capacity and the specified access policy.
215 *
216 * @param capacity the capacity of this queue
217 * @param fair if {@code true} then queue accesses for threads blocked
218 * on insertion or removal, are processed in FIFO order;
219 * if {@code false} the access order is unspecified.
220 * @throws IllegalArgumentException if {@code capacity < 1}
221 */
222 public ArrayBlockingQueue(int capacity, boolean fair) {
223 if (capacity <= 0)
224 throw new IllegalArgumentException();
225 this.items = new Object[capacity];
226 lock = new ReentrantLock(fair);
227 notEmpty = lock.newCondition();
228 notFull = lock.newCondition();
229 }
230
231 /**
232 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
233 * capacity, the specified access policy and initially containing the
234 * elements of the given collection,
235 * added in traversal order of the collection's iterator.
236 *
237 * @param capacity the capacity of this queue
238 * @param fair if {@code true} then queue accesses for threads blocked
239 * on insertion or removal, are processed in FIFO order;
240 * if {@code false} the access order is unspecified.
241 * @param c the collection of elements to initially contain
242 * @throws IllegalArgumentException if {@code capacity} is less than
243 * {@code c.size()}, or less than 1.
244 * @throws NullPointerException if the specified collection or any
245 * of its elements are null
246 */
247 public ArrayBlockingQueue(int capacity, boolean fair,
248 Collection<? extends E> c) {
249 this(capacity, fair);
250
251 final ReentrantLock lock = this.lock;
252 lock.lock(); // Lock only for visibility, not mutual exclusion
253 try {
254 int i = 0;
255 try {
256 for (E e : c) {
257 checkNotNull(e);
258 items[i++] = e;
259 }
260 } catch (ArrayIndexOutOfBoundsException ex) {
261 throw new IllegalArgumentException();
262 }
263 count = i;
264 putIndex = (i == capacity) ? 0 : i;
265 } finally {
266 lock.unlock();
267 }
268 }
269
270 /**
271 * Inserts the specified element at the tail of this queue if it is
272 * possible to do so immediately without exceeding the queue's capacity,
273 * returning {@code true} upon success and throwing an
274 * {@code IllegalStateException} if this queue is full.
275 *
276 * @param e the element to add
277 * @return {@code true} (as specified by {@link Collection#add})
278 * @throws IllegalStateException if this queue is full
279 * @throws NullPointerException if the specified element is null
280 */
281 public boolean add(E e) {
282 return super.add(e);
283 }
284
285 /**
286 * Inserts the specified element at the tail of this queue if it is
287 * possible to do so immediately without exceeding the queue's capacity,
288 * returning {@code true} upon success and {@code false} if this queue
289 * is full. This method is generally preferable to method {@link #add},
290 * which can fail to insert an element only by throwing an exception.
291 *
292 * @throws NullPointerException if the specified element is null
293 */
294 public boolean offer(E e) {
295 checkNotNull(e);
296 final ReentrantLock lock = this.lock;
297 lock.lock();
298 try {
299 if (count == items.length)
300 return false;
301 else {
302 enqueue(e);
303 return true;
304 }
305 } finally {
306 lock.unlock();
307 }
308 }
309
310 /**
311 * Inserts the specified element at the tail of this queue, waiting
312 * for space to become available if the queue is full.
313 *
314 * @throws InterruptedException {@inheritDoc}
315 * @throws NullPointerException {@inheritDoc}
316 */
317 public void put(E e) throws InterruptedException {
318 checkNotNull(e);
319 final ReentrantLock lock = this.lock;
320 lock.lockInterruptibly();
321 try {
322 while (count == items.length)
323 notFull.await();
324 enqueue(e);
325 } finally {
326 lock.unlock();
327 }
328 }
329
330 /**
331 * Inserts the specified element at the tail of this queue, waiting
332 * up to the specified wait time for space to become available if
333 * the queue is full.
334 *
335 * @throws InterruptedException {@inheritDoc}
336 * @throws NullPointerException {@inheritDoc}
337 */
338 public boolean offer(E e, long timeout, TimeUnit unit)
339 throws InterruptedException {
340
341 checkNotNull(e);
342 long nanos = unit.toNanos(timeout);
343 final ReentrantLock lock = this.lock;
344 lock.lockInterruptibly();
345 try {
346 while (count == items.length) {
347 if (nanos <= 0)
348 return false;
349 nanos = notFull.awaitNanos(nanos);
350 }
351 enqueue(e);
352 return true;
353 } finally {
354 lock.unlock();
355 }
356 }
357
358 public E poll() {
359 final ReentrantLock lock = this.lock;
360 lock.lock();
361 try {
362 return (count == 0) ? null : dequeue();
363 } finally {
364 lock.unlock();
365 }
366 }
367
368 public E take() throws InterruptedException {
369 final ReentrantLock lock = this.lock;
370 lock.lockInterruptibly();
371 try {
372 while (count == 0)
373 notEmpty.await();
374 return dequeue();
375 } finally {
376 lock.unlock();
377 }
378 }
379
380 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
381 long nanos = unit.toNanos(timeout);
382 final ReentrantLock lock = this.lock;
383 lock.lockInterruptibly();
384 try {
385 while (count == 0) {
386 if (nanos <= 0)
387 return null;
388 nanos = notEmpty.awaitNanos(nanos);
389 }
390 return dequeue();
391 } finally {
392 lock.unlock();
393 }
394 }
395
396 public E peek() {
397 final ReentrantLock lock = this.lock;
398 lock.lock();
399 try {
400 return (count == 0) ? null : itemAt(takeIndex);
401 } finally {
402 lock.unlock();
403 }
404 }
405
406 // this doc comment is overridden to remove the reference to collections
407 // greater in size than Integer.MAX_VALUE
408 /**
409 * Returns the number of elements in this queue.
410 *
411 * @return the number of elements in this queue
412 */
413 public int size() {
414 final ReentrantLock lock = this.lock;
415 lock.lock();
416 try {
417 return count;
418 } finally {
419 lock.unlock();
420 }
421 }
422
423 // this doc comment is a modified copy of the inherited doc comment,
424 // without the reference to unlimited queues.
425 /**
426 * Returns the number of additional elements that this queue can ideally
427 * (in the absence of memory or resource constraints) accept without
428 * blocking. This is always equal to the initial capacity of this queue
429 * less the current {@code size} of this queue.
430 *
431 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
432 * an element will succeed by inspecting {@code remainingCapacity}
433 * because it may be the case that another thread is about to
434 * insert or remove an element.
435 */
436 public int remainingCapacity() {
437 final ReentrantLock lock = this.lock;
438 lock.lock();
439 try {
440 return items.length - count;
441 } finally {
442 lock.unlock();
443 }
444 }
445
446 /**
447 * Removes a single instance of the specified element from this queue,
448 * if it is present. More formally, removes an element {@code e} such
449 * that {@code o.equals(e)}, if this queue contains one or more such
450 * elements.
451 * Returns {@code true} if this queue contained the specified element
452 * (or equivalently, if this queue changed as a result of the call).
453 *
454 * <p>Removal of interior elements in circular array based queues
455 * is an intrinsically slow and disruptive operation, so should
456 * be undertaken only in exceptional circumstances, ideally
457 * only when the queue is known not to be accessible by other
458 * threads.
459 *
460 * @param o element to be removed from this queue, if present
461 * @return {@code true} if this queue changed as a result of the call
462 */
463 public boolean remove(Object o) {
464 if (o == null) return false;
465 final Object[] items = this.items;
466 final ReentrantLock lock = this.lock;
467 lock.lock();
468 try {
469 if (count > 0) {
470 final int putIndex = this.putIndex;
471 int i = takeIndex;
472 do {
473 if (o.equals(items[i])) {
474 removeAt(i);
475 return true;
476 }
477 } while ((i = inc(i)) != putIndex);
478 }
479 return false;
480 } finally {
481 lock.unlock();
482 }
483 }
484
485 /**
486 * Returns {@code true} if this queue contains the specified element.
487 * More formally, returns {@code true} if and only if this queue contains
488 * at least one element {@code e} such that {@code o.equals(e)}.
489 *
490 * @param o object to be checked for containment in this queue
491 * @return {@code true} if this queue contains the specified element
492 */
493 public boolean contains(Object o) {
494 if (o == null) return false;
495 final Object[] items = this.items;
496 final ReentrantLock lock = this.lock;
497 lock.lock();
498 try {
499 if (count > 0) {
500 final int putIndex = this.putIndex;
501 int i = takeIndex;
502 do {
503 if (o.equals(items[i]))
504 return true;
505 } while ((i = inc(i)) != putIndex);
506 }
507 return false;
508 } finally {
509 lock.unlock();
510 }
511 }
512
513 /**
514 * Returns an array containing all of the elements in this queue, in
515 * proper sequence.
516 *
517 * <p>The returned array will be "safe" in that no references to it are
518 * maintained by this queue. (In other words, this method must allocate
519 * a new array). The caller is thus free to modify the returned array.
520 *
521 * <p>This method acts as bridge between array-based and collection-based
522 * APIs.
523 *
524 * @return an array containing all of the elements in this queue
525 */
526 public Object[] toArray() {
527 final Object[] items = this.items;
528 final ReentrantLock lock = this.lock;
529 lock.lock();
530 try {
531 final int count = this.count;
532 Object[] a = new Object[count];
533 int n = items.length - takeIndex;
534 if (count <= n)
535 System.arraycopy(items, takeIndex, a, 0, count);
536 else {
537 System.arraycopy(items, takeIndex, a, 0, n);
538 System.arraycopy(items, 0, a, n, count - n);
539 }
540 return a;
541 } finally {
542 lock.unlock();
543 }
544 }
545
546 /**
547 * Returns an array containing all of the elements in this queue, in
548 * proper sequence; the runtime type of the returned array is that of
549 * the specified array. If the queue fits in the specified array, it
550 * is returned therein. Otherwise, a new array is allocated with the
551 * runtime type of the specified array and the size of this queue.
552 *
553 * <p>If this queue fits in the specified array with room to spare
554 * (i.e., the array has more elements than this queue), the element in
555 * the array immediately following the end of the queue is set to
556 * {@code null}.
557 *
558 * <p>Like the {@link #toArray()} method, this method acts as bridge between
559 * array-based and collection-based APIs. Further, this method allows
560 * precise control over the runtime type of the output array, and may,
561 * under certain circumstances, be used to save allocation costs.
562 *
563 * <p>Suppose {@code x} is a queue known to contain only strings.
564 * The following code can be used to dump the queue into a newly
565 * allocated array of {@code String}:
566 *
567 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
568 *
569 * Note that {@code toArray(new Object[0])} is identical in function to
570 * {@code toArray()}.
571 *
572 * @param a the array into which the elements of the queue are to
573 * be stored, if it is big enough; otherwise, a new array of the
574 * same runtime type is allocated for this purpose
575 * @return an array containing all of the elements in this queue
576 * @throws ArrayStoreException if the runtime type of the specified array
577 * is not a supertype of the runtime type of every element in
578 * this queue
579 * @throws NullPointerException if the specified array is null
580 */
581 @SuppressWarnings("unchecked")
582 public <T> T[] toArray(T[] a) {
583 final Object[] items = this.items;
584 final ReentrantLock lock = this.lock;
585 lock.lock();
586 try {
587 final int count = this.count;
588 final int len = a.length;
589 if (len < count)
590 a = (T[])java.lang.reflect.Array.newInstance(
591 a.getClass().getComponentType(), count);
592 int n = items.length - takeIndex;
593 if (count <= n)
594 System.arraycopy(items, takeIndex, a, 0, count);
595 else {
596 System.arraycopy(items, takeIndex, a, 0, n);
597 System.arraycopy(items, 0, a, n, count - n);
598 }
599 if (len > count)
600 a[count] = null;
601 return a;
602 } finally {
603 lock.unlock();
604 }
605 }
606
607 public String toString() {
608 final ReentrantLock lock = this.lock;
609 lock.lock();
610 try {
611 int k = count;
612 if (k == 0)
613 return "[]";
614
615 StringBuilder sb = new StringBuilder();
616 sb.append('[');
617 for (int i = takeIndex; ; i = inc(i)) {
618 Object e = items[i];
619 sb.append(e == this ? "(this Collection)" : e);
620 if (--k == 0)
621 return sb.append(']').toString();
622 sb.append(',').append(' ');
623 }
624 } finally {
625 lock.unlock();
626 }
627 }
628
629 /**
630 * Atomically removes all of the elements from this queue.
631 * The queue will be empty after this call returns.
632 */
633 public void clear() {
634 final Object[] items = this.items;
635 final ReentrantLock lock = this.lock;
636 lock.lock();
637 try {
638 int k = count;
639 if (k > 0) {
640 final int putIndex = this.putIndex;
641 int i = takeIndex;
642 do {
643 items[i] = null;
644 } while ((i = inc(i)) != putIndex);
645 takeIndex = putIndex;
646 count = 0;
647 if (itrs != null)
648 itrs.queueIsEmpty();
649 for (; k > 0 && lock.hasWaiters(notFull); k--)
650 notFull.signal();
651 }
652 } finally {
653 lock.unlock();
654 }
655 }
656
657 /**
658 * @throws UnsupportedOperationException {@inheritDoc}
659 * @throws ClassCastException {@inheritDoc}
660 * @throws NullPointerException {@inheritDoc}
661 * @throws IllegalArgumentException {@inheritDoc}
662 */
663 public int drainTo(Collection<? super E> c) {
664 return drainTo(c, Integer.MAX_VALUE);
665 }
666
667 /**
668 * @throws UnsupportedOperationException {@inheritDoc}
669 * @throws ClassCastException {@inheritDoc}
670 * @throws NullPointerException {@inheritDoc}
671 * @throws IllegalArgumentException {@inheritDoc}
672 */
673 public int drainTo(Collection<? super E> c, int maxElements) {
674 checkNotNull(c);
675 if (c == this)
676 throw new IllegalArgumentException();
677 if (maxElements <= 0)
678 return 0;
679 final Object[] items = this.items;
680 final ReentrantLock lock = this.lock;
681 lock.lock();
682 try {
683 int n = Math.min(maxElements, count);
684 int take = takeIndex;
685 int i = 0;
686 try {
687 while (i < n) {
688 @SuppressWarnings("unchecked")
689 E x = (E) items[take];
690 c.add(x);
691 items[take] = null;
692 take = inc(take);
693 i++;
694 }
695 return n;
696 } finally {
697 // Restore invariants even if c.add() threw
698 if (i > 0) {
699 count -= i;
700 takeIndex = take;
701 if (itrs != null) {
702 if (count == 0)
703 itrs.queueIsEmpty();
704 else if (i > take)
705 itrs.takeIndexWrapped();
706 }
707 for (; i > 0 && lock.hasWaiters(notFull); i--)
708 notFull.signal();
709 }
710 }
711 } finally {
712 lock.unlock();
713 }
714 }
715
716 /**
717 * Returns an iterator over the elements in this queue in proper sequence.
718 * The elements will be returned in order from first (head) to last (tail).
719 *
720 * <p>The returned iterator is a "weakly consistent" iterator that
721 * will never throw {@link java.util.ConcurrentModificationException
722 * ConcurrentModificationException}, and guarantees to traverse
723 * elements as they existed upon construction of the iterator, and
724 * may (but is not guaranteed to) reflect any modifications
725 * subsequent to construction.
726 *
727 * @return an iterator over the elements in this queue in proper sequence
728 */
729 public Iterator<E> iterator() {
730 return new Itr();
731 }
732
733 /**
734 * Shared data between iterators and their queue, allowing queue
735 * modifications to update iterators when elements are removed.
736 *
737 * This adds a lot of complexity for the sake of correctly
738 * handling some uncommon operations, but the combination of
739 * circular-arrays and supporting interior removes (i.e., those
740 * not at head) would cause iterators to sometimes lose their
741 * places and/or (re)report elements they shouldn't. To avoid
742 * this, when a queue has one or more iterators, it keeps iterator
743 * state consistent by:
744 *
745 * (1) keeping track of the number of "cycles", that is, the
746 * number of times takeIndex has wrapped around to 0.
747 * (2) notifying all iterators via the callback removedAt whenever
748 * an interior element is removed (and thus other elements may
749 * be shifted).
750 *
751 * These suffice to eliminate iterator inconsistencies, but
752 * unfortunately add the secondary responsibility of maintaining
753 * the list of iterators. We track all active iterators in a
754 * simple linked list (accessed only when the queue's lock is
755 * held) of weak references to Itr. The list is cleaned up using
756 * 3 different mechanisms:
757 *
758 * (1) Whenever a new iterator is created, do some O(1) checking for
759 * stale list elements.
760 *
761 * (2) Whenever takeIndex wraps around to 0, check for iterators
762 * that have been unused for more than one wrap-around cycle.
763 *
764 * (3) Whenever the queue becomes empty, all iterators are notified
765 * and this entire data structure is discarded.
766 *
767 * So in addition to the removedAt callback that is necessary for
768 * correctness, iterators have the shutdown and takeIndexWrapped
769 * callbacks that help remove stale iterators from the list.
770 *
771 * Whenever a list element is examined, it is expunged if either
772 * the GC has determined that the iterator is discarded, or if the
773 * iterator reports that it is "detached" (does not need any
774 * further state updates). Overhead is maximal when takeIndex
775 * never advances, iterators are discarded before they are
776 * exhausted, and all removals are interior removes, in which case
777 * all stale iterators are discovered by the GC. But even in this
778 * case we don't increase the amortized complexity.
779 *
780 * Care must be taken to keep list sweeping methods from
781 * reentrantly invoking another such method, causing subtle
782 * corruption bugs.
783 */
784 class Itrs {
785
786 /**
787 * Node in a linked list of weak iterator references.
788 */
789 private class Node extends WeakReference<Itr> {
790 Node next;
791
792 Node(Itr iterator, Node next) {
793 super(iterator);
794 this.next = next;
795 }
796 }
797
798 /** Incremented whenever takeIndex wraps around to 0 */
799 int cycles = 0;
800
801 /** Linked list of weak iterator references */
802 private Node head;
803
804 /** Used to expunge stale iterators */
805 private Node sweeper = null;
806
807 private static final int SHORT_SWEEP_PROBES = 4;
808 private static final int LONG_SWEEP_PROBES = 16;
809
810 Itrs(Itr initial) {
811 register(initial);
812 }
813
814 /**
815 * Sweeps itrs, looking for and expunging stale iterators.
816 * If at least one was found, tries harder to find more.
817 * Called only from iterating thread.
818 *
819 * @param tryHarder whether to start in try-harder mode, because
820 * there is known to be at least one iterator to collect
821 */
822 void doSomeSweeping(boolean tryHarder) {
823 // assert lock.getHoldCount() == 1;
824 // assert head != null;
825 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
826 Node o, p;
827 final Node sweeper = this.sweeper;
828 boolean passedGo; // to limit search to one full sweep
829
830 if (sweeper == null) {
831 o = null;
832 p = head;
833 passedGo = true;
834 } else {
835 o = sweeper;
836 p = o.next;
837 passedGo = false;
838 }
839
840 for (; probes > 0; probes--) {
841 if (p == null) {
842 if (passedGo)
843 break;
844 o = null;
845 p = head;
846 passedGo = true;
847 }
848 final Itr it = p.get();
849 final Node next = p.next;
850 if (it == null || it.isDetached()) {
851 // found a discarded/exhausted iterator
852 probes = LONG_SWEEP_PROBES; // "try harder"
853 // unlink p
854 p.clear();
855 p.next = null;
856 if (o == null) {
857 head = next;
858 if (next == null) {
859 // We've run out of iterators to track; retire
860 itrs = null;
861 return;
862 }
863 }
864 else
865 o.next = next;
866 } else {
867 o = p;
868 }
869 p = next;
870 }
871
872 this.sweeper = (p == null) ? null : o;
873 }
874
875 /**
876 * Adds a new iterator to the linked list of tracked iterators.
877 */
878 void register(Itr itr) {
879 // assert lock.getHoldCount() == 1;
880 head = new Node(itr, head);
881 }
882
883 /**
884 * Called whenever takeIndex wraps around to 0.
885 *
886 * Notifies all iterators, and expunges any that are now stale.
887 */
888 void takeIndexWrapped() {
889 // assert lock.getHoldCount() == 1;
890 cycles++;
891 for (Node o = null, p = head; p != null;) {
892 final Itr it = p.get();
893 final Node next = p.next;
894 if (it == null || it.takeIndexWrapped()) {
895 // unlink p
896 // assert it == null || it.isDetached();
897 p.clear();
898 p.next = null;
899 if (o == null)
900 head = next;
901 else
902 o.next = next;
903 } else {
904 o = p;
905 }
906 p = next;
907 }
908 if (head == null) // no more iterators to track
909 itrs = null;
910 }
911
912 /**
913 * Called whenever an interior remove (not at takeIndex) occured.
914 *
915 * Notifies all iterators, and expunges any that are now stale.
916 */
917 void removedAt(int removedIndex) {
918 for (Node o = null, p = head; p != null;) {
919 final Itr it = p.get();
920 final Node next = p.next;
921 if (it == null || it.removedAt(removedIndex)) {
922 // unlink p
923 // assert it == null || it.isDetached();
924 p.clear();
925 p.next = null;
926 if (o == null)
927 head = next;
928 else
929 o.next = next;
930 } else {
931 o = p;
932 }
933 p = next;
934 }
935 if (head == null) // no more iterators to track
936 itrs = null;
937 }
938
939 /**
940 * Called whenever the queue becomes empty.
941 *
942 * Notifies all active iterators that the queue is empty,
943 * clears all weak refs, and unlinks the itrs datastructure.
944 */
945 void queueIsEmpty() {
946 // assert lock.getHoldCount() == 1;
947 for (Node p = head; p != null; p = p.next) {
948 Itr it = p.get();
949 if (it != null) {
950 p.clear();
951 it.shutdown();
952 }
953 }
954 head = null;
955 itrs = null;
956 }
957
958 /**
959 * Called whenever an element has been dequeued (at takeIndex).
960 */
961 void elementDequeued() {
962 // assert lock.getHoldCount() == 1;
963 if (count == 0)
964 queueIsEmpty();
965 else if (takeIndex == 0)
966 takeIndexWrapped();
967 }
968 }
969
970 /**
971 * Iterator for ArrayBlockingQueue.
972 *
973 * To maintain weak consistency with respect to puts and takes, we
974 * read ahead one slot, so as to not report hasNext true but then
975 * not have an element to return.
976 *
977 * We switch into "detached" mode (allowing prompt unlinking from
978 * itrs without help from the GC) when all indices are negative, or
979 * when hasNext returns false for the first time. This allows the
980 * iterator to track concurrent updates completely accurately,
981 * except for the corner case of the user calling Iterator.remove()
982 * after hasNext() returned false. Even in this case, we ensure
983 * that we don't remove the wrong element by keeping track of the
984 * expected element to remove, in lastItem. Yes, we may fail to
985 * remove lastItem from the queue if it moved due to an interleaved
986 * interior remove while in detached mode.
987 */
988 private class Itr implements Iterator<E> {
989 /** Index to look for new nextItem; NONE at end */
990 private int cursor;
991
992 /** Element to be returned by next call to next(); null if none */
993 private E nextItem;
994
995 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
996 private int nextIndex;
997
998 /** Last element returned; null if none or not detached. */
999 private E lastItem;
1000
1001 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1002 private int lastRet;
1003
1004 /** Previous value of takeIndex, or DETACHED when detached */
1005 private int prevTakeIndex;
1006
1007 /** Previous value of iters.cycles */
1008 private int prevCycles;
1009
1010 /** Special index value indicating "not available" or "undefined" */
1011 private static final int NONE = -1;
1012
1013 /**
1014 * Special index value indicating "removed elsewhere", that is,
1015 * removed by some operation other than a call to this.remove().
1016 */
1017 private static final int REMOVED = -2;
1018
1019 /** Special value for prevTakeIndex indicating "detached mode" */
1020 private static final int DETACHED = -3;
1021
1022 Itr() {
1023 // assert lock.getHoldCount() == 0;
1024 lastRet = NONE;
1025 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1026 lock.lock();
1027 try {
1028 if (count == 0) {
1029 // assert itrs == null;
1030 cursor = NONE;
1031 nextIndex = NONE;
1032 prevTakeIndex = DETACHED;
1033 } else {
1034 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1035 prevTakeIndex = takeIndex;
1036 nextItem = itemAt(nextIndex = takeIndex);
1037 cursor = incCursor(takeIndex);
1038 if (itrs == null) {
1039 itrs = new Itrs(this);
1040 } else {
1041 itrs.register(this); // in this order
1042 itrs.doSomeSweeping(false);
1043 }
1044 prevCycles = itrs.cycles;
1045 // assert takeIndex >= 0;
1046 // assert prevTakeIndex == takeIndex;
1047 // assert nextIndex >= 0;
1048 // assert nextItem != null;
1049 }
1050 } finally {
1051 lock.unlock();
1052 }
1053 }
1054
1055 boolean isDetached() {
1056 // assert lock.getHoldCount() == 1;
1057 return prevTakeIndex < 0;
1058 }
1059
1060 private int incCursor(int index) {
1061 // assert lock.getHoldCount() == 1;
1062 index = inc(index);
1063 if (index == putIndex)
1064 index = NONE;
1065 return index;
1066 }
1067
1068 /**
1069 * Returns true if index is invalidated by the given number of
1070 * dequeues, starting from prevTakeIndex.
1071 */
1072 private boolean invalidated(int index, int prevTakeIndex,
1073 long dequeues, int length) {
1074 if (index < 0)
1075 return false;
1076 int distance = index - prevTakeIndex;
1077 if (distance < 0)
1078 distance += length;
1079 return dequeues > distance;
1080 }
1081
1082 /**
1083 * Adjusts indices to incorporate all dequeues since the last
1084 * operation on this iterator. Call only from iterating thread.
1085 */
1086 private void incorporateDequeues() {
1087 // assert lock.getHoldCount() == 1;
1088 // assert itrs != null;
1089 // assert !isDetached();
1090 // assert count > 0;
1091
1092 final int cycles = itrs.cycles;
1093 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1094 final int prevCycles = this.prevCycles;
1095 final int prevTakeIndex = this.prevTakeIndex;
1096
1097 if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1098 final int len = items.length;
1099 // how far takeIndex has advanced since the previous
1100 // operation of this iterator
1101 long dequeues = (cycles - prevCycles) * len
1102 + (takeIndex - prevTakeIndex);
1103
1104 // Check indices for invalidation
1105 if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1106 lastRet = REMOVED;
1107 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1108 nextIndex = REMOVED;
1109 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1110 cursor = takeIndex;
1111
1112 if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1113 detach();
1114 else {
1115 this.prevCycles = cycles;
1116 this.prevTakeIndex = takeIndex;
1117 }
1118 }
1119 }
1120
1121 /**
1122 * Called when itrs should stop tracking this iterator, either
1123 * because there are no more indices to update (cursor < 0 &&
1124 * nextIndex < 0 && lastRet < 0) or as a special exception, when
1125 * lastRet >= 0, because hasNext() is about to return false for the
1126 * first time. Call only from iterating thread.
1127 */
1128 private void detach() {
1129 // Switch to detached mode
1130 // assert lock.getHoldCount() == 1;
1131 // assert cursor == NONE;
1132 // assert nextIndex < 0;
1133 // assert lastRet < 0 || nextItem == null;
1134 // assert lastRet < 0 ^ lastItem != null;
1135 if (prevTakeIndex >= 0) {
1136 // assert itrs != null;
1137 prevTakeIndex = DETACHED;
1138 // try to unlink from itrs (but not too hard)
1139 itrs.doSomeSweeping(true);
1140 }
1141 }
1142
1143 /**
1144 * For performance reasons, we would like not to acquire a lock in
1145 * hasNext in the common case. To allow for this, we only access
1146 * fields (i.e. nextItem) that are not modified by update operations
1147 * triggered by queue modifications.
1148 */
1149 public boolean hasNext() {
1150 // assert lock.getHoldCount() == 0;
1151 if (nextItem != null)
1152 return true;
1153 noNext();
1154 return false;
1155 }
1156
1157 private void noNext() {
1158 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1159 lock.lock();
1160 try {
1161 // assert cursor == NONE;
1162 // assert nextIndex == NONE;
1163 if (!isDetached()) {
1164 // assert lastRet >= 0;
1165 incorporateDequeues(); // might update lastRet
1166 if (lastRet >= 0) {
1167 lastItem = itemAt(lastRet);
1168 // assert lastItem != null;
1169 detach();
1170 }
1171 }
1172 // assert isDetached();
1173 // assert lastRet < 0 ^ lastItem != null;
1174 } finally {
1175 lock.unlock();
1176 }
1177 }
1178
1179 public E next() {
1180 // assert lock.getHoldCount() == 0;
1181 final E x = nextItem;
1182 if (x == null)
1183 throw new NoSuchElementException();
1184 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1185 lock.lock();
1186 try {
1187 if (!isDetached())
1188 incorporateDequeues();
1189 // assert nextIndex != NONE;
1190 // assert lastItem == null;
1191 lastRet = nextIndex;
1192 final int cursor = this.cursor;
1193 if (cursor >= 0) {
1194 nextItem = itemAt(nextIndex = cursor);
1195 // assert nextItem != null;
1196 this.cursor = incCursor(cursor);
1197 } else {
1198 nextIndex = NONE;
1199 nextItem = null;
1200 }
1201 } finally {
1202 lock.unlock();
1203 }
1204 return x;
1205 }
1206
1207 public void remove() {
1208 // assert lock.getHoldCount() == 0;
1209 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1210 lock.lock();
1211 try {
1212 if (!isDetached())
1213 incorporateDequeues(); // might update lastRet or detach
1214 final int lastRet = this.lastRet;
1215 this.lastRet = NONE;
1216 if (lastRet >= 0) {
1217 if (!isDetached())
1218 removeAt(lastRet);
1219 else {
1220 final E lastItem = this.lastItem;
1221 // assert lastItem != null;
1222 this.lastItem = null;
1223 if (itemAt(lastRet) == lastItem)
1224 removeAt(lastRet);
1225 }
1226 } else if (lastRet == NONE)
1227 throw new IllegalStateException();
1228 // else lastRet == REMOVED and the last returned element was
1229 // previously asynchronously removed via an operation other
1230 // than this.remove(), so nothing to do.
1231
1232 if (cursor < 0 && nextIndex < 0)
1233 detach();
1234 } finally {
1235 lock.unlock();
1236 // assert lastRet == NONE;
1237 // assert lastItem == null;
1238 }
1239 }
1240
1241 /**
1242 * Called to notify the iterator that the queue is empty, or that it
1243 * has fallen hopelessly behind, so that it should abandon any
1244 * further iteration, except possibly to return one more element
1245 * from next(), as promised by returning true from hasNext().
1246 */
1247 void shutdown() {
1248 // assert lock.getHoldCount() == 1;
1249 cursor = NONE;
1250 if (nextIndex >= 0)
1251 nextIndex = REMOVED;
1252 if (lastRet >= 0) {
1253 lastRet = REMOVED;
1254 lastItem = null;
1255 }
1256 prevTakeIndex = DETACHED;
1257 // Don't set nextItem to null because we must continue to be
1258 // able to return it on next().
1259 //
1260 // Caller will unlink from itrs when convenient.
1261 }
1262
1263 private int distance(int index, int prevTakeIndex, int length) {
1264 int distance = index - prevTakeIndex;
1265 if (distance < 0)
1266 distance += length;
1267 return distance;
1268 }
1269
1270 /**
1271 * Called whenever an interior remove (not at takeIndex) occured.
1272 *
1273 * @return true if this iterator should be unlinked from itrs
1274 */
1275 boolean removedAt(int removedIndex) {
1276 // assert lock.getHoldCount() == 1;
1277 if (isDetached())
1278 return true;
1279
1280 final int cycles = itrs.cycles;
1281 final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1282 final int prevCycles = this.prevCycles;
1283 final int prevTakeIndex = this.prevTakeIndex;
1284 final int len = items.length;
1285 int cycleDiff = cycles - prevCycles;
1286 if (removedIndex < takeIndex)
1287 cycleDiff++;
1288 final int removedDistance =
1289 (cycleDiff * len) + (removedIndex - prevTakeIndex);
1290 // assert removedDistance >= 0;
1291 int cursor = this.cursor;
1292 if (cursor >= 0) {
1293 int x = distance(cursor, prevTakeIndex, len);
1294 if (x == removedDistance) {
1295 if (cursor == putIndex)
1296 this.cursor = cursor = NONE;
1297 }
1298 else if (x > removedDistance) {
1299 // assert cursor != prevTakeIndex;
1300 this.cursor = cursor = dec(cursor);
1301 }
1302 }
1303 int lastRet = this.lastRet;
1304 if (lastRet >= 0) {
1305 int x = distance(lastRet, prevTakeIndex, len);
1306 if (x == removedDistance)
1307 this.lastRet = lastRet = REMOVED;
1308 else if (x > removedDistance)
1309 this.lastRet = lastRet = dec(lastRet);
1310 }
1311 int nextIndex = this.nextIndex;
1312 if (nextIndex >= 0) {
1313 int x = distance(nextIndex, prevTakeIndex, len);
1314 if (x == removedDistance)
1315 this.nextIndex = nextIndex = REMOVED;
1316 else if (x > removedDistance)
1317 this.nextIndex = nextIndex = dec(nextIndex);
1318 }
1319 else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1320 this.prevTakeIndex = DETACHED;
1321 return true;
1322 }
1323 return false;
1324 }
1325
1326 /**
1327 * Called whenever takeIndex wraps around to zero.
1328 *
1329 * @return true if this iterator should be unlinked from itrs
1330 */
1331 boolean takeIndexWrapped() {
1332 // assert lock.getHoldCount() == 1;
1333 if (isDetached())
1334 return true;
1335 if (itrs.cycles - prevCycles > 1) {
1336 // All the elements that existed at the time of the last
1337 // operation are gone, so abandon further iteration.
1338 shutdown();
1339 return true;
1340 }
1341 return false;
1342 }
1343
1344 // /** Uncomment for debugging. */
1345 // public String toString() {
1346 // return ("cursor=" + cursor + " " +
1347 // "nextIndex=" + nextIndex + " " +
1348 // "lastRet=" + lastRet + " " +
1349 // "nextItem=" + nextItem + " " +
1350 // "lastItem=" + lastItem + " " +
1351 // "prevCycles=" + prevCycles + " " +
1352 // "prevTakeIndex=" + prevTakeIndex + " " +
1353 // "size()=" + size() + " " +
1354 // "remainingCapacity()=" + remainingCapacity());
1355 // }
1356 }
1357 }