ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.152
Committed: Thu Oct 17 01:51:37 2019 UTC (4 years, 7 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.151: +3 -0 lines
Log Message:
8232230: Suppress warnings on non-serializable non-transient instance fields in java.util.concurrent

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.38 * Expert Group and released to the public domain, as explained at
4 jsr166 1.78 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.110
9     import java.lang.ref.WeakReference;
10 jsr166 1.125 import java.util.AbstractQueue;
11 jsr166 1.120 import java.util.Arrays;
12 jsr166 1.85 import java.util.Collection;
13     import java.util.Iterator;
14     import java.util.NoSuchElementException;
15 jsr166 1.119 import java.util.Objects;
16 jsr166 1.110 import java.util.Spliterator;
17 dl 1.102 import java.util.Spliterators;
18 jsr166 1.110 import java.util.concurrent.locks.Condition;
19     import java.util.concurrent.locks.ReentrantLock;
20 jsr166 1.127 import java.util.function.Consumer;
21 jsr166 1.135 import java.util.function.Predicate;
22 tim 1.1
23     /**
24 dl 1.25 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
25     * array. This queue orders elements FIFO (first-in-first-out). The
26     * <em>head</em> of the queue is that element that has been on the
27     * queue the longest time. The <em>tail</em> of the queue is that
28     * element that has been on the queue the shortest time. New elements
29     * are inserted at the tail of the queue, and the queue retrieval
30     * operations obtain elements at the head of the queue.
31 dholmes 1.13 *
32 dl 1.40 * <p>This is a classic &quot;bounded buffer&quot;, in which a
33     * fixed-sized array holds elements inserted by producers and
34     * extracted by consumers. Once created, the capacity cannot be
35 jsr166 1.69 * changed. Attempts to {@code put} an element into a full queue
36     * will result in the operation blocking; attempts to {@code take} an
37 dl 1.40 * element from an empty queue will similarly block.
38 dl 1.11 *
39 jsr166 1.72 * <p>This class supports an optional fairness policy for ordering
40 dl 1.42 * waiting producer and consumer threads. By default, this ordering
41     * is not guaranteed. However, a queue constructed with fairness set
42 jsr166 1.69 * to {@code true} grants threads access in FIFO order. Fairness
43 dl 1.42 * generally decreases throughput but reduces variability and avoids
44     * starvation.
45 brian 1.7 *
46 jsr166 1.145 * <p>This class and its iterator implement all of the <em>optional</em>
47     * methods of the {@link Collection} and {@link Iterator} interfaces.
48 dl 1.26 *
49 dl 1.41 * <p>This class is a member of the
50 jsr166 1.151 * <a href="{@docRoot}/java.base/java/util/package-summary.html#CollectionsFramework">
51 dl 1.41 * Java Collections Framework</a>.
52     *
53 dl 1.8 * @since 1.5
54     * @author Doug Lea
55 jsr166 1.109 * @param <E> the type of elements held in this queue
56 dl 1.8 */
57 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
58 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
59    
60 jsr166 1.138 /*
61     * Much of the implementation mechanics, especially the unusual
62     * nested loops, are shared and co-maintained with ArrayDeque.
63     */
64    
65 dl 1.36 /**
66 dl 1.42 * Serialization ID. This class relies on default serialization
67     * even for the items array, which is default-serialized, even if
68     * it is empty. Otherwise it could not be declared final, which is
69 dl 1.36 * necessary here.
70     */
71     private static final long serialVersionUID = -817911632652898426L;
72    
73 jsr166 1.73 /** The queued items */
74 jsr166 1.152 @SuppressWarnings("serial") // Conditionally serializable
75 jsr166 1.68 final Object[] items;
76 jsr166 1.73
77     /** items index for next take, poll, peek or remove */
78 jsr166 1.58 int takeIndex;
79 jsr166 1.73
80     /** items index for next put, offer, or add */
81 jsr166 1.58 int putIndex;
82 jsr166 1.73
83     /** Number of elements in the queue */
84 jsr166 1.59 int count;
85 tim 1.12
86 dl 1.5 /*
87 dl 1.36 * Concurrency control uses the classic two-condition algorithm
88 dl 1.5 * found in any textbook.
89     */
90    
91 dl 1.11 /** Main lock guarding all access */
92 jsr166 1.58 final ReentrantLock lock;
93 jsr166 1.85
94 dholmes 1.13 /** Condition for waiting takes */
95 jsr166 1.152 @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
96 dl 1.37 private final Condition notEmpty;
97 jsr166 1.85
98 dl 1.35 /** Condition for waiting puts */
99 jsr166 1.152 @SuppressWarnings("serial") // Classes implementing Condition may be serializable.
100 dl 1.37 private final Condition notFull;
101 dl 1.5
102 jsr166 1.89 /**
103     * Shared state for currently active iterators, or null if there
104     * are known not to be any. Allows queue operations to update
105     * iterator state.
106     */
107 jsr166 1.121 transient Itrs itrs;
108 jsr166 1.89
109 dl 1.5 // Internal helper methods
110    
111     /**
112 jsr166 1.135 * Increments i, mod modulus.
113     * Precondition and postcondition: 0 <= i < modulus.
114 jsr166 1.71 */
115 jsr166 1.135 static final int inc(int i, int modulus) {
116     if (++i >= modulus) i = 0;
117     return i;
118 jsr166 1.71 }
119    
120 jsr166 1.68 /**
121 jsr166 1.128 * Decrements i, mod modulus.
122     * Precondition and postcondition: 0 <= i < modulus.
123     */
124     static final int dec(int i, int modulus) {
125     if (--i < 0) i = modulus - 1;
126     return i;
127     }
128    
129     /**
130 jsr166 1.68 * Returns item at index i.
131     */
132 jsr166 1.96 @SuppressWarnings("unchecked")
133 jsr166 1.68 final E itemAt(int i) {
134 jsr166 1.96 return (E) items[i];
135 jsr166 1.68 }
136    
137     /**
138 jsr166 1.128 * Returns element at array index i.
139     * This is a slight abuse of generics, accepted by javac.
140     */
141     @SuppressWarnings("unchecked")
142 jsr166 1.132 static <E> E itemAt(Object[] items, int i) {
143 jsr166 1.131 return (E) items[i];
144 jsr166 1.128 }
145    
146     /**
147 jsr166 1.47 * Inserts element at current put position, advances, and signals.
148 dl 1.9 * Call only when holding lock.
149 dl 1.5 */
150 jsr166 1.141 private void enqueue(E e) {
151 jsr166 1.137 // assert lock.isHeldByCurrentThread();
152 jsr166 1.95 // assert lock.getHoldCount() == 1;
153     // assert items[putIndex] == null;
154 dl 1.100 final Object[] items = this.items;
155 jsr166 1.141 items[putIndex] = e;
156 jsr166 1.116 if (++putIndex == items.length) putIndex = 0;
157 jsr166 1.89 count++;
158 dl 1.9 notEmpty.signal();
159 jsr166 1.128 // checkInvariants();
160 tim 1.1 }
161 tim 1.12
162 dl 1.5 /**
163 jsr166 1.47 * Extracts element at current take position, advances, and signals.
164 dl 1.9 * Call only when holding lock.
165 dl 1.5 */
166 jsr166 1.88 private E dequeue() {
167 jsr166 1.137 // assert lock.isHeldByCurrentThread();
168 jsr166 1.95 // assert lock.getHoldCount() == 1;
169     // assert items[takeIndex] != null;
170 jsr166 1.68 final Object[] items = this.items;
171 jsr166 1.97 @SuppressWarnings("unchecked")
172 jsr166 1.141 E e = (E) items[takeIndex];
173 dl 1.5 items[takeIndex] = null;
174 jsr166 1.116 if (++takeIndex == items.length) takeIndex = 0;
175 jsr166 1.89 count--;
176     if (itrs != null)
177     itrs.elementDequeued();
178 dl 1.9 notFull.signal();
179 jsr166 1.128 // checkInvariants();
180 jsr166 1.141 return e;
181 dl 1.5 }
182    
183     /**
184 jsr166 1.90 * Deletes item at array index removeIndex.
185 jsr166 1.89 * Utility for remove(Object) and iterator.remove.
186 dl 1.9 * Call only when holding lock.
187 dl 1.5 */
188 jsr166 1.90 void removeAt(final int removeIndex) {
189 jsr166 1.137 // assert lock.isHeldByCurrentThread();
190 jsr166 1.95 // assert lock.getHoldCount() == 1;
191     // assert items[removeIndex] != null;
192     // assert removeIndex >= 0 && removeIndex < items.length;
193 jsr166 1.68 final Object[] items = this.items;
194 jsr166 1.90 if (removeIndex == takeIndex) {
195 jsr166 1.89 // removing front item; just advance
196 dl 1.9 items[takeIndex] = null;
197 jsr166 1.116 if (++takeIndex == items.length) takeIndex = 0;
198 jsr166 1.90 count--;
199 jsr166 1.89 if (itrs != null)
200     itrs.elementDequeued();
201 tim 1.23 } else {
202 jsr166 1.89 // an "interior" remove
203 jsr166 1.90
204 dl 1.9 // slide over all others up through putIndex.
205 jsr166 1.115 for (int i = removeIndex, putIndex = this.putIndex;;) {
206     int pred = i;
207     if (++i == items.length) i = 0;
208     if (i == putIndex) {
209     items[pred] = null;
210     this.putIndex = pred;
211 dl 1.9 break;
212     }
213 jsr166 1.115 items[pred] = items[i];
214 dl 1.5 }
215 jsr166 1.90 count--;
216     if (itrs != null)
217     itrs.removedAt(removeIndex);
218 dl 1.5 }
219 dl 1.9 notFull.signal();
220 jsr166 1.134 // checkInvariants();
221 tim 1.1 }
222    
223     /**
224 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
225 dholmes 1.13 * capacity and default access policy.
226 jsr166 1.50 *
227 dholmes 1.13 * @param capacity the capacity of this queue
228 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity < 1}
229 dl 1.5 */
230 dholmes 1.13 public ArrayBlockingQueue(int capacity) {
231 dl 1.36 this(capacity, false);
232 dl 1.5 }
233 dl 1.2
234 dl 1.5 /**
235 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
236 dholmes 1.13 * capacity and the specified access policy.
237 jsr166 1.50 *
238 dholmes 1.13 * @param capacity the capacity of this queue
239 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
240 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
241 jsr166 1.69 * if {@code false} the access order is unspecified.
242     * @throws IllegalArgumentException if {@code capacity < 1}
243 dl 1.11 */
244 dholmes 1.13 public ArrayBlockingQueue(int capacity, boolean fair) {
245 dl 1.36 if (capacity <= 0)
246     throw new IllegalArgumentException();
247 jsr166 1.68 this.items = new Object[capacity];
248 dl 1.36 lock = new ReentrantLock(fair);
249     notEmpty = lock.newCondition();
250     notFull = lock.newCondition();
251 dl 1.5 }
252    
253 dholmes 1.16 /**
254 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
255 dholmes 1.21 * capacity, the specified access policy and initially containing the
256 tim 1.17 * elements of the given collection,
257 dholmes 1.16 * added in traversal order of the collection's iterator.
258 jsr166 1.50 *
259 dholmes 1.16 * @param capacity the capacity of this queue
260 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
261 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
262 jsr166 1.69 * if {@code false} the access order is unspecified.
263 dholmes 1.16 * @param c the collection of elements to initially contain
264 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity} is less than
265     * {@code c.size()}, or less than 1.
266 jsr166 1.50 * @throws NullPointerException if the specified collection or any
267     * of its elements are null
268 dholmes 1.16 */
269 tim 1.20 public ArrayBlockingQueue(int capacity, boolean fair,
270 dholmes 1.18 Collection<? extends E> c) {
271 dl 1.36 this(capacity, fair);
272 dholmes 1.16
273 jsr166 1.68 final ReentrantLock lock = this.lock;
274     lock.lock(); // Lock only for visibility, not mutual exclusion
275     try {
276 jsr166 1.133 final Object[] items = this.items;
277 jsr166 1.68 int i = 0;
278     try {
279 jsr166 1.119 for (E e : c)
280     items[i++] = Objects.requireNonNull(e);
281 jsr166 1.68 } catch (ArrayIndexOutOfBoundsException ex) {
282     throw new IllegalArgumentException();
283     }
284     count = i;
285     putIndex = (i == capacity) ? 0 : i;
286 jsr166 1.134 // checkInvariants();
287 jsr166 1.68 } finally {
288     lock.unlock();
289     }
290 dholmes 1.16 }
291 dl 1.2
292 dholmes 1.13 /**
293 jsr166 1.50 * Inserts the specified element at the tail of this queue if it is
294     * possible to do so immediately without exceeding the queue's capacity,
295 jsr166 1.69 * returning {@code true} upon success and throwing an
296     * {@code IllegalStateException} if this queue is full.
297 dl 1.25 *
298 jsr166 1.50 * @param e the element to add
299 jsr166 1.69 * @return {@code true} (as specified by {@link Collection#add})
300 jsr166 1.50 * @throws IllegalStateException if this queue is full
301     * @throws NullPointerException if the specified element is null
302     */
303     public boolean add(E e) {
304 jsr166 1.56 return super.add(e);
305 jsr166 1.50 }
306    
307     /**
308     * Inserts the specified element at the tail of this queue if it is
309     * possible to do so immediately without exceeding the queue's capacity,
310 jsr166 1.69 * returning {@code true} upon success and {@code false} if this queue
311 jsr166 1.50 * is full. This method is generally preferable to method {@link #add},
312     * which can fail to insert an element only by throwing an exception.
313     *
314     * @throws NullPointerException if the specified element is null
315 dholmes 1.13 */
316 jsr166 1.49 public boolean offer(E e) {
317 jsr166 1.119 Objects.requireNonNull(e);
318 dl 1.36 final ReentrantLock lock = this.lock;
319 dl 1.5 lock.lock();
320     try {
321 jsr166 1.59 if (count == items.length)
322 dl 1.2 return false;
323 dl 1.5 else {
324 jsr166 1.88 enqueue(e);
325 dl 1.5 return true;
326     }
327 tim 1.23 } finally {
328 tim 1.12 lock.unlock();
329 dl 1.2 }
330 dl 1.5 }
331 dl 1.2
332 dholmes 1.13 /**
333 jsr166 1.50 * Inserts the specified element at the tail of this queue, waiting
334     * for space to become available if the queue is full.
335     *
336     * @throws InterruptedException {@inheritDoc}
337     * @throws NullPointerException {@inheritDoc}
338     */
339     public void put(E e) throws InterruptedException {
340 jsr166 1.119 Objects.requireNonNull(e);
341 jsr166 1.50 final ReentrantLock lock = this.lock;
342     lock.lockInterruptibly();
343     try {
344 jsr166 1.59 while (count == items.length)
345 jsr166 1.58 notFull.await();
346 jsr166 1.88 enqueue(e);
347 jsr166 1.50 } finally {
348     lock.unlock();
349     }
350     }
351    
352     /**
353     * Inserts the specified element at the tail of this queue, waiting
354     * up to the specified wait time for space to become available if
355     * the queue is full.
356     *
357     * @throws InterruptedException {@inheritDoc}
358     * @throws NullPointerException {@inheritDoc}
359 brian 1.7 */
360 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit)
361 dholmes 1.13 throws InterruptedException {
362 dl 1.2
363 jsr166 1.119 Objects.requireNonNull(e);
364 jsr166 1.56 long nanos = unit.toNanos(timeout);
365 dl 1.36 final ReentrantLock lock = this.lock;
366 dl 1.5 lock.lockInterruptibly();
367     try {
368 jsr166 1.59 while (count == items.length) {
369 jsr166 1.126 if (nanos <= 0L)
370 dl 1.5 return false;
371 jsr166 1.58 nanos = notFull.awaitNanos(nanos);
372 dl 1.5 }
373 jsr166 1.88 enqueue(e);
374 jsr166 1.58 return true;
375 tim 1.23 } finally {
376 jsr166 1.134 // checkInvariants();
377 dl 1.5 lock.unlock();
378 dl 1.2 }
379 dl 1.5 }
380 dl 1.2
381 dholmes 1.13 public E poll() {
382 dl 1.36 final ReentrantLock lock = this.lock;
383 dholmes 1.13 lock.lock();
384     try {
385 jsr166 1.88 return (count == 0) ? null : dequeue();
386 tim 1.23 } finally {
387 tim 1.15 lock.unlock();
388 dholmes 1.13 }
389     }
390    
391 jsr166 1.50 public E take() throws InterruptedException {
392     final ReentrantLock lock = this.lock;
393     lock.lockInterruptibly();
394     try {
395 jsr166 1.59 while (count == 0)
396 jsr166 1.58 notEmpty.await();
397 jsr166 1.88 return dequeue();
398 jsr166 1.50 } finally {
399     lock.unlock();
400     }
401     }
402    
403 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
404 jsr166 1.56 long nanos = unit.toNanos(timeout);
405 dl 1.36 final ReentrantLock lock = this.lock;
406 dl 1.5 lock.lockInterruptibly();
407     try {
408 jsr166 1.59 while (count == 0) {
409 jsr166 1.126 if (nanos <= 0L)
410 dl 1.5 return null;
411 jsr166 1.58 nanos = notEmpty.awaitNanos(nanos);
412 dl 1.2 }
413 jsr166 1.88 return dequeue();
414 tim 1.23 } finally {
415 jsr166 1.131 // checkInvariants();
416 dl 1.5 lock.unlock();
417     }
418     }
419 dl 1.2
420 dholmes 1.13 public E peek() {
421 dl 1.36 final ReentrantLock lock = this.lock;
422 dholmes 1.13 lock.lock();
423     try {
424 jsr166 1.99 return itemAt(takeIndex); // null when queue is empty
425 tim 1.23 } finally {
426 dholmes 1.13 lock.unlock();
427     }
428     }
429    
430     // this doc comment is overridden to remove the reference to collections
431     // greater in size than Integer.MAX_VALUE
432 tim 1.15 /**
433 dl 1.25 * Returns the number of elements in this queue.
434     *
435 jsr166 1.50 * @return the number of elements in this queue
436 dholmes 1.13 */
437     public int size() {
438 dl 1.36 final ReentrantLock lock = this.lock;
439 dholmes 1.13 lock.lock();
440     try {
441     return count;
442 tim 1.23 } finally {
443 dholmes 1.13 lock.unlock();
444     }
445     }
446    
447     // this doc comment is a modified copy of the inherited doc comment,
448     // without the reference to unlimited queues.
449 tim 1.15 /**
450 jsr166 1.48 * Returns the number of additional elements that this queue can ideally
451     * (in the absence of memory or resource constraints) accept without
452 dholmes 1.13 * blocking. This is always equal to the initial capacity of this queue
453 jsr166 1.69 * less the current {@code size} of this queue.
454 jsr166 1.48 *
455     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
456 jsr166 1.69 * an element will succeed by inspecting {@code remainingCapacity}
457 jsr166 1.48 * because it may be the case that another thread is about to
458 jsr166 1.50 * insert or remove an element.
459 dholmes 1.13 */
460     public int remainingCapacity() {
461 dl 1.36 final ReentrantLock lock = this.lock;
462 dholmes 1.13 lock.lock();
463     try {
464     return items.length - count;
465 tim 1.23 } finally {
466 dholmes 1.13 lock.unlock();
467     }
468     }
469    
470 jsr166 1.50 /**
471     * Removes a single instance of the specified element from this queue,
472 jsr166 1.69 * if it is present. More formally, removes an element {@code e} such
473     * that {@code o.equals(e)}, if this queue contains one or more such
474 jsr166 1.50 * elements.
475 jsr166 1.69 * Returns {@code true} if this queue contained the specified element
476 jsr166 1.50 * (or equivalently, if this queue changed as a result of the call).
477     *
478 jsr166 1.64 * <p>Removal of interior elements in circular array based queues
479 dl 1.60 * is an intrinsically slow and disruptive operation, so should
480     * be undertaken only in exceptional circumstances, ideally
481     * only when the queue is known not to be accessible by other
482     * threads.
483     *
484 jsr166 1.50 * @param o element to be removed from this queue, if present
485 jsr166 1.69 * @return {@code true} if this queue changed as a result of the call
486 jsr166 1.50 */
487     public boolean remove(Object o) {
488     if (o == null) return false;
489     final ReentrantLock lock = this.lock;
490     lock.lock();
491     try {
492 jsr166 1.86 if (count > 0) {
493 jsr166 1.114 final Object[] items = this.items;
494 jsr166 1.130 for (int i = takeIndex, end = putIndex,
495     to = (i < end) ? end : items.length;
496     ; i = 0, to = end) {
497     for (; i < to; i++)
498     if (o.equals(items[i])) {
499     removeAt(i);
500     return true;
501     }
502     if (to == end) break;
503     }
504 jsr166 1.50 }
505 jsr166 1.68 return false;
506 jsr166 1.50 } finally {
507 jsr166 1.131 // checkInvariants();
508 jsr166 1.50 lock.unlock();
509     }
510     }
511 dholmes 1.13
512 jsr166 1.50 /**
513 jsr166 1.69 * Returns {@code true} if this queue contains the specified element.
514     * More formally, returns {@code true} if and only if this queue contains
515     * at least one element {@code e} such that {@code o.equals(e)}.
516 jsr166 1.50 *
517     * @param o object to be checked for containment in this queue
518 jsr166 1.69 * @return {@code true} if this queue contains the specified element
519 jsr166 1.50 */
520 dholmes 1.21 public boolean contains(Object o) {
521     if (o == null) return false;
522 dl 1.36 final ReentrantLock lock = this.lock;
523 dl 1.5 lock.lock();
524     try {
525 jsr166 1.86 if (count > 0) {
526 jsr166 1.114 final Object[] items = this.items;
527 jsr166 1.129 for (int i = takeIndex, end = putIndex,
528     to = (i < end) ? end : items.length;
529     ; i = 0, to = end) {
530     for (; i < to; i++)
531     if (o.equals(items[i]))
532     return true;
533     if (to == end) break;
534     }
535 jsr166 1.86 }
536 dl 1.2 return false;
537 tim 1.23 } finally {
538 jsr166 1.131 // checkInvariants();
539 dl 1.5 lock.unlock();
540     }
541     }
542 brian 1.7
543 jsr166 1.50 /**
544     * Returns an array containing all of the elements in this queue, in
545     * proper sequence.
546     *
547     * <p>The returned array will be "safe" in that no references to it are
548     * maintained by this queue. (In other words, this method must allocate
549     * a new array). The caller is thus free to modify the returned array.
550 jsr166 1.51 *
551 jsr166 1.50 * <p>This method acts as bridge between array-based and collection-based
552     * APIs.
553     *
554     * @return an array containing all of the elements in this queue
555     */
556 dl 1.5 public Object[] toArray() {
557 dl 1.36 final ReentrantLock lock = this.lock;
558 dl 1.5 lock.lock();
559     try {
560 jsr166 1.120 final Object[] items = this.items;
561     final int end = takeIndex + count;
562     final Object[] a = Arrays.copyOfRange(items, takeIndex, end);
563     if (end != putIndex)
564     System.arraycopy(items, 0, a, items.length - takeIndex, putIndex);
565     return a;
566 tim 1.23 } finally {
567 dl 1.5 lock.unlock();
568     }
569     }
570 brian 1.7
571 jsr166 1.50 /**
572     * Returns an array containing all of the elements in this queue, in
573     * proper sequence; the runtime type of the returned array is that of
574     * the specified array. If the queue fits in the specified array, it
575     * is returned therein. Otherwise, a new array is allocated with the
576     * runtime type of the specified array and the size of this queue.
577     *
578     * <p>If this queue fits in the specified array with room to spare
579     * (i.e., the array has more elements than this queue), the element in
580     * the array immediately following the end of the queue is set to
581 jsr166 1.69 * {@code null}.
582 jsr166 1.50 *
583     * <p>Like the {@link #toArray()} method, this method acts as bridge between
584     * array-based and collection-based APIs. Further, this method allows
585     * precise control over the runtime type of the output array, and may,
586     * under certain circumstances, be used to save allocation costs.
587     *
588 jsr166 1.69 * <p>Suppose {@code x} is a queue known to contain only strings.
589 jsr166 1.50 * The following code can be used to dump the queue into a newly
590 jsr166 1.69 * allocated array of {@code String}:
591 jsr166 1.50 *
592 jsr166 1.117 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
593 jsr166 1.50 *
594 jsr166 1.69 * Note that {@code toArray(new Object[0])} is identical in function to
595     * {@code toArray()}.
596 jsr166 1.50 *
597     * @param a the array into which the elements of the queue are to
598     * be stored, if it is big enough; otherwise, a new array of the
599     * same runtime type is allocated for this purpose
600     * @return an array containing all of the elements in this queue
601     * @throws ArrayStoreException if the runtime type of the specified array
602     * is not a supertype of the runtime type of every element in
603     * this queue
604     * @throws NullPointerException if the specified array is null
605     */
606 jsr166 1.68 @SuppressWarnings("unchecked")
607 dl 1.5 public <T> T[] toArray(T[] a) {
608 dl 1.36 final ReentrantLock lock = this.lock;
609 dl 1.5 lock.lock();
610     try {
611 jsr166 1.120 final Object[] items = this.items;
612 jsr166 1.68 final int count = this.count;
613 jsr166 1.120 final int firstLeg = Math.min(items.length - takeIndex, count);
614     if (a.length < count) {
615     a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count,
616     a.getClass());
617     } else {
618     System.arraycopy(items, takeIndex, a, 0, firstLeg);
619     if (a.length > count)
620     a[count] = null;
621     }
622     if (firstLeg < count)
623     System.arraycopy(items, 0, a, firstLeg, putIndex);
624     return a;
625 tim 1.23 } finally {
626 dl 1.5 lock.unlock();
627     }
628     }
629 dl 1.6
630     public String toString() {
631 jsr166 1.122 return Helpers.collectionToString(this);
632 dl 1.6 }
633 tim 1.12
634 dl 1.44 /**
635     * Atomically removes all of the elements from this queue.
636     * The queue will be empty after this call returns.
637     */
638 dl 1.30 public void clear() {
639 dl 1.36 final ReentrantLock lock = this.lock;
640 dl 1.30 lock.lock();
641     try {
642 jsr166 1.131 int k;
643     if ((k = count) > 0) {
644     circularClear(items, takeIndex, putIndex);
645 jsr166 1.86 takeIndex = putIndex;
646     count = 0;
647 jsr166 1.89 if (itrs != null)
648     itrs.queueIsEmpty();
649 jsr166 1.86 for (; k > 0 && lock.hasWaiters(notFull); k--)
650     notFull.signal();
651     }
652 dl 1.30 } finally {
653 jsr166 1.131 // checkInvariants();
654 dl 1.30 lock.unlock();
655     }
656     }
657    
658 jsr166 1.50 /**
659 jsr166 1.131 * Nulls out slots starting at array index i, upto index end.
660 jsr166 1.142 * Condition i == end means "full" - the entire array is cleared.
661 jsr166 1.131 */
662     private static void circularClear(Object[] items, int i, int end) {
663 jsr166 1.142 // assert 0 <= i && i < items.length;
664     // assert 0 <= end && end < items.length;
665 jsr166 1.131 for (int to = (i < end) ? end : items.length;
666     ; i = 0, to = end) {
667 jsr166 1.142 for (; i < to; i++) items[i] = null;
668 jsr166 1.131 if (to == end) break;
669     }
670     }
671    
672     /**
673 jsr166 1.50 * @throws UnsupportedOperationException {@inheritDoc}
674     * @throws ClassCastException {@inheritDoc}
675     * @throws NullPointerException {@inheritDoc}
676     * @throws IllegalArgumentException {@inheritDoc}
677     */
678 dl 1.30 public int drainTo(Collection<? super E> c) {
679 jsr166 1.81 return drainTo(c, Integer.MAX_VALUE);
680 dl 1.30 }
681    
682 jsr166 1.50 /**
683     * @throws UnsupportedOperationException {@inheritDoc}
684     * @throws ClassCastException {@inheritDoc}
685     * @throws NullPointerException {@inheritDoc}
686     * @throws IllegalArgumentException {@inheritDoc}
687     */
688 dl 1.30 public int drainTo(Collection<? super E> c, int maxElements) {
689 jsr166 1.119 Objects.requireNonNull(c);
690 dl 1.30 if (c == this)
691     throw new IllegalArgumentException();
692     if (maxElements <= 0)
693     return 0;
694 jsr166 1.68 final Object[] items = this.items;
695 dl 1.36 final ReentrantLock lock = this.lock;
696 dl 1.30 lock.lock();
697     try {
698 jsr166 1.82 int n = Math.min(maxElements, count);
699     int take = takeIndex;
700     int i = 0;
701     try {
702     while (i < n) {
703 jsr166 1.97 @SuppressWarnings("unchecked")
704 jsr166 1.141 E e = (E) items[take];
705     c.add(e);
706 jsr166 1.82 items[take] = null;
707 jsr166 1.116 if (++take == items.length) take = 0;
708 jsr166 1.86 i++;
709 jsr166 1.82 }
710     return n;
711     } finally {
712     // Restore invariants even if c.add() threw
713 jsr166 1.89 if (i > 0) {
714     count -= i;
715     takeIndex = take;
716     if (itrs != null) {
717     if (count == 0)
718     itrs.queueIsEmpty();
719     else if (i > take)
720     itrs.takeIndexWrapped();
721     }
722     for (; i > 0 && lock.hasWaiters(notFull); i--)
723     notFull.signal();
724     }
725 dl 1.30 }
726     } finally {
727 jsr166 1.134 // checkInvariants();
728 dl 1.30 lock.unlock();
729     }
730     }
731    
732 brian 1.7 /**
733 dl 1.75 * Returns an iterator over the elements in this queue in proper sequence.
734 jsr166 1.77 * The elements will be returned in order from first (head) to last (tail).
735     *
736 jsr166 1.106 * <p>The returned iterator is
737     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
738 brian 1.7 *
739 jsr166 1.50 * @return an iterator over the elements in this queue in proper sequence
740 brian 1.7 */
741 dl 1.5 public Iterator<E> iterator() {
742 jsr166 1.68 return new Itr();
743 dl 1.5 }
744 dl 1.8
745     /**
746 jsr166 1.94 * Shared data between iterators and their queue, allowing queue
747 jsr166 1.89 * modifications to update iterators when elements are removed.
748     *
749 jsr166 1.94 * This adds a lot of complexity for the sake of correctly
750     * handling some uncommon operations, but the combination of
751     * circular-arrays and supporting interior removes (i.e., those
752     * not at head) would cause iterators to sometimes lose their
753     * places and/or (re)report elements they shouldn't. To avoid
754     * this, when a queue has one or more iterators, it keeps iterator
755     * state consistent by:
756     *
757     * (1) keeping track of the number of "cycles", that is, the
758     * number of times takeIndex has wrapped around to 0.
759     * (2) notifying all iterators via the callback removedAt whenever
760     * an interior element is removed (and thus other elements may
761     * be shifted).
762     *
763     * These suffice to eliminate iterator inconsistencies, but
764     * unfortunately add the secondary responsibility of maintaining
765     * the list of iterators. We track all active iterators in a
766     * simple linked list (accessed only when the queue's lock is
767     * held) of weak references to Itr. The list is cleaned up using
768     * 3 different mechanisms:
769 jsr166 1.89 *
770     * (1) Whenever a new iterator is created, do some O(1) checking for
771     * stale list elements.
772     *
773     * (2) Whenever takeIndex wraps around to 0, check for iterators
774     * that have been unused for more than one wrap-around cycle.
775     *
776     * (3) Whenever the queue becomes empty, all iterators are notified
777     * and this entire data structure is discarded.
778     *
779 jsr166 1.94 * So in addition to the removedAt callback that is necessary for
780     * correctness, iterators have the shutdown and takeIndexWrapped
781     * callbacks that help remove stale iterators from the list.
782     *
783     * Whenever a list element is examined, it is expunged if either
784     * the GC has determined that the iterator is discarded, or if the
785     * iterator reports that it is "detached" (does not need any
786     * further state updates). Overhead is maximal when takeIndex
787     * never advances, iterators are discarded before they are
788     * exhausted, and all removals are interior removes, in which case
789     * all stale iterators are discovered by the GC. But even in this
790     * case we don't increase the amortized complexity.
791     *
792     * Care must be taken to keep list sweeping methods from
793     * reentrantly invoking another such method, causing subtle
794     * corruption bugs.
795 jsr166 1.89 */
796     class Itrs {
797    
798     /**
799     * Node in a linked list of weak iterator references.
800     */
801     private class Node extends WeakReference<Itr> {
802     Node next;
803    
804     Node(Itr iterator, Node next) {
805     super(iterator);
806     this.next = next;
807     }
808     }
809    
810     /** Incremented whenever takeIndex wraps around to 0 */
811 jsr166 1.108 int cycles;
812 jsr166 1.89
813     /** Linked list of weak iterator references */
814 jsr166 1.93 private Node head;
815 jsr166 1.89
816     /** Used to expunge stale iterators */
817 jsr166 1.108 private Node sweeper;
818 jsr166 1.89
819     private static final int SHORT_SWEEP_PROBES = 4;
820     private static final int LONG_SWEEP_PROBES = 16;
821    
822 jsr166 1.93 Itrs(Itr initial) {
823     register(initial);
824     }
825    
826 jsr166 1.89 /**
827     * Sweeps itrs, looking for and expunging stale iterators.
828     * If at least one was found, tries harder to find more.
829 jsr166 1.94 * Called only from iterating thread.
830 jsr166 1.89 *
831     * @param tryHarder whether to start in try-harder mode, because
832     * there is known to be at least one iterator to collect
833     */
834     void doSomeSweeping(boolean tryHarder) {
835 jsr166 1.137 // assert lock.isHeldByCurrentThread();
836 jsr166 1.95 // assert head != null;
837 jsr166 1.89 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
838     Node o, p;
839     final Node sweeper = this.sweeper;
840 jsr166 1.93 boolean passedGo; // to limit search to one full sweep
841 jsr166 1.89
842 jsr166 1.93 if (sweeper == null) {
843     o = null;
844     p = head;
845     passedGo = true;
846     } else {
847 jsr166 1.89 o = sweeper;
848     p = o.next;
849 jsr166 1.93 passedGo = false;
850 jsr166 1.89 }
851    
852 jsr166 1.93 for (; probes > 0; probes--) {
853     if (p == null) {
854     if (passedGo)
855     break;
856     o = null;
857     p = head;
858     passedGo = true;
859     }
860 jsr166 1.89 final Itr it = p.get();
861     final Node next = p.next;
862     if (it == null || it.isDetached()) {
863     // found a discarded/exhausted iterator
864     probes = LONG_SWEEP_PROBES; // "try harder"
865     // unlink p
866     p.clear();
867     p.next = null;
868 jsr166 1.93 if (o == null) {
869 jsr166 1.89 head = next;
870 jsr166 1.93 if (next == null) {
871     // We've run out of iterators to track; retire
872     itrs = null;
873     return;
874     }
875     }
876 jsr166 1.89 else
877     o.next = next;
878     } else {
879     o = p;
880     }
881     p = next;
882     }
883    
884     this.sweeper = (p == null) ? null : o;
885     }
886    
887     /**
888     * Adds a new iterator to the linked list of tracked iterators.
889     */
890     void register(Itr itr) {
891 jsr166 1.137 // assert lock.isHeldByCurrentThread();
892 jsr166 1.89 head = new Node(itr, head);
893     }
894    
895     /**
896     * Called whenever takeIndex wraps around to 0.
897     *
898     * Notifies all iterators, and expunges any that are now stale.
899     */
900     void takeIndexWrapped() {
901 jsr166 1.137 // assert lock.isHeldByCurrentThread();
902 jsr166 1.89 cycles++;
903     for (Node o = null, p = head; p != null;) {
904     final Itr it = p.get();
905     final Node next = p.next;
906     if (it == null || it.takeIndexWrapped()) {
907     // unlink p
908 jsr166 1.95 // assert it == null || it.isDetached();
909 jsr166 1.89 p.clear();
910     p.next = null;
911     if (o == null)
912     head = next;
913     else
914     o.next = next;
915     } else {
916     o = p;
917     }
918     p = next;
919     }
920     if (head == null) // no more iterators to track
921     itrs = null;
922     }
923    
924     /**
925 jsr166 1.107 * Called whenever an interior remove (not at takeIndex) occurred.
926 jsr166 1.93 *
927     * Notifies all iterators, and expunges any that are now stale.
928 jsr166 1.89 */
929 jsr166 1.90 void removedAt(int removedIndex) {
930 jsr166 1.89 for (Node o = null, p = head; p != null;) {
931     final Itr it = p.get();
932     final Node next = p.next;
933 jsr166 1.90 if (it == null || it.removedAt(removedIndex)) {
934 jsr166 1.89 // unlink p
935 jsr166 1.95 // assert it == null || it.isDetached();
936 jsr166 1.89 p.clear();
937     p.next = null;
938     if (o == null)
939     head = next;
940     else
941     o.next = next;
942     } else {
943     o = p;
944     }
945     p = next;
946     }
947     if (head == null) // no more iterators to track
948     itrs = null;
949     }
950    
951     /**
952     * Called whenever the queue becomes empty.
953     *
954     * Notifies all active iterators that the queue is empty,
955     * clears all weak refs, and unlinks the itrs datastructure.
956     */
957     void queueIsEmpty() {
958 jsr166 1.137 // assert lock.isHeldByCurrentThread();
959 jsr166 1.89 for (Node p = head; p != null; p = p.next) {
960     Itr it = p.get();
961     if (it != null) {
962     p.clear();
963     it.shutdown();
964     }
965     }
966     head = null;
967     itrs = null;
968     }
969    
970     /**
971 jsr166 1.90 * Called whenever an element has been dequeued (at takeIndex).
972 jsr166 1.89 */
973     void elementDequeued() {
974 jsr166 1.137 // assert lock.isHeldByCurrentThread();
975 jsr166 1.89 if (count == 0)
976     queueIsEmpty();
977     else if (takeIndex == 0)
978     takeIndexWrapped();
979     }
980     }
981    
982     /**
983     * Iterator for ArrayBlockingQueue.
984     *
985     * To maintain weak consistency with respect to puts and takes, we
986     * read ahead one slot, so as to not report hasNext true but then
987     * not have an element to return.
988     *
989     * We switch into "detached" mode (allowing prompt unlinking from
990     * itrs without help from the GC) when all indices are negative, or
991     * when hasNext returns false for the first time. This allows the
992     * iterator to track concurrent updates completely accurately,
993     * except for the corner case of the user calling Iterator.remove()
994     * after hasNext() returned false. Even in this case, we ensure
995     * that we don't remove the wrong element by keeping track of the
996     * expected element to remove, in lastItem. Yes, we may fail to
997     * remove lastItem from the queue if it moved due to an interleaved
998     * interior remove while in detached mode.
999 jsr166 1.144 *
1000     * Method forEachRemaining, added in Java 8, is treated similarly
1001     * to hasNext returning false, in that we switch to detached mode,
1002     * but we regard it as an even stronger request to "close" this
1003     * iteration, and don't bother supporting subsequent remove().
1004 dl 1.8 */
1005 dl 1.5 private class Itr implements Iterator<E> {
1006 jsr166 1.91 /** Index to look for new nextItem; NONE at end */
1007 jsr166 1.89 private int cursor;
1008    
1009     /** Element to be returned by next call to next(); null if none */
1010     private E nextItem;
1011    
1012 jsr166 1.91 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
1013 jsr166 1.89 private int nextIndex;
1014    
1015     /** Last element returned; null if none or not detached. */
1016     private E lastItem;
1017    
1018 jsr166 1.91 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
1019 jsr166 1.89 private int lastRet;
1020    
1021 jsr166 1.91 /** Previous value of takeIndex, or DETACHED when detached */
1022 jsr166 1.89 private int prevTakeIndex;
1023    
1024     /** Previous value of iters.cycles */
1025     private int prevCycles;
1026 tim 1.12
1027 jsr166 1.91 /** Special index value indicating "not available" or "undefined" */
1028     private static final int NONE = -1;
1029    
1030     /**
1031     * Special index value indicating "removed elsewhere", that is,
1032     * removed by some operation other than a call to this.remove().
1033     */
1034     private static final int REMOVED = -2;
1035    
1036     /** Special value for prevTakeIndex indicating "detached mode" */
1037     private static final int DETACHED = -3;
1038    
1039 dl 1.66 Itr() {
1040 jsr166 1.91 lastRet = NONE;
1041 jsr166 1.68 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1042     lock.lock();
1043     try {
1044 jsr166 1.89 if (count == 0) {
1045 jsr166 1.95 // assert itrs == null;
1046 jsr166 1.91 cursor = NONE;
1047     nextIndex = NONE;
1048     prevTakeIndex = DETACHED;
1049 jsr166 1.89 } else {
1050     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1051     prevTakeIndex = takeIndex;
1052 jsr166 1.68 nextItem = itemAt(nextIndex = takeIndex);
1053 jsr166 1.89 cursor = incCursor(takeIndex);
1054     if (itrs == null) {
1055 jsr166 1.93 itrs = new Itrs(this);
1056 jsr166 1.89 } else {
1057     itrs.register(this); // in this order
1058     itrs.doSomeSweeping(false);
1059     }
1060     prevCycles = itrs.cycles;
1061 jsr166 1.95 // assert takeIndex >= 0;
1062     // assert prevTakeIndex == takeIndex;
1063     // assert nextIndex >= 0;
1064     // assert nextItem != null;
1065 jsr166 1.89 }
1066 jsr166 1.68 } finally {
1067     lock.unlock();
1068     }
1069 dl 1.5 }
1070 tim 1.12
1071 jsr166 1.89 boolean isDetached() {
1072 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1073 jsr166 1.89 return prevTakeIndex < 0;
1074     }
1075    
1076     private int incCursor(int index) {
1077 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1078 jsr166 1.116 if (++index == items.length) index = 0;
1079     if (index == putIndex) index = NONE;
1080 jsr166 1.89 return index;
1081     }
1082    
1083     /**
1084     * Returns true if index is invalidated by the given number of
1085     * dequeues, starting from prevTakeIndex.
1086     */
1087     private boolean invalidated(int index, int prevTakeIndex,
1088     long dequeues, int length) {
1089     if (index < 0)
1090     return false;
1091     int distance = index - prevTakeIndex;
1092     if (distance < 0)
1093     distance += length;
1094     return dequeues > distance;
1095     }
1096    
1097     /**
1098     * Adjusts indices to incorporate all dequeues since the last
1099     * operation on this iterator. Call only from iterating thread.
1100     */
1101     private void incorporateDequeues() {
1102 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1103 jsr166 1.95 // assert itrs != null;
1104     // assert !isDetached();
1105     // assert count > 0;
1106 jsr166 1.89
1107     final int cycles = itrs.cycles;
1108     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1109     final int prevCycles = this.prevCycles;
1110     final int prevTakeIndex = this.prevTakeIndex;
1111    
1112     if (cycles != prevCycles || takeIndex != prevTakeIndex) {
1113     final int len = items.length;
1114     // how far takeIndex has advanced since the previous
1115     // operation of this iterator
1116 jsr166 1.150 long dequeues = (long) (cycles - prevCycles) * len
1117 jsr166 1.89 + (takeIndex - prevTakeIndex);
1118    
1119     // Check indices for invalidation
1120     if (invalidated(lastRet, prevTakeIndex, dequeues, len))
1121 jsr166 1.91 lastRet = REMOVED;
1122 jsr166 1.89 if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
1123 jsr166 1.91 nextIndex = REMOVED;
1124 jsr166 1.89 if (invalidated(cursor, prevTakeIndex, dequeues, len))
1125     cursor = takeIndex;
1126    
1127     if (cursor < 0 && nextIndex < 0 && lastRet < 0)
1128     detach();
1129     else {
1130     this.prevCycles = cycles;
1131     this.prevTakeIndex = takeIndex;
1132     }
1133     }
1134     }
1135    
1136     /**
1137     * Called when itrs should stop tracking this iterator, either
1138     * because there are no more indices to update (cursor < 0 &&
1139     * nextIndex < 0 && lastRet < 0) or as a special exception, when
1140     * lastRet >= 0, because hasNext() is about to return false for the
1141     * first time. Call only from iterating thread.
1142     */
1143     private void detach() {
1144     // Switch to detached mode
1145 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1146 jsr166 1.95 // assert cursor == NONE;
1147     // assert nextIndex < 0;
1148     // assert lastRet < 0 || nextItem == null;
1149     // assert lastRet < 0 ^ lastItem != null;
1150 jsr166 1.89 if (prevTakeIndex >= 0) {
1151 jsr166 1.95 // assert itrs != null;
1152 jsr166 1.91 prevTakeIndex = DETACHED;
1153 jsr166 1.89 // try to unlink from itrs (but not too hard)
1154     itrs.doSomeSweeping(true);
1155     }
1156     }
1157    
1158     /**
1159     * For performance reasons, we would like not to acquire a lock in
1160     * hasNext in the common case. To allow for this, we only access
1161     * fields (i.e. nextItem) that are not modified by update operations
1162     * triggered by queue modifications.
1163     */
1164 dl 1.5 public boolean hasNext() {
1165 jsr166 1.89 if (nextItem != null)
1166     return true;
1167     noNext();
1168     return false;
1169     }
1170    
1171     private void noNext() {
1172     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1173     lock.lock();
1174     try {
1175 jsr166 1.95 // assert cursor == NONE;
1176     // assert nextIndex == NONE;
1177 jsr166 1.89 if (!isDetached()) {
1178 jsr166 1.95 // assert lastRet >= 0;
1179 jsr166 1.89 incorporateDequeues(); // might update lastRet
1180     if (lastRet >= 0) {
1181     lastItem = itemAt(lastRet);
1182 jsr166 1.95 // assert lastItem != null;
1183 jsr166 1.89 detach();
1184     }
1185     }
1186 jsr166 1.95 // assert isDetached();
1187     // assert lastRet < 0 ^ lastItem != null;
1188 jsr166 1.89 } finally {
1189     lock.unlock();
1190     }
1191 dl 1.5 }
1192 tim 1.12
1193 dl 1.5 public E next() {
1194 jsr166 1.141 final E e = nextItem;
1195     if (e == null)
1196 jsr166 1.89 throw new NoSuchElementException();
1197 dl 1.66 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1198     lock.lock();
1199     try {
1200 jsr166 1.92 if (!isDetached())
1201 jsr166 1.89 incorporateDequeues();
1202 jsr166 1.95 // assert nextIndex != NONE;
1203     // assert lastItem == null;
1204 dl 1.66 lastRet = nextIndex;
1205 jsr166 1.89 final int cursor = this.cursor;
1206     if (cursor >= 0) {
1207     nextItem = itemAt(nextIndex = cursor);
1208 jsr166 1.95 // assert nextItem != null;
1209 jsr166 1.89 this.cursor = incCursor(cursor);
1210     } else {
1211 jsr166 1.91 nextIndex = NONE;
1212 jsr166 1.89 nextItem = null;
1213 jsr166 1.146 if (lastRet == REMOVED) detach();
1214 jsr166 1.89 }
1215 dl 1.66 } finally {
1216     lock.unlock();
1217 dl 1.2 }
1218 jsr166 1.141 return e;
1219 dl 1.5 }
1220 tim 1.12
1221 jsr166 1.136 public void forEachRemaining(Consumer<? super E> action) {
1222     Objects.requireNonNull(action);
1223     final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1224     lock.lock();
1225     try {
1226 jsr166 1.141 final E e = nextItem;
1227     if (e == null) return;
1228 jsr166 1.136 if (!isDetached())
1229     incorporateDequeues();
1230 jsr166 1.141 action.accept(e);
1231 jsr166 1.136 if (isDetached() || cursor < 0) return;
1232     final Object[] items = ArrayBlockingQueue.this.items;
1233     for (int i = cursor, end = putIndex,
1234     to = (i < end) ? end : items.length;
1235     ; i = 0, to = end) {
1236     for (; i < to; i++)
1237     action.accept(itemAt(items, i));
1238     if (to == end) break;
1239     }
1240     } finally {
1241     // Calling forEachRemaining is a strong hint that this
1242     // iteration is surely over; supporting remove() after
1243     // forEachRemaining() is more trouble than it's worth
1244 jsr166 1.140 cursor = nextIndex = lastRet = NONE;
1245 jsr166 1.136 nextItem = lastItem = null;
1246     detach();
1247     lock.unlock();
1248     }
1249     }
1250    
1251 dl 1.5 public void remove() {
1252 dl 1.36 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
1253 dl 1.5 lock.lock();
1254 jsr166 1.137 // assert lock.getHoldCount() == 1;
1255 dl 1.5 try {
1256 jsr166 1.92 if (!isDetached())
1257     incorporateDequeues(); // might update lastRet or detach
1258     final int lastRet = this.lastRet;
1259     this.lastRet = NONE;
1260 jsr166 1.89 if (lastRet >= 0) {
1261 jsr166 1.92 if (!isDetached())
1262     removeAt(lastRet);
1263     else {
1264     final E lastItem = this.lastItem;
1265 jsr166 1.95 // assert lastItem != null;
1266 jsr166 1.92 this.lastItem = null;
1267 jsr166 1.89 if (itemAt(lastRet) == lastItem)
1268     removeAt(lastRet);
1269     }
1270 jsr166 1.91 } else if (lastRet == NONE)
1271 dl 1.66 throw new IllegalStateException();
1272 jsr166 1.91 // else lastRet == REMOVED and the last returned element was
1273 jsr166 1.89 // previously asynchronously removed via an operation other
1274     // than this.remove(), so nothing to do.
1275    
1276     if (cursor < 0 && nextIndex < 0)
1277     detach();
1278     } finally {
1279     lock.unlock();
1280 jsr166 1.95 // assert lastRet == NONE;
1281     // assert lastItem == null;
1282 jsr166 1.89 }
1283     }
1284    
1285     /**
1286     * Called to notify the iterator that the queue is empty, or that it
1287     * has fallen hopelessly behind, so that it should abandon any
1288     * further iteration, except possibly to return one more element
1289     * from next(), as promised by returning true from hasNext().
1290     */
1291     void shutdown() {
1292 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1293 jsr166 1.91 cursor = NONE;
1294 jsr166 1.89 if (nextIndex >= 0)
1295 jsr166 1.91 nextIndex = REMOVED;
1296 jsr166 1.89 if (lastRet >= 0) {
1297 jsr166 1.91 lastRet = REMOVED;
1298 jsr166 1.61 lastItem = null;
1299 jsr166 1.89 }
1300 jsr166 1.91 prevTakeIndex = DETACHED;
1301 jsr166 1.89 // Don't set nextItem to null because we must continue to be
1302     // able to return it on next().
1303     //
1304     // Caller will unlink from itrs when convenient.
1305     }
1306    
1307     private int distance(int index, int prevTakeIndex, int length) {
1308     int distance = index - prevTakeIndex;
1309     if (distance < 0)
1310     distance += length;
1311     return distance;
1312     }
1313    
1314     /**
1315 jsr166 1.107 * Called whenever an interior remove (not at takeIndex) occurred.
1316 jsr166 1.89 *
1317 jsr166 1.90 * @return true if this iterator should be unlinked from itrs
1318 jsr166 1.89 */
1319 jsr166 1.90 boolean removedAt(int removedIndex) {
1320 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1321 jsr166 1.89 if (isDetached())
1322     return true;
1323    
1324     final int takeIndex = ArrayBlockingQueue.this.takeIndex;
1325     final int prevTakeIndex = this.prevTakeIndex;
1326     final int len = items.length;
1327 jsr166 1.112 // distance from prevTakeIndex to removedIndex
1328 jsr166 1.89 final int removedDistance =
1329 jsr166 1.112 len * (itrs.cycles - this.prevCycles
1330     + ((removedIndex < takeIndex) ? 1 : 0))
1331     + (removedIndex - prevTakeIndex);
1332     // assert itrs.cycles - this.prevCycles >= 0;
1333     // assert itrs.cycles - this.prevCycles <= 1;
1334     // assert removedDistance > 0;
1335     // assert removedIndex != takeIndex;
1336 jsr166 1.89 int cursor = this.cursor;
1337     if (cursor >= 0) {
1338     int x = distance(cursor, prevTakeIndex, len);
1339     if (x == removedDistance) {
1340 jsr166 1.90 if (cursor == putIndex)
1341 jsr166 1.91 this.cursor = cursor = NONE;
1342 jsr166 1.89 }
1343     else if (x > removedDistance) {
1344 jsr166 1.95 // assert cursor != prevTakeIndex;
1345 jsr166 1.135 this.cursor = cursor = dec(cursor, len);
1346 jsr166 1.71 }
1347 dl 1.5 }
1348 jsr166 1.89 int lastRet = this.lastRet;
1349     if (lastRet >= 0) {
1350     int x = distance(lastRet, prevTakeIndex, len);
1351     if (x == removedDistance)
1352 jsr166 1.91 this.lastRet = lastRet = REMOVED;
1353 jsr166 1.89 else if (x > removedDistance)
1354 jsr166 1.135 this.lastRet = lastRet = dec(lastRet, len);
1355 jsr166 1.89 }
1356     int nextIndex = this.nextIndex;
1357     if (nextIndex >= 0) {
1358     int x = distance(nextIndex, prevTakeIndex, len);
1359     if (x == removedDistance)
1360 jsr166 1.91 this.nextIndex = nextIndex = REMOVED;
1361 jsr166 1.89 else if (x > removedDistance)
1362 jsr166 1.135 this.nextIndex = nextIndex = dec(nextIndex, len);
1363 jsr166 1.89 }
1364 jsr166 1.112 if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
1365 jsr166 1.91 this.prevTakeIndex = DETACHED;
1366 jsr166 1.89 return true;
1367     }
1368     return false;
1369     }
1370    
1371     /**
1372     * Called whenever takeIndex wraps around to zero.
1373     *
1374 jsr166 1.90 * @return true if this iterator should be unlinked from itrs
1375 jsr166 1.89 */
1376     boolean takeIndexWrapped() {
1377 jsr166 1.137 // assert lock.isHeldByCurrentThread();
1378 jsr166 1.89 if (isDetached())
1379     return true;
1380     if (itrs.cycles - prevCycles > 1) {
1381     // All the elements that existed at the time of the last
1382     // operation are gone, so abandon further iteration.
1383     shutdown();
1384     return true;
1385     }
1386     return false;
1387 dl 1.5 }
1388 jsr166 1.89
1389     // /** Uncomment for debugging. */
1390     // public String toString() {
1391     // return ("cursor=" + cursor + " " +
1392     // "nextIndex=" + nextIndex + " " +
1393     // "lastRet=" + lastRet + " " +
1394     // "nextItem=" + nextItem + " " +
1395     // "lastItem=" + lastItem + " " +
1396     // "prevCycles=" + prevCycles + " " +
1397     // "prevTakeIndex=" + prevTakeIndex + " " +
1398     // "size()=" + size() + " " +
1399     // "remainingCapacity()=" + remainingCapacity());
1400     // }
1401 tim 1.1 }
1402 dl 1.100
1403 jsr166 1.105 /**
1404     * Returns a {@link Spliterator} over the elements in this queue.
1405     *
1406 jsr166 1.106 * <p>The returned spliterator is
1407     * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
1408     *
1409 jsr166 1.105 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
1410     * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
1411     *
1412     * @implNote
1413     * The {@code Spliterator} implements {@code trySplit} to permit limited
1414     * parallelism.
1415     *
1416     * @return a {@code Spliterator} over the elements in this queue
1417     * @since 1.8
1418     */
1419 dl 1.103 public Spliterator<E> spliterator() {
1420 dl 1.102 return Spliterators.spliterator
1421 jsr166 1.124 (this, (Spliterator.ORDERED |
1422     Spliterator.NONNULL |
1423     Spliterator.CONCURRENT));
1424 dl 1.100 }
1425    
1426 jsr166 1.143 /**
1427     * @throws NullPointerException {@inheritDoc}
1428     */
1429 jsr166 1.127 public void forEach(Consumer<? super E> action) {
1430     Objects.requireNonNull(action);
1431     final ReentrantLock lock = this.lock;
1432     lock.lock();
1433     try {
1434     if (count > 0) {
1435     final Object[] items = this.items;
1436 jsr166 1.128 for (int i = takeIndex, end = putIndex,
1437     to = (i < end) ? end : items.length;
1438     ; i = 0, to = end) {
1439     for (; i < to; i++)
1440     action.accept(itemAt(items, i));
1441     if (to == end) break;
1442     }
1443 jsr166 1.127 }
1444     } finally {
1445 jsr166 1.131 // checkInvariants();
1446 jsr166 1.127 lock.unlock();
1447     }
1448     }
1449    
1450 jsr166 1.135 /**
1451     * @throws NullPointerException {@inheritDoc}
1452     */
1453     public boolean removeIf(Predicate<? super E> filter) {
1454     Objects.requireNonNull(filter);
1455     return bulkRemove(filter);
1456     }
1457    
1458     /**
1459     * @throws NullPointerException {@inheritDoc}
1460     */
1461     public boolean removeAll(Collection<?> c) {
1462     Objects.requireNonNull(c);
1463     return bulkRemove(e -> c.contains(e));
1464     }
1465    
1466     /**
1467     * @throws NullPointerException {@inheritDoc}
1468     */
1469     public boolean retainAll(Collection<?> c) {
1470     Objects.requireNonNull(c);
1471     return bulkRemove(e -> !c.contains(e));
1472     }
1473    
1474     /** Implementation of bulk remove methods. */
1475     private boolean bulkRemove(Predicate<? super E> filter) {
1476     final ReentrantLock lock = this.lock;
1477     lock.lock();
1478     try {
1479     if (itrs == null) { // check for active iterators
1480     if (count > 0) {
1481     final Object[] items = this.items;
1482     // Optimize for initial run of survivors
1483     for (int i = takeIndex, end = putIndex,
1484     to = (i < end) ? end : items.length;
1485     ; i = 0, to = end) {
1486     for (; i < to; i++)
1487     if (filter.test(itemAt(items, i)))
1488 jsr166 1.137 return bulkRemoveModified(filter, i);
1489 jsr166 1.135 if (to == end) break;
1490     }
1491     }
1492     return false;
1493     }
1494     } finally {
1495     // checkInvariants();
1496     lock.unlock();
1497     }
1498     // Active iterators are too hairy!
1499     // Punting (for now) to the slow n^2 algorithm ...
1500     return super.removeIf(filter);
1501     }
1502    
1503 jsr166 1.137 // A tiny bit set implementation
1504    
1505     private static long[] nBits(int n) {
1506     return new long[((n - 1) >> 6) + 1];
1507     }
1508     private static void setBit(long[] bits, int i) {
1509     bits[i >> 6] |= 1L << i;
1510     }
1511     private static boolean isClear(long[] bits, int i) {
1512     return (bits[i >> 6] & (1L << i)) == 0;
1513     }
1514    
1515     /**
1516     * Returns circular distance from i to j, disambiguating i == j to
1517     * items.length; never returns 0.
1518     */
1519     private int distanceNonEmpty(int i, int j) {
1520     if ((j -= i) <= 0) j += items.length;
1521     return j;
1522     }
1523    
1524 jsr166 1.135 /**
1525     * Helper for bulkRemove, in case of at least one deletion.
1526 jsr166 1.137 * Tolerate predicates that reentrantly access the collection for
1527     * read (but not write), so traverse once to find elements to
1528     * delete, a second pass to physically expunge.
1529     *
1530     * @param beg valid index of first element to be deleted
1531 jsr166 1.135 */
1532     private boolean bulkRemoveModified(
1533 jsr166 1.137 Predicate<? super E> filter, final int beg) {
1534     final Object[] es = items;
1535 jsr166 1.135 final int capacity = items.length;
1536     final int end = putIndex;
1537 jsr166 1.137 final long[] deathRow = nBits(distanceNonEmpty(beg, putIndex));
1538     deathRow[0] = 1L; // set bit 0
1539     for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1540     ; i = 0, to = end, k -= capacity) {
1541     for (; i < to; i++)
1542     if (filter.test(itemAt(es, i)))
1543     setBit(deathRow, i - k);
1544     if (to == end) break;
1545     }
1546     // a two-finger traversal, with hare i reading, tortoise w writing
1547     int w = beg;
1548     for (int i = beg + 1, to = (i <= end) ? end : es.length, k = beg;
1549     ; w = 0) { // w rejoins i on second leg
1550     // In this loop, i and w are on the same leg, with i > w
1551     for (; i < to; i++)
1552     if (isClear(deathRow, i - k))
1553     es[w++] = es[i];
1554     if (to == end) break;
1555     // In this loop, w is on the first leg, i on the second
1556     for (i = 0, to = end, k -= capacity; i < to && w < capacity; i++)
1557     if (isClear(deathRow, i - k))
1558     es[w++] = es[i];
1559     if (i >= to) {
1560     if (w == capacity) w = 0; // "corner" case
1561     break;
1562 jsr166 1.135 }
1563     }
1564 jsr166 1.137 count -= distanceNonEmpty(w, end);
1565     circularClear(es, putIndex = w, end);
1566     // checkInvariants();
1567     return true;
1568 jsr166 1.135 }
1569    
1570 jsr166 1.128 /** debugging */
1571     void checkInvariants() {
1572 jsr166 1.134 // meta-assertions
1573 jsr166 1.128 // assert lock.isHeldByCurrentThread();
1574 jsr166 1.149 if (!invariantsSatisfied()) {
1575     String detail = String.format(
1576     "takeIndex=%d putIndex=%d count=%d capacity=%d items=%s",
1577     takeIndex, putIndex, count, items.length,
1578     Arrays.toString(items));
1579     System.err.println(detail);
1580     throw new AssertionError(detail);
1581 jsr166 1.128 }
1582     }
1583    
1584 jsr166 1.149 private boolean invariantsSatisfied() {
1585     // Unlike ArrayDeque, we have a count field but no spare slot.
1586     // We prefer ArrayDeque's strategy (and the names of its fields!),
1587     // but our field layout is baked into the serial form, and so is
1588     // too annoying to change.
1589     //
1590     // putIndex == takeIndex must be disambiguated by checking count.
1591     int capacity = items.length;
1592     return capacity > 0
1593     && items.getClass() == Object[].class
1594     && (takeIndex | putIndex | count) >= 0
1595     && takeIndex < capacity
1596     && putIndex < capacity
1597     && count <= capacity
1598     && (putIndex - takeIndex - count) % capacity == 0
1599     && (count == 0 || items[takeIndex] != null)
1600     && (count == capacity || items[putIndex] == null)
1601     && (count == 0 || items[dec(putIndex, capacity)] != null);
1602     }
1603    
1604 jsr166 1.148 /**
1605 jsr166 1.149 * Reconstitutes this queue from a stream (that is, deserializes it).
1606 jsr166 1.148 *
1607 jsr166 1.149 * @param s the stream
1608 jsr166 1.148 * @throws ClassNotFoundException if the class of a serialized object
1609     * could not be found
1610     * @throws java.io.InvalidObjectException if invariants are violated
1611     * @throws java.io.IOException if an I/O error occurs
1612     */
1613     private void readObject(java.io.ObjectInputStream s)
1614     throws java.io.IOException, ClassNotFoundException {
1615    
1616     // Read in items array and various fields
1617     s.defaultReadObject();
1618    
1619 jsr166 1.149 if (!invariantsSatisfied())
1620 jsr166 1.148 throw new java.io.InvalidObjectException("invariants violated");
1621     }
1622 tim 1.1 }