ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.88
Committed: Sun Jul 3 02:46:10 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.87: +8 -8 lines
Log Message:
rename some private methods

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.38 * Expert Group and released to the public domain, as explained at
4 jsr166 1.78 * http://creativecommons.org/publicdomain/zero/1.0/
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 jsr166 1.85 import java.util.concurrent.locks.Condition;
9     import java.util.concurrent.locks.ReentrantLock;
10     import java.util.AbstractQueue;
11     import java.util.Collection;
12     import java.util.Iterator;
13     import java.util.NoSuchElementException;
14 tim 1.1
15     /**
16 dl 1.25 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
17     * array. This queue orders elements FIFO (first-in-first-out). The
18     * <em>head</em> of the queue is that element that has been on the
19     * queue the longest time. The <em>tail</em> of the queue is that
20     * element that has been on the queue the shortest time. New elements
21     * are inserted at the tail of the queue, and the queue retrieval
22     * operations obtain elements at the head of the queue.
23 dholmes 1.13 *
24 dl 1.40 * <p>This is a classic &quot;bounded buffer&quot;, in which a
25     * fixed-sized array holds elements inserted by producers and
26     * extracted by consumers. Once created, the capacity cannot be
27 jsr166 1.69 * changed. Attempts to {@code put} an element into a full queue
28     * will result in the operation blocking; attempts to {@code take} an
29 dl 1.40 * element from an empty queue will similarly block.
30 dl 1.11 *
31 jsr166 1.72 * <p>This class supports an optional fairness policy for ordering
32 dl 1.42 * waiting producer and consumer threads. By default, this ordering
33     * is not guaranteed. However, a queue constructed with fairness set
34 jsr166 1.69 * to {@code true} grants threads access in FIFO order. Fairness
35 dl 1.42 * generally decreases throughput but reduces variability and avoids
36     * starvation.
37 brian 1.7 *
38 dl 1.43 * <p>This class and its iterator implement all of the
39     * <em>optional</em> methods of the {@link Collection} and {@link
40 jsr166 1.47 * Iterator} interfaces.
41 dl 1.26 *
42 dl 1.41 * <p>This class is a member of the
43 jsr166 1.54 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
44 dl 1.41 * Java Collections Framework</a>.
45     *
46 dl 1.8 * @since 1.5
47     * @author Doug Lea
48 dl 1.33 * @param <E> the type of elements held in this collection
49 dl 1.8 */
50 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
51 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
52    
53 dl 1.36 /**
54 dl 1.42 * Serialization ID. This class relies on default serialization
55     * even for the items array, which is default-serialized, even if
56     * it is empty. Otherwise it could not be declared final, which is
57 dl 1.36 * necessary here.
58     */
59     private static final long serialVersionUID = -817911632652898426L;
60    
61 jsr166 1.73 /** The queued items */
62 jsr166 1.68 final Object[] items;
63 jsr166 1.73
64     /** items index for next take, poll, peek or remove */
65 jsr166 1.58 int takeIndex;
66 jsr166 1.73
67     /** items index for next put, offer, or add */
68 jsr166 1.58 int putIndex;
69 jsr166 1.73
70     /** Number of elements in the queue */
71 jsr166 1.59 int count;
72 tim 1.12
73 dl 1.5 /*
74 dl 1.36 * Concurrency control uses the classic two-condition algorithm
75 dl 1.5 * found in any textbook.
76     */
77    
78 dl 1.11 /** Main lock guarding all access */
79 jsr166 1.58 final ReentrantLock lock;
80 jsr166 1.85
81 dholmes 1.13 /** Condition for waiting takes */
82 dl 1.37 private final Condition notEmpty;
83 jsr166 1.85
84 dl 1.35 /** Condition for waiting puts */
85 dl 1.37 private final Condition notFull;
86 dl 1.5
87     // Internal helper methods
88    
89     /**
90     * Circularly increment i.
91     */
92 dl 1.40 final int inc(int i) {
93 jsr166 1.58 return (++i == items.length) ? 0 : i;
94 dl 1.5 }
95    
96 jsr166 1.71 /**
97     * Circularly decrement i.
98     */
99     final int dec(int i) {
100     return ((i == 0) ? items.length : i) - 1;
101     }
102    
103 jsr166 1.68 /**
104     * Returns item at index i.
105     */
106     final E itemAt(int i) {
107 jsr166 1.84 @SuppressWarnings("unchecked") E x = (E) items[i];
108     return x;
109 jsr166 1.68 }
110    
111     /**
112     * Throws NullPointerException if argument is null.
113     *
114     * @param v the element
115     */
116     private static void checkNotNull(Object v) {
117     if (v == null)
118     throw new NullPointerException();
119     }
120    
121 dl 1.5 /**
122 jsr166 1.47 * Inserts element at current put position, advances, and signals.
123 dl 1.9 * Call only when holding lock.
124 dl 1.5 */
125 jsr166 1.88 private void enqueue(E x) {
126 dl 1.5 items[putIndex] = x;
127     putIndex = inc(putIndex);
128     ++count;
129 dl 1.9 notEmpty.signal();
130 tim 1.1 }
131 tim 1.12
132 dl 1.5 /**
133 jsr166 1.47 * Extracts element at current take position, advances, and signals.
134 dl 1.9 * Call only when holding lock.
135 dl 1.5 */
136 jsr166 1.88 private E dequeue() {
137 jsr166 1.68 final Object[] items = this.items;
138 jsr166 1.84 @SuppressWarnings("unchecked") E x = (E) items[takeIndex];
139 dl 1.5 items[takeIndex] = null;
140     takeIndex = inc(takeIndex);
141     --count;
142 dl 1.9 notFull.signal();
143 dl 1.5 return x;
144     }
145    
146     /**
147 jsr166 1.68 * Deletes item at position i.
148     * Utility for remove and iterator.remove.
149 dl 1.9 * Call only when holding lock.
150 dl 1.5 */
151     void removeAt(int i) {
152 jsr166 1.68 final Object[] items = this.items;
153 dl 1.9 // if removing front item, just advance
154     if (i == takeIndex) {
155     items[takeIndex] = null;
156     takeIndex = inc(takeIndex);
157 tim 1.23 } else {
158 dl 1.9 // slide over all others up through putIndex.
159     for (;;) {
160     int nexti = inc(i);
161     if (nexti != putIndex) {
162     items[i] = items[nexti];
163     i = nexti;
164 tim 1.23 } else {
165 dl 1.9 items[i] = null;
166     putIndex = i;
167     break;
168     }
169 dl 1.5 }
170     }
171 dl 1.9 --count;
172     notFull.signal();
173 tim 1.1 }
174    
175     /**
176 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
177 dholmes 1.13 * capacity and default access policy.
178 jsr166 1.50 *
179 dholmes 1.13 * @param capacity the capacity of this queue
180 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity < 1}
181 dl 1.5 */
182 dholmes 1.13 public ArrayBlockingQueue(int capacity) {
183 dl 1.36 this(capacity, false);
184 dl 1.5 }
185 dl 1.2
186 dl 1.5 /**
187 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
188 dholmes 1.13 * capacity and the specified access policy.
189 jsr166 1.50 *
190 dholmes 1.13 * @param capacity the capacity of this queue
191 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
192 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
193 jsr166 1.69 * if {@code false} the access order is unspecified.
194     * @throws IllegalArgumentException if {@code capacity < 1}
195 dl 1.11 */
196 dholmes 1.13 public ArrayBlockingQueue(int capacity, boolean fair) {
197 dl 1.36 if (capacity <= 0)
198     throw new IllegalArgumentException();
199 jsr166 1.68 this.items = new Object[capacity];
200 dl 1.36 lock = new ReentrantLock(fair);
201     notEmpty = lock.newCondition();
202     notFull = lock.newCondition();
203 dl 1.5 }
204    
205 dholmes 1.16 /**
206 jsr166 1.69 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
207 dholmes 1.21 * capacity, the specified access policy and initially containing the
208 tim 1.17 * elements of the given collection,
209 dholmes 1.16 * added in traversal order of the collection's iterator.
210 jsr166 1.50 *
211 dholmes 1.16 * @param capacity the capacity of this queue
212 jsr166 1.69 * @param fair if {@code true} then queue accesses for threads blocked
213 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
214 jsr166 1.69 * if {@code false} the access order is unspecified.
215 dholmes 1.16 * @param c the collection of elements to initially contain
216 jsr166 1.69 * @throws IllegalArgumentException if {@code capacity} is less than
217     * {@code c.size()}, or less than 1.
218 jsr166 1.50 * @throws NullPointerException if the specified collection or any
219     * of its elements are null
220 dholmes 1.16 */
221 tim 1.20 public ArrayBlockingQueue(int capacity, boolean fair,
222 dholmes 1.18 Collection<? extends E> c) {
223 dl 1.36 this(capacity, fair);
224 dholmes 1.16
225 jsr166 1.68 final ReentrantLock lock = this.lock;
226     lock.lock(); // Lock only for visibility, not mutual exclusion
227     try {
228     int i = 0;
229     try {
230     for (E e : c) {
231     checkNotNull(e);
232     items[i++] = e;
233     }
234     } catch (ArrayIndexOutOfBoundsException ex) {
235     throw new IllegalArgumentException();
236     }
237     count = i;
238     putIndex = (i == capacity) ? 0 : i;
239     } finally {
240     lock.unlock();
241     }
242 dholmes 1.16 }
243 dl 1.2
244 dholmes 1.13 /**
245 jsr166 1.50 * Inserts the specified element at the tail of this queue if it is
246     * possible to do so immediately without exceeding the queue's capacity,
247 jsr166 1.69 * returning {@code true} upon success and throwing an
248     * {@code IllegalStateException} if this queue is full.
249 dl 1.25 *
250 jsr166 1.50 * @param e the element to add
251 jsr166 1.69 * @return {@code true} (as specified by {@link Collection#add})
252 jsr166 1.50 * @throws IllegalStateException if this queue is full
253     * @throws NullPointerException if the specified element is null
254     */
255     public boolean add(E e) {
256 jsr166 1.56 return super.add(e);
257 jsr166 1.50 }
258    
259     /**
260     * Inserts the specified element at the tail of this queue if it is
261     * possible to do so immediately without exceeding the queue's capacity,
262 jsr166 1.69 * returning {@code true} upon success and {@code false} if this queue
263 jsr166 1.50 * is full. This method is generally preferable to method {@link #add},
264     * which can fail to insert an element only by throwing an exception.
265     *
266     * @throws NullPointerException if the specified element is null
267 dholmes 1.13 */
268 jsr166 1.49 public boolean offer(E e) {
269 jsr166 1.68 checkNotNull(e);
270 dl 1.36 final ReentrantLock lock = this.lock;
271 dl 1.5 lock.lock();
272     try {
273 jsr166 1.59 if (count == items.length)
274 dl 1.2 return false;
275 dl 1.5 else {
276 jsr166 1.88 enqueue(e);
277 dl 1.5 return true;
278     }
279 tim 1.23 } finally {
280 tim 1.12 lock.unlock();
281 dl 1.2 }
282 dl 1.5 }
283 dl 1.2
284 dholmes 1.13 /**
285 jsr166 1.50 * Inserts the specified element at the tail of this queue, waiting
286     * for space to become available if the queue is full.
287     *
288     * @throws InterruptedException {@inheritDoc}
289     * @throws NullPointerException {@inheritDoc}
290     */
291     public void put(E e) throws InterruptedException {
292 jsr166 1.68 checkNotNull(e);
293 jsr166 1.50 final ReentrantLock lock = this.lock;
294     lock.lockInterruptibly();
295     try {
296 jsr166 1.59 while (count == items.length)
297 jsr166 1.58 notFull.await();
298 jsr166 1.88 enqueue(e);
299 jsr166 1.50 } finally {
300     lock.unlock();
301     }
302     }
303    
304     /**
305     * Inserts the specified element at the tail of this queue, waiting
306     * up to the specified wait time for space to become available if
307     * the queue is full.
308     *
309     * @throws InterruptedException {@inheritDoc}
310     * @throws NullPointerException {@inheritDoc}
311 brian 1.7 */
312 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit)
313 dholmes 1.13 throws InterruptedException {
314 dl 1.2
315 jsr166 1.68 checkNotNull(e);
316 jsr166 1.56 long nanos = unit.toNanos(timeout);
317 dl 1.36 final ReentrantLock lock = this.lock;
318 dl 1.5 lock.lockInterruptibly();
319     try {
320 jsr166 1.59 while (count == items.length) {
321 dl 1.5 if (nanos <= 0)
322     return false;
323 jsr166 1.58 nanos = notFull.awaitNanos(nanos);
324 dl 1.5 }
325 jsr166 1.88 enqueue(e);
326 jsr166 1.58 return true;
327 tim 1.23 } finally {
328 dl 1.5 lock.unlock();
329 dl 1.2 }
330 dl 1.5 }
331 dl 1.2
332 dholmes 1.13 public E poll() {
333 dl 1.36 final ReentrantLock lock = this.lock;
334 dholmes 1.13 lock.lock();
335     try {
336 jsr166 1.88 return (count == 0) ? null : dequeue();
337 tim 1.23 } finally {
338 tim 1.15 lock.unlock();
339 dholmes 1.13 }
340     }
341    
342 jsr166 1.50 public E take() throws InterruptedException {
343     final ReentrantLock lock = this.lock;
344     lock.lockInterruptibly();
345     try {
346 jsr166 1.59 while (count == 0)
347 jsr166 1.58 notEmpty.await();
348 jsr166 1.88 return dequeue();
349 jsr166 1.50 } finally {
350     lock.unlock();
351     }
352     }
353    
354 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
355 jsr166 1.56 long nanos = unit.toNanos(timeout);
356 dl 1.36 final ReentrantLock lock = this.lock;
357 dl 1.5 lock.lockInterruptibly();
358     try {
359 jsr166 1.59 while (count == 0) {
360 dl 1.5 if (nanos <= 0)
361     return null;
362 jsr166 1.58 nanos = notEmpty.awaitNanos(nanos);
363 dl 1.2 }
364 jsr166 1.88 return dequeue();
365 tim 1.23 } finally {
366 dl 1.5 lock.unlock();
367     }
368     }
369 dl 1.2
370 dholmes 1.13 public E peek() {
371 dl 1.36 final ReentrantLock lock = this.lock;
372 dholmes 1.13 lock.lock();
373     try {
374 jsr166 1.68 return (count == 0) ? null : itemAt(takeIndex);
375 tim 1.23 } finally {
376 dholmes 1.13 lock.unlock();
377     }
378     }
379    
380     // this doc comment is overridden to remove the reference to collections
381     // greater in size than Integer.MAX_VALUE
382 tim 1.15 /**
383 dl 1.25 * Returns the number of elements in this queue.
384     *
385 jsr166 1.50 * @return the number of elements in this queue
386 dholmes 1.13 */
387     public int size() {
388 dl 1.36 final ReentrantLock lock = this.lock;
389 dholmes 1.13 lock.lock();
390     try {
391     return count;
392 tim 1.23 } finally {
393 dholmes 1.13 lock.unlock();
394     }
395     }
396    
397     // this doc comment is a modified copy of the inherited doc comment,
398     // without the reference to unlimited queues.
399 tim 1.15 /**
400 jsr166 1.48 * Returns the number of additional elements that this queue can ideally
401     * (in the absence of memory or resource constraints) accept without
402 dholmes 1.13 * blocking. This is always equal to the initial capacity of this queue
403 jsr166 1.69 * less the current {@code size} of this queue.
404 jsr166 1.48 *
405     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
406 jsr166 1.69 * an element will succeed by inspecting {@code remainingCapacity}
407 jsr166 1.48 * because it may be the case that another thread is about to
408 jsr166 1.50 * insert or remove an element.
409 dholmes 1.13 */
410     public int remainingCapacity() {
411 dl 1.36 final ReentrantLock lock = this.lock;
412 dholmes 1.13 lock.lock();
413     try {
414     return items.length - count;
415 tim 1.23 } finally {
416 dholmes 1.13 lock.unlock();
417     }
418     }
419    
420 jsr166 1.50 /**
421     * Removes a single instance of the specified element from this queue,
422 jsr166 1.69 * if it is present. More formally, removes an element {@code e} such
423     * that {@code o.equals(e)}, if this queue contains one or more such
424 jsr166 1.50 * elements.
425 jsr166 1.69 * Returns {@code true} if this queue contained the specified element
426 jsr166 1.50 * (or equivalently, if this queue changed as a result of the call).
427     *
428 jsr166 1.64 * <p>Removal of interior elements in circular array based queues
429 dl 1.60 * is an intrinsically slow and disruptive operation, so should
430     * be undertaken only in exceptional circumstances, ideally
431     * only when the queue is known not to be accessible by other
432     * threads.
433     *
434 jsr166 1.50 * @param o element to be removed from this queue, if present
435 jsr166 1.69 * @return {@code true} if this queue changed as a result of the call
436 jsr166 1.50 */
437     public boolean remove(Object o) {
438     if (o == null) return false;
439 jsr166 1.68 final Object[] items = this.items;
440 jsr166 1.50 final ReentrantLock lock = this.lock;
441     lock.lock();
442     try {
443 jsr166 1.86 if (count > 0) {
444     final int putIndex = this.putIndex;
445     int i = takeIndex;
446     do {
447     if (o.equals(items[i])) {
448     removeAt(i);
449     return true;
450     }
451     } while ((i = inc(i)) != putIndex);
452 jsr166 1.50 }
453 jsr166 1.68 return false;
454 jsr166 1.50 } finally {
455     lock.unlock();
456     }
457     }
458 dholmes 1.13
459 jsr166 1.50 /**
460 jsr166 1.69 * Returns {@code true} if this queue contains the specified element.
461     * More formally, returns {@code true} if and only if this queue contains
462     * at least one element {@code e} such that {@code o.equals(e)}.
463 jsr166 1.50 *
464     * @param o object to be checked for containment in this queue
465 jsr166 1.69 * @return {@code true} if this queue contains the specified element
466 jsr166 1.50 */
467 dholmes 1.21 public boolean contains(Object o) {
468     if (o == null) return false;
469 jsr166 1.68 final Object[] items = this.items;
470 dl 1.36 final ReentrantLock lock = this.lock;
471 dl 1.5 lock.lock();
472     try {
473 jsr166 1.86 if (count > 0) {
474     final int putIndex = this.putIndex;
475     int i = takeIndex;
476     do {
477     if (o.equals(items[i]))
478     return true;
479     } while ((i = inc(i)) != putIndex);
480     }
481 dl 1.2 return false;
482 tim 1.23 } finally {
483 dl 1.5 lock.unlock();
484     }
485     }
486 brian 1.7
487 jsr166 1.50 /**
488     * Returns an array containing all of the elements in this queue, in
489     * proper sequence.
490     *
491     * <p>The returned array will be "safe" in that no references to it are
492     * maintained by this queue. (In other words, this method must allocate
493     * a new array). The caller is thus free to modify the returned array.
494 jsr166 1.51 *
495 jsr166 1.50 * <p>This method acts as bridge between array-based and collection-based
496     * APIs.
497     *
498     * @return an array containing all of the elements in this queue
499     */
500 dl 1.5 public Object[] toArray() {
501 jsr166 1.68 final Object[] items = this.items;
502 dl 1.36 final ReentrantLock lock = this.lock;
503 dl 1.5 lock.lock();
504     try {
505 jsr166 1.68 final int count = this.count;
506 dl 1.34 Object[] a = new Object[count];
507 jsr166 1.68 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
508     a[k] = items[i];
509 dl 1.2 return a;
510 tim 1.23 } finally {
511 dl 1.5 lock.unlock();
512     }
513     }
514 brian 1.7
515 jsr166 1.50 /**
516     * Returns an array containing all of the elements in this queue, in
517     * proper sequence; the runtime type of the returned array is that of
518     * the specified array. If the queue fits in the specified array, it
519     * is returned therein. Otherwise, a new array is allocated with the
520     * runtime type of the specified array and the size of this queue.
521     *
522     * <p>If this queue fits in the specified array with room to spare
523     * (i.e., the array has more elements than this queue), the element in
524     * the array immediately following the end of the queue is set to
525 jsr166 1.69 * {@code null}.
526 jsr166 1.50 *
527     * <p>Like the {@link #toArray()} method, this method acts as bridge between
528     * array-based and collection-based APIs. Further, this method allows
529     * precise control over the runtime type of the output array, and may,
530     * under certain circumstances, be used to save allocation costs.
531     *
532 jsr166 1.69 * <p>Suppose {@code x} is a queue known to contain only strings.
533 jsr166 1.50 * The following code can be used to dump the queue into a newly
534 jsr166 1.69 * allocated array of {@code String}:
535 jsr166 1.50 *
536 jsr166 1.80 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
537 jsr166 1.50 *
538 jsr166 1.69 * Note that {@code toArray(new Object[0])} is identical in function to
539     * {@code toArray()}.
540 jsr166 1.50 *
541     * @param a the array into which the elements of the queue are to
542     * be stored, if it is big enough; otherwise, a new array of the
543     * same runtime type is allocated for this purpose
544     * @return an array containing all of the elements in this queue
545     * @throws ArrayStoreException if the runtime type of the specified array
546     * is not a supertype of the runtime type of every element in
547     * this queue
548     * @throws NullPointerException if the specified array is null
549     */
550 jsr166 1.68 @SuppressWarnings("unchecked")
551 dl 1.5 public <T> T[] toArray(T[] a) {
552 jsr166 1.68 final Object[] items = this.items;
553 dl 1.36 final ReentrantLock lock = this.lock;
554 dl 1.5 lock.lock();
555     try {
556 jsr166 1.68 final int count = this.count;
557     final int len = a.length;
558     if (len < count)
559 dholmes 1.16 a = (T[])java.lang.reflect.Array.newInstance(
560 jsr166 1.68 a.getClass().getComponentType(), count);
561     for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
562     a[k] = (T) items[i];
563     if (len > count)
564 dl 1.5 a[count] = null;
565 dl 1.2 return a;
566 tim 1.23 } finally {
567 dl 1.5 lock.unlock();
568     }
569     }
570 dl 1.6
571     public String toString() {
572 dl 1.36 final ReentrantLock lock = this.lock;
573 dl 1.6 lock.lock();
574     try {
575 jsr166 1.68 int k = count;
576     if (k == 0)
577     return "[]";
578    
579     StringBuilder sb = new StringBuilder();
580     sb.append('[');
581     for (int i = takeIndex; ; i = inc(i)) {
582     Object e = items[i];
583     sb.append(e == this ? "(this Collection)" : e);
584     if (--k == 0)
585     return sb.append(']').toString();
586     sb.append(',').append(' ');
587     }
588 tim 1.23 } finally {
589 dl 1.6 lock.unlock();
590     }
591     }
592 tim 1.12
593 dl 1.44 /**
594     * Atomically removes all of the elements from this queue.
595     * The queue will be empty after this call returns.
596     */
597 dl 1.30 public void clear() {
598 jsr166 1.68 final Object[] items = this.items;
599 dl 1.36 final ReentrantLock lock = this.lock;
600 dl 1.30 lock.lock();
601     try {
602 jsr166 1.86 int k = count;
603     if (k > 0) {
604     final int putIndex = this.putIndex;
605     int i = takeIndex;
606     do {
607     items[i] = null;
608     } while ((i = inc(i)) != putIndex);
609     takeIndex = putIndex;
610     count = 0;
611     for (; k > 0 && lock.hasWaiters(notFull); k--)
612     notFull.signal();
613     }
614 dl 1.30 } finally {
615     lock.unlock();
616     }
617     }
618    
619 jsr166 1.50 /**
620     * @throws UnsupportedOperationException {@inheritDoc}
621     * @throws ClassCastException {@inheritDoc}
622     * @throws NullPointerException {@inheritDoc}
623     * @throws IllegalArgumentException {@inheritDoc}
624     */
625 dl 1.30 public int drainTo(Collection<? super E> c) {
626 jsr166 1.81 return drainTo(c, Integer.MAX_VALUE);
627 dl 1.30 }
628    
629 jsr166 1.50 /**
630     * @throws UnsupportedOperationException {@inheritDoc}
631     * @throws ClassCastException {@inheritDoc}
632     * @throws NullPointerException {@inheritDoc}
633     * @throws IllegalArgumentException {@inheritDoc}
634     */
635 dl 1.30 public int drainTo(Collection<? super E> c, int maxElements) {
636 jsr166 1.68 checkNotNull(c);
637 dl 1.30 if (c == this)
638     throw new IllegalArgumentException();
639     if (maxElements <= 0)
640     return 0;
641 jsr166 1.68 final Object[] items = this.items;
642 dl 1.36 final ReentrantLock lock = this.lock;
643 dl 1.30 lock.lock();
644     try {
645 jsr166 1.82 int n = Math.min(maxElements, count);
646     int take = takeIndex;
647     int i = 0;
648     try {
649     while (i < n) {
650 jsr166 1.84 @SuppressWarnings("unchecked") E x = (E) items[take];
651     c.add(x);
652 jsr166 1.82 items[take] = null;
653     take = inc(take);
654 jsr166 1.86 i++;
655 jsr166 1.82 }
656     return n;
657     } finally {
658     // Restore invariants even if c.add() threw
659 jsr166 1.83 count -= i;
660     takeIndex = take;
661     for (; i > 0 && lock.hasWaiters(notFull); i--)
662     notFull.signal();
663 dl 1.30 }
664     } finally {
665     lock.unlock();
666     }
667     }
668    
669 brian 1.7 /**
670 dl 1.75 * Returns an iterator over the elements in this queue in proper sequence.
671 jsr166 1.77 * The elements will be returned in order from first (head) to last (tail).
672     *
673 jsr166 1.87 * <p>The returned iterator is a "weakly consistent" iterator that
674 jsr166 1.77 * will never throw {@link java.util.ConcurrentModificationException
675 jsr166 1.87 * ConcurrentModificationException}, and guarantees to traverse
676     * elements as they existed upon construction of the iterator, and
677     * may (but is not guaranteed to) reflect any modifications
678     * subsequent to construction.
679 brian 1.7 *
680 jsr166 1.50 * @return an iterator over the elements in this queue in proper sequence
681 brian 1.7 */
682 dl 1.5 public Iterator<E> iterator() {
683 jsr166 1.68 return new Itr();
684 dl 1.5 }
685 dl 1.8
686     /**
687 dl 1.60 * Iterator for ArrayBlockingQueue. To maintain weak consistency
688     * with respect to puts and takes, we (1) read ahead one slot, so
689     * as to not report hasNext true but then not have an element to
690 dl 1.75 * return -- however we later recheck this slot to use the most
691     * current value; (2) ensure that each array slot is traversed at
692     * most once (by tracking "remaining" elements); (3) skip over
693     * null slots, which can occur if takes race ahead of iterators.
694 dl 1.60 * However, for circular array-based queues, we cannot rely on any
695     * well established definition of what it means to be weakly
696     * consistent with respect to interior removes since these may
697     * require slot overwrites in the process of sliding elements to
698     * cover gaps. So we settle for resiliency, operating on
699 jsr166 1.62 * established apparent nexts, which may miss some elements that
700 dl 1.66 * have moved between calls to next.
701 dl 1.8 */
702 dl 1.5 private class Itr implements Iterator<E> {
703 dl 1.60 private int remaining; // Number of elements yet to be returned
704     private int nextIndex; // Index of element to be returned by next
705     private E nextItem; // Element to be returned by next call to next
706     private E lastItem; // Element returned by last call to next
707     private int lastRet; // Index of last element returned, or -1 if none
708 tim 1.12
709 dl 1.66 Itr() {
710 jsr166 1.68 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
711     lock.lock();
712     try {
713     lastRet = -1;
714     if ((remaining = count) > 0)
715     nextItem = itemAt(nextIndex = takeIndex);
716     } finally {
717     lock.unlock();
718     }
719 dl 1.5 }
720 tim 1.12
721 dl 1.5 public boolean hasNext() {
722 dl 1.60 return remaining > 0;
723 dl 1.5 }
724 tim 1.12
725 dl 1.5 public E next() {
726 dl 1.66 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
727     lock.lock();
728     try {
729 dl 1.67 if (remaining <= 0)
730     throw new NoSuchElementException();
731 dl 1.66 lastRet = nextIndex;
732 dl 1.75 E x = itemAt(nextIndex); // check for fresher value
733     if (x == null) {
734     x = nextItem; // we are forced to report old value
735     lastItem = null; // but ensure remove fails
736 dl 1.66 }
737 dl 1.75 else
738     lastItem = x;
739     while (--remaining > 0 && // skip over nulls
740     (nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
741     ;
742 dl 1.66 return x;
743     } finally {
744     lock.unlock();
745 dl 1.2 }
746 dl 1.5 }
747 tim 1.12
748 dl 1.5 public void remove() {
749 dl 1.36 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
750 dl 1.5 lock.lock();
751     try {
752 dl 1.66 int i = lastRet;
753     if (i == -1)
754     throw new IllegalStateException();
755 dl 1.2 lastRet = -1;
756 jsr166 1.61 E x = lastItem;
757     lastItem = null;
758 jsr166 1.71 // only remove if item still at index
759 dl 1.75 if (x != null && x == items[i]) {
760 jsr166 1.71 boolean removingHead = (i == takeIndex);
761     removeAt(i);
762     if (!removingHead)
763     nextIndex = dec(nextIndex);
764     }
765 tim 1.23 } finally {
766 dl 1.5 lock.unlock();
767     }
768     }
769 tim 1.1 }
770 dl 1.60
771 tim 1.1 }