ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.151
Committed: Mon Oct 1 00:10:53 2018 UTC (5 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.150: +1 -1 lines
Log Message:
update to using jdk11 by default, except link to jdk10 javadocs;
sync @docRoot references in javadoc with upstream

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.38 * Expert Group and released to the public domain, as explained at
4 jsr166 1.78 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.110
9     import java.lang.ref.WeakReference;
10 jsr166 1.125 import java.util.AbstractQueue;
11 jsr166 1.120 import java.util.Arrays;
12 jsr166 1.85 import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15 jsr166 1.119 import java.util.Objects;
16 jsr166 1.110 import java.util.Spliterator;
17 dl 1.102 import java.util.Spliterators;
18 jsr166 1.110 import java.util.concurrent.locks.Condition;
19     import java.util.concurrent.locks.ReentrantLock;
20 jsr166 1.127 import java.util.function.Consumer;
21 jsr166 1.135 import java.util.function.Predicate;
22 tim 1.1
23     /**
24 dl 1.25 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
25     * array. This queue orders elements FIFO (first-in-first-out). The
26     * <em>head</em> of the queue is that element that has been on the
27     * queue the longest time. The <em>tail</em> of the queue is that
28     * element that has been on the queue the shortest time. New elements
29     * are inserted at the tail of the queue, and the queue retrieval
30     * operations obtain elements at the head of the queue.
31 dholmes 1.13 *
32 dl 1.40 * <p>This is a classic &quot;bounded buffer&quot;, in which a
33     * fixed-sized array holds elements inserted by producers and
34     * extracted by consumers. Once created, the capacity cannot be
35 jsr166 1.69 * changed. Attempts to {@code put} an element into a full queue
36     * will result in the operation blocking; attempts to {@code take} an
37 dl 1.40 * element from an empty queue will similarly block.
38 dl 1.11 *
39 jsr166 1.72 * <p>This class supports an optional fairness policy for ordering
40 dl 1.42 * waiting producer and consumer threads. By default, this ordering
41     * is not guaranteed. However, a queue constructed with fairness set
42 jsr166 1.69 * to {@code true} grants threads access in FIFO order. Fairness
43 dl 1.42 * generally decreases throughput but reduces variability and avoids
44     * starvation.
45 brian 1.7 *
46 jsr166 1.145 * <p>This class and its iterator implement all of the <em>optional</em>
47     * methods of the {@link Collection} and {@link Iterator} interfaces.
48 dl 1.26 *
49 dl 1.41 * <p>This class is a member of the
50 jsr166 1.151 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
51 dl 1.41 * Java Collections Framework</a>.
52     *
53 dl 1.8 * @since 1.5
54     * @author Doug Lea
55 jsr166 1.109 * @param <E> the type of elements held in this queue
56 dl 1.8 */
57 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
58 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
59    
60 jsr166 1.138 /*
61     * Much of the implementation mechanics, especially the unusual
62     * nested loops, are shared and co-maintained with ArrayDeque.
63     */
64    
65 dl 1.36 /**
66 dl 1.42 * Serialization ID. This class relies on default serialization
67     * even for the items array, which is default-serialized, even if
68     * it is empty. Otherwise it could not be declared final, which is
69 dl 1.36 * necessary here.
70     */
71     private static final long serialVersionUID = -817911632652898426L;
72    
73 jsr166 1.73 /** The queued items */
74 jsr166 1.68 final Object[] items;
75 jsr166 1.73
76     /** items index for next take, poll, peek or remove */
77 jsr166 1.58 int takeIndex;
78 jsr166 1.73
79     /** items index for next put, offer, or add */
80 jsr166 1.58 int putIndex;
81 jsr166 1.73
82     /** Number of elements in the queue */
83 jsr166 1.59 int count;
84 tim 1.12
85 dl 1.5 /*
86 dl 1.36 * Concurrency control uses the classic two-condition algorithm
87 dl 1.5 * found in any textbook.
88     */
89    
90 dl 1.11 /** Main lock guarding all access */
91 jsr166 1.58 final ReentrantLock lock;
92 jsr166 1.85
93 dholmes 1.13 /** Condition for waiting takes */
94 dl 1.37 private final Condition notEmpty;
95 jsr166 1.85
96 dl 1.35 /** Condition for waiting puts */
97 dl 1.37 private final Condition notFull;
98 dl 1.5
99 jsr166 1.89 /**
100     * Shared state for currently active iterators, or null if there
101     * are known not to be any. Allows queue operations to update
102     * iterator state.
103     */
104 jsr166 1.121 transient Itrs itrs;
105 jsr166 1.89
106 dl 1.5 // Internal helper methods
107    
108     /**
109 jsr166 1.135 * Increments i, mod modulus.
110     * Precondition and postcondition: 0 <= i < modulus.
111 jsr166 1.71 */
112 jsr166 1.135 static final int inc(int i, int modulus) {
113     if (++i >= modulus) i = 0;
114     return i;
115 jsr166 1.71 }
116    
117 jsr166 1.68 /**
118 jsr166 1.128 * Decrements i, mod modulus.
119     * Precondition and postcondition: 0 <= i < modulus.
120     */
121     static final int dec(int i, int modulus) {
122     if (--i < 0) i = modulus - 1;
123     return i;
124     }
125    
126     /**
127 jsr166 1.68 * Returns item at index i.
128     */
129 jsr166 1.96 @SuppressWarnings("unchecked")
130 jsr166 1.68 final E itemAt(int i) {
131 jsr166 1.96 return (E) items[i];
132 jsr166 1.68 }
133    
134     /**
135 jsr166 1.128 * Returns element at array index i.
136     * This is a slight abuse of generics, accepted by javac.
137     */
138     @SuppressWarnings("unchecked")
139 jsr166 1.132 static <E> E itemAt(Object[] items, int i) {
140 jsr166 1.131 return (E) items[i];
141 jsr166 1.128 }
142    
143     /**
144 jsr166 1.47 * Inserts element at current put position, advances, and signals.
145 dl 1.9 * Call only when holding lock.
146 dl 1.5 */
147 jsr166 1.141 private void enqueue(E e) {
148 jsr166 1.137 // assert lock.isHeldByCurrentThread();
149 jsr166 1.95 // assert lock.getHoldCount() == 1;
150     // assert items[putIndex] == null;
151 dl 1.100 final Object[] items = this.items;
152 jsr166 1.141 items[putIndex] = e;
153 jsr166 1.116 if (++putIndex == items.length) putIndex = 0;
154 jsr166 1.89 count++;
155 dl 1.9 notEmpty.signal();
156 jsr166 1.128 // checkInvariants();
157 tim 1.1 }
158 tim 1.12
159 dl 1.5 /**
160 jsr166 1.47 * Extracts element at current take position, advances, and signals.
161 dl 1.9 * Call only when holding lock.
162 dl 1.5 */
163 jsr166 1.88 private E dequeue() {
164 jsr166 1.137 // assert lock.isHeldByCurrentThread();
165 jsr166 1.95 // assert lock.getHoldCount() == 1;
166     // assert items[takeIndex] != null;
167 jsr166 1.68 final Object[] items = this.items;
168 jsr166 1.97 @SuppressWarnings("unchecked")
169 jsr166 1.141 E e = (E) items[takeIndex];
170 dl 1.5 items[takeIndex] = null;
171 jsr166 1.116 if (++takeIndex == items.length) takeIndex = 0;
172 jsr166 1.89 count--;
173     if (itrs != null)
174     itrs.elementDequeued();
175 dl 1.9 notFull.signal();
176 jsr166 1.128 // checkInvariants();
177 jsr166 1.141 return e;
178 dl 1.5 }
179    
180     /**
181 jsr166 1.90 * Deletes item at array index removeIndex.
182 jsr166 1.89 * Utility for remove(Object) and iterator.remove.
183 dl 1.9 * Call only when holding lock.
184 dl 1.5 */
185 jsr166 1.90 void removeAt(final int removeIndex) {
186 jsr166 1.137 // assert lock.isHeldByCurrentThread();
187 jsr166 1.95 // assert lock.getHoldCount() == 1;
188     // assert items[removeIndex] != null;
189     // assert removeIndex >= 0 && removeIndex < items.length;
190 jsr166 1.68 final Object[] items = this.items;
191 jsr166 1.90 if (removeIndex == takeIndex) {
192 jsr166 1.89 // removing front item; just advance
193 dl 1.9 items[takeIndex] = null;
194 jsr166 1.116 if (++takeIndex == items.length) takeIndex = 0;
195 jsr166 1.90 count--;
196 jsr166 1.89 if (itrs != null)
197     itrs.elementDequeued();
198 tim 1.23 } else {
199 jsr166 1.89 // an "interior" remove
200 jsr166 1.90
201 dl 1.9 // slide over all others up through putIndex.
202 jsr166 1.115 for (int i = removeIndex, putIndex = this.putIndex;;) {
203     int pred = i;
204     if (++i == items.length) i = 0;
205     if (i == putIndex) {
206     items[pred] = null;
207     this.putIndex = pred;
208 dl 1.9 break;
209     }
210 jsr166 1.115 items[pred] = items[i];
211 dl 1.5 }
212 jsr166 1.90 count--;
213     if (itrs != null)
214     itrs.removedAt(removeIndex);
215 dl 1.5 }
216 dl 1.9 notFull.signal();
217 jsr166 1.134 // checkInvariants();
218 tim 1.1 }
219    
220     /**
221 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
222 dholmes 1.13 * capacity and default access policy.
223 jsr166 1.50 *
224 dholmes 1.13 * @param capacity the capacity of this queue
225 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity < 1}
226 dl 1.5 */
227 dholmes 1.13 public ArrayBlockingQueue(int capacity) {
228 dl 1.36 this(capacity, false);
229 dl 1.5 }
230 dl 1.2
231 dl 1.5 /**
232 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
233 dholmes 1.13 * capacity and the specified access policy.
234 jsr166 1.50 *
235 dholmes 1.13 * @param capacity the capacity of this queue
236 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
237 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
238 jsr166 1.69 * if {@code false} the access order is unspecified.
239     * @throws IllegalArgumentException if {@code capacity < 1}
240 dl 1.11 */
241 dholmes 1.13 public ArrayBlockingQueue(int capacity, boolean fair) {
242 dl 1.36 if (capacity <= 0)
243     throw new IllegalArgumentException();
244 jsr166 1.68 this.items = new Object[capacity];
245 dl 1.36 lock = new ReentrantLock(fair);
246     notEmpty = lock.newCondition();
247     notFull = lock.newCondition();
248 dl 1.5 }
249    
250 dholmes 1.16 /**
251 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
252 dholmes 1.21 * capacity, the specified access policy and initially containing the
253 tim 1.17 * elements of the given collection,
254 dholmes 1.16 * added in traversal order of the collection's iterator.
255 jsr166 1.50 *
256 dholmes 1.16 * @param capacity the capacity of this queue
257 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
258 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
259 jsr166 1.69 * if {@code false} the access order is unspecified.
260 dholmes 1.16 * @param c the collection of elements to initially contain
261 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity} is less than
262     * {@code c.size()}, or less than 1.
263 jsr166 1.50 * @throws NullPointerException if the specified collection or any
264     * of its elements are null
265 dholmes 1.16 */
266 tim 1.20 public ArrayBlockingQueue(int capacity, boolean fair,
267 dholmes 1.18 Collection<? extends E> c) {
268 dl 1.36 this(capacity, fair);
269 dholmes 1.16
270 jsr166 1.68 final ReentrantLock lock = this.lock;
271     lock.lock(); // Lock only for visibility, not mutual exclusion
272     try {
273 jsr166 1.133 final Object[] items = this.items;
274 jsr166 1.68 int i = 0;
275     try {
276 jsr166 1.119 for (E e : c)
277     items[i++] = Objects.requireNonNull(e);
278 jsr166 1.68 } catch (ArrayIndexOutOfBoundsException ex) {
279     throw new IllegalArgumentException();
280     }
281     count = i;
282     putIndex = (i == capacity) ? 0 : i;
283 jsr166 1.134 // checkInvariants();
284 jsr166 1.68 } finally {
285     lock.unlock();
286     }
287 dholmes 1.16 }
288 dl 1.2
289 dholmes 1.13 /**
290 jsr166 1.50 * Inserts the specified element at the tail of this queue if it is
291     * possible to do so immediately without exceeding the queue's capacity,
292 jsr166 1.69 * returning {@code true} upon success and throwing an
293     * {@code IllegalStateException} if this queue is full.
294 dl 1.25 *
295 jsr166 1.50 * @param e the element to add
296 jsr166 1.69 * @return {@code true} (as specified by {@link Collection#add})
297 jsr166 1.50 * @throws IllegalStateException if this queue is full
298     * @throws NullPointerException if the specified element is null
299     */
300     public boolean add(E e) {
301 jsr166 1.56 return super.add(e);
302 jsr166 1.50 }
303    
304     /**
305     * Inserts the specified element at the tail of this queue if it is
306     * possible to do so immediately without exceeding the queue's capacity,
307 jsr166 1.69 * returning {@code true} upon success and {@code false} if this queue
308 jsr166 1.50 * is full. This method is generally preferable to method {@link #add},
309     * which can fail to insert an element only by throwing an exception.
310     *
311     * @throws NullPointerException if the specified element is null
312 dholmes 1.13 */
313 jsr166 1.49 public boolean offer(E e) {
314 jsr166 1.119 Objects.requireNonNull(e);
315 dl 1.36 final ReentrantLock lock = this.lock;
316 dl 1.5 lock.lock();
317     try {
318 jsr166 1.59 if (count == items.length)
319 dl 1.2 return false;
320 dl 1.5 else {
321 jsr166 1.88 enqueue(e);
322 dl 1.5 return true;
323     }
324 tim 1.23 } finally {
325 tim 1.12 lock.unlock();
326 dl 1.2 }
327 dl 1.5 }
328 dl 1.2
329 dholmes 1.13 /**
330 jsr166 1.50 * Inserts the specified element at the tail of this queue, waiting
331     * for space to become available if the queue is full.
332     *
333     * @throws InterruptedException {@inheritDoc}
334     * @throws NullPointerException {@inheritDoc}
335     */
336     public void put(E e) throws InterruptedException {
337 jsr166 1.119 Objects.requireNonNull(e);
338 jsr166 1.50 final ReentrantLock lock = this.lock;
339     lock.lockInterruptibly();
340     try {
341 jsr166 1.59 while (count == items.length)
342 jsr166 1.58 notFull.await();
343 jsr166 1.88 enqueue(e);
344 jsr166 1.50 } finally {
345     lock.unlock();
346     }
347     }
348    
349     /**
350     * Inserts the specified element at the tail of this queue, waiting
351     * up to the specified wait time for space to become available if
352     * the queue is full.
353     *
354     * @throws InterruptedException {@inheritDoc}
355     * @throws NullPointerException {@inheritDoc}
356 brian 1.7 */
357 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit)
358 dholmes 1.13 throws InterruptedException {
359 dl 1.2
360 jsr166 1.119 Objects.requireNonNull(e);
361 jsr166 1.56 long nanos = unit.toNanos(timeout);
362 dl 1.36 final ReentrantLock lock = this.lock;
363 dl 1.5 lock.lockInterruptibly();
364     try {
365 jsr166 1.59 while (count == items.length) {
366 jsr166 1.126 if (nanos <= 0L)
367 dl 1.5 return false;
368 jsr166 1.58 nanos = notFull.awaitNanos(nanos);
369 dl 1.5 }
370 jsr166 1.88 enqueue(e);
371 jsr166 1.58 return true;
372 tim 1.23 } finally {
373 jsr166 1.134 // checkInvariants();
374 dl 1.5 lock.unlock();
375 dl 1.2 }
376 dl 1.5 }
377 dl 1.2
378 dholmes 1.13 public E poll() {
379 dl 1.36 final ReentrantLock lock = this.lock;
380 dholmes 1.13 lock.lock();
381     try {
382 jsr166 1.88 return (count == 0) ? null : dequeue();
383 tim 1.23 } finally {
384 tim 1.15 lock.unlock();
385 dholmes 1.13 }
386     }
387    
388 jsr166 1.50 public E take() throws InterruptedException {
389     final ReentrantLock lock = this.lock;
390     lock.lockInterruptibly();
391     try {
392 jsr166 1.59 while (count == 0)
393 jsr166 1.58 notEmpty.await();
394 jsr166 1.88 return dequeue();
395 jsr166 1.50 } finally {
396     lock.unlock();
397     }
398     }
399    
400 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
401 jsr166 1.56 long nanos = unit.toNanos(timeout);
402 dl 1.36 final ReentrantLock lock = this.lock;
403 dl 1.5 lock.lockInterruptibly();
404     try {
405 jsr166 1.59 while (count == 0) {
406 jsr166 1.126 if (nanos <= 0L)
407 dl 1.5 return null;
408 jsr166 1.58 nanos = notEmpty.awaitNanos(nanos);
409 dl 1.2 }
410 jsr166 1.88 return dequeue();
411 tim 1.23 } finally {
412 jsr166 1.131 // checkInvariants();
413 dl 1.5 lock.unlock();
414     }
415     }
416 dl 1.2
417 dholmes 1.13 public E peek() {
418 dl 1.36 final ReentrantLock lock = this.lock;
419 dholmes 1.13 lock.lock();
420     try {
421 jsr166 1.99 return itemAt(takeIndex); // null when queue is empty
422 tim 1.23 } finally {
423 dholmes 1.13 lock.unlock();
424     }
425     }
426    
427     // this doc comment is overridden to remove the reference to collections
428     // greater in size than Integer.MAX_VALUE
429 tim 1.15 /**
430 dl 1.25 * Returns the number of elements in this queue.
431     *
432 jsr166 1.50 * @return the number of elements in this queue
433 dholmes 1.13 */
434     public int size() {
435 dl 1.36 final ReentrantLock lock = this.lock;
436 dholmes 1.13 lock.lock();
437     try {
438     return count;
439 tim 1.23 } finally {
440 dholmes 1.13 lock.unlock();
441     }
442     }
443    
444     // this doc comment is a modified copy of the inherited doc comment,
445     // without the reference to unlimited queues.
446 tim 1.15 /**
447 jsr166 1.48 * Returns the number of additional elements that this queue can ideally
448     * (in the absence of memory or resource constraints) accept without
449 dholmes 1.13 * blocking. This is always equal to the initial capacity of this queue
450 jsr166 1.69 * less the current {@code size} of this queue.
451 jsr166 1.48 *
452     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
453 jsr166 1.69 * an element will succeed by inspecting {@code remainingCapacity}
454 jsr166 1.48 * because it may be the case that another thread is about to
455 jsr166 1.50 * insert or remove an element.
456 dholmes 1.13 */
457     public int remainingCapacity() {
458 dl 1.36 final ReentrantLock lock = this.lock;
459 dholmes 1.13 lock.lock();
460     try {
461     return items.length - count;
462 tim 1.23 } finally {
463 dholmes 1.13 lock.unlock();
464     }
465     }
466    
467 jsr166 1.50 /**
468     * Removes a single instance of the specified element from this queue,
469 jsr166 1.69 * if it is present. More formally, removes an element {@code e} such
470     * that {@code o.equals(e)}, if this queue contains one or more such
471 jsr166 1.50 * elements.
472 jsr166 1.69 * Returns {@code true} if this queue contained the specified element
473 jsr166 1.50 * (or equivalently, if this queue changed as a result of the call).
474     *
475 jsr166 1.64 * <p>Removal of interior elements in circular array based queues
476 dl 1.60 * is an intrinsically slow and disruptive operation, so should
477     * be undertaken only in exceptional circumstances, ideally
478     * only when the queue is known not to be accessible by other
479     * threads.
480     *
481 jsr166 1.50 * @param o element to be removed from this queue, if present
482 jsr166 1.69 * @return {@code true} if this queue changed as a result of the call
483 jsr166 1.50 */
484     public boolean remove(Object o) {
485     if (o == null) return false;
486     final ReentrantLock lock = this.lock;
487     lock.lock();
488     try {
489 jsr166 1.86 if (count > 0) {
490 jsr166 1.114 final Object[] items = this.items;
491 jsr166 1.130 for (int i = takeIndex, end = putIndex,
492     to = (i < end) ? end : items.length;
493     ; i = 0, to = end) {
494     for (; i < to; i++)
495     if (o.equals(items[i])) {
496     removeAt(i);
497     return true;
498     }
499     if (to == end) break;
500     }
501 jsr166 1.50 }
502 jsr166 1.68 return false;
503 jsr166 1.50 } finally {
504 jsr166 1.131 // checkInvariants();
505 jsr166 1.50 lock.unlock();
506     }
507     }
508 dholmes 1.13
509 jsr166 1.50 /**
510 jsr166 1.69 * Returns {@code true} if this queue contains the specified element.
511     * More formally, returns {@code true} if and only if this queue contains
512     * at least one element {@code e} such that {@code o.equals(e)}.
513 jsr166 1.50 *
514     * @param o object to be checked for containment in this queue
515 jsr166 1.69 * @return {@code true} if this queue contains the specified element
516 jsr166 1.50 */
517 dholmes 1.21 public boolean contains(Object o) {
518     if (o == null) return false;
519 dl 1.36 final ReentrantLock lock = this.lock;
520 dl 1.5 lock.lock();
521     try {
522 jsr166 1.86 if (count > 0) {
523 jsr166 1.114 final Object[] items = this.items;
524 jsr166 1.129 for (int i = takeIndex, end = putIndex,
525     to = (i < end) ? end : items.length;
526     ; i = 0, to = end) {
527     for (; i < to; i++)
528     if (o.equals(items[i]))
529     return true;
530     if (to == end) break;
531     }
532 jsr166 1.86 }
533 dl 1.2 return false;
534 tim 1.23 } finally {
535 jsr166 1.131 // checkInvariants();
536 dl 1.5 lock.unlock();
537     }
538     }
539 brian 1.7
540 jsr166 1.50 /**
541     * Returns an array containing all of the elements in this queue, in
542     * proper sequence.
543     *
544     * <p>The returned array will be "safe" in that no references to it are
545     * maintained by this queue. (In other words, this method must allocate
546     * a new array). The caller is thus free to modify the returned array.
547 jsr166 1.51 *
548 jsr166 1.50 * <p>This method acts as bridge between array-based and collection-based
549     * APIs.
550     *
551     * @return an array containing all of the elements in this queue
552     */
553 dl 1.5 public Object[] toArray() {
554 dl 1.36 final ReentrantLock lock = this.lock;
555 dl 1.5 lock.lock();
556     try {
557 jsr166 1.120 final Object[] items = this.items;
558     final int end = takeIndex + count;
559     final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
560     if (end != putIndex)
561     System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
562     return a;
563 tim 1.23 } finally {
564 dl 1.5 lock.unlock();
565     }
566     }
567 brian 1.7
568 jsr166 1.50 /**
569     * Returns an array containing all of the elements in this queue, in
570     * proper sequence; the runtime type of the returned array is that of
571     * the specified array. If the queue fits in the specified array, it
572     * is returned therein. Otherwise, a new array is allocated with the
573     * runtime type of the specified array and the size of this queue.
574     *
575     * <p>If this queue fits in the specified array with room to spare
576     * (i.e., the array has more elements than this queue), the element in
577     * the array immediately following the end of the queue is set to
578 jsr166 1.69 * {@code null}.
579 jsr166 1.50 *
580     * <p>Like the {@link #toArray()} method, this method acts as bridge between
581     * array-based and collection-based APIs. Further, this method allows
582     * precise control over the runtime type of the output array, and may,
583     * under certain circumstances, be used to save allocation costs.
584     *
585 jsr166 1.69 * <p>Suppose {@code x} is a queue known to contain only strings.
586 jsr166 1.50 * The following code can be used to dump the queue into a newly
587 jsr166 1.69 * allocated array of {@code String}:
588 jsr166 1.50 *
589 jsr166 1.117 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
590 jsr166 1.50 *
591 jsr166 1.69 * Note that {@code toArray(new Object[0])} is identical in function to
592     * {@code toArray()}.
593 jsr166 1.50 *
594     * @param a the array into which the elements of the queue are to
595     * be stored, if it is big enough; otherwise, a new array of the
596     * same runtime type is allocated for this purpose
597     * @return an array containing all of the elements in this queue
598     * @throws ArrayStoreException if the runtime type of the specified array
599     * is not a supertype of the runtime type of every element in
600     * this queue
601     * @throws NullPointerException if the specified array is null
602     */
603 jsr166 1.68 @SuppressWarnings("unchecked")
604 dl 1.5 public <T> T[] toArray(T[] a) {
605 dl 1.36 final ReentrantLock lock = this.lock;
606 dl 1.5 lock.lock();
607     try {
608 jsr166 1.120 final Object[] items = this.items;
609 jsr166 1.68 final int count = this.count;
610 jsr166 1.120 final int firstLeg = Math.min(items.length - takeIndex, count);
611     if (a.length < count) {
612     a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
613     a.getClass());
614     } else {
615     System.arraycopy(items, takeIndex, a, 0, firstLeg);
616     if (a.length > count)
617     a[count] = null;
618     }
619     if (firstLeg < count)
620     System.arraycopy(items, 0, a, firstLeg, putIndex);
621     return a;
622 tim 1.23 } finally {
623 dl 1.5 lock.unlock();
624     }
625     }
626 dl 1.6
627     public String toString() {
628 jsr166 1.122 return Helpers.collectionToString(this);
629 dl 1.6 }
630 tim 1.12
631 dl 1.44 /**
632     * Atomically removes all of the elements from this queue.
633     * The queue will be empty after this call returns.
634     */
635 dl 1.30 public void clear() {
636 dl 1.36 final ReentrantLock lock = this.lock;
637 dl 1.30 lock.lock();
638     try {
639 jsr166 1.131 int k;
640     if ((k = count) > 0) {
641     circularClear(items, takeIndex, putIndex);
642 jsr166 1.86 takeIndex = putIndex;
643     count = 0;
644 jsr166 1.89 if (itrs != null)
645     itrs.queueIsEmpty();
646 jsr166 1.86 for (; k > 0 && lock.hasWaiters(notFull); k--)
647     notFull.signal();
648     }
649 dl 1.30 } finally {
650 jsr166 1.131 // checkInvariants();
651 dl 1.30 lock.unlock();
652     }
653     }
654    
655 jsr166 1.50 /**
656 jsr166 1.131 * Nulls out slots starting at array index i, upto index end.
657 jsr166 1.142 * Condition i == end means "full" - the entire array is cleared.
658 jsr166 1.131 */
659     private static void circularClear(Object[] items, int i, int end) {
660 jsr166 1.142 // assert 0 <= i && i < items.length;
661     // assert 0 <= end && end < items.length;
662 jsr166 1.131 for (int to = (i < end) ? end : items.length;
663     ; i = 0, to = end) {
664 jsr166 1.142 for (; i < to; i++) items[i] = null;
665 jsr166 1.131 if (to == end) break;
666     }
667     }
668    
669     /**
670 jsr166 1.50 * @throws UnsupportedOperationException {@inheritDoc}
671     * @throws ClassCastException {@inheritDoc}
672     * @throws NullPointerException {@inheritDoc}
673     * @throws IllegalArgumentException {@inheritDoc}
674     */
675 dl 1.30 public int drainTo(Collection<? super E> c) {
676 jsr166 1.81 return drainTo(c, Integer.MAX_VALUE);
677 dl 1.30 }
678    
679 jsr166 1.50 /**
680     * @throws UnsupportedOperationException {@inheritDoc}
681     * @throws ClassCastException {@inheritDoc}
682     * @throws NullPointerException {@inheritDoc}
683     * @throws IllegalArgumentException {@inheritDoc}
684     */
685 dl 1.30 public int drainTo(Collection<? super E> c, int maxElements) {
686 jsr166 1.119 Objects.requireNonNull(c);
687 dl 1.30 if (c == this)
688     throw new IllegalArgumentException();
689     if (maxElements <= 0)
690     return 0;
691 jsr166 1.68 final Object[] items = this.items;
692 dl 1.36 final ReentrantLock lock = this.lock;
693 dl 1.30 lock.lock();
694     try {
695 jsr166 1.82 int n = Math.min(maxElements, count);
696     int take = takeIndex;
697     int i = 0;
698     try {
699     while (i < n) {
700 jsr166 1.97 @SuppressWarnings("unchecked")
701 jsr166 1.141 E e = (E) items[take];
702     c.add(e);
703 jsr166 1.82 items[take] = null;
704 jsr166 1.116 if (++take == items.length) take = 0;
705 jsr166 1.86 i++;
706 jsr166 1.82 }
707     return n;
708     } finally {
709     // Restore invariants even if c.add() threw
710 jsr166 1.89 if (i > 0) {
711     count -= i;
712     takeIndex = take;
713     if (itrs != null) {
714     if (count == 0)
715     itrs.queueIsEmpty();
716     else if (i > take)
717     itrs.takeIndexWrapped();
718     }
719     for (; i > 0 && lock.hasWaiters(notFull); i--)
720     notFull.signal();
721     }
722 dl 1.30 }
723     } finally {
724 jsr166 1.134 // checkInvariants();
725 dl 1.30 lock.unlock();
726     }
727     }
728    
729 brian 1.7 /**
730 dl 1.75 * Returns an iterator over the elements in this queue in proper sequence.
731 jsr166 1.77 * The elements will be returned in order from first (head) to last (tail).
732     *
733 jsr166 1.106 * <p>The returned iterator is
734     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
735 brian 1.7 *
736 jsr166 1.50 * @return an iterator over the elements in this queue in proper sequence
737 brian 1.7 */
738 dl 1.5 public Iterator<E> iterator() {
739 jsr166 1.68 return new Itr();
740 dl 1.5 }
741 dl 1.8
742     /**
743 jsr166 1.94 * Shared data between iterators and their queue, allowing queue
744 jsr166 1.89 * modifications to update iterators when elements are removed.
745     *
746 jsr166 1.94 * This adds a lot of complexity for the sake of correctly
747     * handling some uncommon operations, but the combination of
748     * circular-arrays and supporting interior removes (i.e., those
749     * not at head) would cause iterators to sometimes lose their
750     * places and/or (re)report elements they shouldn't. To avoid
751     * this, when a queue has one or more iterators, it keeps iterator
752     * state consistent by:
753     *
754     * (1) keeping track of the number of "cycles", that is, the
755     * number of times takeIndex has wrapped around to 0.
756     * (2) notifying all iterators via the callback removedAt whenever
757     * an interior element is removed (and thus other elements may
758     * be shifted).
759     *
760     * These suffice to eliminate iterator inconsistencies, but
761     * unfortunately add the secondary responsibility of maintaining
762     * the list of iterators. We track all active iterators in a
763     * simple linked list (accessed only when the queue's lock is
764     * held) of weak references to Itr. The list is cleaned up using
765     * 3 different mechanisms:
766 jsr166 1.89 *
767     * (1) Whenever a new iterator is created, do some O(1) checking for
768     * stale list elements.
769     *
770     * (2) Whenever takeIndex wraps around to 0, check for iterators
771     * that have been unused for more than one wrap-around cycle.
772     *
773     * (3) Whenever the queue becomes empty, all iterators are notified
774     * and this entire data structure is discarded.
775     *
776 jsr166 1.94 * So in addition to the removedAt callback that is necessary for
777     * correctness, iterators have the shutdown and takeIndexWrapped
778     * callbacks that help remove stale iterators from the list.
779     *
780     * Whenever a list element is examined, it is expunged if either
781     * the GC has determined that the iterator is discarded, or if the
782     * iterator reports that it is "detached" (does not need any
783     * further state updates). Overhead is maximal when takeIndex
784     * never advances, iterators are discarded before they are
785     * exhausted, and all removals are interior removes, in which case
786     * all stale iterators are discovered by the GC. But even in this
787     * case we don't increase the amortized complexity.
788     *
789     * Care must be taken to keep list sweeping methods from
790     * reentrantly invoking another such method, causing subtle
791     * corruption bugs.
792 jsr166 1.89 */
793     class Itrs {
794    
795     /**
796     * Node in a linked list of weak iterator references.
797     */
798     private class Node extends WeakReference<Itr> {
799     Node next;
800    
801     Node(Itr iterator, Node next) {
802     super(iterator);
803     this.next = next;
804     }
805     }
806    
807     /** Incremented whenever takeIndex wraps around to 0 */
808 jsr166 1.108 int cycles;
809 jsr166 1.89
810     /** Linked list of weak iterator references */
811 jsr166 1.93 private Node head;
812 jsr166 1.89
813     /** Used to expunge stale iterators */
814 jsr166 1.108 private Node sweeper;
815 jsr166 1.89
816     private static final int SHORT_SWEEP_PROBES = 4;
817     private static final int LONG_SWEEP_PROBES = 16;
818    
819 jsr166 1.93 Itrs(Itr initial) {
820     register(initial);
821     }
822    
823 jsr166 1.89 /**
824     * Sweeps itrs, looking for and expunging stale iterators.
825     * If at least one was found, tries harder to find more.
826 jsr166 1.94 * Called only from iterating thread.
827 jsr166 1.89 *
828     * @param tryHarder whether to start in try-harder mode, because
829     * there is known to be at least one iterator to collect
830     */
831     void doSomeSweeping(boolean tryHarder) {
832 jsr166 1.137 // assert lock.isHeldByCurrentThread();
833 jsr166 1.95 // assert head != null;
834 jsr166 1.89 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
835     Node o, p;
836     final Node sweeper = this.sweeper;
837 jsr166 1.93 boolean passedGo; // to limit search to one full sweep
838 jsr166 1.89
839 jsr166 1.93 if (sweeper == null) {
840     o = null;
841     p = head;
842     passedGo = true;
843     } else {
844 jsr166 1.89 o = sweeper;
845     p = o.next;
846 jsr166 1.93 passedGo = false;
847 jsr166 1.89 }
848    
849 jsr166 1.93 for (; probes > 0; probes--) {
850     if (p == null) {
851     if (passedGo)
852     break;
853     o = null;
854     p = head;
855     passedGo = true;
856     }
857 jsr166 1.89 final Itr it = p.get();
858     final Node next = p.next;
859     if (it == null || it.isDetached()) {
860     // found a discarded/exhausted iterator
861     probes = LONG_SWEEP_PROBES; // "try harder"
862     // unlink p
863     p.clear();
864     p.next = null;
865 jsr166 1.93 if (o == null) {
866 jsr166 1.89 head = next;
867 jsr166 1.93 if (next == null) {
868     // We've run out of iterators to track; retire
869     itrs = null;
870     return;
871     }
872     }
873 jsr166 1.89 else
874     o.next = next;
875     } else {
876     o = p;
877     }
878     p = next;
879     }
880    
881     this.sweeper = (p == null) ? null : o;
882     }
883    
884     /**
885     * Adds a new iterator to the linked list of tracked iterators.
886     */
887     void register(Itr itr) {
888 jsr166 1.137 // assert lock.isHeldByCurrentThread();
889 jsr166 1.89 head = new Node(itr, head);
890     }
891    
892     /**
893     * Called whenever takeIndex wraps around to 0.
894     *
895     * Notifies all iterators, and expunges any that are now stale.
896     */
897     void takeIndexWrapped() {
898 jsr166 1.137 // assert lock.isHeldByCurrentThread();
899 jsr166 1.89 cycles++;
900     for (Node o = null, p = head; p != null;) {
901     final Itr it = p.get();
902     final Node next = p.next;
903     if (it == null || it.takeIndexWrapped()) {
904     // unlink p
905 jsr166 1.95 // assert it == null || it.isDetached();
906 jsr166 1.89 p.clear();
907     p.next = null;
908     if (o == null)
909     head = next;
910     else
911     o.next = next;
912     } else {
913     o = p;
914     }
915     p = next;
916     }
917     if (head == null) // no more iterators to track
918     itrs = null;
919     }
920    
921     /**
922 jsr166 1.107 * Called whenever an interior remove (not at takeIndex) occurred.
923 jsr166 1.93 *
924     * Notifies all iterators, and expunges any that are now stale.
925 jsr166 1.89 */
926 jsr166 1.90 void removedAt(int removedIndex) {
927 jsr166 1.89 for (Node o = null, p = head; p != null;) {
928     final Itr it = p.get();
929     final Node next = p.next;
930 jsr166 1.90 if (it == null || it.removedAt(removedIndex)) {
931 jsr166 1.89 // unlink p
932 jsr166 1.95 // assert it == null || it.isDetached();
933 jsr166 1.89 p.clear();
934     p.next = null;
935     if (o == null)
936     head = next;
937     else
938     o.next = next;
939     } else {
940     o = p;
941     }
942     p = next;
943     }
944     if (head == null) // no more iterators to track
945     itrs = null;
946     }
947    
948     /**
949     * Called whenever the queue becomes empty.
950     *
951     * Notifies all active iterators that the queue is empty,
952     * clears all weak refs, and unlinks the itrs datastructure.
953     */
954     void queueIsEmpty() {
955 jsr166 1.137 // assert lock.isHeldByCurrentThread();
956 jsr166 1.89 for (Node p = head; p != null; p = p.next) {
957     Itr it = p.get();
958     if (it != null) {
959     p.clear();
960     it.shutdown();
961     }
962     }
963     head = null;
964     itrs = null;
965     }
966    
967     /**
968 jsr166 1.90 * Called whenever an element has been dequeued (at takeIndex).
969 jsr166 1.89 */
970     void elementDequeued() {
971 jsr166 1.137 // assert lock.isHeldByCurrentThread();
972 jsr166 1.89 if (count == 0)
973     queueIsEmpty();
974     else if (takeIndex == 0)
975     takeIndexWrapped();
976     }
977     }
978    
979     /**
980     * Iterator for ArrayBlockingQueue.
981     *
982     * To maintain weak consistency with respect to puts and takes, we
983     * read ahead one slot, so as to not report hasNext true but then
984     * not have an element to return.
985     *
986     * We switch into "detached" mode (allowing prompt unlinking from
987     * itrs without help from the GC) when all indices are negative, or
988     * when hasNext returns false for the first time. This allows the
989     * iterator to track concurrent updates completely accurately,
990     * except for the corner case of the user calling Iterator.remove()
991     * after hasNext() returned false. Even in this case, we ensure
992     * that we don't remove the wrong element by keeping track of the
993     * expected element to remove, in lastItem. Yes, we may fail to
994     * remove lastItem from the queue if it moved due to an interleaved
995     * interior remove while in detached mode.
996 jsr166 1.144 *
997     * Method forEachRemaining, added in Java 8, is treated similarly
998     * to hasNext returning false, in that we switch to detached mode,
999     * but we regard it as an even stronger request to "close" this
1000     * iteration, and don't bother supporting subsequent remove().
1001 dl 1.8 */
1002 dl 1.5 private class Itr implements Iterator<E> {
1003 jsr166 1.91 /** Index to look for new nextItem; NONE at end */
1004 jsr166 1.89 private int cursor;
1005    
1006     /** Element to be returned by next call to next(); null if none */
1007     private E nextItem;
1008    
1009 jsr166 1.91 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
1010 jsr166 1.89 private int nextIndex;
1011    
1012     /** Last element returned; null if none or not detached. */
1013     private E lastItem;
1014    
1015 jsr166 1.91 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1016 jsr166 1.89 private int lastRet;
1017    
1018 jsr166 1.91 /** Previous value of takeIndex, or DETACHED when detached */
1019 jsr166 1.89 private int prevTakeIndex;
1020    
1021     /** Previous value of iters.cycles */
1022     private int prevCycles;
1023 tim 1.12
1024 jsr166 1.91 /** Special index value indicating "not available" or "undefined" */
1025     private static final int NONE = -1;
1026    
1027     /**
1028     * Special index value indicating "removed elsewhere", that is,
1029     * removed by some operation other than a call to this.remove().
1030     */
1031     private static final int REMOVED = -2;
1032    
1033     /** Special value for prevTakeIndex indicating "detached mode" */
1034     private static final int DETACHED = -3;
1035    
1036 dl 1.66 Itr() {
1037 jsr166 1.91 lastRet = NONE;
1038 jsr166 1.68 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1039     lock.lock();
1040     try {
1041 jsr166 1.89 if (count == 0) {
1042 jsr166 1.95 // assert itrs == null;
1043 jsr166 1.91 cursor = NONE;
1044     nextIndex = NONE;
1045     prevTakeIndex = DETACHED;
1046 jsr166 1.89 } else {
1047     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1048     prevTakeIndex = takeIndex;
1049 jsr166 1.68 nextItem = itemAt(nextIndex = takeIndex);
1050 jsr166 1.89 cursor = incCursor(takeIndex);
1051     if (itrs == null) {
1052 jsr166 1.93 itrs = new Itrs(this);
1053 jsr166 1.89 } else {
1054     itrs.register(this); // in this order
1055     itrs.doSomeSweeping(false);
1056     }
1057     prevCycles = itrs.cycles;
1058 jsr166 1.95 // assert takeIndex >= 0;
1059     // assert prevTakeIndex == takeIndex;
1060     // assert nextIndex >= 0;
1061     // assert nextItem != null;
1062 jsr166 1.89 }
1063 jsr166 1.68 } finally {
1064     lock.unlock();
1065     }
1066 dl 1.5 }
1067 tim 1.12
1068 jsr166 1.89 boolean isDetached() {
1069 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1070 jsr166 1.89 return prevTakeIndex < 0;
1071     }
1072    
1073     private int incCursor(int index) {
1074 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1075 jsr166 1.116 if (++index == items.length) index = 0;
1076     if (index == putIndex) index = NONE;
1077 jsr166 1.89 return index;
1078     }
1079    
1080     /**
1081     * Returns true if index is invalidated by the given number of
1082     * dequeues, starting from prevTakeIndex.
1083     */
1084     private boolean invalidated(int index, int prevTakeIndex,
1085     long dequeues, int length) {
1086     if (index < 0)
1087     return false;
1088     int distance = index - prevTakeIndex;
1089     if (distance < 0)
1090     distance += length;
1091     return dequeues > distance;
1092     }
1093    
1094     /**
1095     * Adjusts indices to incorporate all dequeues since the last
1096     * operation on this iterator. Call only from iterating thread.
1097     */
1098     private void incorporateDequeues() {
1099 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1100 jsr166 1.95 // assert itrs != null;
1101     // assert !isDetached();
1102     // assert count > 0;
1103 jsr166 1.89
1104     final int cycles = itrs.cycles;
1105     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1106     final int prevCycles = this.prevCycles;
1107     final int prevTakeIndex = this.prevTakeIndex;
1108    
1109     if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1110     final int len = items.length;
1111     // how far takeIndex has advanced since the previous
1112     // operation of this iterator
1113 jsr166 1.150 long dequeues = (long) (cycles - prevCycles) * len
1114 jsr166 1.89 + (takeIndex - prevTakeIndex);
1115    
1116     // Check indices for invalidation
1117     if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1118 jsr166 1.91 lastRet = REMOVED;
1119 jsr166 1.89 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1120 jsr166 1.91 nextIndex = REMOVED;
1121 jsr166 1.89 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1122     cursor = takeIndex;
1123    
1124     if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1125     detach();
1126     else {
1127     this.prevCycles = cycles;
1128     this.prevTakeIndex = takeIndex;
1129     }
1130     }
1131     }
1132    
1133     /**
1134     * Called when itrs should stop tracking this iterator, either
1135     * because there are no more indices to update (cursor < 0 &&
1136     * nextIndex < 0 && lastRet < 0) or as a special exception, when
1137     * lastRet >= 0, because hasNext() is about to return false for the
1138     * first time. Call only from iterating thread.
1139     */
1140     private void detach() {
1141     // Switch to detached mode
1142 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1143 jsr166 1.95 // assert cursor == NONE;
1144     // assert nextIndex < 0;
1145     // assert lastRet < 0 || nextItem == null;
1146     // assert lastRet < 0 ^ lastItem != null;
1147 jsr166 1.89 if (prevTakeIndex >= 0) {
1148 jsr166 1.95 // assert itrs != null;
1149 jsr166 1.91 prevTakeIndex = DETACHED;
1150 jsr166 1.89 // try to unlink from itrs (but not too hard)
1151     itrs.doSomeSweeping(true);
1152     }
1153     }
1154    
1155     /**
1156     * For performance reasons, we would like not to acquire a lock in
1157     * hasNext in the common case. To allow for this, we only access
1158     * fields (i.e. nextItem) that are not modified by update operations
1159     * triggered by queue modifications.
1160     */
1161 dl 1.5 public boolean hasNext() {
1162 jsr166 1.89 if (nextItem != null)
1163     return true;
1164     noNext();
1165     return false;
1166     }
1167    
1168     private void noNext() {
1169     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1170     lock.lock();
1171     try {
1172 jsr166 1.95 // assert cursor == NONE;
1173     // assert nextIndex == NONE;
1174 jsr166 1.89 if (!isDetached()) {
1175 jsr166 1.95 // assert lastRet >= 0;
1176 jsr166 1.89 incorporateDequeues(); // might update lastRet
1177     if (lastRet >= 0) {
1178     lastItem = itemAt(lastRet);
1179 jsr166 1.95 // assert lastItem != null;
1180 jsr166 1.89 detach();
1181     }
1182     }
1183 jsr166 1.95 // assert isDetached();
1184     // assert lastRet < 0 ^ lastItem != null;
1185 jsr166 1.89 } finally {
1186     lock.unlock();
1187     }
1188 dl 1.5 }
1189 tim 1.12
1190 dl 1.5 public E next() {
1191 jsr166 1.141 final E e = nextItem;
1192     if (e == null)
1193 jsr166 1.89 throw new NoSuchElementException();
1194 dl 1.66 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1195     lock.lock();
1196     try {
1197 jsr166 1.92 if (!isDetached())
1198 jsr166 1.89 incorporateDequeues();
1199 jsr166 1.95 // assert nextIndex != NONE;
1200     // assert lastItem == null;
1201 dl 1.66 lastRet = nextIndex;
1202 jsr166 1.89 final int cursor = this.cursor;
1203     if (cursor >= 0) {
1204     nextItem = itemAt(nextIndex = cursor);
1205 jsr166 1.95 // assert nextItem != null;
1206 jsr166 1.89 this.cursor = incCursor(cursor);
1207     } else {
1208 jsr166 1.91 nextIndex = NONE;
1209 jsr166 1.89 nextItem = null;
1210 jsr166 1.146 if (lastRet == REMOVED) detach();
1211 jsr166 1.89 }
1212 dl 1.66 } finally {
1213     lock.unlock();
1214 dl 1.2 }
1215 jsr166 1.141 return e;
1216 dl 1.5 }
1217 tim 1.12
1218 jsr166 1.136 public void forEachRemaining(Consumer<? super E> action) {
1219     Objects.requireNonNull(action);
1220     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1221     lock.lock();
1222     try {
1223 jsr166 1.141 final E e = nextItem;
1224     if (e == null) return;
1225 jsr166 1.136 if (!isDetached())
1226     incorporateDequeues();
1227 jsr166 1.141 action.accept(e);
1228 jsr166 1.136 if (isDetached() || cursor < 0) return;
1229     final Object[] items = ArrayBlockingQueue.this.items;
1230     for (int i = cursor, end = putIndex,
1231     to = (i < end) ? end : items.length;
1232     ; i = 0, to = end) {
1233     for (; i < to; i++)
1234     action.accept(itemAt(items, i));
1235     if (to == end) break;
1236     }
1237     } finally {
1238     // Calling forEachRemaining is a strong hint that this
1239     // iteration is surely over; supporting remove() after
1240     // forEachRemaining() is more trouble than it's worth
1241 jsr166 1.140 cursor = nextIndex = lastRet = NONE;
1242 jsr166 1.136 nextItem = lastItem = null;
1243     detach();
1244     lock.unlock();
1245     }
1246     }
1247    
1248 dl 1.5 public void remove() {
1249 dl 1.36 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1250 dl 1.5 lock.lock();
1251 jsr166 1.137 // assert lock.getHoldCount() == 1;
1252 dl 1.5 try {
1253 jsr166 1.92 if (!isDetached())
1254     incorporateDequeues(); // might update lastRet or detach
1255     final int lastRet = this.lastRet;
1256     this.lastRet = NONE;
1257 jsr166 1.89 if (lastRet >= 0) {
1258 jsr166 1.92 if (!isDetached())
1259     removeAt(lastRet);
1260     else {
1261     final E lastItem = this.lastItem;
1262 jsr166 1.95 // assert lastItem != null;
1263 jsr166 1.92 this.lastItem = null;
1264 jsr166 1.89 if (itemAt(lastRet) == lastItem)
1265     removeAt(lastRet);
1266     }
1267 jsr166 1.91 } else if (lastRet == NONE)
1268 dl 1.66 throw new IllegalStateException();
1269 jsr166 1.91 // else lastRet == REMOVED and the last returned element was
1270 jsr166 1.89 // previously asynchronously removed via an operation other
1271     // than this.remove(), so nothing to do.
1272    
1273     if (cursor < 0 && nextIndex < 0)
1274     detach();
1275     } finally {
1276     lock.unlock();
1277 jsr166 1.95 // assert lastRet == NONE;
1278     // assert lastItem == null;
1279 jsr166 1.89 }
1280     }
1281    
1282     /**
1283     * Called to notify the iterator that the queue is empty, or that it
1284     * has fallen hopelessly behind, so that it should abandon any
1285     * further iteration, except possibly to return one more element
1286     * from next(), as promised by returning true from hasNext().
1287     */
1288     void shutdown() {
1289 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1290 jsr166 1.91 cursor = NONE;
1291 jsr166 1.89 if (nextIndex >= 0)
1292 jsr166 1.91 nextIndex = REMOVED;
1293 jsr166 1.89 if (lastRet >= 0) {
1294 jsr166 1.91 lastRet = REMOVED;
1295 jsr166 1.61 lastItem = null;
1296 jsr166 1.89 }
1297 jsr166 1.91 prevTakeIndex = DETACHED;
1298 jsr166 1.89 // Don't set nextItem to null because we must continue to be
1299     // able to return it on next().
1300     //
1301     // Caller will unlink from itrs when convenient.
1302     }
1303    
1304     private int distance(int index, int prevTakeIndex, int length) {
1305     int distance = index - prevTakeIndex;
1306     if (distance < 0)
1307     distance += length;
1308     return distance;
1309     }
1310    
1311     /**
1312 jsr166 1.107 * Called whenever an interior remove (not at takeIndex) occurred.
1313 jsr166 1.89 *
1314 jsr166 1.90 * @return true if this iterator should be unlinked from itrs
1315 jsr166 1.89 */
1316 jsr166 1.90 boolean removedAt(int removedIndex) {
1317 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1318 jsr166 1.89 if (isDetached())
1319     return true;
1320    
1321     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1322     final int prevTakeIndex = this.prevTakeIndex;
1323     final int len = items.length;
1324 jsr166 1.112 // distance from prevTakeIndex to removedIndex
1325 jsr166 1.89 final int removedDistance =
1326 jsr166 1.112 len * (itrs.cycles - this.prevCycles
1327     + ((removedIndex < takeIndex) ? 1 : 0))
1328     + (removedIndex - prevTakeIndex);
1329     // assert itrs.cycles - this.prevCycles >= 0;
1330     // assert itrs.cycles - this.prevCycles <= 1;
1331     // assert removedDistance > 0;
1332     // assert removedIndex != takeIndex;
1333 jsr166 1.89 int cursor = this.cursor;
1334     if (cursor >= 0) {
1335     int x = distance(cursor, prevTakeIndex, len);
1336     if (x == removedDistance) {
1337 jsr166 1.90 if (cursor == putIndex)
1338 jsr166 1.91 this.cursor = cursor = NONE;
1339 jsr166 1.89 }
1340     else if (x > removedDistance) {
1341 jsr166 1.95 // assert cursor != prevTakeIndex;
1342 jsr166 1.135 this.cursor = cursor = dec(cursor, len);
1343 jsr166 1.71 }
1344 dl 1.5 }
1345 jsr166 1.89 int lastRet = this.lastRet;
1346     if (lastRet >= 0) {
1347     int x = distance(lastRet, prevTakeIndex, len);
1348     if (x == removedDistance)
1349 jsr166 1.91 this.lastRet = lastRet = REMOVED;
1350 jsr166 1.89 else if (x > removedDistance)
1351 jsr166 1.135 this.lastRet = lastRet = dec(lastRet, len);
1352 jsr166 1.89 }
1353     int nextIndex = this.nextIndex;
1354     if (nextIndex >= 0) {
1355     int x = distance(nextIndex, prevTakeIndex, len);
1356     if (x == removedDistance)
1357 jsr166 1.91 this.nextIndex = nextIndex = REMOVED;
1358 jsr166 1.89 else if (x > removedDistance)
1359 jsr166 1.135 this.nextIndex = nextIndex = dec(nextIndex, len);
1360 jsr166 1.89 }
1361 jsr166 1.112 if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1362 jsr166 1.91 this.prevTakeIndex = DETACHED;
1363 jsr166 1.89 return true;
1364     }
1365     return false;
1366     }
1367    
1368     /**
1369     * Called whenever takeIndex wraps around to zero.
1370     *
1371 jsr166 1.90 * @return true if this iterator should be unlinked from itrs
1372 jsr166 1.89 */
1373     boolean takeIndexWrapped() {
1374 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1375 jsr166 1.89 if (isDetached())
1376     return true;
1377     if (itrs.cycles - prevCycles > 1) {
1378     // All the elements that existed at the time of the last
1379     // operation are gone, so abandon further iteration.
1380     shutdown();
1381     return true;
1382     }
1383     return false;
1384 dl 1.5 }
1385 jsr166 1.89
1386     // /** Uncomment for debugging. */
1387     // public String toString() {
1388     // return ("cursor=" + cursor + " " +
1389     // "nextIndex=" + nextIndex + " " +
1390     // "lastRet=" + lastRet + " " +
1391     // "nextItem=" + nextItem + " " +
1392     // "lastItem=" + lastItem + " " +
1393     // "prevCycles=" + prevCycles + " " +
1394     // "prevTakeIndex=" + prevTakeIndex + " " +
1395     // "size()=" + size() + " " +
1396     // "remainingCapacity()=" + remainingCapacity());
1397     // }
1398 tim 1.1 }
1399 dl 1.100
1400 jsr166 1.105 /**
1401     * Returns a {@link Spliterator} over the elements in this queue.
1402     *
1403 jsr166 1.106 * <p>The returned spliterator is
1404     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1405     *
1406 jsr166 1.105 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1407     * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1408     *
1409     * @implNote
1410     * The {@code Spliterator} implements {@code trySplit} to permit limited
1411     * parallelism.
1412     *
1413     * @return a {@code Spliterator} over the elements in this queue
1414     * @since 1.8
1415     */
1416 dl 1.103 public Spliterator<E> spliterator() {
1417 dl 1.102 return Spliterators.spliterator
1418 jsr166 1.124 (this, (Spliterator.ORDERED |
1419     Spliterator.NONNULL |
1420     Spliterator.CONCURRENT));
1421 dl 1.100 }
1422    
1423 jsr166 1.143 /**
1424     * @throws NullPointerException {@inheritDoc}
1425     */
1426 jsr166 1.127 public void forEach(Consumer<? super E> action) {
1427     Objects.requireNonNull(action);
1428     final ReentrantLock lock = this.lock;
1429     lock.lock();
1430     try {
1431     if (count > 0) {
1432     final Object[] items = this.items;
1433 jsr166 1.128 for (int i = takeIndex, end = putIndex,
1434     to = (i < end) ? end : items.length;
1435     ; i = 0, to = end) {
1436     for (; i < to; i++)
1437     action.accept(itemAt(items, i));
1438     if (to == end) break;
1439     }
1440 jsr166 1.127 }
1441     } finally {
1442 jsr166 1.131 // checkInvariants();
1443 jsr166 1.127 lock.unlock();
1444     }
1445     }
1446    
1447 jsr166 1.135 /**
1448     * @throws NullPointerException {@inheritDoc}
1449     */
1450     public boolean removeIf(Predicate<? super E> filter) {
1451     Objects.requireNonNull(filter);
1452     return bulkRemove(filter);
1453     }
1454    
1455     /**
1456     * @throws NullPointerException {@inheritDoc}
1457     */
1458     public boolean removeAll(Collection<?> c) {
1459     Objects.requireNonNull(c);
1460     return bulkRemove(e -> c.contains(e));
1461     }
1462    
1463     /**
1464     * @throws NullPointerException {@inheritDoc}
1465     */
1466     public boolean retainAll(Collection<?> c) {
1467     Objects.requireNonNull(c);
1468     return bulkRemove(e -> !c.contains(e));
1469     }
1470    
1471     /** Implementation of bulk remove methods. */
1472     private boolean bulkRemove(Predicate<? super E> filter) {
1473     final ReentrantLock lock = this.lock;
1474     lock.lock();
1475     try {
1476     if (itrs == null) { // check for active iterators
1477     if (count > 0) {
1478     final Object[] items = this.items;
1479     // Optimize for initial run of survivors
1480     for (int i = takeIndex, end = putIndex,
1481     to = (i < end) ? end : items.length;
1482     ; i = 0, to = end) {
1483     for (; i < to; i++)
1484     if (filter.test(itemAt(items, i)))
1485 jsr166 1.137 return bulkRemoveModified(filter, i);
1486 jsr166 1.135 if (to == end) break;
1487     }
1488     }
1489     return false;
1490     }
1491     } finally {
1492     // checkInvariants();
1493     lock.unlock();
1494     }
1495     // Active iterators are too hairy!
1496     // Punting (for now) to the slow n^2 algorithm ...
1497     return super.removeIf(filter);
1498     }
1499    
1500 jsr166 1.137 // A tiny bit set implementation
1501    
1502     private static long[] nBits(int n) {
1503     return new long[((n - 1) >> 6) + 1];
1504     }
1505     private static void setBit(long[] bits, int i) {
1506     bits[i >> 6] |= 1L << i;
1507     }
1508     private static boolean isClear(long[] bits, int i) {
1509     return (bits[i >> 6] & (1L << i)) == 0;
1510     }
1511    
1512     /**
1513     * Returns circular distance from i to j, disambiguating i == j to
1514     * items.length; never returns 0.
1515     */
1516     private int distanceNonEmpty(int i, int j) {
1517     if ((j -= i) <= 0) j += items.length;
1518     return j;
1519     }
1520    
1521 jsr166 1.135 /**
1522     * Helper for bulkRemove, in case of at least one deletion.
1523 jsr166 1.137 * Tolerate predicates that reentrantly access the collection for
1524     * read (but not write), so traverse once to find elements to
1525     * delete, a second pass to physically expunge.
1526     *
1527     * @param beg valid index of first element to be deleted
1528 jsr166 1.135 */
1529     private boolean bulkRemoveModified(
1530 jsr166 1.137 Predicate<? super E> filter, final int beg) {
1531     final Object[] es = items;
1532 jsr166 1.135 final int capacity = items.length;
1533     final int end = putIndex;
1534 jsr166 1.137 final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
1535     deathRow[0] = 1L; // set bit 0
1536     for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1537     ; i = 0, to = end, k -= capacity) {
1538     for (; i < to; i++)
1539     if (filter.test(itemAt(es, i)))
1540     setBit(deathRow, i - k);
1541     if (to == end) break;
1542     }
1543     // a two-finger traversal, with hare i reading, tortoise w writing
1544     int w = beg;
1545     for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1546     ; w = 0) { // w rejoins i on second leg
1547     // In this loop, i and w are on the same leg, with i > w
1548     for (; i < to; i++)
1549     if (isClear(deathRow, i - k))
1550     es[w++] = es[i];
1551     if (to == end) break;
1552     // In this loop, w is on the first leg, i on the second
1553     for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
1554     if (isClear(deathRow, i - k))
1555     es[w++] = es[i];
1556     if (i >= to) {
1557     if (w == capacity) w = 0; // "corner" case
1558     break;
1559 jsr166 1.135 }
1560     }
1561 jsr166 1.137 count -= distanceNonEmpty(w, end);
1562     circularClear(es, putIndex = w, end);
1563     // checkInvariants();
1564     return true;
1565 jsr166 1.135 }
1566    
1567 jsr166 1.128 /** debugging */
1568     void checkInvariants() {
1569 jsr166 1.134 // meta-assertions
1570 jsr166 1.128 // assert lock.isHeldByCurrentThread();
1571 jsr166 1.149 if (!invariantsSatisfied()) {
1572     String detail = String.format(
1573     "takeIndex=%d putIndex=%d count=%d capacity=%d items=%s",
1574     takeIndex, putIndex, count, items.length,
1575     Arrays.toString(items));
1576     System.err.println(detail);
1577     throw new AssertionError(detail);
1578 jsr166 1.128 }
1579     }
1580    
1581 jsr166 1.149 private boolean invariantsSatisfied() {
1582     // Unlike ArrayDeque, we have a count field but no spare slot.
1583     // We prefer ArrayDeque's strategy (and the names of its fields!),
1584     // but our field layout is baked into the serial form, and so is
1585     // too annoying to change.
1586     //
1587     // putIndex == takeIndex must be disambiguated by checking count.
1588     int capacity = items.length;
1589     return capacity > 0
1590     && items.getClass() == Object[].class
1591     && (takeIndex | putIndex | count) >= 0
1592     && takeIndex < capacity
1593     && putIndex < capacity
1594     && count <= capacity
1595     && (putIndex - takeIndex - count) % capacity == 0
1596     && (count == 0 || items[takeIndex] != null)
1597     && (count == capacity || items[putIndex] == null)
1598     && (count == 0 || items[dec(putIndex, capacity)] != null);
1599     }
1600    
1601 jsr166 1.148 /**
1602 jsr166 1.149 * Reconstitutes this queue from a stream (that is, deserializes it).
1603 jsr166 1.148 *
1604 jsr166 1.149 * @param s the stream
1605 jsr166 1.148 * @throws ClassNotFoundException if the class of a serialized object
1606     * could not be found
1607     * @throws java.io.InvalidObjectException if invariants are violated
1608     * @throws java.io.IOException if an I/O error occurs
1609     */
1610     private void readObject(java.io.ObjectInputStream s)
1611     throws java.io.IOException, ClassNotFoundException {
1612    
1613     // Read in items array and various fields
1614     s.defaultReadObject();
1615    
1616 jsr166 1.149 if (!invariantsSatisfied())
1617 jsr166 1.148 throw new java.io.InvalidObjectException("invariants violated");
1618     }
1619 tim 1.1 }