ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.64
Committed: Sun Sep 26 18:09:48 2010 UTC (13 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.63: +1 -1 lines
Log Message:
Add <p>

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.38 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.11 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.25 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13     * array. This queue orders elements FIFO (first-in-first-out). The
14     * <em>head</em> of the queue is that element that has been on the
15     * queue the longest time. The <em>tail</em> of the queue is that
16     * element that has been on the queue the shortest time. New elements
17     * are inserted at the tail of the queue, and the queue retrieval
18     * operations obtain elements at the head of the queue.
19 dholmes 1.13 *
20 dl 1.40 * <p>This is a classic &quot;bounded buffer&quot;, in which a
21     * fixed-sized array holds elements inserted by producers and
22     * extracted by consumers. Once created, the capacity cannot be
23 jsr166 1.50 * increased. Attempts to <tt>put</tt> an element into a full queue
24     * will result in the operation blocking; attempts to <tt>take</tt> an
25 dl 1.40 * element from an empty queue will similarly block.
26 dl 1.11 *
27     * <p> This class supports an optional fairness policy for ordering
28 dl 1.42 * waiting producer and consumer threads. By default, this ordering
29     * is not guaranteed. However, a queue constructed with fairness set
30     * to <tt>true</tt> grants threads access in FIFO order. Fairness
31     * generally decreases throughput but reduces variability and avoids
32     * starvation.
33 brian 1.7 *
34 dl 1.43 * <p>This class and its iterator implement all of the
35     * <em>optional</em> methods of the {@link Collection} and {@link
36 jsr166 1.47 * Iterator} interfaces.
37 dl 1.26 *
38 dl 1.41 * <p>This class is a member of the
39 jsr166 1.54 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 dl 1.41 * Java Collections Framework</a>.
41     *
42 dl 1.8 * @since 1.5
43     * @author Doug Lea
44 dl 1.33 * @param <E> the type of elements held in this collection
45 dl 1.8 */
46 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
48    
49 dl 1.36 /**
50 dl 1.42 * Serialization ID. This class relies on default serialization
51     * even for the items array, which is default-serialized, even if
52     * it is empty. Otherwise it could not be declared final, which is
53 dl 1.36 * necessary here.
54     */
55     private static final long serialVersionUID = -817911632652898426L;
56    
57     /** The queued items */
58 jsr166 1.58 final E[] items;
59 dl 1.8 /** items index for next take, poll or remove */
60 jsr166 1.58 int takeIndex;
61 dl 1.8 /** items index for next put, offer, or add. */
62 jsr166 1.58 int putIndex;
63 dl 1.8 /** Number of items in the queue */
64 jsr166 1.59 int count;
65 tim 1.12
66 dl 1.5 /*
67 dl 1.36 * Concurrency control uses the classic two-condition algorithm
68 dl 1.5 * found in any textbook.
69     */
70    
71 dl 1.11 /** Main lock guarding all access */
72 jsr166 1.58 final ReentrantLock lock;
73 dholmes 1.13 /** Condition for waiting takes */
74 dl 1.37 private final Condition notEmpty;
75 dl 1.35 /** Condition for waiting puts */
76 dl 1.37 private final Condition notFull;
77 dl 1.5
78     // Internal helper methods
79    
80     /**
81     * Circularly increment i.
82     */
83 dl 1.40 final int inc(int i) {
84 jsr166 1.58 return (++i == items.length) ? 0 : i;
85 dl 1.5 }
86    
87     /**
88 jsr166 1.47 * Inserts element at current put position, advances, and signals.
89 dl 1.9 * Call only when holding lock.
90 dl 1.5 */
91     private void insert(E x) {
92     items[putIndex] = x;
93     putIndex = inc(putIndex);
94     ++count;
95 dl 1.9 notEmpty.signal();
96 tim 1.1 }
97 tim 1.12
98 dl 1.5 /**
99 jsr166 1.47 * Extracts element at current take position, advances, and signals.
100 dl 1.9 * Call only when holding lock.
101 dl 1.5 */
102 dholmes 1.13 private E extract() {
103 dl 1.36 final E[] items = this.items;
104 dl 1.5 E x = items[takeIndex];
105     items[takeIndex] = null;
106     takeIndex = inc(takeIndex);
107     --count;
108 dl 1.9 notFull.signal();
109 dl 1.5 return x;
110     }
111    
112     /**
113 tim 1.12 * Utility for remove and iterator.remove: Delete item at position i.
114 dl 1.9 * Call only when holding lock.
115 dl 1.5 */
116     void removeAt(int i) {
117 dl 1.36 final E[] items = this.items;
118 dl 1.9 // if removing front item, just advance
119     if (i == takeIndex) {
120     items[takeIndex] = null;
121     takeIndex = inc(takeIndex);
122 tim 1.23 } else {
123 dl 1.9 // slide over all others up through putIndex.
124     for (;;) {
125     int nexti = inc(i);
126     if (nexti != putIndex) {
127     items[i] = items[nexti];
128     i = nexti;
129 tim 1.23 } else {
130 dl 1.9 items[i] = null;
131     putIndex = i;
132     break;
133     }
134 dl 1.5 }
135     }
136 dl 1.9 --count;
137     notFull.signal();
138 tim 1.1 }
139    
140     /**
141 dholmes 1.21 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
142 dholmes 1.13 * capacity and default access policy.
143 jsr166 1.50 *
144 dholmes 1.13 * @param capacity the capacity of this queue
145 dholmes 1.16 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
146 dl 1.5 */
147 dholmes 1.13 public ArrayBlockingQueue(int capacity) {
148 dl 1.36 this(capacity, false);
149 dl 1.5 }
150 dl 1.2
151 dl 1.5 /**
152 dholmes 1.21 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
153 dholmes 1.13 * capacity and the specified access policy.
154 jsr166 1.50 *
155 dholmes 1.13 * @param capacity the capacity of this queue
156     * @param fair if <tt>true</tt> then queue accesses for threads blocked
157 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
158     * if <tt>false</tt> the access order is unspecified.
159 dholmes 1.16 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
160 dl 1.11 */
161 dholmes 1.13 public ArrayBlockingQueue(int capacity, boolean fair) {
162 dl 1.36 if (capacity <= 0)
163     throw new IllegalArgumentException();
164     this.items = (E[]) new Object[capacity];
165     lock = new ReentrantLock(fair);
166     notEmpty = lock.newCondition();
167     notFull = lock.newCondition();
168 dl 1.5 }
169    
170 dholmes 1.16 /**
171 dholmes 1.21 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
172     * capacity, the specified access policy and initially containing the
173 tim 1.17 * elements of the given collection,
174 dholmes 1.16 * added in traversal order of the collection's iterator.
175 jsr166 1.50 *
176 dholmes 1.16 * @param capacity the capacity of this queue
177     * @param fair if <tt>true</tt> then queue accesses for threads blocked
178 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
179     * if <tt>false</tt> the access order is unspecified.
180 dholmes 1.16 * @param c the collection of elements to initially contain
181     * @throws IllegalArgumentException if <tt>capacity</tt> is less than
182 jsr166 1.50 * <tt>c.size()</tt>, or less than 1.
183     * @throws NullPointerException if the specified collection or any
184     * of its elements are null
185 dholmes 1.16 */
186 tim 1.20 public ArrayBlockingQueue(int capacity, boolean fair,
187 dholmes 1.18 Collection<? extends E> c) {
188 dl 1.36 this(capacity, fair);
189 dholmes 1.16 if (capacity < c.size())
190     throw new IllegalArgumentException();
191    
192 jsr166 1.57 for (E e : c)
193     add(e);
194 dholmes 1.16 }
195 dl 1.2
196 dholmes 1.13 /**
197 jsr166 1.50 * Inserts the specified element at the tail of this queue if it is
198     * possible to do so immediately without exceeding the queue's capacity,
199     * returning <tt>true</tt> upon success and throwing an
200     * <tt>IllegalStateException</tt> if this queue is full.
201 dl 1.25 *
202 jsr166 1.50 * @param e the element to add
203 jsr166 1.52 * @return <tt>true</tt> (as specified by {@link Collection#add})
204 jsr166 1.50 * @throws IllegalStateException if this queue is full
205     * @throws NullPointerException if the specified element is null
206     */
207     public boolean add(E e) {
208 jsr166 1.56 return super.add(e);
209 jsr166 1.50 }
210    
211     /**
212     * Inserts the specified element at the tail of this queue if it is
213     * possible to do so immediately without exceeding the queue's capacity,
214     * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
215     * is full. This method is generally preferable to method {@link #add},
216     * which can fail to insert an element only by throwing an exception.
217     *
218     * @throws NullPointerException if the specified element is null
219 dholmes 1.13 */
220 jsr166 1.49 public boolean offer(E e) {
221     if (e == null) throw new NullPointerException();
222 dl 1.36 final ReentrantLock lock = this.lock;
223 dl 1.5 lock.lock();
224     try {
225 jsr166 1.59 if (count == items.length)
226 dl 1.2 return false;
227 dl 1.5 else {
228 jsr166 1.49 insert(e);
229 dl 1.5 return true;
230     }
231 tim 1.23 } finally {
232 tim 1.12 lock.unlock();
233 dl 1.2 }
234 dl 1.5 }
235 dl 1.2
236 dholmes 1.13 /**
237 jsr166 1.50 * Inserts the specified element at the tail of this queue, waiting
238     * for space to become available if the queue is full.
239     *
240     * @throws InterruptedException {@inheritDoc}
241     * @throws NullPointerException {@inheritDoc}
242     */
243     public void put(E e) throws InterruptedException {
244     if (e == null) throw new NullPointerException();
245     final ReentrantLock lock = this.lock;
246     lock.lockInterruptibly();
247     try {
248 jsr166 1.59 while (count == items.length)
249 jsr166 1.58 notFull.await();
250 jsr166 1.50 insert(e);
251     } finally {
252     lock.unlock();
253     }
254     }
255    
256     /**
257     * Inserts the specified element at the tail of this queue, waiting
258     * up to the specified wait time for space to become available if
259     * the queue is full.
260     *
261     * @throws InterruptedException {@inheritDoc}
262     * @throws NullPointerException {@inheritDoc}
263 brian 1.7 */
264 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit)
265 dholmes 1.13 throws InterruptedException {
266 dl 1.2
267 jsr166 1.49 if (e == null) throw new NullPointerException();
268 jsr166 1.56 long nanos = unit.toNanos(timeout);
269 dl 1.36 final ReentrantLock lock = this.lock;
270 dl 1.5 lock.lockInterruptibly();
271     try {
272 jsr166 1.59 while (count == items.length) {
273 dl 1.5 if (nanos <= 0)
274     return false;
275 jsr166 1.58 nanos = notFull.awaitNanos(nanos);
276 dl 1.5 }
277 jsr166 1.58 insert(e);
278     return true;
279 tim 1.23 } finally {
280 dl 1.5 lock.unlock();
281 dl 1.2 }
282 dl 1.5 }
283 dl 1.2
284 dholmes 1.13 public E poll() {
285 dl 1.36 final ReentrantLock lock = this.lock;
286 dholmes 1.13 lock.lock();
287     try {
288 jsr166 1.59 return (count == 0) ? null : extract();
289 tim 1.23 } finally {
290 tim 1.15 lock.unlock();
291 dholmes 1.13 }
292     }
293    
294 jsr166 1.50 public E take() throws InterruptedException {
295     final ReentrantLock lock = this.lock;
296     lock.lockInterruptibly();
297     try {
298 jsr166 1.59 while (count == 0)
299 jsr166 1.58 notEmpty.await();
300     return extract();
301 jsr166 1.50 } finally {
302     lock.unlock();
303     }
304     }
305    
306 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
307 jsr166 1.56 long nanos = unit.toNanos(timeout);
308 dl 1.36 final ReentrantLock lock = this.lock;
309 dl 1.5 lock.lockInterruptibly();
310     try {
311 jsr166 1.59 while (count == 0) {
312 dl 1.5 if (nanos <= 0)
313     return null;
314 jsr166 1.58 nanos = notEmpty.awaitNanos(nanos);
315 dl 1.2 }
316 jsr166 1.58 return extract();
317 tim 1.23 } finally {
318 dl 1.5 lock.unlock();
319     }
320     }
321 dl 1.2
322 dholmes 1.13 public E peek() {
323 dl 1.36 final ReentrantLock lock = this.lock;
324 dholmes 1.13 lock.lock();
325     try {
326 jsr166 1.59 return (count == 0) ? null : items[takeIndex];
327 tim 1.23 } finally {
328 dholmes 1.13 lock.unlock();
329     }
330     }
331    
332     // this doc comment is overridden to remove the reference to collections
333     // greater in size than Integer.MAX_VALUE
334 tim 1.15 /**
335 dl 1.25 * Returns the number of elements in this queue.
336     *
337 jsr166 1.50 * @return the number of elements in this queue
338 dholmes 1.13 */
339     public int size() {
340 dl 1.36 final ReentrantLock lock = this.lock;
341 dholmes 1.13 lock.lock();
342     try {
343     return count;
344 tim 1.23 } finally {
345 dholmes 1.13 lock.unlock();
346     }
347     }
348    
349     // this doc comment is a modified copy of the inherited doc comment,
350     // without the reference to unlimited queues.
351 tim 1.15 /**
352 jsr166 1.48 * Returns the number of additional elements that this queue can ideally
353     * (in the absence of memory or resource constraints) accept without
354 dholmes 1.13 * blocking. This is always equal to the initial capacity of this queue
355     * less the current <tt>size</tt> of this queue.
356 jsr166 1.48 *
357     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
358     * an element will succeed by inspecting <tt>remainingCapacity</tt>
359     * because it may be the case that another thread is about to
360 jsr166 1.50 * insert or remove an element.
361 dholmes 1.13 */
362     public int remainingCapacity() {
363 dl 1.36 final ReentrantLock lock = this.lock;
364 dholmes 1.13 lock.lock();
365     try {
366     return items.length - count;
367 tim 1.23 } finally {
368 dholmes 1.13 lock.unlock();
369     }
370     }
371    
372 jsr166 1.50 /**
373     * Removes a single instance of the specified element from this queue,
374     * if it is present. More formally, removes an element <tt>e</tt> such
375     * that <tt>o.equals(e)</tt>, if this queue contains one or more such
376     * elements.
377     * Returns <tt>true</tt> if this queue contained the specified element
378     * (or equivalently, if this queue changed as a result of the call).
379     *
380 jsr166 1.64 * <p>Removal of interior elements in circular array based queues
381 dl 1.60 * is an intrinsically slow and disruptive operation, so should
382     * be undertaken only in exceptional circumstances, ideally
383     * only when the queue is known not to be accessible by other
384     * threads.
385     *
386 jsr166 1.50 * @param o element to be removed from this queue, if present
387     * @return <tt>true</tt> if this queue changed as a result of the call
388     */
389     public boolean remove(Object o) {
390     if (o == null) return false;
391     final E[] items = this.items;
392     final ReentrantLock lock = this.lock;
393     lock.lock();
394     try {
395     int i = takeIndex;
396     int k = 0;
397     for (;;) {
398     if (k++ >= count)
399     return false;
400     if (o.equals(items[i])) {
401     removeAt(i);
402     return true;
403     }
404     i = inc(i);
405     }
406    
407     } finally {
408     lock.unlock();
409     }
410     }
411 dholmes 1.13
412 jsr166 1.50 /**
413     * Returns <tt>true</tt> if this queue contains the specified element.
414     * More formally, returns <tt>true</tt> if and only if this queue contains
415     * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
416     *
417     * @param o object to be checked for containment in this queue
418     * @return <tt>true</tt> if this queue contains the specified element
419     */
420 dholmes 1.21 public boolean contains(Object o) {
421     if (o == null) return false;
422 dl 1.36 final E[] items = this.items;
423     final ReentrantLock lock = this.lock;
424 dl 1.5 lock.lock();
425     try {
426     int i = takeIndex;
427     int k = 0;
428     while (k++ < count) {
429 dholmes 1.21 if (o.equals(items[i]))
430 dl 1.2 return true;
431 dl 1.5 i = inc(i);
432     }
433 dl 1.2 return false;
434 tim 1.23 } finally {
435 dl 1.5 lock.unlock();
436     }
437     }
438 brian 1.7
439 jsr166 1.50 /**
440     * Returns an array containing all of the elements in this queue, in
441     * proper sequence.
442     *
443     * <p>The returned array will be "safe" in that no references to it are
444     * maintained by this queue. (In other words, this method must allocate
445     * a new array). The caller is thus free to modify the returned array.
446 jsr166 1.51 *
447 jsr166 1.50 * <p>This method acts as bridge between array-based and collection-based
448     * APIs.
449     *
450     * @return an array containing all of the elements in this queue
451     */
452 dl 1.5 public Object[] toArray() {
453 dl 1.36 final E[] items = this.items;
454     final ReentrantLock lock = this.lock;
455 dl 1.5 lock.lock();
456     try {
457 dl 1.34 Object[] a = new Object[count];
458 dl 1.5 int k = 0;
459     int i = takeIndex;
460     while (k < count) {
461 dl 1.2 a[k++] = items[i];
462 dl 1.5 i = inc(i);
463     }
464 dl 1.2 return a;
465 tim 1.23 } finally {
466 dl 1.5 lock.unlock();
467     }
468     }
469 brian 1.7
470 jsr166 1.50 /**
471     * Returns an array containing all of the elements in this queue, in
472     * proper sequence; the runtime type of the returned array is that of
473     * the specified array. If the queue fits in the specified array, it
474     * is returned therein. Otherwise, a new array is allocated with the
475     * runtime type of the specified array and the size of this queue.
476     *
477     * <p>If this queue fits in the specified array with room to spare
478     * (i.e., the array has more elements than this queue), the element in
479     * the array immediately following the end of the queue is set to
480     * <tt>null</tt>.
481     *
482     * <p>Like the {@link #toArray()} method, this method acts as bridge between
483     * array-based and collection-based APIs. Further, this method allows
484     * precise control over the runtime type of the output array, and may,
485     * under certain circumstances, be used to save allocation costs.
486     *
487     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
488     * The following code can be used to dump the queue into a newly
489     * allocated array of <tt>String</tt>:
490     *
491     * <pre>
492     * String[] y = x.toArray(new String[0]);</pre>
493     *
494     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
495     * <tt>toArray()</tt>.
496     *
497     * @param a the array into which the elements of the queue are to
498     * be stored, if it is big enough; otherwise, a new array of the
499     * same runtime type is allocated for this purpose
500     * @return an array containing all of the elements in this queue
501     * @throws ArrayStoreException if the runtime type of the specified array
502     * is not a supertype of the runtime type of every element in
503     * this queue
504     * @throws NullPointerException if the specified array is null
505     */
506 dl 1.5 public <T> T[] toArray(T[] a) {
507 dl 1.36 final E[] items = this.items;
508     final ReentrantLock lock = this.lock;
509 dl 1.5 lock.lock();
510     try {
511     if (a.length < count)
512 dholmes 1.16 a = (T[])java.lang.reflect.Array.newInstance(
513 tim 1.17 a.getClass().getComponentType(),
514 dholmes 1.16 count
515     );
516 tim 1.12
517 dl 1.5 int k = 0;
518     int i = takeIndex;
519     while (k < count) {
520 dl 1.2 a[k++] = (T)items[i];
521 dl 1.5 i = inc(i);
522     }
523     if (a.length > count)
524     a[count] = null;
525 dl 1.2 return a;
526 tim 1.23 } finally {
527 dl 1.5 lock.unlock();
528     }
529     }
530 dl 1.6
531     public String toString() {
532 dl 1.36 final ReentrantLock lock = this.lock;
533 dl 1.6 lock.lock();
534     try {
535     return super.toString();
536 tim 1.23 } finally {
537 dl 1.6 lock.unlock();
538     }
539     }
540 tim 1.12
541 dl 1.44 /**
542     * Atomically removes all of the elements from this queue.
543     * The queue will be empty after this call returns.
544     */
545 dl 1.30 public void clear() {
546 dl 1.36 final E[] items = this.items;
547     final ReentrantLock lock = this.lock;
548 dl 1.30 lock.lock();
549     try {
550     int i = takeIndex;
551     int k = count;
552     while (k-- > 0) {
553     items[i] = null;
554     i = inc(i);
555     }
556     count = 0;
557     putIndex = 0;
558     takeIndex = 0;
559     notFull.signalAll();
560     } finally {
561     lock.unlock();
562     }
563     }
564    
565 jsr166 1.50 /**
566     * @throws UnsupportedOperationException {@inheritDoc}
567     * @throws ClassCastException {@inheritDoc}
568     * @throws NullPointerException {@inheritDoc}
569     * @throws IllegalArgumentException {@inheritDoc}
570     */
571 dl 1.30 public int drainTo(Collection<? super E> c) {
572     if (c == null)
573     throw new NullPointerException();
574     if (c == this)
575     throw new IllegalArgumentException();
576 dl 1.36 final E[] items = this.items;
577     final ReentrantLock lock = this.lock;
578 dl 1.30 lock.lock();
579     try {
580     int i = takeIndex;
581     int n = 0;
582     int max = count;
583     while (n < max) {
584     c.add(items[i]);
585     items[i] = null;
586     i = inc(i);
587     ++n;
588     }
589     if (n > 0) {
590     count = 0;
591     putIndex = 0;
592     takeIndex = 0;
593     notFull.signalAll();
594     }
595     return n;
596     } finally {
597     lock.unlock();
598     }
599     }
600    
601 jsr166 1.50 /**
602     * @throws UnsupportedOperationException {@inheritDoc}
603     * @throws ClassCastException {@inheritDoc}
604     * @throws NullPointerException {@inheritDoc}
605     * @throws IllegalArgumentException {@inheritDoc}
606     */
607 dl 1.30 public int drainTo(Collection<? super E> c, int maxElements) {
608     if (c == null)
609     throw new NullPointerException();
610     if (c == this)
611     throw new IllegalArgumentException();
612     if (maxElements <= 0)
613     return 0;
614 dl 1.36 final E[] items = this.items;
615     final ReentrantLock lock = this.lock;
616 dl 1.30 lock.lock();
617     try {
618     int i = takeIndex;
619     int n = 0;
620     int sz = count;
621 jsr166 1.58 int max = (maxElements < count) ? maxElements : count;
622 dl 1.30 while (n < max) {
623     c.add(items[i]);
624     items[i] = null;
625     i = inc(i);
626     ++n;
627     }
628     if (n > 0) {
629     count -= n;
630     takeIndex = i;
631     notFull.signalAll();
632     }
633     return n;
634     } finally {
635     lock.unlock();
636     }
637     }
638    
639    
640 brian 1.7 /**
641 dl 1.60 * Returns an iterator over the elements in this queue in proper
642     * sequence. The returned <tt>Iterator</tt> is "weakly
643     * consistent" with respect to operations at the head and tail of
644     * the queue, and will never throw {@link
645     * ConcurrentModificationException}. It might return elements
646     * that existed upon construction of the iterator but have since
647     * been polled or taken, and might not return elements that have
648     * since been added. Further, no consistency guarantees are made
649     * with respect to "interior" removals occuring in concurrent
650     * invocations of {@link Collection#remove(Object)} or {@link
651     * Iterator#remove} occurring in other threads.
652     *
653 jsr166 1.63 * <p>The returned iterator supports the optional {@link Iterator#remove}
654 dl 1.60 * operation. However, removal of interior elements in circular
655     * array based queues is an intrinsically slow and disruptive
656     * operation, so should be undertaken only in exceptional
657     * circumstances, ideally only when the queue is known not to be
658     * accessible by other threads.
659 brian 1.7 *
660 jsr166 1.50 * @return an iterator over the elements in this queue in proper sequence
661 brian 1.7 */
662 dl 1.5 public Iterator<E> iterator() {
663 dl 1.36 final ReentrantLock lock = this.lock;
664 dl 1.5 lock.lock();
665     try {
666 dl 1.2 return new Itr();
667 tim 1.23 } finally {
668 dl 1.5 lock.unlock();
669     }
670     }
671 dl 1.8
672     /**
673 dl 1.60 * Iterator for ArrayBlockingQueue. To maintain weak consistency
674     * with respect to puts and takes, we (1) read ahead one slot, so
675     * as to not report hasNext true but then not have an element to
676     * return (2) ensure that each array slot is traversed at most
677     * once (by tracking "remaining" elements); (3) skip over null
678     * slots, which can occur if takes race ahead of iterators.
679     * However, for circular array-based queues, we cannot rely on any
680     * well established definition of what it means to be weakly
681     * consistent with respect to interior removes since these may
682     * require slot overwrites in the process of sliding elements to
683     * cover gaps. So we settle for resiliency, operating on
684 jsr166 1.62 * established apparent nexts, which may miss some elements that
685     * have moved between calls to next.
686 dl 1.8 */
687 dl 1.5 private class Itr implements Iterator<E> {
688 dl 1.60 private int remaining; // Number of elements yet to be returned
689     private int nextIndex; // Index of element to be returned by next
690     private E nextItem; // Element to be returned by next call to next
691     private E lastItem; // Element returned by last call to next
692     private int lastRet; // Index of last element returned, or -1 if none
693 tim 1.12
694 dl 1.5 Itr() {
695     lastRet = -1;
696 dl 1.60 if ((remaining = count) > 0)
697     nextItem = items[nextIndex = takeIndex];
698 dl 1.5 }
699 tim 1.12
700 dl 1.5 public boolean hasNext() {
701 dl 1.60 return remaining > 0;
702 dl 1.5 }
703 tim 1.12
704 dl 1.5 public E next() {
705 dl 1.60 if (remaining <= 0)
706     throw new NoSuchElementException();
707 dl 1.36 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
708 dl 1.5 lock.lock();
709     try {
710     lastRet = nextIndex;
711 dl 1.60 E x = lastItem = nextItem;
712     while (--remaining > 0) {
713     if ((nextItem = items[nextIndex = inc(nextIndex)]) != null)
714     break;
715     }
716 dl 1.5 return x;
717 tim 1.23 } finally {
718 tim 1.12 lock.unlock();
719 dl 1.2 }
720 dl 1.5 }
721 tim 1.12
722 dl 1.5 public void remove() {
723 dl 1.36 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
724 dl 1.5 lock.lock();
725     try {
726 dl 1.2 int i = lastRet;
727     if (i == -1)
728     throw new IllegalStateException();
729     lastRet = -1;
730 jsr166 1.61 E x = lastItem;
731     lastItem = null;
732     if (x == items[i])
733 dl 1.60 removeAt(i); // only remove if item still at index
734 tim 1.23 } finally {
735 dl 1.5 lock.unlock();
736     }
737     }
738 tim 1.1 }
739 dl 1.60
740 tim 1.1 }