ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.129
Committed: Sun Nov 6 04:18:30 2016 UTC (7 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.128: +8 -7 lines
Log Message:
Optimize contains() using nested loop trick, as in ArrayDeque

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.38 * Expert Group and released to the public domain, as explained at
4 jsr166 1.78 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.110
9     import java.lang.ref.WeakReference;
10 jsr166 1.125 import java.util.AbstractQueue;
11 jsr166 1.120 import java.util.Arrays;
12 jsr166 1.85 import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15 jsr166 1.119 import java.util.Objects;
16 jsr166 1.110 import java.util.Spliterator;
17 dl 1.102 import java.util.Spliterators;
18 jsr166 1.110 import java.util.concurrent.locks.Condition;
19     import java.util.concurrent.locks.ReentrantLock;
20 jsr166 1.127 import java.util.function.Consumer;
21 tim 1.1
22     /**
23 dl 1.25 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
24     * array. This queue orders elements FIFO (first-in-first-out). The
25     * <em>head</em> of the queue is that element that has been on the
26     * queue the longest time. The <em>tail</em> of the queue is that
27     * element that has been on the queue the shortest time. New elements
28     * are inserted at the tail of the queue, and the queue retrieval
29     * operations obtain elements at the head of the queue.
30 dholmes 1.13 *
31 dl 1.40 * <p>This is a classic &quot;bounded buffer&quot;, in which a
32     * fixed-sized array holds elements inserted by producers and
33     * extracted by consumers. Once created, the capacity cannot be
34 jsr166 1.69 * changed. Attempts to {@code put} an element into a full queue
35     * will result in the operation blocking; attempts to {@code take} an
36 dl 1.40 * element from an empty queue will similarly block.
37 dl 1.11 *
38 jsr166 1.72 * <p>This class supports an optional fairness policy for ordering
39 dl 1.42 * waiting producer and consumer threads. By default, this ordering
40     * is not guaranteed. However, a queue constructed with fairness set
41 jsr166 1.69 * to {@code true} grants threads access in FIFO order. Fairness
42 dl 1.42 * generally decreases throughput but reduces variability and avoids
43     * starvation.
44 brian 1.7 *
45 dl 1.43 * <p>This class and its iterator implement all of the
46     * <em>optional</em> methods of the {@link Collection} and {@link
47 jsr166 1.47 * Iterator} interfaces.
48 dl 1.26 *
49 dl 1.41 * <p>This class is a member of the
50 jsr166 1.54 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
51 dl 1.41 * Java Collections Framework</a>.
52     *
53 dl 1.8 * @since 1.5
54     * @author Doug Lea
55 jsr166 1.109 * @param <E> the type of elements held in this queue
56 dl 1.8 */
57 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
58 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
59    
60 dl 1.36 /**
61 dl 1.42 * Serialization ID. This class relies on default serialization
62     * even for the items array, which is default-serialized, even if
63     * it is empty. Otherwise it could not be declared final, which is
64 dl 1.36 * necessary here.
65     */
66     private static final long serialVersionUID = -817911632652898426L;
67    
68 jsr166 1.73 /** The queued items */
69 jsr166 1.68 final Object[] items;
70 jsr166 1.73
71     /** items index for next take, poll, peek or remove */
72 jsr166 1.58 int takeIndex;
73 jsr166 1.73
74     /** items index for next put, offer, or add */
75 jsr166 1.58 int putIndex;
76 jsr166 1.73
77     /** Number of elements in the queue */
78 jsr166 1.59 int count;
79 tim 1.12
80 dl 1.5 /*
81 dl 1.36 * Concurrency control uses the classic two-condition algorithm
82 dl 1.5 * found in any textbook.
83     */
84    
85 dl 1.11 /** Main lock guarding all access */
86 jsr166 1.58 final ReentrantLock lock;
87 jsr166 1.85
88 dholmes 1.13 /** Condition for waiting takes */
89 dl 1.37 private final Condition notEmpty;
90 jsr166 1.85
91 dl 1.35 /** Condition for waiting puts */
92 dl 1.37 private final Condition notFull;
93 dl 1.5
94 jsr166 1.89 /**
95     * Shared state for currently active iterators, or null if there
96     * are known not to be any. Allows queue operations to update
97     * iterator state.
98     */
99 jsr166 1.121 transient Itrs itrs;
100 jsr166 1.89
101 dl 1.5 // Internal helper methods
102    
103     /**
104 jsr166 1.113 * Circularly decrements array index i.
105 jsr166 1.71 */
106     final int dec(int i) {
107     return ((i == 0) ? items.length : i) - 1;
108     }
109    
110 jsr166 1.68 /**
111 jsr166 1.128 * Decrements i, mod modulus.
112     * Precondition and postcondition: 0 <= i < modulus.
113     */
114     static final int dec(int i, int modulus) {
115     if (--i < 0) i = modulus - 1;
116     return i;
117     }
118    
119     /**
120 jsr166 1.68 * Returns item at index i.
121     */
122 jsr166 1.96 @SuppressWarnings("unchecked")
123 jsr166 1.68 final E itemAt(int i) {
124 jsr166 1.96 return (E) items[i];
125 jsr166 1.68 }
126    
127     /**
128 jsr166 1.128 * Returns element at array index i.
129     * This is a slight abuse of generics, accepted by javac.
130     */
131     @SuppressWarnings("unchecked")
132     final static <E> E itemAt(Object[] es, int i) {
133     return (E) es[i];
134     }
135    
136     /**
137 jsr166 1.47 * Inserts element at current put position, advances, and signals.
138 dl 1.9 * Call only when holding lock.
139 dl 1.5 */
140 jsr166 1.88 private void enqueue(E x) {
141 jsr166 1.128 // checkInvariants();
142 jsr166 1.95 // assert lock.getHoldCount() == 1;
143     // assert items[putIndex] == null;
144 dl 1.100 final Object[] items = this.items;
145 dl 1.5 items[putIndex] = x;
146 jsr166 1.116 if (++putIndex == items.length) putIndex = 0;
147 jsr166 1.89 count++;
148 dl 1.9 notEmpty.signal();
149 jsr166 1.128 // checkInvariants();
150 tim 1.1 }
151 tim 1.12
152 dl 1.5 /**
153 jsr166 1.47 * Extracts element at current take position, advances, and signals.
154 dl 1.9 * Call only when holding lock.
155 dl 1.5 */
156 jsr166 1.88 private E dequeue() {
157 jsr166 1.128 // checkInvariants();
158 jsr166 1.95 // assert lock.getHoldCount() == 1;
159     // assert items[takeIndex] != null;
160 jsr166 1.68 final Object[] items = this.items;
161 jsr166 1.97 @SuppressWarnings("unchecked")
162     E x = (E) items[takeIndex];
163 dl 1.5 items[takeIndex] = null;
164 jsr166 1.116 if (++takeIndex == items.length) takeIndex = 0;
165 jsr166 1.89 count--;
166     if (itrs != null)
167     itrs.elementDequeued();
168 dl 1.9 notFull.signal();
169 jsr166 1.128 // checkInvariants();
170 dl 1.5 return x;
171     }
172    
173     /**
174 jsr166 1.90 * Deletes item at array index removeIndex.
175 jsr166 1.89 * Utility for remove(Object) and iterator.remove.
176 dl 1.9 * Call only when holding lock.
177 dl 1.5 */
178 jsr166 1.90 void removeAt(final int removeIndex) {
179 jsr166 1.128 // checkInvariants();
180 jsr166 1.95 // assert lock.getHoldCount() == 1;
181     // assert items[removeIndex] != null;
182     // assert removeIndex >= 0 && removeIndex < items.length;
183 jsr166 1.68 final Object[] items = this.items;
184 jsr166 1.90 if (removeIndex == takeIndex) {
185 jsr166 1.89 // removing front item; just advance
186 dl 1.9 items[takeIndex] = null;
187 jsr166 1.116 if (++takeIndex == items.length) takeIndex = 0;
188 jsr166 1.90 count--;
189 jsr166 1.89 if (itrs != null)
190     itrs.elementDequeued();
191 tim 1.23 } else {
192 jsr166 1.89 // an "interior" remove
193 jsr166 1.90
194 dl 1.9 // slide over all others up through putIndex.
195 jsr166 1.115 for (int i = removeIndex, putIndex = this.putIndex;;) {
196     int pred = i;
197     if (++i == items.length) i = 0;
198     if (i == putIndex) {
199     items[pred] = null;
200     this.putIndex = pred;
201 dl 1.9 break;
202     }
203 jsr166 1.115 items[pred] = items[i];
204 dl 1.5 }
205 jsr166 1.90 count--;
206     if (itrs != null)
207     itrs.removedAt(removeIndex);
208 dl 1.5 }
209 dl 1.9 notFull.signal();
210 tim 1.1 }
211    
212     /**
213 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
214 dholmes 1.13 * capacity and default access policy.
215 jsr166 1.50 *
216 dholmes 1.13 * @param capacity the capacity of this queue
217 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity < 1}
218 dl 1.5 */
219 dholmes 1.13 public ArrayBlockingQueue(int capacity) {
220 dl 1.36 this(capacity, false);
221 dl 1.5 }
222 dl 1.2
223 dl 1.5 /**
224 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
225 dholmes 1.13 * capacity and the specified access policy.
226 jsr166 1.50 *
227 dholmes 1.13 * @param capacity the capacity of this queue
228 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
229 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
230 jsr166 1.69 * if {@code false} the access order is unspecified.
231     * @throws IllegalArgumentException if {@code capacity < 1}
232 dl 1.11 */
233 dholmes 1.13 public ArrayBlockingQueue(int capacity, boolean fair) {
234 dl 1.36 if (capacity <= 0)
235     throw new IllegalArgumentException();
236 jsr166 1.68 this.items = new Object[capacity];
237 dl 1.36 lock = new ReentrantLock(fair);
238     notEmpty = lock.newCondition();
239     notFull = lock.newCondition();
240 dl 1.5 }
241    
242 dholmes 1.16 /**
243 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
244 dholmes 1.21 * capacity, the specified access policy and initially containing the
245 tim 1.17 * elements of the given collection,
246 dholmes 1.16 * added in traversal order of the collection's iterator.
247 jsr166 1.50 *
248 dholmes 1.16 * @param capacity the capacity of this queue
249 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
250 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
251 jsr166 1.69 * if {@code false} the access order is unspecified.
252 dholmes 1.16 * @param c the collection of elements to initially contain
253 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity} is less than
254     * {@code c.size()}, or less than 1.
255 jsr166 1.50 * @throws NullPointerException if the specified collection or any
256     * of its elements are null
257 dholmes 1.16 */
258 tim 1.20 public ArrayBlockingQueue(int capacity, boolean fair,
259 dholmes 1.18 Collection<? extends E> c) {
260 dl 1.36 this(capacity, fair);
261 dholmes 1.16
262 jsr166 1.68 final ReentrantLock lock = this.lock;
263     lock.lock(); // Lock only for visibility, not mutual exclusion
264     try {
265     int i = 0;
266     try {
267 jsr166 1.119 for (E e : c)
268     items[i++] = Objects.requireNonNull(e);
269 jsr166 1.68 } catch (ArrayIndexOutOfBoundsException ex) {
270     throw new IllegalArgumentException();
271     }
272     count = i;
273     putIndex = (i == capacity) ? 0 : i;
274     } finally {
275     lock.unlock();
276     }
277 dholmes 1.16 }
278 dl 1.2
279 dholmes 1.13 /**
280 jsr166 1.50 * Inserts the specified element at the tail of this queue if it is
281     * possible to do so immediately without exceeding the queue's capacity,
282 jsr166 1.69 * returning {@code true} upon success and throwing an
283     * {@code IllegalStateException} if this queue is full.
284 dl 1.25 *
285 jsr166 1.50 * @param e the element to add
286 jsr166 1.69 * @return {@code true} (as specified by {@link Collection#add})
287 jsr166 1.50 * @throws IllegalStateException if this queue is full
288     * @throws NullPointerException if the specified element is null
289     */
290     public boolean add(E e) {
291 jsr166 1.56 return super.add(e);
292 jsr166 1.50 }
293    
294     /**
295     * Inserts the specified element at the tail of this queue if it is
296     * possible to do so immediately without exceeding the queue's capacity,
297 jsr166 1.69 * returning {@code true} upon success and {@code false} if this queue
298 jsr166 1.50 * is full. This method is generally preferable to method {@link #add},
299     * which can fail to insert an element only by throwing an exception.
300     *
301     * @throws NullPointerException if the specified element is null
302 dholmes 1.13 */
303 jsr166 1.49 public boolean offer(E e) {
304 jsr166 1.119 Objects.requireNonNull(e);
305 dl 1.36 final ReentrantLock lock = this.lock;
306 dl 1.5 lock.lock();
307     try {
308 jsr166 1.59 if (count == items.length)
309 dl 1.2 return false;
310 dl 1.5 else {
311 jsr166 1.88 enqueue(e);
312 dl 1.5 return true;
313     }
314 tim 1.23 } finally {
315 tim 1.12 lock.unlock();
316 dl 1.2 }
317 dl 1.5 }
318 dl 1.2
319 dholmes 1.13 /**
320 jsr166 1.50 * Inserts the specified element at the tail of this queue, waiting
321     * for space to become available if the queue is full.
322     *
323     * @throws InterruptedException {@inheritDoc}
324     * @throws NullPointerException {@inheritDoc}
325     */
326     public void put(E e) throws InterruptedException {
327 jsr166 1.119 Objects.requireNonNull(e);
328 jsr166 1.50 final ReentrantLock lock = this.lock;
329     lock.lockInterruptibly();
330     try {
331 jsr166 1.59 while (count == items.length)
332 jsr166 1.58 notFull.await();
333 jsr166 1.88 enqueue(e);
334 jsr166 1.50 } finally {
335     lock.unlock();
336     }
337     }
338    
339     /**
340     * Inserts the specified element at the tail of this queue, waiting
341     * up to the specified wait time for space to become available if
342     * the queue is full.
343     *
344     * @throws InterruptedException {@inheritDoc}
345     * @throws NullPointerException {@inheritDoc}
346 brian 1.7 */
347 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit)
348 dholmes 1.13 throws InterruptedException {
349 dl 1.2
350 jsr166 1.119 Objects.requireNonNull(e);
351 jsr166 1.56 long nanos = unit.toNanos(timeout);
352 dl 1.36 final ReentrantLock lock = this.lock;
353 dl 1.5 lock.lockInterruptibly();
354     try {
355 jsr166 1.59 while (count == items.length) {
356 jsr166 1.126 if (nanos <= 0L)
357 dl 1.5 return false;
358 jsr166 1.58 nanos = notFull.awaitNanos(nanos);
359 dl 1.5 }
360 jsr166 1.88 enqueue(e);
361 jsr166 1.128 // checkInvariants();
362 jsr166 1.58 return true;
363 tim 1.23 } finally {
364 dl 1.5 lock.unlock();
365 dl 1.2 }
366 dl 1.5 }
367 dl 1.2
368 dholmes 1.13 public E poll() {
369 dl 1.36 final ReentrantLock lock = this.lock;
370 dholmes 1.13 lock.lock();
371     try {
372 jsr166 1.88 return (count == 0) ? null : dequeue();
373 tim 1.23 } finally {
374 tim 1.15 lock.unlock();
375 dholmes 1.13 }
376     }
377    
378 jsr166 1.50 public E take() throws InterruptedException {
379     final ReentrantLock lock = this.lock;
380     lock.lockInterruptibly();
381     try {
382 jsr166 1.59 while (count == 0)
383 jsr166 1.58 notEmpty.await();
384 jsr166 1.88 return dequeue();
385 jsr166 1.50 } finally {
386     lock.unlock();
387     }
388     }
389    
390 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
391 jsr166 1.56 long nanos = unit.toNanos(timeout);
392 dl 1.36 final ReentrantLock lock = this.lock;
393 dl 1.5 lock.lockInterruptibly();
394     try {
395 jsr166 1.59 while (count == 0) {
396 jsr166 1.126 if (nanos <= 0L)
397 dl 1.5 return null;
398 jsr166 1.58 nanos = notEmpty.awaitNanos(nanos);
399 dl 1.2 }
400 jsr166 1.128 // checkInvariants();
401 jsr166 1.88 return dequeue();
402 tim 1.23 } finally {
403 dl 1.5 lock.unlock();
404     }
405     }
406 dl 1.2
407 dholmes 1.13 public E peek() {
408 dl 1.36 final ReentrantLock lock = this.lock;
409 dholmes 1.13 lock.lock();
410     try {
411 jsr166 1.99 return itemAt(takeIndex); // null when queue is empty
412 tim 1.23 } finally {
413 dholmes 1.13 lock.unlock();
414     }
415     }
416    
417     // this doc comment is overridden to remove the reference to collections
418     // greater in size than Integer.MAX_VALUE
419 tim 1.15 /**
420 dl 1.25 * Returns the number of elements in this queue.
421     *
422 jsr166 1.50 * @return the number of elements in this queue
423 dholmes 1.13 */
424     public int size() {
425 dl 1.36 final ReentrantLock lock = this.lock;
426 dholmes 1.13 lock.lock();
427     try {
428     return count;
429 tim 1.23 } finally {
430 dholmes 1.13 lock.unlock();
431     }
432     }
433    
434     // this doc comment is a modified copy of the inherited doc comment,
435     // without the reference to unlimited queues.
436 tim 1.15 /**
437 jsr166 1.48 * Returns the number of additional elements that this queue can ideally
438     * (in the absence of memory or resource constraints) accept without
439 dholmes 1.13 * blocking. This is always equal to the initial capacity of this queue
440 jsr166 1.69 * less the current {@code size} of this queue.
441 jsr166 1.48 *
442     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
443 jsr166 1.69 * an element will succeed by inspecting {@code remainingCapacity}
444 jsr166 1.48 * because it may be the case that another thread is about to
445 jsr166 1.50 * insert or remove an element.
446 dholmes 1.13 */
447     public int remainingCapacity() {
448 dl 1.36 final ReentrantLock lock = this.lock;
449 dholmes 1.13 lock.lock();
450     try {
451     return items.length - count;
452 tim 1.23 } finally {
453 dholmes 1.13 lock.unlock();
454     }
455     }
456    
457 jsr166 1.50 /**
458     * Removes a single instance of the specified element from this queue,
459 jsr166 1.69 * if it is present. More formally, removes an element {@code e} such
460     * that {@code o.equals(e)}, if this queue contains one or more such
461 jsr166 1.50 * elements.
462 jsr166 1.69 * Returns {@code true} if this queue contained the specified element
463 jsr166 1.50 * (or equivalently, if this queue changed as a result of the call).
464     *
465 jsr166 1.64 * <p>Removal of interior elements in circular array based queues
466 dl 1.60 * is an intrinsically slow and disruptive operation, so should
467     * be undertaken only in exceptional circumstances, ideally
468     * only when the queue is known not to be accessible by other
469     * threads.
470     *
471 jsr166 1.50 * @param o element to be removed from this queue, if present
472 jsr166 1.69 * @return {@code true} if this queue changed as a result of the call
473 jsr166 1.50 */
474     public boolean remove(Object o) {
475     if (o == null) return false;
476     final ReentrantLock lock = this.lock;
477     lock.lock();
478     try {
479 jsr166 1.86 if (count > 0) {
480 jsr166 1.114 final Object[] items = this.items;
481 jsr166 1.86 final int putIndex = this.putIndex;
482     int i = takeIndex;
483     do {
484     if (o.equals(items[i])) {
485     removeAt(i);
486     return true;
487     }
488 jsr166 1.116 if (++i == items.length) i = 0;
489 dl 1.100 } while (i != putIndex);
490 jsr166 1.50 }
491 jsr166 1.68 return false;
492 jsr166 1.50 } finally {
493     lock.unlock();
494     }
495     }
496 dholmes 1.13
497 jsr166 1.50 /**
498 jsr166 1.69 * Returns {@code true} if this queue contains the specified element.
499     * More formally, returns {@code true} if and only if this queue contains
500     * at least one element {@code e} such that {@code o.equals(e)}.
501 jsr166 1.50 *
502     * @param o object to be checked for containment in this queue
503 jsr166 1.69 * @return {@code true} if this queue contains the specified element
504 jsr166 1.50 */
505 dholmes 1.21 public boolean contains(Object o) {
506     if (o == null) return false;
507 dl 1.36 final ReentrantLock lock = this.lock;
508 dl 1.5 lock.lock();
509     try {
510 jsr166 1.86 if (count > 0) {
511 jsr166 1.114 final Object[] items = this.items;
512 jsr166 1.129 for (int i = takeIndex, end = putIndex,
513     to = (i < end) ? end : items.length;
514     ; i = 0, to = end) {
515     for (; i < to; i++)
516     if (o.equals(items[i]))
517     return true;
518     if (to == end) break;
519     }
520 jsr166 1.86 }
521 dl 1.2 return false;
522 tim 1.23 } finally {
523 dl 1.5 lock.unlock();
524     }
525     }
526 brian 1.7
527 jsr166 1.50 /**
528     * Returns an array containing all of the elements in this queue, in
529     * proper sequence.
530     *
531     * <p>The returned array will be "safe" in that no references to it are
532     * maintained by this queue. (In other words, this method must allocate
533     * a new array). The caller is thus free to modify the returned array.
534 jsr166 1.51 *
535 jsr166 1.50 * <p>This method acts as bridge between array-based and collection-based
536     * APIs.
537     *
538     * @return an array containing all of the elements in this queue
539     */
540 dl 1.5 public Object[] toArray() {
541 dl 1.36 final ReentrantLock lock = this.lock;
542 dl 1.5 lock.lock();
543     try {
544 jsr166 1.120 final Object[] items = this.items;
545     final int end = takeIndex + count;
546     final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
547     if (end != putIndex)
548     System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
549     return a;
550 tim 1.23 } finally {
551 dl 1.5 lock.unlock();
552     }
553     }
554 brian 1.7
555 jsr166 1.50 /**
556     * Returns an array containing all of the elements in this queue, in
557     * proper sequence; the runtime type of the returned array is that of
558     * the specified array. If the queue fits in the specified array, it
559     * is returned therein. Otherwise, a new array is allocated with the
560     * runtime type of the specified array and the size of this queue.
561     *
562     * <p>If this queue fits in the specified array with room to spare
563     * (i.e., the array has more elements than this queue), the element in
564     * the array immediately following the end of the queue is set to
565 jsr166 1.69 * {@code null}.
566 jsr166 1.50 *
567     * <p>Like the {@link #toArray()} method, this method acts as bridge between
568     * array-based and collection-based APIs. Further, this method allows
569     * precise control over the runtime type of the output array, and may,
570     * under certain circumstances, be used to save allocation costs.
571     *
572 jsr166 1.69 * <p>Suppose {@code x} is a queue known to contain only strings.
573 jsr166 1.50 * The following code can be used to dump the queue into a newly
574 jsr166 1.69 * allocated array of {@code String}:
575 jsr166 1.50 *
576 jsr166 1.117 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
577 jsr166 1.50 *
578 jsr166 1.69 * Note that {@code toArray(new Object[0])} is identical in function to
579     * {@code toArray()}.
580 jsr166 1.50 *
581     * @param a the array into which the elements of the queue are to
582     * be stored, if it is big enough; otherwise, a new array of the
583     * same runtime type is allocated for this purpose
584     * @return an array containing all of the elements in this queue
585     * @throws ArrayStoreException if the runtime type of the specified array
586     * is not a supertype of the runtime type of every element in
587     * this queue
588     * @throws NullPointerException if the specified array is null
589     */
590 jsr166 1.68 @SuppressWarnings("unchecked")
591 dl 1.5 public <T> T[] toArray(T[] a) {
592 dl 1.36 final ReentrantLock lock = this.lock;
593 dl 1.5 lock.lock();
594     try {
595 jsr166 1.120 final Object[] items = this.items;
596 jsr166 1.68 final int count = this.count;
597 jsr166 1.120 final int firstLeg = Math.min(items.length - takeIndex, count);
598     if (a.length < count) {
599     a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
600     a.getClass());
601     } else {
602     System.arraycopy(items, takeIndex, a, 0, firstLeg);
603     if (a.length > count)
604     a[count] = null;
605     }
606     if (firstLeg < count)
607     System.arraycopy(items, 0, a, firstLeg, putIndex);
608     return a;
609 tim 1.23 } finally {
610 dl 1.5 lock.unlock();
611     }
612     }
613 dl 1.6
614     public String toString() {
615 jsr166 1.122 return Helpers.collectionToString(this);
616 dl 1.6 }
617 tim 1.12
618 dl 1.44 /**
619     * Atomically removes all of the elements from this queue.
620     * The queue will be empty after this call returns.
621     */
622 dl 1.30 public void clear() {
623 dl 1.36 final ReentrantLock lock = this.lock;
624 dl 1.30 lock.lock();
625     try {
626 jsr166 1.86 int k = count;
627     if (k > 0) {
628 jsr166 1.123 final Object[] items = this.items;
629 jsr166 1.86 final int putIndex = this.putIndex;
630     int i = takeIndex;
631     do {
632     items[i] = null;
633 jsr166 1.116 if (++i == items.length) i = 0;
634 dl 1.100 } while (i != putIndex);
635 jsr166 1.86 takeIndex = putIndex;
636     count = 0;
637 jsr166 1.89 if (itrs != null)
638     itrs.queueIsEmpty();
639 jsr166 1.86 for (; k > 0 && lock.hasWaiters(notFull); k--)
640     notFull.signal();
641     }
642 dl 1.30 } finally {
643     lock.unlock();
644     }
645     }
646    
647 jsr166 1.50 /**
648     * @throws UnsupportedOperationException {@inheritDoc}
649     * @throws ClassCastException {@inheritDoc}
650     * @throws NullPointerException {@inheritDoc}
651     * @throws IllegalArgumentException {@inheritDoc}
652     */
653 dl 1.30 public int drainTo(Collection<? super E> c) {
654 jsr166 1.81 return drainTo(c, Integer.MAX_VALUE);
655 dl 1.30 }
656    
657 jsr166 1.50 /**
658     * @throws UnsupportedOperationException {@inheritDoc}
659     * @throws ClassCastException {@inheritDoc}
660     * @throws NullPointerException {@inheritDoc}
661     * @throws IllegalArgumentException {@inheritDoc}
662     */
663 dl 1.30 public int drainTo(Collection<? super E> c, int maxElements) {
664 jsr166 1.119 Objects.requireNonNull(c);
665 dl 1.30 if (c == this)
666     throw new IllegalArgumentException();
667     if (maxElements <= 0)
668     return 0;
669 jsr166 1.68 final Object[] items = this.items;
670 dl 1.36 final ReentrantLock lock = this.lock;
671 dl 1.30 lock.lock();
672     try {
673 jsr166 1.82 int n = Math.min(maxElements, count);
674     int take = takeIndex;
675     int i = 0;
676     try {
677     while (i < n) {
678 jsr166 1.97 @SuppressWarnings("unchecked")
679     E x = (E) items[take];
680 jsr166 1.84 c.add(x);
681 jsr166 1.82 items[take] = null;
682 jsr166 1.116 if (++take == items.length) take = 0;
683 jsr166 1.86 i++;
684 jsr166 1.82 }
685     return n;
686     } finally {
687     // Restore invariants even if c.add() threw
688 jsr166 1.89 if (i > 0) {
689     count -= i;
690     takeIndex = take;
691     if (itrs != null) {
692     if (count == 0)
693     itrs.queueIsEmpty();
694     else if (i > take)
695     itrs.takeIndexWrapped();
696     }
697     for (; i > 0 && lock.hasWaiters(notFull); i--)
698     notFull.signal();
699     }
700 dl 1.30 }
701     } finally {
702     lock.unlock();
703     }
704     }
705    
706 brian 1.7 /**
707 dl 1.75 * Returns an iterator over the elements in this queue in proper sequence.
708 jsr166 1.77 * The elements will be returned in order from first (head) to last (tail).
709     *
710 jsr166 1.106 * <p>The returned iterator is
711     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
712 brian 1.7 *
713 jsr166 1.50 * @return an iterator over the elements in this queue in proper sequence
714 brian 1.7 */
715 dl 1.5 public Iterator<E> iterator() {
716 jsr166 1.68 return new Itr();
717 dl 1.5 }
718 dl 1.8
719     /**
720 jsr166 1.94 * Shared data between iterators and their queue, allowing queue
721 jsr166 1.89 * modifications to update iterators when elements are removed.
722     *
723 jsr166 1.94 * This adds a lot of complexity for the sake of correctly
724     * handling some uncommon operations, but the combination of
725     * circular-arrays and supporting interior removes (i.e., those
726     * not at head) would cause iterators to sometimes lose their
727     * places and/or (re)report elements they shouldn't. To avoid
728     * this, when a queue has one or more iterators, it keeps iterator
729     * state consistent by:
730     *
731     * (1) keeping track of the number of "cycles", that is, the
732     * number of times takeIndex has wrapped around to 0.
733     * (2) notifying all iterators via the callback removedAt whenever
734     * an interior element is removed (and thus other elements may
735     * be shifted).
736     *
737     * These suffice to eliminate iterator inconsistencies, but
738     * unfortunately add the secondary responsibility of maintaining
739     * the list of iterators. We track all active iterators in a
740     * simple linked list (accessed only when the queue's lock is
741     * held) of weak references to Itr. The list is cleaned up using
742     * 3 different mechanisms:
743 jsr166 1.89 *
744     * (1) Whenever a new iterator is created, do some O(1) checking for
745     * stale list elements.
746     *
747     * (2) Whenever takeIndex wraps around to 0, check for iterators
748     * that have been unused for more than one wrap-around cycle.
749     *
750     * (3) Whenever the queue becomes empty, all iterators are notified
751     * and this entire data structure is discarded.
752     *
753 jsr166 1.94 * So in addition to the removedAt callback that is necessary for
754     * correctness, iterators have the shutdown and takeIndexWrapped
755     * callbacks that help remove stale iterators from the list.
756     *
757     * Whenever a list element is examined, it is expunged if either
758     * the GC has determined that the iterator is discarded, or if the
759     * iterator reports that it is "detached" (does not need any
760     * further state updates). Overhead is maximal when takeIndex
761     * never advances, iterators are discarded before they are
762     * exhausted, and all removals are interior removes, in which case
763     * all stale iterators are discovered by the GC. But even in this
764     * case we don't increase the amortized complexity.
765     *
766     * Care must be taken to keep list sweeping methods from
767     * reentrantly invoking another such method, causing subtle
768     * corruption bugs.
769 jsr166 1.89 */
770     class Itrs {
771    
772     /**
773     * Node in a linked list of weak iterator references.
774     */
775     private class Node extends WeakReference<Itr> {
776     Node next;
777    
778     Node(Itr iterator, Node next) {
779     super(iterator);
780     this.next = next;
781     }
782     }
783    
784     /** Incremented whenever takeIndex wraps around to 0 */
785 jsr166 1.108 int cycles;
786 jsr166 1.89
787     /** Linked list of weak iterator references */
788 jsr166 1.93 private Node head;
789 jsr166 1.89
790     /** Used to expunge stale iterators */
791 jsr166 1.108 private Node sweeper;
792 jsr166 1.89
793     private static final int SHORT_SWEEP_PROBES = 4;
794     private static final int LONG_SWEEP_PROBES = 16;
795    
796 jsr166 1.93 Itrs(Itr initial) {
797     register(initial);
798     }
799    
800 jsr166 1.89 /**
801     * Sweeps itrs, looking for and expunging stale iterators.
802     * If at least one was found, tries harder to find more.
803 jsr166 1.94 * Called only from iterating thread.
804 jsr166 1.89 *
805     * @param tryHarder whether to start in try-harder mode, because
806     * there is known to be at least one iterator to collect
807     */
808     void doSomeSweeping(boolean tryHarder) {
809 jsr166 1.95 // assert lock.getHoldCount() == 1;
810     // assert head != null;
811 jsr166 1.89 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
812     Node o, p;
813     final Node sweeper = this.sweeper;
814 jsr166 1.93 boolean passedGo; // to limit search to one full sweep
815 jsr166 1.89
816 jsr166 1.93 if (sweeper == null) {
817     o = null;
818     p = head;
819     passedGo = true;
820     } else {
821 jsr166 1.89 o = sweeper;
822     p = o.next;
823 jsr166 1.93 passedGo = false;
824 jsr166 1.89 }
825    
826 jsr166 1.93 for (; probes > 0; probes--) {
827     if (p == null) {
828     if (passedGo)
829     break;
830     o = null;
831     p = head;
832     passedGo = true;
833     }
834 jsr166 1.89 final Itr it = p.get();
835     final Node next = p.next;
836     if (it == null || it.isDetached()) {
837     // found a discarded/exhausted iterator
838     probes = LONG_SWEEP_PROBES; // "try harder"
839     // unlink p
840     p.clear();
841     p.next = null;
842 jsr166 1.93 if (o == null) {
843 jsr166 1.89 head = next;
844 jsr166 1.93 if (next == null) {
845     // We've run out of iterators to track; retire
846     itrs = null;
847     return;
848     }
849     }
850 jsr166 1.89 else
851     o.next = next;
852     } else {
853     o = p;
854     }
855     p = next;
856     }
857    
858     this.sweeper = (p == null) ? null : o;
859     }
860    
861     /**
862     * Adds a new iterator to the linked list of tracked iterators.
863     */
864     void register(Itr itr) {
865 jsr166 1.95 // assert lock.getHoldCount() == 1;
866 jsr166 1.89 head = new Node(itr, head);
867     }
868    
869     /**
870     * Called whenever takeIndex wraps around to 0.
871     *
872     * Notifies all iterators, and expunges any that are now stale.
873     */
874     void takeIndexWrapped() {
875 jsr166 1.95 // assert lock.getHoldCount() == 1;
876 jsr166 1.89 cycles++;
877     for (Node o = null, p = head; p != null;) {
878     final Itr it = p.get();
879     final Node next = p.next;
880     if (it == null || it.takeIndexWrapped()) {
881     // unlink p
882 jsr166 1.95 // assert it == null || it.isDetached();
883 jsr166 1.89 p.clear();
884     p.next = null;
885     if (o == null)
886     head = next;
887     else
888     o.next = next;
889     } else {
890     o = p;
891     }
892     p = next;
893     }
894     if (head == null) // no more iterators to track
895     itrs = null;
896     }
897    
898     /**
899 jsr166 1.107 * Called whenever an interior remove (not at takeIndex) occurred.
900 jsr166 1.93 *
901     * Notifies all iterators, and expunges any that are now stale.
902 jsr166 1.89 */
903 jsr166 1.90 void removedAt(int removedIndex) {
904 jsr166 1.89 for (Node o = null, p = head; p != null;) {
905     final Itr it = p.get();
906     final Node next = p.next;
907 jsr166 1.90 if (it == null || it.removedAt(removedIndex)) {
908 jsr166 1.89 // unlink p
909 jsr166 1.95 // assert it == null || it.isDetached();
910 jsr166 1.89 p.clear();
911     p.next = null;
912     if (o == null)
913     head = next;
914     else
915     o.next = next;
916     } else {
917     o = p;
918     }
919     p = next;
920     }
921     if (head == null) // no more iterators to track
922     itrs = null;
923     }
924    
925     /**
926     * Called whenever the queue becomes empty.
927     *
928     * Notifies all active iterators that the queue is empty,
929     * clears all weak refs, and unlinks the itrs datastructure.
930     */
931     void queueIsEmpty() {
932 jsr166 1.95 // assert lock.getHoldCount() == 1;
933 jsr166 1.89 for (Node p = head; p != null; p = p.next) {
934     Itr it = p.get();
935     if (it != null) {
936     p.clear();
937     it.shutdown();
938     }
939     }
940     head = null;
941     itrs = null;
942     }
943    
944     /**
945 jsr166 1.90 * Called whenever an element has been dequeued (at takeIndex).
946 jsr166 1.89 */
947     void elementDequeued() {
948 jsr166 1.95 // assert lock.getHoldCount() == 1;
949 jsr166 1.89 if (count == 0)
950     queueIsEmpty();
951     else if (takeIndex == 0)
952     takeIndexWrapped();
953     }
954     }
955    
956     /**
957     * Iterator for ArrayBlockingQueue.
958     *
959     * To maintain weak consistency with respect to puts and takes, we
960     * read ahead one slot, so as to not report hasNext true but then
961     * not have an element to return.
962     *
963     * We switch into "detached" mode (allowing prompt unlinking from
964     * itrs without help from the GC) when all indices are negative, or
965     * when hasNext returns false for the first time. This allows the
966     * iterator to track concurrent updates completely accurately,
967     * except for the corner case of the user calling Iterator.remove()
968     * after hasNext() returned false. Even in this case, we ensure
969     * that we don't remove the wrong element by keeping track of the
970     * expected element to remove, in lastItem. Yes, we may fail to
971     * remove lastItem from the queue if it moved due to an interleaved
972     * interior remove while in detached mode.
973 dl 1.8 */
974 dl 1.5 private class Itr implements Iterator<E> {
975 jsr166 1.91 /** Index to look for new nextItem; NONE at end */
976 jsr166 1.89 private int cursor;
977    
978     /** Element to be returned by next call to next(); null if none */
979     private E nextItem;
980    
981 jsr166 1.91 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
982 jsr166 1.89 private int nextIndex;
983    
984     /** Last element returned; null if none or not detached. */
985     private E lastItem;
986    
987 jsr166 1.91 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
988 jsr166 1.89 private int lastRet;
989    
990 jsr166 1.91 /** Previous value of takeIndex, or DETACHED when detached */
991 jsr166 1.89 private int prevTakeIndex;
992    
993     /** Previous value of iters.cycles */
994     private int prevCycles;
995 tim 1.12
996 jsr166 1.91 /** Special index value indicating "not available" or "undefined" */
997     private static final int NONE = -1;
998    
999     /**
1000     * Special index value indicating "removed elsewhere", that is,
1001     * removed by some operation other than a call to this.remove().
1002     */
1003     private static final int REMOVED = -2;
1004    
1005     /** Special value for prevTakeIndex indicating "detached mode" */
1006     private static final int DETACHED = -3;
1007    
1008 dl 1.66 Itr() {
1009 jsr166 1.95 // assert lock.getHoldCount() == 0;
1010 jsr166 1.91 lastRet = NONE;
1011 jsr166 1.68 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1012     lock.lock();
1013     try {
1014 jsr166 1.89 if (count == 0) {
1015 jsr166 1.95 // assert itrs == null;
1016 jsr166 1.91 cursor = NONE;
1017     nextIndex = NONE;
1018     prevTakeIndex = DETACHED;
1019 jsr166 1.89 } else {
1020     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1021     prevTakeIndex = takeIndex;
1022 jsr166 1.68 nextItem = itemAt(nextIndex = takeIndex);
1023 jsr166 1.89 cursor = incCursor(takeIndex);
1024     if (itrs == null) {
1025 jsr166 1.93 itrs = new Itrs(this);
1026 jsr166 1.89 } else {
1027     itrs.register(this); // in this order
1028     itrs.doSomeSweeping(false);
1029     }
1030     prevCycles = itrs.cycles;
1031 jsr166 1.95 // assert takeIndex >= 0;
1032     // assert prevTakeIndex == takeIndex;
1033     // assert nextIndex >= 0;
1034     // assert nextItem != null;
1035 jsr166 1.89 }
1036 jsr166 1.68 } finally {
1037     lock.unlock();
1038     }
1039 dl 1.5 }
1040 tim 1.12
1041 jsr166 1.89 boolean isDetached() {
1042 jsr166 1.95 // assert lock.getHoldCount() == 1;
1043 jsr166 1.89 return prevTakeIndex < 0;
1044     }
1045    
1046     private int incCursor(int index) {
1047 jsr166 1.95 // assert lock.getHoldCount() == 1;
1048 jsr166 1.116 if (++index == items.length) index = 0;
1049     if (index == putIndex) index = NONE;
1050 jsr166 1.89 return index;
1051     }
1052    
1053     /**
1054     * Returns true if index is invalidated by the given number of
1055     * dequeues, starting from prevTakeIndex.
1056     */
1057     private boolean invalidated(int index, int prevTakeIndex,
1058     long dequeues, int length) {
1059     if (index < 0)
1060     return false;
1061     int distance = index - prevTakeIndex;
1062     if (distance < 0)
1063     distance += length;
1064     return dequeues > distance;
1065     }
1066    
1067     /**
1068     * Adjusts indices to incorporate all dequeues since the last
1069     * operation on this iterator. Call only from iterating thread.
1070     */
1071     private void incorporateDequeues() {
1072 jsr166 1.95 // assert lock.getHoldCount() == 1;
1073     // assert itrs != null;
1074     // assert !isDetached();
1075     // assert count > 0;
1076 jsr166 1.89
1077     final int cycles = itrs.cycles;
1078     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1079     final int prevCycles = this.prevCycles;
1080     final int prevTakeIndex = this.prevTakeIndex;
1081    
1082     if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1083     final int len = items.length;
1084     // how far takeIndex has advanced since the previous
1085     // operation of this iterator
1086     long dequeues = (cycles - prevCycles) * len
1087     + (takeIndex - prevTakeIndex);
1088    
1089     // Check indices for invalidation
1090     if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1091 jsr166 1.91 lastRet = REMOVED;
1092 jsr166 1.89 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1093 jsr166 1.91 nextIndex = REMOVED;
1094 jsr166 1.89 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1095     cursor = takeIndex;
1096    
1097     if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1098     detach();
1099     else {
1100     this.prevCycles = cycles;
1101     this.prevTakeIndex = takeIndex;
1102     }
1103     }
1104     }
1105    
1106     /**
1107     * Called when itrs should stop tracking this iterator, either
1108     * because there are no more indices to update (cursor < 0 &&
1109     * nextIndex < 0 && lastRet < 0) or as a special exception, when
1110     * lastRet >= 0, because hasNext() is about to return false for the
1111     * first time. Call only from iterating thread.
1112     */
1113     private void detach() {
1114     // Switch to detached mode
1115 jsr166 1.95 // assert lock.getHoldCount() == 1;
1116     // assert cursor == NONE;
1117     // assert nextIndex < 0;
1118     // assert lastRet < 0 || nextItem == null;
1119     // assert lastRet < 0 ^ lastItem != null;
1120 jsr166 1.89 if (prevTakeIndex >= 0) {
1121 jsr166 1.95 // assert itrs != null;
1122 jsr166 1.91 prevTakeIndex = DETACHED;
1123 jsr166 1.89 // try to unlink from itrs (but not too hard)
1124     itrs.doSomeSweeping(true);
1125     }
1126     }
1127    
1128     /**
1129     * For performance reasons, we would like not to acquire a lock in
1130     * hasNext in the common case. To allow for this, we only access
1131     * fields (i.e. nextItem) that are not modified by update operations
1132     * triggered by queue modifications.
1133     */
1134 dl 1.5 public boolean hasNext() {
1135 jsr166 1.95 // assert lock.getHoldCount() == 0;
1136 jsr166 1.89 if (nextItem != null)
1137     return true;
1138     noNext();
1139     return false;
1140     }
1141    
1142     private void noNext() {
1143     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1144     lock.lock();
1145     try {
1146 jsr166 1.95 // assert cursor == NONE;
1147     // assert nextIndex == NONE;
1148 jsr166 1.89 if (!isDetached()) {
1149 jsr166 1.95 // assert lastRet >= 0;
1150 jsr166 1.89 incorporateDequeues(); // might update lastRet
1151     if (lastRet >= 0) {
1152     lastItem = itemAt(lastRet);
1153 jsr166 1.95 // assert lastItem != null;
1154 jsr166 1.89 detach();
1155     }
1156     }
1157 jsr166 1.95 // assert isDetached();
1158     // assert lastRet < 0 ^ lastItem != null;
1159 jsr166 1.89 } finally {
1160     lock.unlock();
1161     }
1162 dl 1.5 }
1163 tim 1.12
1164 dl 1.5 public E next() {
1165 jsr166 1.95 // assert lock.getHoldCount() == 0;
1166 jsr166 1.89 final E x = nextItem;
1167     if (x == null)
1168     throw new NoSuchElementException();
1169 dl 1.66 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1170     lock.lock();
1171     try {
1172 jsr166 1.92 if (!isDetached())
1173 jsr166 1.89 incorporateDequeues();
1174 jsr166 1.95 // assert nextIndex != NONE;
1175     // assert lastItem == null;
1176 dl 1.66 lastRet = nextIndex;
1177 jsr166 1.89 final int cursor = this.cursor;
1178     if (cursor >= 0) {
1179     nextItem = itemAt(nextIndex = cursor);
1180 jsr166 1.95 // assert nextItem != null;
1181 jsr166 1.89 this.cursor = incCursor(cursor);
1182     } else {
1183 jsr166 1.91 nextIndex = NONE;
1184 jsr166 1.89 nextItem = null;
1185     }
1186 dl 1.66 } finally {
1187     lock.unlock();
1188 dl 1.2 }
1189 jsr166 1.89 return x;
1190 dl 1.5 }
1191 tim 1.12
1192 dl 1.5 public void remove() {
1193 jsr166 1.95 // assert lock.getHoldCount() == 0;
1194 dl 1.36 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1195 dl 1.5 lock.lock();
1196     try {
1197 jsr166 1.92 if (!isDetached())
1198     incorporateDequeues(); // might update lastRet or detach
1199     final int lastRet = this.lastRet;
1200     this.lastRet = NONE;
1201 jsr166 1.89 if (lastRet >= 0) {
1202 jsr166 1.92 if (!isDetached())
1203     removeAt(lastRet);
1204     else {
1205     final E lastItem = this.lastItem;
1206 jsr166 1.95 // assert lastItem != null;
1207 jsr166 1.92 this.lastItem = null;
1208 jsr166 1.89 if (itemAt(lastRet) == lastItem)
1209     removeAt(lastRet);
1210     }
1211 jsr166 1.91 } else if (lastRet == NONE)
1212 dl 1.66 throw new IllegalStateException();
1213 jsr166 1.91 // else lastRet == REMOVED and the last returned element was
1214 jsr166 1.89 // previously asynchronously removed via an operation other
1215     // than this.remove(), so nothing to do.
1216    
1217     if (cursor < 0 && nextIndex < 0)
1218     detach();
1219     } finally {
1220     lock.unlock();
1221 jsr166 1.95 // assert lastRet == NONE;
1222     // assert lastItem == null;
1223 jsr166 1.89 }
1224     }
1225    
1226     /**
1227     * Called to notify the iterator that the queue is empty, or that it
1228     * has fallen hopelessly behind, so that it should abandon any
1229     * further iteration, except possibly to return one more element
1230     * from next(), as promised by returning true from hasNext().
1231     */
1232     void shutdown() {
1233 jsr166 1.95 // assert lock.getHoldCount() == 1;
1234 jsr166 1.91 cursor = NONE;
1235 jsr166 1.89 if (nextIndex >= 0)
1236 jsr166 1.91 nextIndex = REMOVED;
1237 jsr166 1.89 if (lastRet >= 0) {
1238 jsr166 1.91 lastRet = REMOVED;
1239 jsr166 1.61 lastItem = null;
1240 jsr166 1.89 }
1241 jsr166 1.91 prevTakeIndex = DETACHED;
1242 jsr166 1.89 // Don't set nextItem to null because we must continue to be
1243     // able to return it on next().
1244     //
1245     // Caller will unlink from itrs when convenient.
1246     }
1247    
1248     private int distance(int index, int prevTakeIndex, int length) {
1249     int distance = index - prevTakeIndex;
1250     if (distance < 0)
1251     distance += length;
1252     return distance;
1253     }
1254    
1255     /**
1256 jsr166 1.107 * Called whenever an interior remove (not at takeIndex) occurred.
1257 jsr166 1.89 *
1258 jsr166 1.90 * @return true if this iterator should be unlinked from itrs
1259 jsr166 1.89 */
1260 jsr166 1.90 boolean removedAt(int removedIndex) {
1261 jsr166 1.95 // assert lock.getHoldCount() == 1;
1262 jsr166 1.89 if (isDetached())
1263     return true;
1264    
1265     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1266     final int prevTakeIndex = this.prevTakeIndex;
1267     final int len = items.length;
1268 jsr166 1.112 // distance from prevTakeIndex to removedIndex
1269 jsr166 1.89 final int removedDistance =
1270 jsr166 1.112 len * (itrs.cycles - this.prevCycles
1271     + ((removedIndex < takeIndex) ? 1 : 0))
1272     + (removedIndex - prevTakeIndex);
1273     // assert itrs.cycles - this.prevCycles >= 0;
1274     // assert itrs.cycles - this.prevCycles <= 1;
1275     // assert removedDistance > 0;
1276     // assert removedIndex != takeIndex;
1277 jsr166 1.89 int cursor = this.cursor;
1278     if (cursor >= 0) {
1279     int x = distance(cursor, prevTakeIndex, len);
1280     if (x == removedDistance) {
1281 jsr166 1.90 if (cursor == putIndex)
1282 jsr166 1.91 this.cursor = cursor = NONE;
1283 jsr166 1.89 }
1284     else if (x > removedDistance) {
1285 jsr166 1.95 // assert cursor != prevTakeIndex;
1286 jsr166 1.89 this.cursor = cursor = dec(cursor);
1287 jsr166 1.71 }
1288 dl 1.5 }
1289 jsr166 1.89 int lastRet = this.lastRet;
1290     if (lastRet >= 0) {
1291     int x = distance(lastRet, prevTakeIndex, len);
1292     if (x == removedDistance)
1293 jsr166 1.91 this.lastRet = lastRet = REMOVED;
1294 jsr166 1.89 else if (x > removedDistance)
1295     this.lastRet = lastRet = dec(lastRet);
1296     }
1297     int nextIndex = this.nextIndex;
1298     if (nextIndex >= 0) {
1299     int x = distance(nextIndex, prevTakeIndex, len);
1300     if (x == removedDistance)
1301 jsr166 1.91 this.nextIndex = nextIndex = REMOVED;
1302 jsr166 1.89 else if (x > removedDistance)
1303     this.nextIndex = nextIndex = dec(nextIndex);
1304     }
1305 jsr166 1.112 if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1306 jsr166 1.91 this.prevTakeIndex = DETACHED;
1307 jsr166 1.89 return true;
1308     }
1309     return false;
1310     }
1311    
1312     /**
1313     * Called whenever takeIndex wraps around to zero.
1314     *
1315 jsr166 1.90 * @return true if this iterator should be unlinked from itrs
1316 jsr166 1.89 */
1317     boolean takeIndexWrapped() {
1318 jsr166 1.95 // assert lock.getHoldCount() == 1;
1319 jsr166 1.89 if (isDetached())
1320     return true;
1321     if (itrs.cycles - prevCycles > 1) {
1322     // All the elements that existed at the time of the last
1323     // operation are gone, so abandon further iteration.
1324     shutdown();
1325     return true;
1326     }
1327     return false;
1328 dl 1.5 }
1329 jsr166 1.89
1330     // /** Uncomment for debugging. */
1331     // public String toString() {
1332     // return ("cursor=" + cursor + " " +
1333     // "nextIndex=" + nextIndex + " " +
1334     // "lastRet=" + lastRet + " " +
1335     // "nextItem=" + nextItem + " " +
1336     // "lastItem=" + lastItem + " " +
1337     // "prevCycles=" + prevCycles + " " +
1338     // "prevTakeIndex=" + prevTakeIndex + " " +
1339     // "size()=" + size() + " " +
1340     // "remainingCapacity()=" + remainingCapacity());
1341     // }
1342 tim 1.1 }
1343 dl 1.100
1344 jsr166 1.105 /**
1345     * Returns a {@link Spliterator} over the elements in this queue.
1346     *
1347 jsr166 1.106 * <p>The returned spliterator is
1348     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1349     *
1350 jsr166 1.105 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1351     * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1352     *
1353     * @implNote
1354     * The {@code Spliterator} implements {@code trySplit} to permit limited
1355     * parallelism.
1356     *
1357     * @return a {@code Spliterator} over the elements in this queue
1358     * @since 1.8
1359     */
1360 dl 1.103 public Spliterator<E> spliterator() {
1361 dl 1.102 return Spliterators.spliterator
1362 jsr166 1.124 (this, (Spliterator.ORDERED |
1363     Spliterator.NONNULL |
1364     Spliterator.CONCURRENT));
1365 dl 1.100 }
1366    
1367 jsr166 1.127 public void forEach(Consumer<? super E> action) {
1368     Objects.requireNonNull(action);
1369     final ReentrantLock lock = this.lock;
1370     lock.lock();
1371     try {
1372     if (count > 0) {
1373     final Object[] items = this.items;
1374 jsr166 1.128 for (int i = takeIndex, end = putIndex,
1375     to = (i < end) ? end : items.length;
1376     ; i = 0, to = end) {
1377     for (; i < to; i++)
1378     action.accept(itemAt(items, i));
1379     if (to == end) break;
1380     }
1381 jsr166 1.127 }
1382     } finally {
1383     lock.unlock();
1384     }
1385     }
1386    
1387 jsr166 1.128 /** debugging */
1388     void checkInvariants() {
1389     // meta-assertion
1390     // assert lock.isHeldByCurrentThread();
1391     try {
1392     int capacity = items.length;
1393     // assert capacity > 0;
1394     // assert takeIndex >= 0 && takeIndex < capacity;
1395     // assert putIndex >= 0 && putIndex < capacity;
1396     // assert count <= capacity;
1397     // assert takeIndex == putIndex || items[takeIndex] != null;
1398     // assert count == capacity || items[putIndex] == null;
1399     // assert takeIndex == putIndex || items[dec(putIndex, capacity)] != null;
1400     } catch (Throwable t) {
1401     System.err.printf("takeIndex=%d putIndex=%d capacity=%d%n",
1402     takeIndex, putIndex, items.length);
1403     System.err.printf("items=%s%n",
1404     Arrays.toString(items));
1405     throw t;
1406     }
1407     }
1408    
1409 tim 1.1 }