ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.58
Committed: Mon Jul 12 20:15:19 2010 UTC (13 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.57: +27 -53 lines
Log Message:
cleaner Condition-handling

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.38 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.11 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.25 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13     * array. This queue orders elements FIFO (first-in-first-out). The
14     * <em>head</em> of the queue is that element that has been on the
15     * queue the longest time. The <em>tail</em> of the queue is that
16     * element that has been on the queue the shortest time. New elements
17     * are inserted at the tail of the queue, and the queue retrieval
18     * operations obtain elements at the head of the queue.
19 dholmes 1.13 *
20 dl 1.40 * <p>This is a classic &quot;bounded buffer&quot;, in which a
21     * fixed-sized array holds elements inserted by producers and
22     * extracted by consumers. Once created, the capacity cannot be
23 jsr166 1.50 * increased. Attempts to <tt>put</tt> an element into a full queue
24     * will result in the operation blocking; attempts to <tt>take</tt> an
25 dl 1.40 * element from an empty queue will similarly block.
26 dl 1.11 *
27     * <p> This class supports an optional fairness policy for ordering
28 dl 1.42 * waiting producer and consumer threads. By default, this ordering
29     * is not guaranteed. However, a queue constructed with fairness set
30     * to <tt>true</tt> grants threads access in FIFO order. Fairness
31     * generally decreases throughput but reduces variability and avoids
32     * starvation.
33 brian 1.7 *
34 dl 1.43 * <p>This class and its iterator implement all of the
35     * <em>optional</em> methods of the {@link Collection} and {@link
36 jsr166 1.47 * Iterator} interfaces.
37 dl 1.26 *
38 dl 1.41 * <p>This class is a member of the
39 jsr166 1.54 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 dl 1.41 * Java Collections Framework</a>.
41     *
42 dl 1.8 * @since 1.5
43     * @author Doug Lea
44 dl 1.33 * @param <E> the type of elements held in this collection
45 dl 1.8 */
46 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
48    
49 dl 1.36 /**
50 dl 1.42 * Serialization ID. This class relies on default serialization
51     * even for the items array, which is default-serialized, even if
52     * it is empty. Otherwise it could not be declared final, which is
53 dl 1.36 * necessary here.
54     */
55     private static final long serialVersionUID = -817911632652898426L;
56    
57     /** The queued items */
58 jsr166 1.58 final E[] items;
59 dl 1.8 /** items index for next take, poll or remove */
60 jsr166 1.58 int takeIndex;
61 dl 1.8 /** items index for next put, offer, or add. */
62 jsr166 1.58 int putIndex;
63 dl 1.8 /** Number of items in the queue */
64 dl 1.5 private int count;
65 tim 1.12
66 dl 1.5 /*
67 dl 1.36 * Concurrency control uses the classic two-condition algorithm
68 dl 1.5 * found in any textbook.
69     */
70    
71 dl 1.11 /** Main lock guarding all access */
72 jsr166 1.58 final ReentrantLock lock;
73 dholmes 1.13 /** Condition for waiting takes */
74 dl 1.37 private final Condition notEmpty;
75 dl 1.35 /** Condition for waiting puts */
76 dl 1.37 private final Condition notFull;
77 dl 1.5
78 jsr166 1.58 /** Predicate for the notEmpty condition */
79     boolean empty() { return count == 0; }
80     /** Predicate for the notFull condition */
81     boolean full() { return count == items.length; }
82    
83 dl 1.5 // Internal helper methods
84    
85     /**
86     * Circularly increment i.
87     */
88 dl 1.40 final int inc(int i) {
89 jsr166 1.58 return (++i == items.length) ? 0 : i;
90 dl 1.5 }
91    
92     /**
93 jsr166 1.47 * Inserts element at current put position, advances, and signals.
94 dl 1.9 * Call only when holding lock.
95 dl 1.5 */
96     private void insert(E x) {
97     items[putIndex] = x;
98     putIndex = inc(putIndex);
99     ++count;
100 dl 1.9 notEmpty.signal();
101 tim 1.1 }
102 tim 1.12
103 dl 1.5 /**
104 jsr166 1.47 * Extracts element at current take position, advances, and signals.
105 dl 1.9 * Call only when holding lock.
106 dl 1.5 */
107 dholmes 1.13 private E extract() {
108 dl 1.36 final E[] items = this.items;
109 dl 1.5 E x = items[takeIndex];
110     items[takeIndex] = null;
111     takeIndex = inc(takeIndex);
112     --count;
113 dl 1.9 notFull.signal();
114 dl 1.5 return x;
115     }
116    
117     /**
118 tim 1.12 * Utility for remove and iterator.remove: Delete item at position i.
119 dl 1.9 * Call only when holding lock.
120 dl 1.5 */
121     void removeAt(int i) {
122 dl 1.36 final E[] items = this.items;
123 dl 1.9 // if removing front item, just advance
124     if (i == takeIndex) {
125     items[takeIndex] = null;
126     takeIndex = inc(takeIndex);
127 tim 1.23 } else {
128 dl 1.9 // slide over all others up through putIndex.
129     for (;;) {
130     int nexti = inc(i);
131     if (nexti != putIndex) {
132     items[i] = items[nexti];
133     i = nexti;
134 tim 1.23 } else {
135 dl 1.9 items[i] = null;
136     putIndex = i;
137     break;
138     }
139 dl 1.5 }
140     }
141 dl 1.9 --count;
142     notFull.signal();
143 tim 1.1 }
144    
145     /**
146 dholmes 1.21 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
147 dholmes 1.13 * capacity and default access policy.
148 jsr166 1.50 *
149 dholmes 1.13 * @param capacity the capacity of this queue
150 dholmes 1.16 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
151 dl 1.5 */
152 dholmes 1.13 public ArrayBlockingQueue(int capacity) {
153 dl 1.36 this(capacity, false);
154 dl 1.5 }
155 dl 1.2
156 dl 1.5 /**
157 dholmes 1.21 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
158 dholmes 1.13 * capacity and the specified access policy.
159 jsr166 1.50 *
160 dholmes 1.13 * @param capacity the capacity of this queue
161     * @param fair if <tt>true</tt> then queue accesses for threads blocked
162 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
163     * if <tt>false</tt> the access order is unspecified.
164 dholmes 1.16 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
165 dl 1.11 */
166 dholmes 1.13 public ArrayBlockingQueue(int capacity, boolean fair) {
167 dl 1.36 if (capacity <= 0)
168     throw new IllegalArgumentException();
169     this.items = (E[]) new Object[capacity];
170     lock = new ReentrantLock(fair);
171     notEmpty = lock.newCondition();
172     notFull = lock.newCondition();
173 dl 1.5 }
174    
175 dholmes 1.16 /**
176 dholmes 1.21 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
177     * capacity, the specified access policy and initially containing the
178 tim 1.17 * elements of the given collection,
179 dholmes 1.16 * added in traversal order of the collection's iterator.
180 jsr166 1.50 *
181 dholmes 1.16 * @param capacity the capacity of this queue
182     * @param fair if <tt>true</tt> then queue accesses for threads blocked
183 jsr166 1.50 * on insertion or removal, are processed in FIFO order;
184     * if <tt>false</tt> the access order is unspecified.
185 dholmes 1.16 * @param c the collection of elements to initially contain
186     * @throws IllegalArgumentException if <tt>capacity</tt> is less than
187 jsr166 1.50 * <tt>c.size()</tt>, or less than 1.
188     * @throws NullPointerException if the specified collection or any
189     * of its elements are null
190 dholmes 1.16 */
191 tim 1.20 public ArrayBlockingQueue(int capacity, boolean fair,
192 dholmes 1.18 Collection<? extends E> c) {
193 dl 1.36 this(capacity, fair);
194 dholmes 1.16 if (capacity < c.size())
195     throw new IllegalArgumentException();
196    
197 jsr166 1.57 for (E e : c)
198     add(e);
199 dholmes 1.16 }
200 dl 1.2
201 dholmes 1.13 /**
202 jsr166 1.50 * Inserts the specified element at the tail of this queue if it is
203     * possible to do so immediately without exceeding the queue's capacity,
204     * returning <tt>true</tt> upon success and throwing an
205     * <tt>IllegalStateException</tt> if this queue is full.
206 dl 1.25 *
207 jsr166 1.50 * @param e the element to add
208 jsr166 1.52 * @return <tt>true</tt> (as specified by {@link Collection#add})
209 jsr166 1.50 * @throws IllegalStateException if this queue is full
210     * @throws NullPointerException if the specified element is null
211     */
212     public boolean add(E e) {
213 jsr166 1.56 return super.add(e);
214 jsr166 1.50 }
215    
216     /**
217     * Inserts the specified element at the tail of this queue if it is
218     * possible to do so immediately without exceeding the queue's capacity,
219     * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
220     * is full. This method is generally preferable to method {@link #add},
221     * which can fail to insert an element only by throwing an exception.
222     *
223     * @throws NullPointerException if the specified element is null
224 dholmes 1.13 */
225 jsr166 1.49 public boolean offer(E e) {
226     if (e == null) throw new NullPointerException();
227 dl 1.36 final ReentrantLock lock = this.lock;
228 dl 1.5 lock.lock();
229     try {
230 jsr166 1.58 if (full())
231 dl 1.2 return false;
232 dl 1.5 else {
233 jsr166 1.49 insert(e);
234 dl 1.5 return true;
235     }
236 tim 1.23 } finally {
237 tim 1.12 lock.unlock();
238 dl 1.2 }
239 dl 1.5 }
240 dl 1.2
241 dholmes 1.13 /**
242 jsr166 1.50 * Inserts the specified element at the tail of this queue, waiting
243     * for space to become available if the queue is full.
244     *
245     * @throws InterruptedException {@inheritDoc}
246     * @throws NullPointerException {@inheritDoc}
247     */
248     public void put(E e) throws InterruptedException {
249     if (e == null) throw new NullPointerException();
250     final ReentrantLock lock = this.lock;
251     lock.lockInterruptibly();
252     try {
253 jsr166 1.58 while (full())
254     notFull.await();
255 jsr166 1.50 insert(e);
256     } finally {
257     lock.unlock();
258     }
259     }
260    
261     /**
262     * Inserts the specified element at the tail of this queue, waiting
263     * up to the specified wait time for space to become available if
264     * the queue is full.
265     *
266     * @throws InterruptedException {@inheritDoc}
267     * @throws NullPointerException {@inheritDoc}
268 brian 1.7 */
269 jsr166 1.49 public boolean offer(E e, long timeout, TimeUnit unit)
270 dholmes 1.13 throws InterruptedException {
271 dl 1.2
272 jsr166 1.49 if (e == null) throw new NullPointerException();
273 jsr166 1.56 long nanos = unit.toNanos(timeout);
274 dl 1.36 final ReentrantLock lock = this.lock;
275 dl 1.5 lock.lockInterruptibly();
276     try {
277 jsr166 1.58 while (full()) {
278 dl 1.5 if (nanos <= 0)
279     return false;
280 jsr166 1.58 nanos = notFull.awaitNanos(nanos);
281 dl 1.5 }
282 jsr166 1.58 insert(e);
283     return true;
284 tim 1.23 } finally {
285 dl 1.5 lock.unlock();
286 dl 1.2 }
287 dl 1.5 }
288 dl 1.2
289 dholmes 1.13 public E poll() {
290 dl 1.36 final ReentrantLock lock = this.lock;
291 dholmes 1.13 lock.lock();
292     try {
293 jsr166 1.58 return empty() ? null : extract();
294 tim 1.23 } finally {
295 tim 1.15 lock.unlock();
296 dholmes 1.13 }
297     }
298    
299 jsr166 1.50 public E take() throws InterruptedException {
300     final ReentrantLock lock = this.lock;
301     lock.lockInterruptibly();
302     try {
303 jsr166 1.58 while (empty())
304     notEmpty.await();
305     return extract();
306 jsr166 1.50 } finally {
307     lock.unlock();
308     }
309     }
310    
311 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
312 jsr166 1.56 long nanos = unit.toNanos(timeout);
313 dl 1.36 final ReentrantLock lock = this.lock;
314 dl 1.5 lock.lockInterruptibly();
315     try {
316 jsr166 1.58 while (empty()) {
317 dl 1.5 if (nanos <= 0)
318     return null;
319 jsr166 1.58 nanos = notEmpty.awaitNanos(nanos);
320 dl 1.2 }
321 jsr166 1.58 return extract();
322 tim 1.23 } finally {
323 dl 1.5 lock.unlock();
324     }
325     }
326 dl 1.2
327 dholmes 1.13 public E peek() {
328 dl 1.36 final ReentrantLock lock = this.lock;
329 dholmes 1.13 lock.lock();
330     try {
331 jsr166 1.58 return empty() ? null : items[takeIndex];
332 tim 1.23 } finally {
333 dholmes 1.13 lock.unlock();
334     }
335     }
336    
337     // this doc comment is overridden to remove the reference to collections
338     // greater in size than Integer.MAX_VALUE
339 tim 1.15 /**
340 dl 1.25 * Returns the number of elements in this queue.
341     *
342 jsr166 1.50 * @return the number of elements in this queue
343 dholmes 1.13 */
344     public int size() {
345 dl 1.36 final ReentrantLock lock = this.lock;
346 dholmes 1.13 lock.lock();
347     try {
348     return count;
349 tim 1.23 } finally {
350 dholmes 1.13 lock.unlock();
351     }
352     }
353    
354     // this doc comment is a modified copy of the inherited doc comment,
355     // without the reference to unlimited queues.
356 tim 1.15 /**
357 jsr166 1.48 * Returns the number of additional elements that this queue can ideally
358     * (in the absence of memory or resource constraints) accept without
359 dholmes 1.13 * blocking. This is always equal to the initial capacity of this queue
360     * less the current <tt>size</tt> of this queue.
361 jsr166 1.48 *
362     * <p>Note that you <em>cannot</em> always tell if an attempt to insert
363     * an element will succeed by inspecting <tt>remainingCapacity</tt>
364     * because it may be the case that another thread is about to
365 jsr166 1.50 * insert or remove an element.
366 dholmes 1.13 */
367     public int remainingCapacity() {
368 dl 1.36 final ReentrantLock lock = this.lock;
369 dholmes 1.13 lock.lock();
370     try {
371     return items.length - count;
372 tim 1.23 } finally {
373 dholmes 1.13 lock.unlock();
374     }
375     }
376    
377 jsr166 1.50 /**
378     * Removes a single instance of the specified element from this queue,
379     * if it is present. More formally, removes an element <tt>e</tt> such
380     * that <tt>o.equals(e)</tt>, if this queue contains one or more such
381     * elements.
382     * Returns <tt>true</tt> if this queue contained the specified element
383     * (or equivalently, if this queue changed as a result of the call).
384     *
385     * @param o element to be removed from this queue, if present
386     * @return <tt>true</tt> if this queue changed as a result of the call
387     */
388     public boolean remove(Object o) {
389     if (o == null) return false;
390     final E[] items = this.items;
391     final ReentrantLock lock = this.lock;
392     lock.lock();
393     try {
394     int i = takeIndex;
395     int k = 0;
396     for (;;) {
397     if (k++ >= count)
398     return false;
399     if (o.equals(items[i])) {
400     removeAt(i);
401     return true;
402     }
403     i = inc(i);
404     }
405    
406     } finally {
407     lock.unlock();
408     }
409     }
410 dholmes 1.13
411 jsr166 1.50 /**
412     * Returns <tt>true</tt> if this queue contains the specified element.
413     * More formally, returns <tt>true</tt> if and only if this queue contains
414     * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
415     *
416     * @param o object to be checked for containment in this queue
417     * @return <tt>true</tt> if this queue contains the specified element
418     */
419 dholmes 1.21 public boolean contains(Object o) {
420     if (o == null) return false;
421 dl 1.36 final E[] items = this.items;
422     final ReentrantLock lock = this.lock;
423 dl 1.5 lock.lock();
424     try {
425     int i = takeIndex;
426     int k = 0;
427     while (k++ < count) {
428 dholmes 1.21 if (o.equals(items[i]))
429 dl 1.2 return true;
430 dl 1.5 i = inc(i);
431     }
432 dl 1.2 return false;
433 tim 1.23 } finally {
434 dl 1.5 lock.unlock();
435     }
436     }
437 brian 1.7
438 jsr166 1.50 /**
439     * Returns an array containing all of the elements in this queue, in
440     * proper sequence.
441     *
442     * <p>The returned array will be "safe" in that no references to it are
443     * maintained by this queue. (In other words, this method must allocate
444     * a new array). The caller is thus free to modify the returned array.
445 jsr166 1.51 *
446 jsr166 1.50 * <p>This method acts as bridge between array-based and collection-based
447     * APIs.
448     *
449     * @return an array containing all of the elements in this queue
450     */
451 dl 1.5 public Object[] toArray() {
452 dl 1.36 final E[] items = this.items;
453     final ReentrantLock lock = this.lock;
454 dl 1.5 lock.lock();
455     try {
456 dl 1.34 Object[] a = new Object[count];
457 dl 1.5 int k = 0;
458     int i = takeIndex;
459     while (k < count) {
460 dl 1.2 a[k++] = items[i];
461 dl 1.5 i = inc(i);
462     }
463 dl 1.2 return a;
464 tim 1.23 } finally {
465 dl 1.5 lock.unlock();
466     }
467     }
468 brian 1.7
469 jsr166 1.50 /**
470     * Returns an array containing all of the elements in this queue, in
471     * proper sequence; the runtime type of the returned array is that of
472     * the specified array. If the queue fits in the specified array, it
473     * is returned therein. Otherwise, a new array is allocated with the
474     * runtime type of the specified array and the size of this queue.
475     *
476     * <p>If this queue fits in the specified array with room to spare
477     * (i.e., the array has more elements than this queue), the element in
478     * the array immediately following the end of the queue is set to
479     * <tt>null</tt>.
480     *
481     * <p>Like the {@link #toArray()} method, this method acts as bridge between
482     * array-based and collection-based APIs. Further, this method allows
483     * precise control over the runtime type of the output array, and may,
484     * under certain circumstances, be used to save allocation costs.
485     *
486     * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
487     * The following code can be used to dump the queue into a newly
488     * allocated array of <tt>String</tt>:
489     *
490     * <pre>
491     * String[] y = x.toArray(new String[0]);</pre>
492     *
493     * Note that <tt>toArray(new Object[0])</tt> is identical in function to
494     * <tt>toArray()</tt>.
495     *
496     * @param a the array into which the elements of the queue are to
497     * be stored, if it is big enough; otherwise, a new array of the
498     * same runtime type is allocated for this purpose
499     * @return an array containing all of the elements in this queue
500     * @throws ArrayStoreException if the runtime type of the specified array
501     * is not a supertype of the runtime type of every element in
502     * this queue
503     * @throws NullPointerException if the specified array is null
504     */
505 dl 1.5 public <T> T[] toArray(T[] a) {
506 dl 1.36 final E[] items = this.items;
507     final ReentrantLock lock = this.lock;
508 dl 1.5 lock.lock();
509     try {
510     if (a.length < count)
511 dholmes 1.16 a = (T[])java.lang.reflect.Array.newInstance(
512 tim 1.17 a.getClass().getComponentType(),
513 dholmes 1.16 count
514     );
515 tim 1.12
516 dl 1.5 int k = 0;
517     int i = takeIndex;
518     while (k < count) {
519 dl 1.2 a[k++] = (T)items[i];
520 dl 1.5 i = inc(i);
521     }
522     if (a.length > count)
523     a[count] = null;
524 dl 1.2 return a;
525 tim 1.23 } finally {
526 dl 1.5 lock.unlock();
527     }
528     }
529 dl 1.6
530     public String toString() {
531 dl 1.36 final ReentrantLock lock = this.lock;
532 dl 1.6 lock.lock();
533     try {
534     return super.toString();
535 tim 1.23 } finally {
536 dl 1.6 lock.unlock();
537     }
538     }
539 tim 1.12
540 dl 1.44 /**
541     * Atomically removes all of the elements from this queue.
542     * The queue will be empty after this call returns.
543     */
544 dl 1.30 public void clear() {
545 dl 1.36 final E[] items = this.items;
546     final ReentrantLock lock = this.lock;
547 dl 1.30 lock.lock();
548     try {
549     int i = takeIndex;
550     int k = count;
551     while (k-- > 0) {
552     items[i] = null;
553     i = inc(i);
554     }
555     count = 0;
556     putIndex = 0;
557     takeIndex = 0;
558     notFull.signalAll();
559     } finally {
560     lock.unlock();
561     }
562     }
563    
564 jsr166 1.50 /**
565     * @throws UnsupportedOperationException {@inheritDoc}
566     * @throws ClassCastException {@inheritDoc}
567     * @throws NullPointerException {@inheritDoc}
568     * @throws IllegalArgumentException {@inheritDoc}
569     */
570 dl 1.30 public int drainTo(Collection<? super E> c) {
571     if (c == null)
572     throw new NullPointerException();
573     if (c == this)
574     throw new IllegalArgumentException();
575 dl 1.36 final E[] items = this.items;
576     final ReentrantLock lock = this.lock;
577 dl 1.30 lock.lock();
578     try {
579     int i = takeIndex;
580     int n = 0;
581     int max = count;
582     while (n < max) {
583     c.add(items[i]);
584     items[i] = null;
585     i = inc(i);
586     ++n;
587     }
588     if (n > 0) {
589     count = 0;
590     putIndex = 0;
591     takeIndex = 0;
592     notFull.signalAll();
593     }
594     return n;
595     } finally {
596     lock.unlock();
597     }
598     }
599    
600 jsr166 1.50 /**
601     * @throws UnsupportedOperationException {@inheritDoc}
602     * @throws ClassCastException {@inheritDoc}
603     * @throws NullPointerException {@inheritDoc}
604     * @throws IllegalArgumentException {@inheritDoc}
605     */
606 dl 1.30 public int drainTo(Collection<? super E> c, int maxElements) {
607     if (c == null)
608     throw new NullPointerException();
609     if (c == this)
610     throw new IllegalArgumentException();
611     if (maxElements <= 0)
612     return 0;
613 dl 1.36 final E[] items = this.items;
614     final ReentrantLock lock = this.lock;
615 dl 1.30 lock.lock();
616     try {
617     int i = takeIndex;
618     int n = 0;
619     int sz = count;
620 jsr166 1.58 int max = (maxElements < count) ? maxElements : count;
621 dl 1.30 while (n < max) {
622     c.add(items[i]);
623     items[i] = null;
624     i = inc(i);
625     ++n;
626     }
627     if (n > 0) {
628     count -= n;
629     takeIndex = i;
630     notFull.signalAll();
631     }
632     return n;
633     } finally {
634     lock.unlock();
635     }
636     }
637    
638    
639 brian 1.7 /**
640     * Returns an iterator over the elements in this queue in proper sequence.
641 dl 1.22 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
642 jsr166 1.47 * will never throw {@link ConcurrentModificationException},
643 dl 1.22 * and guarantees to traverse elements as they existed upon
644     * construction of the iterator, and may (but is not guaranteed to)
645     * reflect any modifications subsequent to construction.
646 brian 1.7 *
647 jsr166 1.50 * @return an iterator over the elements in this queue in proper sequence
648 brian 1.7 */
649 dl 1.5 public Iterator<E> iterator() {
650 dl 1.36 final ReentrantLock lock = this.lock;
651 dl 1.5 lock.lock();
652     try {
653 dl 1.2 return new Itr();
654 tim 1.23 } finally {
655 dl 1.5 lock.unlock();
656     }
657     }
658 dl 1.8
659     /**
660     * Iterator for ArrayBlockingQueue
661     */
662 dl 1.5 private class Itr implements Iterator<E> {
663     /**
664     * Index of element to be returned by next,
665     * or a negative number if no such.
666     */
667 dl 1.8 private int nextIndex;
668 dl 1.2
669 tim 1.12 /**
670 dl 1.5 * nextItem holds on to item fields because once we claim
671     * that an element exists in hasNext(), we must return it in
672     * the following next() call even if it was in the process of
673     * being removed when hasNext() was called.
674 jsr166 1.47 */
675 dl 1.8 private E nextItem;
676 dl 1.5
677     /**
678     * Index of element returned by most recent call to next.
679     * Reset to -1 if this element is deleted by a call to remove.
680     */
681 dl 1.8 private int lastRet;
682 tim 1.12
683 dl 1.5 Itr() {
684     lastRet = -1;
685 jsr166 1.58 if (empty())
686 dl 1.5 nextIndex = -1;
687     else {
688     nextIndex = takeIndex;
689     nextItem = items[takeIndex];
690     }
691     }
692 tim 1.12
693 dl 1.5 public boolean hasNext() {
694     /*
695     * No sync. We can return true by mistake here
696     * only if this iterator passed across threads,
697     * which we don't support anyway.
698 dl 1.2 */
699 dl 1.5 return nextIndex >= 0;
700     }
701    
702     /**
703 jsr166 1.47 * Checks whether nextIndex is valid; if so setting nextItem.
704 dl 1.5 * Stops iterator when either hits putIndex or sees null item.
705     */
706     private void checkNext() {
707     if (nextIndex == putIndex) {
708     nextIndex = -1;
709     nextItem = null;
710 tim 1.23 } else {
711 dl 1.5 nextItem = items[nextIndex];
712     if (nextItem == null)
713     nextIndex = -1;
714 dl 1.2 }
715 dl 1.5 }
716 tim 1.12
717 dl 1.5 public E next() {
718 dl 1.36 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
719 dl 1.5 lock.lock();
720     try {
721     if (nextIndex < 0)
722 dl 1.2 throw new NoSuchElementException();
723 dl 1.5 lastRet = nextIndex;
724     E x = nextItem;
725     nextIndex = inc(nextIndex);
726     checkNext();
727     return x;
728 tim 1.23 } finally {
729 tim 1.12 lock.unlock();
730 dl 1.2 }
731 dl 1.5 }
732 tim 1.12
733 dl 1.5 public void remove() {
734 dl 1.36 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
735 dl 1.5 lock.lock();
736     try {
737 dl 1.2 int i = lastRet;
738     if (i == -1)
739     throw new IllegalStateException();
740     lastRet = -1;
741 tim 1.12
742 dl 1.9 int ti = takeIndex;
743 dl 1.2 removeAt(i);
744 dl 1.9 // back up cursor (reset to front if was first element)
745 tim 1.12 nextIndex = (i == ti) ? takeIndex : i;
746 dl 1.5 checkNext();
747 tim 1.23 } finally {
748 dl 1.5 lock.unlock();
749     }
750     }
751 tim 1.1 }
752     }