ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.104
Committed: Thu May 2 06:02:17 2013 UTC (11 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.103: +0 -1 lines
Log Message:
port to latest lambda

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.38 * Expert Group and released to the public domain, as explained at
4 jsr166 1.78 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.85 import java.util.concurrent.locks.Condition;
9     import java.util.concurrent.locks.ReentrantLock;
10     import java.util.AbstractQueue;
11     import java.util.Collection;
12     import java.util.Iterator;
13     import java.util.NoSuchElementException;
14 jsr166 1.89 import java.lang.ref.WeakReference;
15 dl 1.102 import java.util.Spliterators;
16 dl 1.100 import java.util.Spliterator;
17     import java.util.stream.Stream;
18     import java.util.function.Consumer;
19 tim 1.1
20     /**
21 dl 1.25 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
22     * array. This queue orders elements FIFO (first-in-first-out). The
23     * <em>head</em> of the queue is that element that has been on the
24     * queue the longest time. The <em>tail</em> of the queue is that
25     * element that has been on the queue the shortest time. New elements
26     * are inserted at the tail of the queue, and the queue retrieval
27     * operations obtain elements at the head of the queue.
28 dholmes 1.13 *
29 dl 1.40 * <p>This is a classic &quot;bounded buffer&quot;, in which a
30     * fixed-sized array holds elements inserted by producers and
31     * extracted by consumers. Once created, the capacity cannot be
32 jsr166 1.69 * changed. Attempts to {@code put} an element into a full queue
33     * will result in the operation blocking; attempts to {@code take} an
34 dl 1.40 * element from an empty queue will similarly block.
35 dl 1.11 *
36 jsr166 1.72 * <p>This class supports an optional fairness policy for ordering
37 dl 1.42 * waiting producer and consumer threads. By default, this ordering
38     * is not guaranteed. However, a queue constructed with fairness set
39 jsr166 1.69 * to {@code true} grants threads access in FIFO order. Fairness
40 dl 1.42 * generally decreases throughput but reduces variability and avoids
41     * starvation.
42 brian 1.7 *
43 dl 1.43 * <p>This class and its iterator implement all of the
44     * <em>optional</em> methods of the {@link Collection} and {@link
45 jsr166 1.47 * Iterator} interfaces.
46 dl 1.26 *
47 dl 1.41 * <p>This class is a member of the
48 jsr166 1.54 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
49 dl 1.41 * Java Collections Framework</a>.
50     *
51 dl 1.8 * @since 1.5
52     * @author Doug Lea
53 dl 1.33 * @param <E> the type of elements held in this collection
54 dl 1.8 */
55 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
56 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
57    
58 dl 1.36 /**
59 dl 1.42 * Serialization ID. This class relies on default serialization
60     * even for the items array, which is default-serialized, even if
61     * it is empty. Otherwise it could not be declared final, which is
62 dl 1.36 * necessary here.
63     */
64     private static final long serialVersionUID = -817911632652898426L;
65    
66 jsr166 1.73 /** The queued items */
67 jsr166 1.68 final Object[] items;
68 jsr166 1.73
69     /** items index for next take, poll, peek or remove */
70 jsr166 1.58 int takeIndex;
71 jsr166 1.73
72     /** items index for next put, offer, or add */
73 jsr166 1.58 int putIndex;
74 jsr166 1.73
75     /** Number of elements in the queue */
76 jsr166 1.59 int count;
77 tim 1.12
78 dl 1.5 /*
79 dl 1.36 * Concurrency control uses the classic two-condition algorithm
80 dl 1.5 * found in any textbook.
81     */
82    
83 dl 1.11 /** Main lock guarding all access */
84 jsr166 1.58 final ReentrantLock lock;
85 jsr166 1.85
86 dholmes 1.13 /** Condition for waiting takes */
87 dl 1.37 private final Condition notEmpty;
88 jsr166 1.85
89 dl 1.35 /** Condition for waiting puts */
90 dl 1.37 private final Condition notFull;
91 dl 1.5
92 jsr166 1.89 /**
93     * Shared state for currently active iterators, or null if there
94     * are known not to be any. Allows queue operations to update
95     * iterator state.
96     */
97     transient Itrs itrs = null;
98    
99 dl 1.5 // Internal helper methods
100    
101     /**
102 jsr166 1.71 * Circularly decrement i.
103     */
104     final int dec(int i) {
105     return ((i == 0) ? items.length : i) - 1;
106     }
107    
108 jsr166 1.68 /**
109     * Returns item at index i.
110     */
111 jsr166 1.96 @SuppressWarnings("unchecked")
112 jsr166 1.68 final E itemAt(int i) {
113 jsr166 1.96 return (E) items[i];
114 jsr166 1.68 }
115    
116     /**
117     * Throws NullPointerException if argument is null.
118     *
119     * @param v the element
120     */
121     private static void checkNotNull(Object v) {
122     if (v == null)
123     throw new NullPointerException();
124     }
125    
126 dl 1.5 /**
127 jsr166 1.47 * Inserts element at current put position, advances, and signals.
128 dl 1.9 * Call only when holding lock.
129 dl 1.5 */
130 jsr166 1.88 private void enqueue(E x) {
131 jsr166 1.95 // assert lock.getHoldCount() == 1;
132     // assert items[putIndex] == null;
133 dl 1.100 final Object[] items = this.items;
134 dl 1.5 items[putIndex] = x;
135 dl 1.100 if (++putIndex == items.length)
136     putIndex = 0;
137 jsr166 1.89 count++;
138 dl 1.9 notEmpty.signal();
139 tim 1.1 }
140 tim 1.12
141 dl 1.5 /**
142 jsr166 1.47 * Extracts element at current take position, advances, and signals.
143 dl 1.9 * Call only when holding lock.
144 dl 1.5 */
145 jsr166 1.88 private E dequeue() {
146 jsr166 1.95 // assert lock.getHoldCount() == 1;
147     // assert items[takeIndex] != null;
148 jsr166 1.68 final Object[] items = this.items;
149 jsr166 1.97 @SuppressWarnings("unchecked")
150     E x = (E) items[takeIndex];
151 dl 1.5 items[takeIndex] = null;
152 dl 1.100 if (++takeIndex == items.length)
153     takeIndex = 0;
154 jsr166 1.89 count--;
155     if (itrs != null)
156     itrs.elementDequeued();
157 dl 1.9 notFull.signal();
158 dl 1.5 return x;
159     }
160    
161     /**
162 jsr166 1.90 * Deletes item at array index removeIndex.
163 jsr166 1.89 * Utility for remove(Object) and iterator.remove.
164 dl 1.9 * Call only when holding lock.
165 dl 1.5 */
166 jsr166 1.90 void removeAt(final int removeIndex) {
167 jsr166 1.95 // assert lock.getHoldCount() == 1;
168     // assert items[removeIndex] != null;
169     // assert removeIndex >= 0 && removeIndex < items.length;
170 jsr166 1.68 final Object[] items = this.items;
171 jsr166 1.90 if (removeIndex == takeIndex) {
172 jsr166 1.89 // removing front item; just advance
173 dl 1.9 items[takeIndex] = null;
174 dl 1.100 if (++takeIndex == items.length)
175     takeIndex = 0;
176 jsr166 1.90 count--;
177 jsr166 1.89 if (itrs != null)
178     itrs.elementDequeued();
179 tim 1.23 } else {
180 jsr166 1.89 // an "interior" remove
181 jsr166 1.90
182 dl 1.9 // slide over all others up through putIndex.
183 jsr166 1.90 final int putIndex = this.putIndex;
184     for (int i = removeIndex;;) {
185 dl 1.100 int next = i + 1;
186     if (next == items.length)
187     next = 0;
188 jsr166 1.90 if (next != putIndex) {
189     items[i] = items[next];
190     i = next;
191 tim 1.23 } else {
192 dl 1.9 items[i] = null;
193 jsr166 1.90 this.putIndex = i;
194 dl 1.9 break;
195     }
196 dl 1.5 }
197 jsr166 1.90 count--;
198     if (itrs != null)
199     itrs.removedAt(removeIndex);
200 dl 1.5 }
201 dl 1.9 notFull.signal();
202 tim 1.1 }
203    
204     /**
205 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
206 dholmes 1.13 * capacity and default access policy.
207 jsr166 1.50 *
208 dholmes 1.13 * @param capacity the capacity of this queue
209 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity < 1}
210 dl 1.5 */
211 dholmes 1.13 public ArrayBlockingQueue(int capacity) {
212 dl 1.36 this(capacity, false);
213 dl 1.5 }
214 dl 1.2
215 dl 1.5 /**
216 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
217 dholmes 1.13 * capacity and the specified access policy.
218 jsr166 1.50 *
219 dholmes 1.13 * @param capacity the capacity of this queue
220 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
221 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
222 jsr166 1.69 * if {@code false} the access order is unspecified.
223     * @throws IllegalArgumentException if {@code capacity < 1}
224 dl 1.11 */
225 dholmes 1.13 public ArrayBlockingQueue(int capacity, boolean fair) {
226 dl 1.36 if (capacity <= 0)
227     throw new IllegalArgumentException();
228 jsr166 1.68 this.items = new Object[capacity];
229 dl 1.36 lock = new ReentrantLock(fair);
230     notEmpty = lock.newCondition();
231     notFull = lock.newCondition();
232 dl 1.5 }
233    
234 dholmes 1.16 /**
235 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
236 dholmes 1.21 * capacity, the specified access policy and initially containing the
237 tim 1.17 * elements of the given collection,
238 dholmes 1.16 * added in traversal order of the collection's iterator.
239 jsr166 1.50 *
240 dholmes 1.16 * @param capacity the capacity of this queue
241 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
242 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
243 jsr166 1.69 * if {@code false} the access order is unspecified.
244 dholmes 1.16 * @param c the collection of elements to initially contain
245 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity} is less than
246     * {@code c.size()}, or less than 1.
247 jsr166 1.50 * @throws NullPointerException if the specified collection or any
248     * of its elements are null
249 dholmes 1.16 */
250 tim 1.20 public ArrayBlockingQueue(int capacity, boolean fair,
251 dholmes 1.18 Collection<? extends E> c) {
252 dl 1.36 this(capacity, fair);
253 dholmes 1.16
254 jsr166 1.68 final ReentrantLock lock = this.lock;
255     lock.lock(); // Lock only for visibility, not mutual exclusion
256     try {
257     int i = 0;
258     try {
259     for (E e : c) {
260     checkNotNull(e);
261     items[i++] = e;
262     }
263     } catch (ArrayIndexOutOfBoundsException ex) {
264     throw new IllegalArgumentException();
265     }
266     count = i;
267     putIndex = (i == capacity) ? 0 : i;
268     } finally {
269     lock.unlock();
270     }
271 dholmes 1.16 }
272 dl 1.2
273 dholmes 1.13 /**
274 jsr166 1.50 * Inserts the specified element at the tail of this queue if it is
275     * possible to do so immediately without exceeding the queue's capacity,
276 jsr166 1.69 * returning {@code true} upon success and throwing an
277     * {@code IllegalStateException} if this queue is full.
278 dl 1.25 *
279 jsr166 1.50 * @param e the element to add
280 jsr166 1.69 * @return {@code true} (as specified by {@link Collection#add})
281 jsr166 1.50 * @throws IllegalStateException if this queue is full
282     * @throws NullPointerException if the specified element is null
283     */
284     public boolean add(E e) {
285 jsr166 1.56 return super.add(e);
286 jsr166 1.50 }
287    
288     /**
289     * Inserts the specified element at the tail of this queue if it is
290     * possible to do so immediately without exceeding the queue's capacity,
291 jsr166 1.69 * returning {@code true} upon success and {@code false} if this queue
292 jsr166 1.50 * is full. This method is generally preferable to method {@link #add},
293     * which can fail to insert an element only by throwing an exception.
294     *
295     * @throws NullPointerException if the specified element is null
296 dholmes 1.13 */
297 jsr166 1.49 public boolean offer(E e) {
298 jsr166 1.68 checkNotNull(e);
299 dl 1.36 final ReentrantLock lock = this.lock;
300 dl 1.5 lock.lock();
301     try {
302 jsr166 1.59 if (count == items.length)
303 dl 1.2 return false;
304 dl 1.5 else {
305 jsr166 1.88 enqueue(e);
306 dl 1.5 return true;
307     }
308 tim 1.23 } finally {
309 tim 1.12 lock.unlock();
310 dl 1.2 }
311 dl 1.5 }
312 dl 1.2
313 dholmes 1.13 /**
314 jsr166 1.50 * Inserts the specified element at the tail of this queue, waiting
315     * for space to become available if the queue is full.
316     *
317     * @throws InterruptedException {@inheritDoc}
318     * @throws NullPointerException {@inheritDoc}
319     */
320     public void put(E e) throws InterruptedException {
321 jsr166 1.68 checkNotNull(e);
322 jsr166 1.50 final ReentrantLock lock = this.lock;
323     lock.lockInterruptibly();
324     try {
325 jsr166 1.59 while (count == items.length)
326 jsr166 1.58 notFull.await();
327 jsr166 1.88 enqueue(e);
328 jsr166 1.50 } finally {
329     lock.unlock();
330     }
331     }
332    
333     /**
334     * Inserts the specified element at the tail of this queue, waiting
335     * up to the specified wait time for space to become available if
336     * the queue is full.
337     *
338     * @throws InterruptedException {@inheritDoc}
339     * @throws NullPointerException {@inheritDoc}
340 brian 1.7 */
341 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit)
342 dholmes 1.13 throws InterruptedException {
343 dl 1.2
344 jsr166 1.68 checkNotNull(e);
345 jsr166 1.56 long nanos = unit.toNanos(timeout);
346 dl 1.36 final ReentrantLock lock = this.lock;
347 dl 1.5 lock.lockInterruptibly();
348     try {
349 jsr166 1.59 while (count == items.length) {
350 dl 1.5 if (nanos <= 0)
351     return false;
352 jsr166 1.58 nanos = notFull.awaitNanos(nanos);
353 dl 1.5 }
354 jsr166 1.88 enqueue(e);
355 jsr166 1.58 return true;
356 tim 1.23 } finally {
357 dl 1.5 lock.unlock();
358 dl 1.2 }
359 dl 1.5 }
360 dl 1.2
361 dholmes 1.13 public E poll() {
362 dl 1.36 final ReentrantLock lock = this.lock;
363 dholmes 1.13 lock.lock();
364     try {
365 jsr166 1.88 return (count == 0) ? null : dequeue();
366 tim 1.23 } finally {
367 tim 1.15 lock.unlock();
368 dholmes 1.13 }
369     }
370    
371 jsr166 1.50 public E take() throws InterruptedException {
372     final ReentrantLock lock = this.lock;
373     lock.lockInterruptibly();
374     try {
375 jsr166 1.59 while (count == 0)
376 jsr166 1.58 notEmpty.await();
377 jsr166 1.88 return dequeue();
378 jsr166 1.50 } finally {
379     lock.unlock();
380     }
381     }
382    
383 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
384 jsr166 1.56 long nanos = unit.toNanos(timeout);
385 dl 1.36 final ReentrantLock lock = this.lock;
386 dl 1.5 lock.lockInterruptibly();
387     try {
388 jsr166 1.59 while (count == 0) {
389 dl 1.5 if (nanos <= 0)
390     return null;
391 jsr166 1.58 nanos = notEmpty.awaitNanos(nanos);
392 dl 1.2 }
393 jsr166 1.88 return dequeue();
394 tim 1.23 } finally {
395 dl 1.5 lock.unlock();
396     }
397     }
398 dl 1.2
399 dholmes 1.13 public E peek() {
400 dl 1.36 final ReentrantLock lock = this.lock;
401 dholmes 1.13 lock.lock();
402     try {
403 jsr166 1.99 return itemAt(takeIndex); // null when queue is empty
404 tim 1.23 } finally {
405 dholmes 1.13 lock.unlock();
406     }
407     }
408    
409     // this doc comment is overridden to remove the reference to collections
410     // greater in size than Integer.MAX_VALUE
411 tim 1.15 /**
412 dl 1.25 * Returns the number of elements in this queue.
413     *
414 jsr166 1.50 * @return the number of elements in this queue
415 dholmes 1.13 */
416     public int size() {
417 dl 1.36 final ReentrantLock lock = this.lock;
418 dholmes 1.13 lock.lock();
419     try {
420     return count;
421 tim 1.23 } finally {
422 dholmes 1.13 lock.unlock();
423     }
424     }
425    
426     // this doc comment is a modified copy of the inherited doc comment,
427     // without the reference to unlimited queues.
428 tim 1.15 /**
429 jsr166 1.48 * Returns the number of additional elements that this queue can ideally
430     * (in the absence of memory or resource constraints) accept without
431 dholmes 1.13 * blocking. This is always equal to the initial capacity of this queue
432 jsr166 1.69 * less the current {@code size} of this queue.
433 jsr166 1.48 *
434     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
435 jsr166 1.69 * an element will succeed by inspecting {@code remainingCapacity}
436 jsr166 1.48 * because it may be the case that another thread is about to
437 jsr166 1.50 * insert or remove an element.
438 dholmes 1.13 */
439     public int remainingCapacity() {
440 dl 1.36 final ReentrantLock lock = this.lock;
441 dholmes 1.13 lock.lock();
442     try {
443     return items.length - count;
444 tim 1.23 } finally {
445 dholmes 1.13 lock.unlock();
446     }
447     }
448    
449 jsr166 1.50 /**
450     * Removes a single instance of the specified element from this queue,
451 jsr166 1.69 * if it is present. More formally, removes an element {@code e} such
452     * that {@code o.equals(e)}, if this queue contains one or more such
453 jsr166 1.50 * elements.
454 jsr166 1.69 * Returns {@code true} if this queue contained the specified element
455 jsr166 1.50 * (or equivalently, if this queue changed as a result of the call).
456     *
457 jsr166 1.64 * <p>Removal of interior elements in circular array based queues
458 dl 1.60 * is an intrinsically slow and disruptive operation, so should
459     * be undertaken only in exceptional circumstances, ideally
460     * only when the queue is known not to be accessible by other
461     * threads.
462     *
463 jsr166 1.50 * @param o element to be removed from this queue, if present
464 jsr166 1.69 * @return {@code true} if this queue changed as a result of the call
465 jsr166 1.50 */
466     public boolean remove(Object o) {
467     if (o == null) return false;
468 jsr166 1.68 final Object[] items = this.items;
469 jsr166 1.50 final ReentrantLock lock = this.lock;
470     lock.lock();
471     try {
472 jsr166 1.86 if (count > 0) {
473     final int putIndex = this.putIndex;
474     int i = takeIndex;
475     do {
476     if (o.equals(items[i])) {
477     removeAt(i);
478     return true;
479     }
480 dl 1.100 if (++i == items.length)
481     i = 0;
482     } while (i != putIndex);
483 jsr166 1.50 }
484 jsr166 1.68 return false;
485 jsr166 1.50 } finally {
486     lock.unlock();
487     }
488     }
489 dholmes 1.13
490 jsr166 1.50 /**
491 jsr166 1.69 * Returns {@code true} if this queue contains the specified element.
492     * More formally, returns {@code true} if and only if this queue contains
493     * at least one element {@code e} such that {@code o.equals(e)}.
494 jsr166 1.50 *
495     * @param o object to be checked for containment in this queue
496 jsr166 1.69 * @return {@code true} if this queue contains the specified element
497 jsr166 1.50 */
498 dholmes 1.21 public boolean contains(Object o) {
499     if (o == null) return false;
500 jsr166 1.68 final Object[] items = this.items;
501 dl 1.36 final ReentrantLock lock = this.lock;
502 dl 1.5 lock.lock();
503     try {
504 jsr166 1.86 if (count > 0) {
505     final int putIndex = this.putIndex;
506     int i = takeIndex;
507     do {
508     if (o.equals(items[i]))
509     return true;
510 dl 1.100 if (++i == items.length)
511     i = 0;
512     } while (i != putIndex);
513 jsr166 1.86 }
514 dl 1.2 return false;
515 tim 1.23 } finally {
516 dl 1.5 lock.unlock();
517     }
518     }
519 brian 1.7
520 jsr166 1.50 /**
521     * Returns an array containing all of the elements in this queue, in
522     * proper sequence.
523     *
524     * <p>The returned array will be "safe" in that no references to it are
525     * maintained by this queue. (In other words, this method must allocate
526     * a new array). The caller is thus free to modify the returned array.
527 jsr166 1.51 *
528 jsr166 1.50 * <p>This method acts as bridge between array-based and collection-based
529     * APIs.
530     *
531     * @return an array containing all of the elements in this queue
532     */
533 dl 1.5 public Object[] toArray() {
534 dl 1.100 Object[] a;
535 dl 1.36 final ReentrantLock lock = this.lock;
536 dl 1.5 lock.lock();
537     try {
538 jsr166 1.68 final int count = this.count;
539 dl 1.100 a = new Object[count];
540 jsr166 1.98 int n = items.length - takeIndex;
541     if (count <= n)
542     System.arraycopy(items, takeIndex, a, 0, count);
543     else {
544     System.arraycopy(items, takeIndex, a, 0, n);
545     System.arraycopy(items, 0, a, n, count - n);
546     }
547 tim 1.23 } finally {
548 dl 1.5 lock.unlock();
549     }
550 dl 1.100 return a;
551 dl 1.5 }
552 brian 1.7
553 jsr166 1.50 /**
554     * Returns an array containing all of the elements in this queue, in
555     * proper sequence; the runtime type of the returned array is that of
556     * the specified array. If the queue fits in the specified array, it
557     * is returned therein. Otherwise, a new array is allocated with the
558     * runtime type of the specified array and the size of this queue.
559     *
560     * <p>If this queue fits in the specified array with room to spare
561     * (i.e., the array has more elements than this queue), the element in
562     * the array immediately following the end of the queue is set to
563 jsr166 1.69 * {@code null}.
564 jsr166 1.50 *
565     * <p>Like the {@link #toArray()} method, this method acts as bridge between
566     * array-based and collection-based APIs. Further, this method allows
567     * precise control over the runtime type of the output array, and may,
568     * under certain circumstances, be used to save allocation costs.
569     *
570 jsr166 1.69 * <p>Suppose {@code x} is a queue known to contain only strings.
571 jsr166 1.50 * The following code can be used to dump the queue into a newly
572 jsr166 1.69 * allocated array of {@code String}:
573 jsr166 1.50 *
574 jsr166 1.80 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
575 jsr166 1.50 *
576 jsr166 1.69 * Note that {@code toArray(new Object[0])} is identical in function to
577     * {@code toArray()}.
578 jsr166 1.50 *
579     * @param a the array into which the elements of the queue are to
580     * be stored, if it is big enough; otherwise, a new array of the
581     * same runtime type is allocated for this purpose
582     * @return an array containing all of the elements in this queue
583     * @throws ArrayStoreException if the runtime type of the specified array
584     * is not a supertype of the runtime type of every element in
585     * this queue
586     * @throws NullPointerException if the specified array is null
587     */
588 jsr166 1.68 @SuppressWarnings("unchecked")
589 dl 1.5 public <T> T[] toArray(T[] a) {
590 jsr166 1.68 final Object[] items = this.items;
591 dl 1.36 final ReentrantLock lock = this.lock;
592 dl 1.5 lock.lock();
593     try {
594 jsr166 1.68 final int count = this.count;
595     final int len = a.length;
596     if (len < count)
597 dholmes 1.16 a = (T[])java.lang.reflect.Array.newInstance(
598 jsr166 1.68 a.getClass().getComponentType(), count);
599 jsr166 1.98 int n = items.length - takeIndex;
600     if (count <= n)
601     System.arraycopy(items, takeIndex, a, 0, count);
602     else {
603     System.arraycopy(items, takeIndex, a, 0, n);
604     System.arraycopy(items, 0, a, n, count - n);
605     }
606 jsr166 1.68 if (len > count)
607 dl 1.5 a[count] = null;
608 tim 1.23 } finally {
609 dl 1.5 lock.unlock();
610     }
611 dl 1.100 return a;
612 dl 1.5 }
613 dl 1.6
614     public String toString() {
615 dl 1.36 final ReentrantLock lock = this.lock;
616 dl 1.6 lock.lock();
617     try {
618 jsr166 1.68 int k = count;
619     if (k == 0)
620     return "[]";
621    
622 dl 1.100 final Object[] items = this.items;
623 jsr166 1.68 StringBuilder sb = new StringBuilder();
624     sb.append('[');
625 dl 1.100 for (int i = takeIndex; ; ) {
626 jsr166 1.68 Object e = items[i];
627     sb.append(e == this ? "(this Collection)" : e);
628     if (--k == 0)
629     return sb.append(']').toString();
630     sb.append(',').append(' ');
631 dl 1.100 if (++i == items.length)
632     i = 0;
633 jsr166 1.68 }
634 tim 1.23 } finally {
635 dl 1.6 lock.unlock();
636     }
637     }
638 tim 1.12
639 dl 1.44 /**
640     * Atomically removes all of the elements from this queue.
641     * The queue will be empty after this call returns.
642     */
643 dl 1.30 public void clear() {
644 jsr166 1.68 final Object[] items = this.items;
645 dl 1.36 final ReentrantLock lock = this.lock;
646 dl 1.30 lock.lock();
647     try {
648 jsr166 1.86 int k = count;
649     if (k > 0) {
650     final int putIndex = this.putIndex;
651     int i = takeIndex;
652     do {
653     items[i] = null;
654 dl 1.100 if (++i == items.length)
655     i = 0;
656     } while (i != putIndex);
657 jsr166 1.86 takeIndex = putIndex;
658     count = 0;
659 jsr166 1.89 if (itrs != null)
660     itrs.queueIsEmpty();
661 jsr166 1.86 for (; k > 0 && lock.hasWaiters(notFull); k--)
662     notFull.signal();
663     }
664 dl 1.30 } finally {
665     lock.unlock();
666     }
667     }
668    
669 jsr166 1.50 /**
670     * @throws UnsupportedOperationException {@inheritDoc}
671     * @throws ClassCastException {@inheritDoc}
672     * @throws NullPointerException {@inheritDoc}
673     * @throws IllegalArgumentException {@inheritDoc}
674     */
675 dl 1.30 public int drainTo(Collection<? super E> c) {
676 jsr166 1.81 return drainTo(c, Integer.MAX_VALUE);
677 dl 1.30 }
678    
679 jsr166 1.50 /**
680     * @throws UnsupportedOperationException {@inheritDoc}
681     * @throws ClassCastException {@inheritDoc}
682     * @throws NullPointerException {@inheritDoc}
683     * @throws IllegalArgumentException {@inheritDoc}
684     */
685 dl 1.30 public int drainTo(Collection<? super E> c, int maxElements) {
686 jsr166 1.68 checkNotNull(c);
687 dl 1.30 if (c == this)
688     throw new IllegalArgumentException();
689     if (maxElements <= 0)
690     return 0;
691 jsr166 1.68 final Object[] items = this.items;
692 dl 1.36 final ReentrantLock lock = this.lock;
693 dl 1.30 lock.lock();
694     try {
695 jsr166 1.82 int n = Math.min(maxElements, count);
696     int take = takeIndex;
697     int i = 0;
698     try {
699     while (i < n) {
700 jsr166 1.97 @SuppressWarnings("unchecked")
701     E x = (E) items[take];
702 jsr166 1.84 c.add(x);
703 jsr166 1.82 items[take] = null;
704 dl 1.100 if (++take == items.length)
705     take = 0;
706 jsr166 1.86 i++;
707 jsr166 1.82 }
708     return n;
709     } finally {
710     // Restore invariants even if c.add() threw
711 jsr166 1.89 if (i > 0) {
712     count -= i;
713     takeIndex = take;
714     if (itrs != null) {
715     if (count == 0)
716     itrs.queueIsEmpty();
717     else if (i > take)
718     itrs.takeIndexWrapped();
719     }
720     for (; i > 0 && lock.hasWaiters(notFull); i--)
721     notFull.signal();
722     }
723 dl 1.30 }
724     } finally {
725     lock.unlock();
726     }
727     }
728    
729 brian 1.7 /**
730 dl 1.75 * Returns an iterator over the elements in this queue in proper sequence.
731 jsr166 1.77 * The elements will be returned in order from first (head) to last (tail).
732     *
733 jsr166 1.87 * <p>The returned iterator is a "weakly consistent" iterator that
734 jsr166 1.77 * will never throw {@link java.util.ConcurrentModificationException
735 jsr166 1.87 * ConcurrentModificationException}, and guarantees to traverse
736     * elements as they existed upon construction of the iterator, and
737     * may (but is not guaranteed to) reflect any modifications
738     * subsequent to construction.
739 brian 1.7 *
740 jsr166 1.50 * @return an iterator over the elements in this queue in proper sequence
741 brian 1.7 */
742 dl 1.5 public Iterator<E> iterator() {
743 jsr166 1.68 return new Itr();
744 dl 1.5 }
745 dl 1.8
746     /**
747 jsr166 1.94 * Shared data between iterators and their queue, allowing queue
748 jsr166 1.89 * modifications to update iterators when elements are removed.
749     *
750 jsr166 1.94 * This adds a lot of complexity for the sake of correctly
751     * handling some uncommon operations, but the combination of
752     * circular-arrays and supporting interior removes (i.e., those
753     * not at head) would cause iterators to sometimes lose their
754     * places and/or (re)report elements they shouldn't. To avoid
755     * this, when a queue has one or more iterators, it keeps iterator
756     * state consistent by:
757     *
758     * (1) keeping track of the number of "cycles", that is, the
759     * number of times takeIndex has wrapped around to 0.
760     * (2) notifying all iterators via the callback removedAt whenever
761     * an interior element is removed (and thus other elements may
762     * be shifted).
763     *
764     * These suffice to eliminate iterator inconsistencies, but
765     * unfortunately add the secondary responsibility of maintaining
766     * the list of iterators. We track all active iterators in a
767     * simple linked list (accessed only when the queue's lock is
768     * held) of weak references to Itr. The list is cleaned up using
769     * 3 different mechanisms:
770 jsr166 1.89 *
771     * (1) Whenever a new iterator is created, do some O(1) checking for
772     * stale list elements.
773     *
774     * (2) Whenever takeIndex wraps around to 0, check for iterators
775     * that have been unused for more than one wrap-around cycle.
776     *
777     * (3) Whenever the queue becomes empty, all iterators are notified
778     * and this entire data structure is discarded.
779     *
780 jsr166 1.94 * So in addition to the removedAt callback that is necessary for
781     * correctness, iterators have the shutdown and takeIndexWrapped
782     * callbacks that help remove stale iterators from the list.
783     *
784     * Whenever a list element is examined, it is expunged if either
785     * the GC has determined that the iterator is discarded, or if the
786     * iterator reports that it is "detached" (does not need any
787     * further state updates). Overhead is maximal when takeIndex
788     * never advances, iterators are discarded before they are
789     * exhausted, and all removals are interior removes, in which case
790     * all stale iterators are discovered by the GC. But even in this
791     * case we don't increase the amortized complexity.
792     *
793     * Care must be taken to keep list sweeping methods from
794     * reentrantly invoking another such method, causing subtle
795     * corruption bugs.
796 jsr166 1.89 */
797     class Itrs {
798    
799     /**
800     * Node in a linked list of weak iterator references.
801     */
802     private class Node extends WeakReference<Itr> {
803     Node next;
804    
805     Node(Itr iterator, Node next) {
806     super(iterator);
807     this.next = next;
808     }
809     }
810    
811     /** Incremented whenever takeIndex wraps around to 0 */
812     int cycles = 0;
813    
814     /** Linked list of weak iterator references */
815 jsr166 1.93 private Node head;
816 jsr166 1.89
817     /** Used to expunge stale iterators */
818     private Node sweeper = null;
819    
820     private static final int SHORT_SWEEP_PROBES = 4;
821     private static final int LONG_SWEEP_PROBES = 16;
822    
823 jsr166 1.93 Itrs(Itr initial) {
824     register(initial);
825     }
826    
827 jsr166 1.89 /**
828     * Sweeps itrs, looking for and expunging stale iterators.
829     * If at least one was found, tries harder to find more.
830 jsr166 1.94 * Called only from iterating thread.
831 jsr166 1.89 *
832     * @param tryHarder whether to start in try-harder mode, because
833     * there is known to be at least one iterator to collect
834     */
835     void doSomeSweeping(boolean tryHarder) {
836 jsr166 1.95 // assert lock.getHoldCount() == 1;
837     // assert head != null;
838 jsr166 1.89 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
839     Node o, p;
840     final Node sweeper = this.sweeper;
841 jsr166 1.93 boolean passedGo; // to limit search to one full sweep
842 jsr166 1.89
843 jsr166 1.93 if (sweeper == null) {
844     o = null;
845     p = head;
846     passedGo = true;
847     } else {
848 jsr166 1.89 o = sweeper;
849     p = o.next;
850 jsr166 1.93 passedGo = false;
851 jsr166 1.89 }
852    
853 jsr166 1.93 for (; probes > 0; probes--) {
854     if (p == null) {
855     if (passedGo)
856     break;
857     o = null;
858     p = head;
859     passedGo = true;
860     }
861 jsr166 1.89 final Itr it = p.get();
862     final Node next = p.next;
863     if (it == null || it.isDetached()) {
864     // found a discarded/exhausted iterator
865     probes = LONG_SWEEP_PROBES; // "try harder"
866     // unlink p
867     p.clear();
868     p.next = null;
869 jsr166 1.93 if (o == null) {
870 jsr166 1.89 head = next;
871 jsr166 1.93 if (next == null) {
872     // We've run out of iterators to track; retire
873     itrs = null;
874     return;
875     }
876     }
877 jsr166 1.89 else
878     o.next = next;
879     } else {
880     o = p;
881     }
882     p = next;
883     }
884    
885     this.sweeper = (p == null) ? null : o;
886     }
887    
888     /**
889     * Adds a new iterator to the linked list of tracked iterators.
890     */
891     void register(Itr itr) {
892 jsr166 1.95 // assert lock.getHoldCount() == 1;
893 jsr166 1.89 head = new Node(itr, head);
894     }
895    
896     /**
897     * Called whenever takeIndex wraps around to 0.
898     *
899     * Notifies all iterators, and expunges any that are now stale.
900     */
901     void takeIndexWrapped() {
902 jsr166 1.95 // assert lock.getHoldCount() == 1;
903 jsr166 1.89 cycles++;
904     for (Node o = null, p = head; p != null;) {
905     final Itr it = p.get();
906     final Node next = p.next;
907     if (it == null || it.takeIndexWrapped()) {
908     // unlink p
909 jsr166 1.95 // assert it == null || it.isDetached();
910 jsr166 1.89 p.clear();
911     p.next = null;
912     if (o == null)
913     head = next;
914     else
915     o.next = next;
916     } else {
917     o = p;
918     }
919     p = next;
920     }
921     if (head == null) // no more iterators to track
922     itrs = null;
923     }
924    
925     /**
926 jsr166 1.90 * Called whenever an interior remove (not at takeIndex) occured.
927 jsr166 1.93 *
928     * Notifies all iterators, and expunges any that are now stale.
929 jsr166 1.89 */
930 jsr166 1.90 void removedAt(int removedIndex) {
931 jsr166 1.89 for (Node o = null, p = head; p != null;) {
932     final Itr it = p.get();
933     final Node next = p.next;
934 jsr166 1.90 if (it == null || it.removedAt(removedIndex)) {
935 jsr166 1.89 // unlink p
936 jsr166 1.95 // assert it == null || it.isDetached();
937 jsr166 1.89 p.clear();
938     p.next = null;
939     if (o == null)
940     head = next;
941     else
942     o.next = next;
943     } else {
944     o = p;
945     }
946     p = next;
947     }
948     if (head == null) // no more iterators to track
949     itrs = null;
950     }
951    
952     /**
953     * Called whenever the queue becomes empty.
954     *
955     * Notifies all active iterators that the queue is empty,
956     * clears all weak refs, and unlinks the itrs datastructure.
957     */
958     void queueIsEmpty() {
959 jsr166 1.95 // assert lock.getHoldCount() == 1;
960 jsr166 1.89 for (Node p = head; p != null; p = p.next) {
961     Itr it = p.get();
962     if (it != null) {
963     p.clear();
964     it.shutdown();
965     }
966     }
967     head = null;
968     itrs = null;
969     }
970    
971     /**
972 jsr166 1.90 * Called whenever an element has been dequeued (at takeIndex).
973 jsr166 1.89 */
974     void elementDequeued() {
975 jsr166 1.95 // assert lock.getHoldCount() == 1;
976 jsr166 1.89 if (count == 0)
977     queueIsEmpty();
978     else if (takeIndex == 0)
979     takeIndexWrapped();
980     }
981     }
982    
983     /**
984     * Iterator for ArrayBlockingQueue.
985     *
986     * To maintain weak consistency with respect to puts and takes, we
987     * read ahead one slot, so as to not report hasNext true but then
988     * not have an element to return.
989     *
990     * We switch into "detached" mode (allowing prompt unlinking from
991     * itrs without help from the GC) when all indices are negative, or
992     * when hasNext returns false for the first time. This allows the
993     * iterator to track concurrent updates completely accurately,
994     * except for the corner case of the user calling Iterator.remove()
995     * after hasNext() returned false. Even in this case, we ensure
996     * that we don't remove the wrong element by keeping track of the
997     * expected element to remove, in lastItem. Yes, we may fail to
998     * remove lastItem from the queue if it moved due to an interleaved
999     * interior remove while in detached mode.
1000 dl 1.8 */
1001 dl 1.5 private class Itr implements Iterator<E> {
1002 jsr166 1.91 /** Index to look for new nextItem; NONE at end */
1003 jsr166 1.89 private int cursor;
1004    
1005     /** Element to be returned by next call to next(); null if none */
1006     private E nextItem;
1007    
1008 jsr166 1.91 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
1009 jsr166 1.89 private int nextIndex;
1010    
1011     /** Last element returned; null if none or not detached. */
1012     private E lastItem;
1013    
1014 jsr166 1.91 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1015 jsr166 1.89 private int lastRet;
1016    
1017 jsr166 1.91 /** Previous value of takeIndex, or DETACHED when detached */
1018 jsr166 1.89 private int prevTakeIndex;
1019    
1020     /** Previous value of iters.cycles */
1021     private int prevCycles;
1022 tim 1.12
1023 jsr166 1.91 /** Special index value indicating "not available" or "undefined" */
1024     private static final int NONE = -1;
1025    
1026     /**
1027     * Special index value indicating "removed elsewhere", that is,
1028     * removed by some operation other than a call to this.remove().
1029     */
1030     private static final int REMOVED = -2;
1031    
1032     /** Special value for prevTakeIndex indicating "detached mode" */
1033     private static final int DETACHED = -3;
1034    
1035 dl 1.66 Itr() {
1036 jsr166 1.95 // assert lock.getHoldCount() == 0;
1037 jsr166 1.91 lastRet = NONE;
1038 jsr166 1.68 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1039     lock.lock();
1040     try {
1041 jsr166 1.89 if (count == 0) {
1042 jsr166 1.95 // assert itrs == null;
1043 jsr166 1.91 cursor = NONE;
1044     nextIndex = NONE;
1045     prevTakeIndex = DETACHED;
1046 jsr166 1.89 } else {
1047     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1048     prevTakeIndex = takeIndex;
1049 jsr166 1.68 nextItem = itemAt(nextIndex = takeIndex);
1050 jsr166 1.89 cursor = incCursor(takeIndex);
1051     if (itrs == null) {
1052 jsr166 1.93 itrs = new Itrs(this);
1053 jsr166 1.89 } else {
1054     itrs.register(this); // in this order
1055     itrs.doSomeSweeping(false);
1056     }
1057     prevCycles = itrs.cycles;
1058 jsr166 1.95 // assert takeIndex >= 0;
1059     // assert prevTakeIndex == takeIndex;
1060     // assert nextIndex >= 0;
1061     // assert nextItem != null;
1062 jsr166 1.89 }
1063 jsr166 1.68 } finally {
1064     lock.unlock();
1065     }
1066 dl 1.5 }
1067 tim 1.12
1068 jsr166 1.89 boolean isDetached() {
1069 jsr166 1.95 // assert lock.getHoldCount() == 1;
1070 jsr166 1.89 return prevTakeIndex < 0;
1071     }
1072    
1073     private int incCursor(int index) {
1074 jsr166 1.95 // assert lock.getHoldCount() == 1;
1075 dl 1.100 if (++index == items.length)
1076     index = 0;
1077 jsr166 1.89 if (index == putIndex)
1078 jsr166 1.91 index = NONE;
1079 jsr166 1.89 return index;
1080     }
1081    
1082     /**
1083     * Returns true if index is invalidated by the given number of
1084     * dequeues, starting from prevTakeIndex.
1085     */
1086     private boolean invalidated(int index, int prevTakeIndex,
1087     long dequeues, int length) {
1088     if (index < 0)
1089     return false;
1090     int distance = index - prevTakeIndex;
1091     if (distance < 0)
1092     distance += length;
1093     return dequeues > distance;
1094     }
1095    
1096     /**
1097     * Adjusts indices to incorporate all dequeues since the last
1098     * operation on this iterator. Call only from iterating thread.
1099     */
1100     private void incorporateDequeues() {
1101 jsr166 1.95 // assert lock.getHoldCount() == 1;
1102     // assert itrs != null;
1103     // assert !isDetached();
1104     // assert count > 0;
1105 jsr166 1.89
1106     final int cycles = itrs.cycles;
1107     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1108     final int prevCycles = this.prevCycles;
1109     final int prevTakeIndex = this.prevTakeIndex;
1110    
1111     if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1112     final int len = items.length;
1113     // how far takeIndex has advanced since the previous
1114     // operation of this iterator
1115     long dequeues = (cycles - prevCycles) * len
1116     + (takeIndex - prevTakeIndex);
1117    
1118     // Check indices for invalidation
1119     if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1120 jsr166 1.91 lastRet = REMOVED;
1121 jsr166 1.89 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1122 jsr166 1.91 nextIndex = REMOVED;
1123 jsr166 1.89 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1124     cursor = takeIndex;
1125    
1126     if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1127     detach();
1128     else {
1129     this.prevCycles = cycles;
1130     this.prevTakeIndex = takeIndex;
1131     }
1132     }
1133     }
1134    
1135     /**
1136     * Called when itrs should stop tracking this iterator, either
1137     * because there are no more indices to update (cursor < 0 &&
1138     * nextIndex < 0 && lastRet < 0) or as a special exception, when
1139     * lastRet >= 0, because hasNext() is about to return false for the
1140     * first time. Call only from iterating thread.
1141     */
1142     private void detach() {
1143     // Switch to detached mode
1144 jsr166 1.95 // assert lock.getHoldCount() == 1;
1145     // assert cursor == NONE;
1146     // assert nextIndex < 0;
1147     // assert lastRet < 0 || nextItem == null;
1148     // assert lastRet < 0 ^ lastItem != null;
1149 jsr166 1.89 if (prevTakeIndex >= 0) {
1150 jsr166 1.95 // assert itrs != null;
1151 jsr166 1.91 prevTakeIndex = DETACHED;
1152 jsr166 1.89 // try to unlink from itrs (but not too hard)
1153     itrs.doSomeSweeping(true);
1154     }
1155     }
1156    
1157     /**
1158     * For performance reasons, we would like not to acquire a lock in
1159     * hasNext in the common case. To allow for this, we only access
1160     * fields (i.e. nextItem) that are not modified by update operations
1161     * triggered by queue modifications.
1162     */
1163 dl 1.5 public boolean hasNext() {
1164 jsr166 1.95 // assert lock.getHoldCount() == 0;
1165 jsr166 1.89 if (nextItem != null)
1166     return true;
1167     noNext();
1168     return false;
1169     }
1170    
1171     private void noNext() {
1172     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1173     lock.lock();
1174     try {
1175 jsr166 1.95 // assert cursor == NONE;
1176     // assert nextIndex == NONE;
1177 jsr166 1.89 if (!isDetached()) {
1178 jsr166 1.95 // assert lastRet >= 0;
1179 jsr166 1.89 incorporateDequeues(); // might update lastRet
1180     if (lastRet >= 0) {
1181     lastItem = itemAt(lastRet);
1182 jsr166 1.95 // assert lastItem != null;
1183 jsr166 1.89 detach();
1184     }
1185     }
1186 jsr166 1.95 // assert isDetached();
1187     // assert lastRet < 0 ^ lastItem != null;
1188 jsr166 1.89 } finally {
1189     lock.unlock();
1190     }
1191 dl 1.5 }
1192 tim 1.12
1193 dl 1.5 public E next() {
1194 jsr166 1.95 // assert lock.getHoldCount() == 0;
1195 jsr166 1.89 final E x = nextItem;
1196     if (x == null)
1197     throw new NoSuchElementException();
1198 dl 1.66 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1199     lock.lock();
1200     try {
1201 jsr166 1.92 if (!isDetached())
1202 jsr166 1.89 incorporateDequeues();
1203 jsr166 1.95 // assert nextIndex != NONE;
1204     // assert lastItem == null;
1205 dl 1.66 lastRet = nextIndex;
1206 jsr166 1.89 final int cursor = this.cursor;
1207     if (cursor >= 0) {
1208     nextItem = itemAt(nextIndex = cursor);
1209 jsr166 1.95 // assert nextItem != null;
1210 jsr166 1.89 this.cursor = incCursor(cursor);
1211     } else {
1212 jsr166 1.91 nextIndex = NONE;
1213 jsr166 1.89 nextItem = null;
1214     }
1215 dl 1.66 } finally {
1216     lock.unlock();
1217 dl 1.2 }
1218 jsr166 1.89 return x;
1219 dl 1.5 }
1220 tim 1.12
1221 dl 1.5 public void remove() {
1222 jsr166 1.95 // assert lock.getHoldCount() == 0;
1223 dl 1.36 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1224 dl 1.5 lock.lock();
1225     try {
1226 jsr166 1.92 if (!isDetached())
1227     incorporateDequeues(); // might update lastRet or detach
1228     final int lastRet = this.lastRet;
1229     this.lastRet = NONE;
1230 jsr166 1.89 if (lastRet >= 0) {
1231 jsr166 1.92 if (!isDetached())
1232     removeAt(lastRet);
1233     else {
1234     final E lastItem = this.lastItem;
1235 jsr166 1.95 // assert lastItem != null;
1236 jsr166 1.92 this.lastItem = null;
1237 jsr166 1.89 if (itemAt(lastRet) == lastItem)
1238     removeAt(lastRet);
1239     }
1240 jsr166 1.91 } else if (lastRet == NONE)
1241 dl 1.66 throw new IllegalStateException();
1242 jsr166 1.91 // else lastRet == REMOVED and the last returned element was
1243 jsr166 1.89 // previously asynchronously removed via an operation other
1244     // than this.remove(), so nothing to do.
1245    
1246     if (cursor < 0 && nextIndex < 0)
1247     detach();
1248     } finally {
1249     lock.unlock();
1250 jsr166 1.95 // assert lastRet == NONE;
1251     // assert lastItem == null;
1252 jsr166 1.89 }
1253     }
1254    
1255     /**
1256     * Called to notify the iterator that the queue is empty, or that it
1257     * has fallen hopelessly behind, so that it should abandon any
1258     * further iteration, except possibly to return one more element
1259     * from next(), as promised by returning true from hasNext().
1260     */
1261     void shutdown() {
1262 jsr166 1.95 // assert lock.getHoldCount() == 1;
1263 jsr166 1.91 cursor = NONE;
1264 jsr166 1.89 if (nextIndex >= 0)
1265 jsr166 1.91 nextIndex = REMOVED;
1266 jsr166 1.89 if (lastRet >= 0) {
1267 jsr166 1.91 lastRet = REMOVED;
1268 jsr166 1.61 lastItem = null;
1269 jsr166 1.89 }
1270 jsr166 1.91 prevTakeIndex = DETACHED;
1271 jsr166 1.89 // Don't set nextItem to null because we must continue to be
1272     // able to return it on next().
1273     //
1274     // Caller will unlink from itrs when convenient.
1275     }
1276    
1277     private int distance(int index, int prevTakeIndex, int length) {
1278     int distance = index - prevTakeIndex;
1279     if (distance < 0)
1280     distance += length;
1281     return distance;
1282     }
1283    
1284     /**
1285 jsr166 1.90 * Called whenever an interior remove (not at takeIndex) occured.
1286 jsr166 1.89 *
1287 jsr166 1.90 * @return true if this iterator should be unlinked from itrs
1288 jsr166 1.89 */
1289 jsr166 1.90 boolean removedAt(int removedIndex) {
1290 jsr166 1.95 // assert lock.getHoldCount() == 1;
1291 jsr166 1.89 if (isDetached())
1292     return true;
1293    
1294     final int cycles = itrs.cycles;
1295     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1296     final int prevCycles = this.prevCycles;
1297     final int prevTakeIndex = this.prevTakeIndex;
1298     final int len = items.length;
1299     int cycleDiff = cycles - prevCycles;
1300 jsr166 1.90 if (removedIndex < takeIndex)
1301 jsr166 1.89 cycleDiff++;
1302     final int removedDistance =
1303 jsr166 1.90 (cycleDiff * len) + (removedIndex - prevTakeIndex);
1304 jsr166 1.95 // assert removedDistance >= 0;
1305 jsr166 1.89 int cursor = this.cursor;
1306     if (cursor >= 0) {
1307     int x = distance(cursor, prevTakeIndex, len);
1308     if (x == removedDistance) {
1309 jsr166 1.90 if (cursor == putIndex)
1310 jsr166 1.91 this.cursor = cursor = NONE;
1311 jsr166 1.89 }
1312     else if (x > removedDistance) {
1313 jsr166 1.95 // assert cursor != prevTakeIndex;
1314 jsr166 1.89 this.cursor = cursor = dec(cursor);
1315 jsr166 1.71 }
1316 dl 1.5 }
1317 jsr166 1.89 int lastRet = this.lastRet;
1318     if (lastRet >= 0) {
1319     int x = distance(lastRet, prevTakeIndex, len);
1320     if (x == removedDistance)
1321 jsr166 1.91 this.lastRet = lastRet = REMOVED;
1322 jsr166 1.89 else if (x > removedDistance)
1323     this.lastRet = lastRet = dec(lastRet);
1324     }
1325     int nextIndex = this.nextIndex;
1326     if (nextIndex >= 0) {
1327     int x = distance(nextIndex, prevTakeIndex, len);
1328     if (x == removedDistance)
1329 jsr166 1.91 this.nextIndex = nextIndex = REMOVED;
1330 jsr166 1.89 else if (x > removedDistance)
1331     this.nextIndex = nextIndex = dec(nextIndex);
1332     }
1333     else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1334 jsr166 1.91 this.prevTakeIndex = DETACHED;
1335 jsr166 1.89 return true;
1336     }
1337     return false;
1338     }
1339    
1340     /**
1341     * Called whenever takeIndex wraps around to zero.
1342     *
1343 jsr166 1.90 * @return true if this iterator should be unlinked from itrs
1344 jsr166 1.89 */
1345     boolean takeIndexWrapped() {
1346 jsr166 1.95 // assert lock.getHoldCount() == 1;
1347 jsr166 1.89 if (isDetached())
1348     return true;
1349     if (itrs.cycles - prevCycles > 1) {
1350     // All the elements that existed at the time of the last
1351     // operation are gone, so abandon further iteration.
1352     shutdown();
1353     return true;
1354     }
1355     return false;
1356 dl 1.5 }
1357 jsr166 1.89
1358     // /** Uncomment for debugging. */
1359     // public String toString() {
1360     // return ("cursor=" + cursor + " " +
1361     // "nextIndex=" + nextIndex + " " +
1362     // "lastRet=" + lastRet + " " +
1363     // "nextItem=" + nextItem + " " +
1364     // "lastItem=" + lastItem + " " +
1365     // "prevCycles=" + prevCycles + " " +
1366     // "prevTakeIndex=" + prevTakeIndex + " " +
1367     // "size()=" + size() + " " +
1368     // "remainingCapacity()=" + remainingCapacity());
1369     // }
1370 tim 1.1 }
1371 dl 1.100
1372 dl 1.103 public Spliterator<E> spliterator() {
1373 dl 1.102 return Spliterators.spliterator
1374 dl 1.100 (this, Spliterator.ORDERED | Spliterator.NONNULL |
1375     Spliterator.CONCURRENT);
1376     }
1377    
1378 tim 1.1 }