ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.122
Committed: Wed Mar 4 00:22:30 2015 UTC (9 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.121: +1 -21 lines
Log Message:
optimize toString() methods; introduce Helpers

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.38 * Expert Group and released to the public domain, as explained at
4 jsr166 1.78 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.110
9     import java.lang.ref.WeakReference;
10 jsr166 1.120 import java.util.Arrays;
11 jsr166 1.85 import java.util.AbstractQueue;
12     import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15 jsr166 1.119 import java.util.Objects;
16 jsr166 1.110 import java.util.Spliterator;
17 dl 1.102 import java.util.Spliterators;
18 jsr166 1.110 import java.util.concurrent.locks.Condition;
19     import java.util.concurrent.locks.ReentrantLock;
20 tim 1.1
21     /**
22 dl 1.25 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
23     * array. This queue orders elements FIFO (first-in-first-out). The
24     * <em>head</em> of the queue is that element that has been on the
25     * queue the longest time. The <em>tail</em> of the queue is that
26     * element that has been on the queue the shortest time. New elements
27     * are inserted at the tail of the queue, and the queue retrieval
28     * operations obtain elements at the head of the queue.
29 dholmes 1.13 *
30 dl 1.40 * <p>This is a classic &quot;bounded buffer&quot;, in which a
31     * fixed-sized array holds elements inserted by producers and
32     * extracted by consumers. Once created, the capacity cannot be
33 jsr166 1.69 * changed. Attempts to {@code put} an element into a full queue
34     * will result in the operation blocking; attempts to {@code take} an
35 dl 1.40 * element from an empty queue will similarly block.
36 dl 1.11 *
37 jsr166 1.72 * <p>This class supports an optional fairness policy for ordering
38 dl 1.42 * waiting producer and consumer threads. By default, this ordering
39     * is not guaranteed. However, a queue constructed with fairness set
40 jsr166 1.69 * to {@code true} grants threads access in FIFO order. Fairness
41 dl 1.42 * generally decreases throughput but reduces variability and avoids
42     * starvation.
43 brian 1.7 *
44 dl 1.43 * <p>This class and its iterator implement all of the
45     * <em>optional</em> methods of the {@link Collection} and {@link
46 jsr166 1.47 * Iterator} interfaces.
47 dl 1.26 *
48 dl 1.41 * <p>This class is a member of the
49 jsr166 1.54 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
50 dl 1.41 * Java Collections Framework</a>.
51     *
52 dl 1.8 * @since 1.5
53     * @author Doug Lea
54 jsr166 1.109 * @param <E> the type of elements held in this queue
55 dl 1.8 */
56 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
57 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
58    
59 dl 1.36 /**
60 dl 1.42 * Serialization ID. This class relies on default serialization
61     * even for the items array, which is default-serialized, even if
62     * it is empty. Otherwise it could not be declared final, which is
63 dl 1.36 * necessary here.
64     */
65     private static final long serialVersionUID = -817911632652898426L;
66    
67 jsr166 1.73 /** The queued items */
68 jsr166 1.68 final Object[] items;
69 jsr166 1.73
70     /** items index for next take, poll, peek or remove */
71 jsr166 1.58 int takeIndex;
72 jsr166 1.73
73     /** items index for next put, offer, or add */
74 jsr166 1.58 int putIndex;
75 jsr166 1.73
76     /** Number of elements in the queue */
77 jsr166 1.59 int count;
78 tim 1.12
79 dl 1.5 /*
80 dl 1.36 * Concurrency control uses the classic two-condition algorithm
81 dl 1.5 * found in any textbook.
82     */
83    
84 dl 1.11 /** Main lock guarding all access */
85 jsr166 1.58 final ReentrantLock lock;
86 jsr166 1.85
87 dholmes 1.13 /** Condition for waiting takes */
88 dl 1.37 private final Condition notEmpty;
89 jsr166 1.85
90 dl 1.35 /** Condition for waiting puts */
91 dl 1.37 private final Condition notFull;
92 dl 1.5
93 jsr166 1.89 /**
94     * Shared state for currently active iterators, or null if there
95     * are known not to be any. Allows queue operations to update
96     * iterator state.
97     */
98 jsr166 1.121 transient Itrs itrs;
99 jsr166 1.89
100 dl 1.5 // Internal helper methods
101    
102     /**
103 jsr166 1.113 * Circularly decrements array index i.
104 jsr166 1.71 */
105     final int dec(int i) {
106     return ((i == 0) ? items.length : i) - 1;
107     }
108    
109 jsr166 1.68 /**
110     * Returns item at index i.
111     */
112 jsr166 1.96 @SuppressWarnings("unchecked")
113 jsr166 1.68 final E itemAt(int i) {
114 jsr166 1.96 return (E) items[i];
115 jsr166 1.68 }
116    
117     /**
118 jsr166 1.47 * Inserts element at current put position, advances, and signals.
119 dl 1.9 * Call only when holding lock.
120 dl 1.5 */
121 jsr166 1.88 private void enqueue(E x) {
122 jsr166 1.95 // assert lock.getHoldCount() == 1;
123     // assert items[putIndex] == null;
124 dl 1.100 final Object[] items = this.items;
125 dl 1.5 items[putIndex] = x;
126 jsr166 1.116 if (++putIndex == items.length) putIndex = 0;
127 jsr166 1.89 count++;
128 dl 1.9 notEmpty.signal();
129 tim 1.1 }
130 tim 1.12
131 dl 1.5 /**
132 jsr166 1.47 * Extracts element at current take position, advances, and signals.
133 dl 1.9 * Call only when holding lock.
134 dl 1.5 */
135 jsr166 1.88 private E dequeue() {
136 jsr166 1.95 // assert lock.getHoldCount() == 1;
137     // assert items[takeIndex] != null;
138 jsr166 1.68 final Object[] items = this.items;
139 jsr166 1.97 @SuppressWarnings("unchecked")
140     E x = (E) items[takeIndex];
141 dl 1.5 items[takeIndex] = null;
142 jsr166 1.116 if (++takeIndex == items.length) takeIndex = 0;
143 jsr166 1.89 count--;
144     if (itrs != null)
145     itrs.elementDequeued();
146 dl 1.9 notFull.signal();
147 dl 1.5 return x;
148     }
149    
150     /**
151 jsr166 1.90 * Deletes item at array index removeIndex.
152 jsr166 1.89 * Utility for remove(Object) and iterator.remove.
153 dl 1.9 * Call only when holding lock.
154 dl 1.5 */
155 jsr166 1.90 void removeAt(final int removeIndex) {
156 jsr166 1.95 // assert lock.getHoldCount() == 1;
157     // assert items[removeIndex] != null;
158     // assert removeIndex >= 0 && removeIndex < items.length;
159 jsr166 1.68 final Object[] items = this.items;
160 jsr166 1.90 if (removeIndex == takeIndex) {
161 jsr166 1.89 // removing front item; just advance
162 dl 1.9 items[takeIndex] = null;
163 jsr166 1.116 if (++takeIndex == items.length) takeIndex = 0;
164 jsr166 1.90 count--;
165 jsr166 1.89 if (itrs != null)
166     itrs.elementDequeued();
167 tim 1.23 } else {
168 jsr166 1.89 // an "interior" remove
169 jsr166 1.90
170 dl 1.9 // slide over all others up through putIndex.
171 jsr166 1.115 for (int i = removeIndex, putIndex = this.putIndex;;) {
172     int pred = i;
173     if (++i == items.length) i = 0;
174     if (i == putIndex) {
175     items[pred] = null;
176     this.putIndex = pred;
177 dl 1.9 break;
178     }
179 jsr166 1.115 items[pred] = items[i];
180 dl 1.5 }
181 jsr166 1.90 count--;
182     if (itrs != null)
183     itrs.removedAt(removeIndex);
184 dl 1.5 }
185 dl 1.9 notFull.signal();
186 tim 1.1 }
187    
188     /**
189 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
190 dholmes 1.13 * capacity and default access policy.
191 jsr166 1.50 *
192 dholmes 1.13 * @param capacity the capacity of this queue
193 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity < 1}
194 dl 1.5 */
195 dholmes 1.13 public ArrayBlockingQueue(int capacity) {
196 dl 1.36 this(capacity, false);
197 dl 1.5 }
198 dl 1.2
199 dl 1.5 /**
200 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
201 dholmes 1.13 * capacity and the specified access policy.
202 jsr166 1.50 *
203 dholmes 1.13 * @param capacity the capacity of this queue
204 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
205 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
206 jsr166 1.69 * if {@code false} the access order is unspecified.
207     * @throws IllegalArgumentException if {@code capacity < 1}
208 dl 1.11 */
209 dholmes 1.13 public ArrayBlockingQueue(int capacity, boolean fair) {
210 dl 1.36 if (capacity <= 0)
211     throw new IllegalArgumentException();
212 jsr166 1.68 this.items = new Object[capacity];
213 dl 1.36 lock = new ReentrantLock(fair);
214     notEmpty = lock.newCondition();
215     notFull = lock.newCondition();
216 dl 1.5 }
217    
218 dholmes 1.16 /**
219 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
220 dholmes 1.21 * capacity, the specified access policy and initially containing the
221 tim 1.17 * elements of the given collection,
222 dholmes 1.16 * added in traversal order of the collection's iterator.
223 jsr166 1.50 *
224 dholmes 1.16 * @param capacity the capacity of this queue
225 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
226 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
227 jsr166 1.69 * if {@code false} the access order is unspecified.
228 dholmes 1.16 * @param c the collection of elements to initially contain
229 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity} is less than
230     * {@code c.size()}, or less than 1.
231 jsr166 1.50 * @throws NullPointerException if the specified collection or any
232     * of its elements are null
233 dholmes 1.16 */
234 tim 1.20 public ArrayBlockingQueue(int capacity, boolean fair,
235 dholmes 1.18 Collection<? extends E> c) {
236 dl 1.36 this(capacity, fair);
237 dholmes 1.16
238 jsr166 1.68 final ReentrantLock lock = this.lock;
239     lock.lock(); // Lock only for visibility, not mutual exclusion
240     try {
241     int i = 0;
242     try {
243 jsr166 1.119 for (E e : c)
244     items[i++] = Objects.requireNonNull(e);
245 jsr166 1.68 } catch (ArrayIndexOutOfBoundsException ex) {
246     throw new IllegalArgumentException();
247     }
248     count = i;
249     putIndex = (i == capacity) ? 0 : i;
250     } finally {
251     lock.unlock();
252     }
253 dholmes 1.16 }
254 dl 1.2
255 dholmes 1.13 /**
256 jsr166 1.50 * Inserts the specified element at the tail of this queue if it is
257     * possible to do so immediately without exceeding the queue's capacity,
258 jsr166 1.69 * returning {@code true} upon success and throwing an
259     * {@code IllegalStateException} if this queue is full.
260 dl 1.25 *
261 jsr166 1.50 * @param e the element to add
262 jsr166 1.69 * @return {@code true} (as specified by {@link Collection#add})
263 jsr166 1.50 * @throws IllegalStateException if this queue is full
264     * @throws NullPointerException if the specified element is null
265     */
266     public boolean add(E e) {
267 jsr166 1.56 return super.add(e);
268 jsr166 1.50 }
269    
270     /**
271     * Inserts the specified element at the tail of this queue if it is
272     * possible to do so immediately without exceeding the queue's capacity,
273 jsr166 1.69 * returning {@code true} upon success and {@code false} if this queue
274 jsr166 1.50 * is full. This method is generally preferable to method {@link #add},
275     * which can fail to insert an element only by throwing an exception.
276     *
277     * @throws NullPointerException if the specified element is null
278 dholmes 1.13 */
279 jsr166 1.49 public boolean offer(E e) {
280 jsr166 1.119 Objects.requireNonNull(e);
281 dl 1.36 final ReentrantLock lock = this.lock;
282 dl 1.5 lock.lock();
283     try {
284 jsr166 1.59 if (count == items.length)
285 dl 1.2 return false;
286 dl 1.5 else {
287 jsr166 1.88 enqueue(e);
288 dl 1.5 return true;
289     }
290 tim 1.23 } finally {
291 tim 1.12 lock.unlock();
292 dl 1.2 }
293 dl 1.5 }
294 dl 1.2
295 dholmes 1.13 /**
296 jsr166 1.50 * Inserts the specified element at the tail of this queue, waiting
297     * for space to become available if the queue is full.
298     *
299     * @throws InterruptedException {@inheritDoc}
300     * @throws NullPointerException {@inheritDoc}
301     */
302     public void put(E e) throws InterruptedException {
303 jsr166 1.119 Objects.requireNonNull(e);
304 jsr166 1.50 final ReentrantLock lock = this.lock;
305     lock.lockInterruptibly();
306     try {
307 jsr166 1.59 while (count == items.length)
308 jsr166 1.58 notFull.await();
309 jsr166 1.88 enqueue(e);
310 jsr166 1.50 } finally {
311     lock.unlock();
312     }
313     }
314    
315     /**
316     * Inserts the specified element at the tail of this queue, waiting
317     * up to the specified wait time for space to become available if
318     * the queue is full.
319     *
320     * @throws InterruptedException {@inheritDoc}
321     * @throws NullPointerException {@inheritDoc}
322 brian 1.7 */
323 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit)
324 dholmes 1.13 throws InterruptedException {
325 dl 1.2
326 jsr166 1.119 Objects.requireNonNull(e);
327 jsr166 1.56 long nanos = unit.toNanos(timeout);
328 dl 1.36 final ReentrantLock lock = this.lock;
329 dl 1.5 lock.lockInterruptibly();
330     try {
331 jsr166 1.59 while (count == items.length) {
332 dl 1.5 if (nanos <= 0)
333     return false;
334 jsr166 1.58 nanos = notFull.awaitNanos(nanos);
335 dl 1.5 }
336 jsr166 1.88 enqueue(e);
337 jsr166 1.58 return true;
338 tim 1.23 } finally {
339 dl 1.5 lock.unlock();
340 dl 1.2 }
341 dl 1.5 }
342 dl 1.2
343 dholmes 1.13 public E poll() {
344 dl 1.36 final ReentrantLock lock = this.lock;
345 dholmes 1.13 lock.lock();
346     try {
347 jsr166 1.88 return (count == 0) ? null : dequeue();
348 tim 1.23 } finally {
349 tim 1.15 lock.unlock();
350 dholmes 1.13 }
351     }
352    
353 jsr166 1.50 public E take() throws InterruptedException {
354     final ReentrantLock lock = this.lock;
355     lock.lockInterruptibly();
356     try {
357 jsr166 1.59 while (count == 0)
358 jsr166 1.58 notEmpty.await();
359 jsr166 1.88 return dequeue();
360 jsr166 1.50 } finally {
361     lock.unlock();
362     }
363     }
364    
365 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
366 jsr166 1.56 long nanos = unit.toNanos(timeout);
367 dl 1.36 final ReentrantLock lock = this.lock;
368 dl 1.5 lock.lockInterruptibly();
369     try {
370 jsr166 1.59 while (count == 0) {
371 dl 1.5 if (nanos <= 0)
372     return null;
373 jsr166 1.58 nanos = notEmpty.awaitNanos(nanos);
374 dl 1.2 }
375 jsr166 1.88 return dequeue();
376 tim 1.23 } finally {
377 dl 1.5 lock.unlock();
378     }
379     }
380 dl 1.2
381 dholmes 1.13 public E peek() {
382 dl 1.36 final ReentrantLock lock = this.lock;
383 dholmes 1.13 lock.lock();
384     try {
385 jsr166 1.99 return itemAt(takeIndex); // null when queue is empty
386 tim 1.23 } finally {
387 dholmes 1.13 lock.unlock();
388     }
389     }
390    
391     // this doc comment is overridden to remove the reference to collections
392     // greater in size than Integer.MAX_VALUE
393 tim 1.15 /**
394 dl 1.25 * Returns the number of elements in this queue.
395     *
396 jsr166 1.50 * @return the number of elements in this queue
397 dholmes 1.13 */
398     public int size() {
399 dl 1.36 final ReentrantLock lock = this.lock;
400 dholmes 1.13 lock.lock();
401     try {
402     return count;
403 tim 1.23 } finally {
404 dholmes 1.13 lock.unlock();
405     }
406     }
407    
408     // this doc comment is a modified copy of the inherited doc comment,
409     // without the reference to unlimited queues.
410 tim 1.15 /**
411 jsr166 1.48 * Returns the number of additional elements that this queue can ideally
412     * (in the absence of memory or resource constraints) accept without
413 dholmes 1.13 * blocking. This is always equal to the initial capacity of this queue
414 jsr166 1.69 * less the current {@code size} of this queue.
415 jsr166 1.48 *
416     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
417 jsr166 1.69 * an element will succeed by inspecting {@code remainingCapacity}
418 jsr166 1.48 * because it may be the case that another thread is about to
419 jsr166 1.50 * insert or remove an element.
420 dholmes 1.13 */
421     public int remainingCapacity() {
422 dl 1.36 final ReentrantLock lock = this.lock;
423 dholmes 1.13 lock.lock();
424     try {
425     return items.length - count;
426 tim 1.23 } finally {
427 dholmes 1.13 lock.unlock();
428     }
429     }
430    
431 jsr166 1.50 /**
432     * Removes a single instance of the specified element from this queue,
433 jsr166 1.69 * if it is present. More formally, removes an element {@code e} such
434     * that {@code o.equals(e)}, if this queue contains one or more such
435 jsr166 1.50 * elements.
436 jsr166 1.69 * Returns {@code true} if this queue contained the specified element
437 jsr166 1.50 * (or equivalently, if this queue changed as a result of the call).
438     *
439 jsr166 1.64 * <p>Removal of interior elements in circular array based queues
440 dl 1.60 * is an intrinsically slow and disruptive operation, so should
441     * be undertaken only in exceptional circumstances, ideally
442     * only when the queue is known not to be accessible by other
443     * threads.
444     *
445 jsr166 1.50 * @param o element to be removed from this queue, if present
446 jsr166 1.69 * @return {@code true} if this queue changed as a result of the call
447 jsr166 1.50 */
448     public boolean remove(Object o) {
449     if (o == null) return false;
450     final ReentrantLock lock = this.lock;
451     lock.lock();
452     try {
453 jsr166 1.86 if (count > 0) {
454 jsr166 1.114 final Object[] items = this.items;
455 jsr166 1.86 final int putIndex = this.putIndex;
456     int i = takeIndex;
457     do {
458     if (o.equals(items[i])) {
459     removeAt(i);
460     return true;
461     }
462 jsr166 1.116 if (++i == items.length) i = 0;
463 dl 1.100 } while (i != putIndex);
464 jsr166 1.50 }
465 jsr166 1.68 return false;
466 jsr166 1.50 } finally {
467     lock.unlock();
468     }
469     }
470 dholmes 1.13
471 jsr166 1.50 /**
472 jsr166 1.69 * Returns {@code true} if this queue contains the specified element.
473     * More formally, returns {@code true} if and only if this queue contains
474     * at least one element {@code e} such that {@code o.equals(e)}.
475 jsr166 1.50 *
476     * @param o object to be checked for containment in this queue
477 jsr166 1.69 * @return {@code true} if this queue contains the specified element
478 jsr166 1.50 */
479 dholmes 1.21 public boolean contains(Object o) {
480     if (o == null) return false;
481 dl 1.36 final ReentrantLock lock = this.lock;
482 dl 1.5 lock.lock();
483     try {
484 jsr166 1.86 if (count > 0) {
485 jsr166 1.114 final Object[] items = this.items;
486 jsr166 1.86 final int putIndex = this.putIndex;
487     int i = takeIndex;
488     do {
489     if (o.equals(items[i]))
490     return true;
491 jsr166 1.116 if (++i == items.length) i = 0;
492 dl 1.100 } while (i != putIndex);
493 jsr166 1.86 }
494 dl 1.2 return false;
495 tim 1.23 } finally {
496 dl 1.5 lock.unlock();
497     }
498     }
499 brian 1.7
500 jsr166 1.50 /**
501     * Returns an array containing all of the elements in this queue, in
502     * proper sequence.
503     *
504     * <p>The returned array will be "safe" in that no references to it are
505     * maintained by this queue. (In other words, this method must allocate
506     * a new array). The caller is thus free to modify the returned array.
507 jsr166 1.51 *
508 jsr166 1.50 * <p>This method acts as bridge between array-based and collection-based
509     * APIs.
510     *
511     * @return an array containing all of the elements in this queue
512     */
513 dl 1.5 public Object[] toArray() {
514 dl 1.36 final ReentrantLock lock = this.lock;
515 dl 1.5 lock.lock();
516     try {
517 jsr166 1.120 final Object[] items = this.items;
518     final int end = takeIndex + count;
519     final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
520     if (end != putIndex)
521     System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
522     return a;
523 tim 1.23 } finally {
524 dl 1.5 lock.unlock();
525     }
526     }
527 brian 1.7
528 jsr166 1.50 /**
529     * Returns an array containing all of the elements in this queue, in
530     * proper sequence; the runtime type of the returned array is that of
531     * the specified array. If the queue fits in the specified array, it
532     * is returned therein. Otherwise, a new array is allocated with the
533     * runtime type of the specified array and the size of this queue.
534     *
535     * <p>If this queue fits in the specified array with room to spare
536     * (i.e., the array has more elements than this queue), the element in
537     * the array immediately following the end of the queue is set to
538 jsr166 1.69 * {@code null}.
539 jsr166 1.50 *
540     * <p>Like the {@link #toArray()} method, this method acts as bridge between
541     * array-based and collection-based APIs. Further, this method allows
542     * precise control over the runtime type of the output array, and may,
543     * under certain circumstances, be used to save allocation costs.
544     *
545 jsr166 1.69 * <p>Suppose {@code x} is a queue known to contain only strings.
546 jsr166 1.50 * The following code can be used to dump the queue into a newly
547 jsr166 1.69 * allocated array of {@code String}:
548 jsr166 1.50 *
549 jsr166 1.117 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
550 jsr166 1.50 *
551 jsr166 1.69 * Note that {@code toArray(new Object[0])} is identical in function to
552     * {@code toArray()}.
553 jsr166 1.50 *
554     * @param a the array into which the elements of the queue are to
555     * be stored, if it is big enough; otherwise, a new array of the
556     * same runtime type is allocated for this purpose
557     * @return an array containing all of the elements in this queue
558     * @throws ArrayStoreException if the runtime type of the specified array
559     * is not a supertype of the runtime type of every element in
560     * this queue
561     * @throws NullPointerException if the specified array is null
562     */
563 jsr166 1.68 @SuppressWarnings("unchecked")
564 dl 1.5 public <T> T[] toArray(T[] a) {
565 dl 1.36 final ReentrantLock lock = this.lock;
566 dl 1.5 lock.lock();
567     try {
568 jsr166 1.120 final Object[] items = this.items;
569 jsr166 1.68 final int count = this.count;
570 jsr166 1.120 final int firstLeg = Math.min(items.length - takeIndex, count);
571     if (a.length < count) {
572     a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
573     a.getClass());
574     } else {
575     System.arraycopy(items, takeIndex, a, 0, firstLeg);
576     if (a.length > count)
577     a[count] = null;
578     }
579     if (firstLeg < count)
580     System.arraycopy(items, 0, a, firstLeg, putIndex);
581     return a;
582 tim 1.23 } finally {
583 dl 1.5 lock.unlock();
584     }
585     }
586 dl 1.6
587     public String toString() {
588 jsr166 1.122 return Helpers.collectionToString(this);
589 dl 1.6 }
590 tim 1.12
591 dl 1.44 /**
592     * Atomically removes all of the elements from this queue.
593     * The queue will be empty after this call returns.
594     */
595 dl 1.30 public void clear() {
596 jsr166 1.68 final Object[] items = this.items;
597 dl 1.36 final ReentrantLock lock = this.lock;
598 dl 1.30 lock.lock();
599     try {
600 jsr166 1.86 int k = count;
601     if (k > 0) {
602     final int putIndex = this.putIndex;
603     int i = takeIndex;
604     do {
605     items[i] = null;
606 jsr166 1.116 if (++i == items.length) i = 0;
607 dl 1.100 } while (i != putIndex);
608 jsr166 1.86 takeIndex = putIndex;
609     count = 0;
610 jsr166 1.89 if (itrs != null)
611     itrs.queueIsEmpty();
612 jsr166 1.86 for (; k > 0 && lock.hasWaiters(notFull); k--)
613     notFull.signal();
614     }
615 dl 1.30 } finally {
616     lock.unlock();
617     }
618     }
619    
620 jsr166 1.50 /**
621     * @throws UnsupportedOperationException {@inheritDoc}
622     * @throws ClassCastException {@inheritDoc}
623     * @throws NullPointerException {@inheritDoc}
624     * @throws IllegalArgumentException {@inheritDoc}
625     */
626 dl 1.30 public int drainTo(Collection<? super E> c) {
627 jsr166 1.81 return drainTo(c, Integer.MAX_VALUE);
628 dl 1.30 }
629    
630 jsr166 1.50 /**
631     * @throws UnsupportedOperationException {@inheritDoc}
632     * @throws ClassCastException {@inheritDoc}
633     * @throws NullPointerException {@inheritDoc}
634     * @throws IllegalArgumentException {@inheritDoc}
635     */
636 dl 1.30 public int drainTo(Collection<? super E> c, int maxElements) {
637 jsr166 1.119 Objects.requireNonNull(c);
638 dl 1.30 if (c == this)
639     throw new IllegalArgumentException();
640     if (maxElements <= 0)
641     return 0;
642 jsr166 1.68 final Object[] items = this.items;
643 dl 1.36 final ReentrantLock lock = this.lock;
644 dl 1.30 lock.lock();
645     try {
646 jsr166 1.82 int n = Math.min(maxElements, count);
647     int take = takeIndex;
648     int i = 0;
649     try {
650     while (i < n) {
651 jsr166 1.97 @SuppressWarnings("unchecked")
652     E x = (E) items[take];
653 jsr166 1.84 c.add(x);
654 jsr166 1.82 items[take] = null;
655 jsr166 1.116 if (++take == items.length) take = 0;
656 jsr166 1.86 i++;
657 jsr166 1.82 }
658     return n;
659     } finally {
660     // Restore invariants even if c.add() threw
661 jsr166 1.89 if (i > 0) {
662     count -= i;
663     takeIndex = take;
664     if (itrs != null) {
665     if (count == 0)
666     itrs.queueIsEmpty();
667     else if (i > take)
668     itrs.takeIndexWrapped();
669     }
670     for (; i > 0 && lock.hasWaiters(notFull); i--)
671     notFull.signal();
672     }
673 dl 1.30 }
674     } finally {
675     lock.unlock();
676     }
677     }
678    
679 brian 1.7 /**
680 dl 1.75 * Returns an iterator over the elements in this queue in proper sequence.
681 jsr166 1.77 * The elements will be returned in order from first (head) to last (tail).
682     *
683 jsr166 1.106 * <p>The returned iterator is
684     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
685 brian 1.7 *
686 jsr166 1.50 * @return an iterator over the elements in this queue in proper sequence
687 brian 1.7 */
688 dl 1.5 public Iterator<E> iterator() {
689 jsr166 1.68 return new Itr();
690 dl 1.5 }
691 dl 1.8
692     /**
693 jsr166 1.94 * Shared data between iterators and their queue, allowing queue
694 jsr166 1.89 * modifications to update iterators when elements are removed.
695     *
696 jsr166 1.94 * This adds a lot of complexity for the sake of correctly
697     * handling some uncommon operations, but the combination of
698     * circular-arrays and supporting interior removes (i.e., those
699     * not at head) would cause iterators to sometimes lose their
700     * places and/or (re)report elements they shouldn't. To avoid
701     * this, when a queue has one or more iterators, it keeps iterator
702     * state consistent by:
703     *
704     * (1) keeping track of the number of "cycles", that is, the
705     * number of times takeIndex has wrapped around to 0.
706     * (2) notifying all iterators via the callback removedAt whenever
707     * an interior element is removed (and thus other elements may
708     * be shifted).
709     *
710     * These suffice to eliminate iterator inconsistencies, but
711     * unfortunately add the secondary responsibility of maintaining
712     * the list of iterators. We track all active iterators in a
713     * simple linked list (accessed only when the queue's lock is
714     * held) of weak references to Itr. The list is cleaned up using
715     * 3 different mechanisms:
716 jsr166 1.89 *
717     * (1) Whenever a new iterator is created, do some O(1) checking for
718     * stale list elements.
719     *
720     * (2) Whenever takeIndex wraps around to 0, check for iterators
721     * that have been unused for more than one wrap-around cycle.
722     *
723     * (3) Whenever the queue becomes empty, all iterators are notified
724     * and this entire data structure is discarded.
725     *
726 jsr166 1.94 * So in addition to the removedAt callback that is necessary for
727     * correctness, iterators have the shutdown and takeIndexWrapped
728     * callbacks that help remove stale iterators from the list.
729     *
730     * Whenever a list element is examined, it is expunged if either
731     * the GC has determined that the iterator is discarded, or if the
732     * iterator reports that it is "detached" (does not need any
733     * further state updates). Overhead is maximal when takeIndex
734     * never advances, iterators are discarded before they are
735     * exhausted, and all removals are interior removes, in which case
736     * all stale iterators are discovered by the GC. But even in this
737     * case we don't increase the amortized complexity.
738     *
739     * Care must be taken to keep list sweeping methods from
740     * reentrantly invoking another such method, causing subtle
741     * corruption bugs.
742 jsr166 1.89 */
743     class Itrs {
744    
745     /**
746     * Node in a linked list of weak iterator references.
747     */
748     private class Node extends WeakReference<Itr> {
749     Node next;
750    
751     Node(Itr iterator, Node next) {
752     super(iterator);
753     this.next = next;
754     }
755     }
756    
757     /** Incremented whenever takeIndex wraps around to 0 */
758 jsr166 1.108 int cycles;
759 jsr166 1.89
760     /** Linked list of weak iterator references */
761 jsr166 1.93 private Node head;
762 jsr166 1.89
763     /** Used to expunge stale iterators */
764 jsr166 1.108 private Node sweeper;
765 jsr166 1.89
766     private static final int SHORT_SWEEP_PROBES = 4;
767     private static final int LONG_SWEEP_PROBES = 16;
768    
769 jsr166 1.93 Itrs(Itr initial) {
770     register(initial);
771     }
772    
773 jsr166 1.89 /**
774     * Sweeps itrs, looking for and expunging stale iterators.
775     * If at least one was found, tries harder to find more.
776 jsr166 1.94 * Called only from iterating thread.
777 jsr166 1.89 *
778     * @param tryHarder whether to start in try-harder mode, because
779     * there is known to be at least one iterator to collect
780     */
781     void doSomeSweeping(boolean tryHarder) {
782 jsr166 1.95 // assert lock.getHoldCount() == 1;
783     // assert head != null;
784 jsr166 1.89 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
785     Node o, p;
786     final Node sweeper = this.sweeper;
787 jsr166 1.93 boolean passedGo; // to limit search to one full sweep
788 jsr166 1.89
789 jsr166 1.93 if (sweeper == null) {
790     o = null;
791     p = head;
792     passedGo = true;
793     } else {
794 jsr166 1.89 o = sweeper;
795     p = o.next;
796 jsr166 1.93 passedGo = false;
797 jsr166 1.89 }
798    
799 jsr166 1.93 for (; probes > 0; probes--) {
800     if (p == null) {
801     if (passedGo)
802     break;
803     o = null;
804     p = head;
805     passedGo = true;
806     }
807 jsr166 1.89 final Itr it = p.get();
808     final Node next = p.next;
809     if (it == null || it.isDetached()) {
810     // found a discarded/exhausted iterator
811     probes = LONG_SWEEP_PROBES; // "try harder"
812     // unlink p
813     p.clear();
814     p.next = null;
815 jsr166 1.93 if (o == null) {
816 jsr166 1.89 head = next;
817 jsr166 1.93 if (next == null) {
818     // We've run out of iterators to track; retire
819     itrs = null;
820     return;
821     }
822     }
823 jsr166 1.89 else
824     o.next = next;
825     } else {
826     o = p;
827     }
828     p = next;
829     }
830    
831     this.sweeper = (p == null) ? null : o;
832     }
833    
834     /**
835     * Adds a new iterator to the linked list of tracked iterators.
836     */
837     void register(Itr itr) {
838 jsr166 1.95 // assert lock.getHoldCount() == 1;
839 jsr166 1.89 head = new Node(itr, head);
840     }
841    
842     /**
843     * Called whenever takeIndex wraps around to 0.
844     *
845     * Notifies all iterators, and expunges any that are now stale.
846     */
847     void takeIndexWrapped() {
848 jsr166 1.95 // assert lock.getHoldCount() == 1;
849 jsr166 1.89 cycles++;
850     for (Node o = null, p = head; p != null;) {
851     final Itr it = p.get();
852     final Node next = p.next;
853     if (it == null || it.takeIndexWrapped()) {
854     // unlink p
855 jsr166 1.95 // assert it == null || it.isDetached();
856 jsr166 1.89 p.clear();
857     p.next = null;
858     if (o == null)
859     head = next;
860     else
861     o.next = next;
862     } else {
863     o = p;
864     }
865     p = next;
866     }
867     if (head == null) // no more iterators to track
868     itrs = null;
869     }
870    
871     /**
872 jsr166 1.107 * Called whenever an interior remove (not at takeIndex) occurred.
873 jsr166 1.93 *
874     * Notifies all iterators, and expunges any that are now stale.
875 jsr166 1.89 */
876 jsr166 1.90 void removedAt(int removedIndex) {
877 jsr166 1.89 for (Node o = null, p = head; p != null;) {
878     final Itr it = p.get();
879     final Node next = p.next;
880 jsr166 1.90 if (it == null || it.removedAt(removedIndex)) {
881 jsr166 1.89 // unlink p
882 jsr166 1.95 // assert it == null || it.isDetached();
883 jsr166 1.89 p.clear();
884     p.next = null;
885     if (o == null)
886     head = next;
887     else
888     o.next = next;
889     } else {
890     o = p;
891     }
892     p = next;
893     }
894     if (head == null) // no more iterators to track
895     itrs = null;
896     }
897    
898     /**
899     * Called whenever the queue becomes empty.
900     *
901     * Notifies all active iterators that the queue is empty,
902     * clears all weak refs, and unlinks the itrs datastructure.
903     */
904     void queueIsEmpty() {
905 jsr166 1.95 // assert lock.getHoldCount() == 1;
906 jsr166 1.89 for (Node p = head; p != null; p = p.next) {
907     Itr it = p.get();
908     if (it != null) {
909     p.clear();
910     it.shutdown();
911     }
912     }
913     head = null;
914     itrs = null;
915     }
916    
917     /**
918 jsr166 1.90 * Called whenever an element has been dequeued (at takeIndex).
919 jsr166 1.89 */
920     void elementDequeued() {
921 jsr166 1.95 // assert lock.getHoldCount() == 1;
922 jsr166 1.89 if (count == 0)
923     queueIsEmpty();
924     else if (takeIndex == 0)
925     takeIndexWrapped();
926     }
927     }
928    
929     /**
930     * Iterator for ArrayBlockingQueue.
931     *
932     * To maintain weak consistency with respect to puts and takes, we
933     * read ahead one slot, so as to not report hasNext true but then
934     * not have an element to return.
935     *
936     * We switch into "detached" mode (allowing prompt unlinking from
937     * itrs without help from the GC) when all indices are negative, or
938     * when hasNext returns false for the first time. This allows the
939     * iterator to track concurrent updates completely accurately,
940     * except for the corner case of the user calling Iterator.remove()
941     * after hasNext() returned false. Even in this case, we ensure
942     * that we don't remove the wrong element by keeping track of the
943     * expected element to remove, in lastItem. Yes, we may fail to
944     * remove lastItem from the queue if it moved due to an interleaved
945     * interior remove while in detached mode.
946 dl 1.8 */
947 dl 1.5 private class Itr implements Iterator<E> {
948 jsr166 1.91 /** Index to look for new nextItem; NONE at end */
949 jsr166 1.89 private int cursor;
950    
951     /** Element to be returned by next call to next(); null if none */
952     private E nextItem;
953    
954 jsr166 1.91 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
955 jsr166 1.89 private int nextIndex;
956    
957     /** Last element returned; null if none or not detached. */
958     private E lastItem;
959    
960 jsr166 1.91 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
961 jsr166 1.89 private int lastRet;
962    
963 jsr166 1.91 /** Previous value of takeIndex, or DETACHED when detached */
964 jsr166 1.89 private int prevTakeIndex;
965    
966     /** Previous value of iters.cycles */
967     private int prevCycles;
968 tim 1.12
969 jsr166 1.91 /** Special index value indicating "not available" or "undefined" */
970     private static final int NONE = -1;
971    
972     /**
973     * Special index value indicating "removed elsewhere", that is,
974     * removed by some operation other than a call to this.remove().
975     */
976     private static final int REMOVED = -2;
977    
978     /** Special value for prevTakeIndex indicating "detached mode" */
979     private static final int DETACHED = -3;
980    
981 dl 1.66 Itr() {
982 jsr166 1.95 // assert lock.getHoldCount() == 0;
983 jsr166 1.91 lastRet = NONE;
984 jsr166 1.68 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
985     lock.lock();
986     try {
987 jsr166 1.89 if (count == 0) {
988 jsr166 1.95 // assert itrs == null;
989 jsr166 1.91 cursor = NONE;
990     nextIndex = NONE;
991     prevTakeIndex = DETACHED;
992 jsr166 1.89 } else {
993     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
994     prevTakeIndex = takeIndex;
995 jsr166 1.68 nextItem = itemAt(nextIndex = takeIndex);
996 jsr166 1.89 cursor = incCursor(takeIndex);
997     if (itrs == null) {
998 jsr166 1.93 itrs = new Itrs(this);
999 jsr166 1.89 } else {
1000     itrs.register(this); // in this order
1001     itrs.doSomeSweeping(false);
1002     }
1003     prevCycles = itrs.cycles;
1004 jsr166 1.95 // assert takeIndex >= 0;
1005     // assert prevTakeIndex == takeIndex;
1006     // assert nextIndex >= 0;
1007     // assert nextItem != null;
1008 jsr166 1.89 }
1009 jsr166 1.68 } finally {
1010     lock.unlock();
1011     }
1012 dl 1.5 }
1013 tim 1.12
1014 jsr166 1.89 boolean isDetached() {
1015 jsr166 1.95 // assert lock.getHoldCount() == 1;
1016 jsr166 1.89 return prevTakeIndex < 0;
1017     }
1018    
1019     private int incCursor(int index) {
1020 jsr166 1.95 // assert lock.getHoldCount() == 1;
1021 jsr166 1.116 if (++index == items.length) index = 0;
1022     if (index == putIndex) index = NONE;
1023 jsr166 1.89 return index;
1024     }
1025    
1026     /**
1027     * Returns true if index is invalidated by the given number of
1028     * dequeues, starting from prevTakeIndex.
1029     */
1030     private boolean invalidated(int index, int prevTakeIndex,
1031     long dequeues, int length) {
1032     if (index < 0)
1033     return false;
1034     int distance = index - prevTakeIndex;
1035     if (distance < 0)
1036     distance += length;
1037     return dequeues > distance;
1038     }
1039    
1040     /**
1041     * Adjusts indices to incorporate all dequeues since the last
1042     * operation on this iterator. Call only from iterating thread.
1043     */
1044     private void incorporateDequeues() {
1045 jsr166 1.95 // assert lock.getHoldCount() == 1;
1046     // assert itrs != null;
1047     // assert !isDetached();
1048     // assert count > 0;
1049 jsr166 1.89
1050     final int cycles = itrs.cycles;
1051     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1052     final int prevCycles = this.prevCycles;
1053     final int prevTakeIndex = this.prevTakeIndex;
1054    
1055     if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1056     final int len = items.length;
1057     // how far takeIndex has advanced since the previous
1058     // operation of this iterator
1059     long dequeues = (cycles - prevCycles) * len
1060     + (takeIndex - prevTakeIndex);
1061    
1062     // Check indices for invalidation
1063     if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1064 jsr166 1.91 lastRet = REMOVED;
1065 jsr166 1.89 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1066 jsr166 1.91 nextIndex = REMOVED;
1067 jsr166 1.89 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1068     cursor = takeIndex;
1069    
1070     if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1071     detach();
1072     else {
1073     this.prevCycles = cycles;
1074     this.prevTakeIndex = takeIndex;
1075     }
1076     }
1077     }
1078    
1079     /**
1080     * Called when itrs should stop tracking this iterator, either
1081     * because there are no more indices to update (cursor < 0 &&
1082     * nextIndex < 0 && lastRet < 0) or as a special exception, when
1083     * lastRet >= 0, because hasNext() is about to return false for the
1084     * first time. Call only from iterating thread.
1085     */
1086     private void detach() {
1087     // Switch to detached mode
1088 jsr166 1.95 // assert lock.getHoldCount() == 1;
1089     // assert cursor == NONE;
1090     // assert nextIndex < 0;
1091     // assert lastRet < 0 || nextItem == null;
1092     // assert lastRet < 0 ^ lastItem != null;
1093 jsr166 1.89 if (prevTakeIndex >= 0) {
1094 jsr166 1.95 // assert itrs != null;
1095 jsr166 1.91 prevTakeIndex = DETACHED;
1096 jsr166 1.89 // try to unlink from itrs (but not too hard)
1097     itrs.doSomeSweeping(true);
1098     }
1099     }
1100    
1101     /**
1102     * For performance reasons, we would like not to acquire a lock in
1103     * hasNext in the common case. To allow for this, we only access
1104     * fields (i.e. nextItem) that are not modified by update operations
1105     * triggered by queue modifications.
1106     */
1107 dl 1.5 public boolean hasNext() {
1108 jsr166 1.95 // assert lock.getHoldCount() == 0;
1109 jsr166 1.89 if (nextItem != null)
1110     return true;
1111     noNext();
1112     return false;
1113     }
1114    
1115     private void noNext() {
1116     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1117     lock.lock();
1118     try {
1119 jsr166 1.95 // assert cursor == NONE;
1120     // assert nextIndex == NONE;
1121 jsr166 1.89 if (!isDetached()) {
1122 jsr166 1.95 // assert lastRet >= 0;
1123 jsr166 1.89 incorporateDequeues(); // might update lastRet
1124     if (lastRet >= 0) {
1125     lastItem = itemAt(lastRet);
1126 jsr166 1.95 // assert lastItem != null;
1127 jsr166 1.89 detach();
1128     }
1129     }
1130 jsr166 1.95 // assert isDetached();
1131     // assert lastRet < 0 ^ lastItem != null;
1132 jsr166 1.89 } finally {
1133     lock.unlock();
1134     }
1135 dl 1.5 }
1136 tim 1.12
1137 dl 1.5 public E next() {
1138 jsr166 1.95 // assert lock.getHoldCount() == 0;
1139 jsr166 1.89 final E x = nextItem;
1140     if (x == null)
1141     throw new NoSuchElementException();
1142 dl 1.66 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1143     lock.lock();
1144     try {
1145 jsr166 1.92 if (!isDetached())
1146 jsr166 1.89 incorporateDequeues();
1147 jsr166 1.95 // assert nextIndex != NONE;
1148     // assert lastItem == null;
1149 dl 1.66 lastRet = nextIndex;
1150 jsr166 1.89 final int cursor = this.cursor;
1151     if (cursor >= 0) {
1152     nextItem = itemAt(nextIndex = cursor);
1153 jsr166 1.95 // assert nextItem != null;
1154 jsr166 1.89 this.cursor = incCursor(cursor);
1155     } else {
1156 jsr166 1.91 nextIndex = NONE;
1157 jsr166 1.89 nextItem = null;
1158     }
1159 dl 1.66 } finally {
1160     lock.unlock();
1161 dl 1.2 }
1162 jsr166 1.89 return x;
1163 dl 1.5 }
1164 tim 1.12
1165 dl 1.5 public void remove() {
1166 jsr166 1.95 // assert lock.getHoldCount() == 0;
1167 dl 1.36 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1168 dl 1.5 lock.lock();
1169     try {
1170 jsr166 1.92 if (!isDetached())
1171     incorporateDequeues(); // might update lastRet or detach
1172     final int lastRet = this.lastRet;
1173     this.lastRet = NONE;
1174 jsr166 1.89 if (lastRet >= 0) {
1175 jsr166 1.92 if (!isDetached())
1176     removeAt(lastRet);
1177     else {
1178     final E lastItem = this.lastItem;
1179 jsr166 1.95 // assert lastItem != null;
1180 jsr166 1.92 this.lastItem = null;
1181 jsr166 1.89 if (itemAt(lastRet) == lastItem)
1182     removeAt(lastRet);
1183     }
1184 jsr166 1.91 } else if (lastRet == NONE)
1185 dl 1.66 throw new IllegalStateException();
1186 jsr166 1.91 // else lastRet == REMOVED and the last returned element was
1187 jsr166 1.89 // previously asynchronously removed via an operation other
1188     // than this.remove(), so nothing to do.
1189    
1190     if (cursor < 0 && nextIndex < 0)
1191     detach();
1192     } finally {
1193     lock.unlock();
1194 jsr166 1.95 // assert lastRet == NONE;
1195     // assert lastItem == null;
1196 jsr166 1.89 }
1197     }
1198    
1199     /**
1200     * Called to notify the iterator that the queue is empty, or that it
1201     * has fallen hopelessly behind, so that it should abandon any
1202     * further iteration, except possibly to return one more element
1203     * from next(), as promised by returning true from hasNext().
1204     */
1205     void shutdown() {
1206 jsr166 1.95 // assert lock.getHoldCount() == 1;
1207 jsr166 1.91 cursor = NONE;
1208 jsr166 1.89 if (nextIndex >= 0)
1209 jsr166 1.91 nextIndex = REMOVED;
1210 jsr166 1.89 if (lastRet >= 0) {
1211 jsr166 1.91 lastRet = REMOVED;
1212 jsr166 1.61 lastItem = null;
1213 jsr166 1.89 }
1214 jsr166 1.91 prevTakeIndex = DETACHED;
1215 jsr166 1.89 // Don't set nextItem to null because we must continue to be
1216     // able to return it on next().
1217     //
1218     // Caller will unlink from itrs when convenient.
1219     }
1220    
1221     private int distance(int index, int prevTakeIndex, int length) {
1222     int distance = index - prevTakeIndex;
1223     if (distance < 0)
1224     distance += length;
1225     return distance;
1226     }
1227    
1228     /**
1229 jsr166 1.107 * Called whenever an interior remove (not at takeIndex) occurred.
1230 jsr166 1.89 *
1231 jsr166 1.90 * @return true if this iterator should be unlinked from itrs
1232 jsr166 1.89 */
1233 jsr166 1.90 boolean removedAt(int removedIndex) {
1234 jsr166 1.95 // assert lock.getHoldCount() == 1;
1235 jsr166 1.89 if (isDetached())
1236     return true;
1237    
1238     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1239     final int prevTakeIndex = this.prevTakeIndex;
1240     final int len = items.length;
1241 jsr166 1.112 // distance from prevTakeIndex to removedIndex
1242 jsr166 1.89 final int removedDistance =
1243 jsr166 1.112 len * (itrs.cycles - this.prevCycles
1244     + ((removedIndex < takeIndex) ? 1 : 0))
1245     + (removedIndex - prevTakeIndex);
1246     // assert itrs.cycles - this.prevCycles >= 0;
1247     // assert itrs.cycles - this.prevCycles <= 1;
1248     // assert removedDistance > 0;
1249     // assert removedIndex != takeIndex;
1250 jsr166 1.89 int cursor = this.cursor;
1251     if (cursor >= 0) {
1252     int x = distance(cursor, prevTakeIndex, len);
1253     if (x == removedDistance) {
1254 jsr166 1.90 if (cursor == putIndex)
1255 jsr166 1.91 this.cursor = cursor = NONE;
1256 jsr166 1.89 }
1257     else if (x > removedDistance) {
1258 jsr166 1.95 // assert cursor != prevTakeIndex;
1259 jsr166 1.89 this.cursor = cursor = dec(cursor);
1260 jsr166 1.71 }
1261 dl 1.5 }
1262 jsr166 1.89 int lastRet = this.lastRet;
1263     if (lastRet >= 0) {
1264     int x = distance(lastRet, prevTakeIndex, len);
1265     if (x == removedDistance)
1266 jsr166 1.91 this.lastRet = lastRet = REMOVED;
1267 jsr166 1.89 else if (x > removedDistance)
1268     this.lastRet = lastRet = dec(lastRet);
1269     }
1270     int nextIndex = this.nextIndex;
1271     if (nextIndex >= 0) {
1272     int x = distance(nextIndex, prevTakeIndex, len);
1273     if (x == removedDistance)
1274 jsr166 1.91 this.nextIndex = nextIndex = REMOVED;
1275 jsr166 1.89 else if (x > removedDistance)
1276     this.nextIndex = nextIndex = dec(nextIndex);
1277     }
1278 jsr166 1.112 if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1279 jsr166 1.91 this.prevTakeIndex = DETACHED;
1280 jsr166 1.89 return true;
1281     }
1282     return false;
1283     }
1284    
1285     /**
1286     * Called whenever takeIndex wraps around to zero.
1287     *
1288 jsr166 1.90 * @return true if this iterator should be unlinked from itrs
1289 jsr166 1.89 */
1290     boolean takeIndexWrapped() {
1291 jsr166 1.95 // assert lock.getHoldCount() == 1;
1292 jsr166 1.89 if (isDetached())
1293     return true;
1294     if (itrs.cycles - prevCycles > 1) {
1295     // All the elements that existed at the time of the last
1296     // operation are gone, so abandon further iteration.
1297     shutdown();
1298     return true;
1299     }
1300     return false;
1301 dl 1.5 }
1302 jsr166 1.89
1303     // /** Uncomment for debugging. */
1304     // public String toString() {
1305     // return ("cursor=" + cursor + " " +
1306     // "nextIndex=" + nextIndex + " " +
1307     // "lastRet=" + lastRet + " " +
1308     // "nextItem=" + nextItem + " " +
1309     // "lastItem=" + lastItem + " " +
1310     // "prevCycles=" + prevCycles + " " +
1311     // "prevTakeIndex=" + prevTakeIndex + " " +
1312     // "size()=" + size() + " " +
1313     // "remainingCapacity()=" + remainingCapacity());
1314     // }
1315 tim 1.1 }
1316 dl 1.100
1317 jsr166 1.105 /**
1318     * Returns a {@link Spliterator} over the elements in this queue.
1319     *
1320 jsr166 1.106 * <p>The returned spliterator is
1321     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1322     *
1323 jsr166 1.105 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1324     * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1325     *
1326     * @implNote
1327     * The {@code Spliterator} implements {@code trySplit} to permit limited
1328     * parallelism.
1329     *
1330     * @return a {@code Spliterator} over the elements in this queue
1331     * @since 1.8
1332     */
1333 dl 1.103 public Spliterator<E> spliterator() {
1334 dl 1.102 return Spliterators.spliterator
1335 dl 1.100 (this, Spliterator.ORDERED | Spliterator.NONNULL |
1336     Spliterator.CONCURRENT);
1337     }
1338    
1339 tim 1.1 }