ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.29
Committed: Fri Sep 26 11:37:10 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.28: +1 -1 lines
Log Message:
Spellcheck

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.11 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.25 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13     * array. This queue orders elements FIFO (first-in-first-out). The
14     * <em>head</em> of the queue is that element that has been on the
15     * queue the longest time. The <em>tail</em> of the queue is that
16     * element that has been on the queue the shortest time. New elements
17     * are inserted at the tail of the queue, and the queue retrieval
18     * operations obtain elements at the head of the queue.
19 dholmes 1.13 *
20 tim 1.15 * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
21 dholmes 1.13 * array holds
22 dl 1.11 * elements inserted by producers and extracted by consumers. Once
23     * created, the capacity can not be increased. Attempts to offer an
24     * element to a full queue will result in the offer operation
25     * blocking; attempts to retrieve an element from an empty queue will
26 tim 1.12 * similarly block.
27 dl 1.11 *
28     * <p> This class supports an optional fairness policy for ordering
29     * threads blocked on an insertion or removal. By default, this
30     * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
31     * constructed with fairness set to <tt>true</tt> grants blocked
32 dl 1.25 * threads access in FIFO order. Fairness generally decreases
33 dl 1.29 * throughput but reduces variability and avoids starvation.
34 brian 1.7 *
35 dl 1.26 * <p>This class implements all of the <em>optional</em> methods
36     * of the {@link Collection} and {@link Iterator} interfaces.
37     *
38 dl 1.8 * @since 1.5
39     * @author Doug Lea
40     */
41 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
42 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
43 dl 1.24 private static final long serialVersionUID = -817911632652898425L;
44 tim 1.1
45 dl 1.8 /** The queued items */
46 tim 1.12 private transient final E[] items;
47 dl 1.8 /** items index for next take, poll or remove */
48 tim 1.12 private transient int takeIndex;
49 dl 1.8 /** items index for next put, offer, or add. */
50 tim 1.12 private transient int putIndex;
51 dl 1.8 /** Number of items in the queue */
52 dl 1.5 private int count;
53 tim 1.12
54 dl 1.5 /**
55     * An array used only during deserialization, to hold
56     * items read back in from the stream, and then used
57     * as "items" by readResolve via the private constructor.
58     */
59     private transient E[] deserializedItems;
60 tim 1.12
61 dl 1.5 /*
62     * Concurrency control via the classic two-condition algorithm
63     * found in any textbook.
64     */
65    
66 dl 1.11 /** Main lock guarding all access */
67     private final ReentrantLock lock;
68 dholmes 1.13 /** Condition for waiting takes */
69 dl 1.11 private final Condition notEmpty;
70 dl 1.8 /** Condition for wiating puts */
71 dl 1.11 private final Condition notFull;
72 dl 1.5
73     // Internal helper methods
74    
75     /**
76     * Circularly increment i.
77     */
78     int inc(int i) {
79     return (++i == items.length)? 0 : i;
80     }
81    
82     /**
83 dl 1.9 * Insert element at current put position, advance, and signal.
84     * Call only when holding lock.
85 dl 1.5 */
86     private void insert(E x) {
87     items[putIndex] = x;
88     putIndex = inc(putIndex);
89     ++count;
90 dl 1.9 notEmpty.signal();
91 tim 1.1 }
92 tim 1.12
93 dl 1.5 /**
94 dl 1.9 * Extract element at current take position, advance, and signal.
95     * Call only when holding lock.
96 dl 1.5 */
97 dholmes 1.13 private E extract() {
98 dl 1.5 E x = items[takeIndex];
99     items[takeIndex] = null;
100     takeIndex = inc(takeIndex);
101     --count;
102 dl 1.9 notFull.signal();
103 dl 1.5 return x;
104     }
105    
106     /**
107 tim 1.12 * Utility for remove and iterator.remove: Delete item at position i.
108 dl 1.9 * Call only when holding lock.
109 dl 1.5 */
110     void removeAt(int i) {
111 dl 1.9 // if removing front item, just advance
112     if (i == takeIndex) {
113     items[takeIndex] = null;
114     takeIndex = inc(takeIndex);
115 tim 1.23 } else {
116 dl 1.9 // slide over all others up through putIndex.
117     for (;;) {
118     int nexti = inc(i);
119     if (nexti != putIndex) {
120     items[i] = items[nexti];
121     i = nexti;
122 tim 1.23 } else {
123 dl 1.9 items[i] = null;
124     putIndex = i;
125     break;
126     }
127 dl 1.5 }
128     }
129 dl 1.9 --count;
130     notFull.signal();
131 tim 1.1 }
132    
133     /**
134 tim 1.12 * Internal constructor also used by readResolve.
135 dl 1.5 * Sets all final fields, plus count.
136 tim 1.15 * @param cap the capacity
137 dl 1.5 * @param array the array to use or null if should create new one
138     * @param count the number of items in the array, where indices 0
139     * to count-1 hold items.
140 dholmes 1.16 * @param lk the lock to use with this queue
141 dl 1.5 */
142 tim 1.15 private ArrayBlockingQueue(int cap, E[] array, int count,
143 dholmes 1.13 ReentrantLock lk) {
144 tim 1.15 if (cap <= 0)
145 dl 1.5 throw new IllegalArgumentException();
146     if (array == null)
147 tim 1.12 this.items = (E[]) new Object[cap];
148 dl 1.5 else
149     this.items = array;
150     this.putIndex = count;
151     this.count = count;
152 dl 1.11 lock = lk;
153     notEmpty = lock.newCondition();
154     notFull = lock.newCondition();
155 dl 1.5 }
156 tim 1.12
157 dl 1.5 /**
158 dholmes 1.21 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
159 dholmes 1.13 * capacity and default access policy.
160     * @param capacity the capacity of this queue
161 dholmes 1.16 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
162 dl 1.5 */
163 dholmes 1.13 public ArrayBlockingQueue(int capacity) {
164     this(capacity, null, 0, new ReentrantLock());
165 dl 1.5 }
166 dl 1.2
167 dl 1.5 /**
168 dholmes 1.21 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
169 dholmes 1.13 * capacity and the specified access policy.
170     * @param capacity the capacity of this queue
171     * @param fair if <tt>true</tt> then queue accesses for threads blocked
172     * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
173     * the access order is unspecified.
174 dholmes 1.16 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
175 dl 1.11 */
176 dholmes 1.13 public ArrayBlockingQueue(int capacity, boolean fair) {
177     this(capacity, null, 0, new ReentrantLock(fair));
178 dl 1.5 }
179    
180 dholmes 1.16 /**
181 dholmes 1.21 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
182     * capacity, the specified access policy and initially containing the
183 tim 1.17 * elements of the given collection,
184 dholmes 1.16 * added in traversal order of the collection's iterator.
185     * @param capacity the capacity of this queue
186     * @param fair if <tt>true</tt> then queue accesses for threads blocked
187     * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
188     * the access order is unspecified.
189     * @param c the collection of elements to initially contain
190     * @throws IllegalArgumentException if <tt>capacity</tt> is less than
191     * <tt>c.size()</tt>, or less than 1.
192     * @throws NullPointerException if <tt>c</tt> or any element within it
193     * is <tt>null</tt>
194     */
195 tim 1.20 public ArrayBlockingQueue(int capacity, boolean fair,
196 dholmes 1.18 Collection<? extends E> c) {
197 dholmes 1.16 this(capacity, null, 0, new ReentrantLock(fair));
198    
199     if (capacity < c.size())
200     throw new IllegalArgumentException();
201    
202 tim 1.20 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
203     add(it.next());
204 dholmes 1.16 }
205 dl 1.2
206 dholmes 1.13 /**
207 dholmes 1.27 * Inserts the specified element at the tail of this queue if possible,
208     * returning immediately if this queue is full.
209 dl 1.25 *
210 dl 1.28 * @param o the element to add.
211     * @return <tt>true</tt> if it was possible to add the element to
212     * this queue, else <tt>false</tt>
213 dl 1.25 * @throws NullPointerException if the specified element is <tt>null</tt>
214 dholmes 1.13 */
215 dholmes 1.21 public boolean offer(E o) {
216     if (o == null) throw new NullPointerException();
217 dl 1.5 lock.lock();
218     try {
219 tim 1.12 if (count == items.length)
220 dl 1.2 return false;
221 dl 1.5 else {
222 dholmes 1.21 insert(o);
223 dl 1.5 return true;
224     }
225 tim 1.23 } finally {
226 tim 1.12 lock.unlock();
227 dl 1.2 }
228 dl 1.5 }
229 dl 1.2
230 dholmes 1.13 /**
231 dholmes 1.27 * Inserts the specified element at the tail of this queue, waiting if
232 dholmes 1.13 * necessary up to the specified wait time for space to become available.
233 dl 1.28 * @param o the element to add
234     * @param timeout how long to wait before giving up, in units of
235     * <tt>unit</tt>
236     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
237     * <tt>timeout</tt> parameter
238     * @return <tt>true</tt> if successful, or <tt>false</tt> if
239     * the specified waiting time elapses before space is available.
240     * @throws InterruptedException if interrupted while waiting.
241 dl 1.25 * @throws NullPointerException if the specified element is <tt>null</tt>.
242 brian 1.7 */
243 dholmes 1.21 public boolean offer(E o, long timeout, TimeUnit unit)
244 dholmes 1.13 throws InterruptedException {
245 dl 1.2
246 dholmes 1.21 if (o == null) throw new NullPointerException();
247 dholmes 1.13
248 dl 1.5 lock.lockInterruptibly();
249     try {
250 dholmes 1.13 long nanos = unit.toNanos(timeout);
251 dl 1.5 for (;;) {
252     if (count != items.length) {
253 dholmes 1.21 insert(o);
254 dl 1.5 return true;
255     }
256     if (nanos <= 0)
257     return false;
258     try {
259     nanos = notFull.awaitNanos(nanos);
260 tim 1.23 } catch (InterruptedException ie) {
261 dl 1.5 notFull.signal(); // propagate to non-interrupted thread
262     throw ie;
263     }
264     }
265 tim 1.23 } finally {
266 dl 1.5 lock.unlock();
267 dl 1.2 }
268 dl 1.5 }
269 dl 1.2
270 dholmes 1.13
271     public E poll() {
272     lock.lock();
273     try {
274     if (count == 0)
275     return null;
276     E x = extract();
277     return x;
278 tim 1.23 } finally {
279 tim 1.15 lock.unlock();
280 dholmes 1.13 }
281     }
282    
283 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
284     lock.lockInterruptibly();
285     try {
286 dholmes 1.13 long nanos = unit.toNanos(timeout);
287 dl 1.2 for (;;) {
288 dl 1.5 if (count != 0) {
289     E x = extract();
290     return x;
291 dl 1.2 }
292 dl 1.5 if (nanos <= 0)
293     return null;
294     try {
295     nanos = notEmpty.awaitNanos(nanos);
296 tim 1.23 } catch (InterruptedException ie) {
297 dl 1.5 notEmpty.signal(); // propagate to non-interrupted thread
298     throw ie;
299     }
300    
301 dl 1.2 }
302 tim 1.23 } finally {
303 dl 1.5 lock.unlock();
304     }
305     }
306 dl 1.2
307 brian 1.7
308 dholmes 1.21 public boolean remove(Object o) {
309     if (o == null) return false;
310 dl 1.5 lock.lock();
311     try {
312     int i = takeIndex;
313     int k = 0;
314     for (;;) {
315     if (k++ >= count)
316     return false;
317 dholmes 1.21 if (o.equals(items[i])) {
318 dl 1.5 removeAt(i);
319     return true;
320     }
321 dl 1.2 i = inc(i);
322 dl 1.5 }
323 tim 1.12
324 tim 1.23 } finally {
325 dl 1.5 lock.unlock();
326     }
327     }
328 brian 1.7
329 dholmes 1.13 public E peek() {
330     lock.lock();
331     try {
332     return (count == 0) ? null : items[takeIndex];
333 tim 1.23 } finally {
334 dholmes 1.13 lock.unlock();
335     }
336     }
337    
338     public E take() throws InterruptedException {
339     lock.lockInterruptibly();
340     try {
341     try {
342     while (count == 0)
343     notEmpty.await();
344 tim 1.23 } catch (InterruptedException ie) {
345 dholmes 1.13 notEmpty.signal(); // propagate to non-interrupted thread
346     throw ie;
347     }
348     E x = extract();
349     return x;
350 tim 1.23 } finally {
351 dholmes 1.13 lock.unlock();
352     }
353     }
354    
355     /**
356 dholmes 1.21 * Adds the specified element to the tail of this queue, waiting if
357 dholmes 1.13 * necessary for space to become available.
358 dl 1.28 * @param o the element to add
359     * @throws InterruptedException if interrupted while waiting.
360 dl 1.25 * @throws NullPointerException if the specified element is <tt>null</tt>.
361 dholmes 1.13 */
362 dholmes 1.21 public void put(E o) throws InterruptedException {
363 dholmes 1.13
364 dholmes 1.21 if (o == null) throw new NullPointerException();
365 dholmes 1.13
366     lock.lockInterruptibly();
367     try {
368     try {
369     while (count == items.length)
370     notFull.await();
371 tim 1.23 } catch (InterruptedException ie) {
372 dholmes 1.13 notFull.signal(); // propagate to non-interrupted thread
373     throw ie;
374     }
375 dholmes 1.21 insert(o);
376 tim 1.23 } finally {
377 dholmes 1.13 lock.unlock();
378     }
379     }
380    
381     // this doc comment is overridden to remove the reference to collections
382     // greater in size than Integer.MAX_VALUE
383 tim 1.15 /**
384 dl 1.25 * Returns the number of elements in this queue.
385     *
386     * @return the number of elements in this queue.
387 dholmes 1.13 */
388     public int size() {
389     lock.lock();
390     try {
391     return count;
392 tim 1.23 } finally {
393 dholmes 1.13 lock.unlock();
394     }
395     }
396    
397     // this doc comment is a modified copy of the inherited doc comment,
398     // without the reference to unlimited queues.
399 tim 1.15 /**
400 dholmes 1.21 * Returns the number of elements that this queue can ideally (in
401 dholmes 1.13 * the absence of memory or resource constraints) accept without
402     * blocking. This is always equal to the initial capacity of this queue
403     * less the current <tt>size</tt> of this queue.
404     * <p>Note that you <em>cannot</em> always tell if
405     * an attempt to <tt>add</tt> an element will succeed by
406     * inspecting <tt>remainingCapacity</tt> because it may be the
407     * case that a waiting consumer is ready to <tt>take</tt> an
408     * element out of an otherwise full queue.
409     */
410     public int remainingCapacity() {
411     lock.lock();
412     try {
413     return items.length - count;
414 tim 1.23 } finally {
415 dholmes 1.13 lock.unlock();
416     }
417     }
418    
419    
420 dholmes 1.21 public boolean contains(Object o) {
421     if (o == null) return false;
422 dl 1.5 lock.lock();
423     try {
424     int i = takeIndex;
425     int k = 0;
426     while (k++ < count) {
427 dholmes 1.21 if (o.equals(items[i]))
428 dl 1.2 return true;
429 dl 1.5 i = inc(i);
430     }
431 dl 1.2 return false;
432 tim 1.23 } finally {
433 dl 1.5 lock.unlock();
434     }
435     }
436 brian 1.7
437 dl 1.5 public Object[] toArray() {
438     lock.lock();
439     try {
440 tim 1.12 E[] a = (E[]) new Object[count];
441 dl 1.5 int k = 0;
442     int i = takeIndex;
443     while (k < count) {
444 dl 1.2 a[k++] = items[i];
445 dl 1.5 i = inc(i);
446     }
447 dl 1.2 return a;
448 tim 1.23 } finally {
449 dl 1.5 lock.unlock();
450     }
451     }
452 brian 1.7
453 dl 1.5 public <T> T[] toArray(T[] a) {
454     lock.lock();
455     try {
456     if (a.length < count)
457 dholmes 1.16 a = (T[])java.lang.reflect.Array.newInstance(
458 tim 1.17 a.getClass().getComponentType(),
459 dholmes 1.16 count
460     );
461 tim 1.12
462 dl 1.5 int k = 0;
463     int i = takeIndex;
464     while (k < count) {
465 dl 1.2 a[k++] = (T)items[i];
466 dl 1.5 i = inc(i);
467     }
468     if (a.length > count)
469     a[count] = null;
470 dl 1.2 return a;
471 tim 1.23 } finally {
472 dl 1.5 lock.unlock();
473     }
474     }
475 dl 1.6
476     public String toString() {
477     lock.lock();
478     try {
479     return super.toString();
480 tim 1.23 } finally {
481 dl 1.6 lock.unlock();
482     }
483     }
484 tim 1.12
485 brian 1.7 /**
486     * Returns an iterator over the elements in this queue in proper sequence.
487 dl 1.22 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
488     * will never throw {@link java.util.ConcurrentModificationException},
489     * and guarantees to traverse elements as they existed upon
490     * construction of the iterator, and may (but is not guaranteed to)
491     * reflect any modifications subsequent to construction.
492 brian 1.7 *
493     * @return an iterator over the elements in this queue in proper sequence.
494     */
495 dl 1.5 public Iterator<E> iterator() {
496     lock.lock();
497     try {
498 dl 1.2 return new Itr();
499 tim 1.23 } finally {
500 dl 1.5 lock.unlock();
501     }
502     }
503 dl 1.8
504     /**
505     * Iterator for ArrayBlockingQueue
506     */
507 dl 1.5 private class Itr implements Iterator<E> {
508     /**
509     * Index of element to be returned by next,
510     * or a negative number if no such.
511     */
512 dl 1.8 private int nextIndex;
513 dl 1.2
514 tim 1.12 /**
515 dl 1.5 * nextItem holds on to item fields because once we claim
516     * that an element exists in hasNext(), we must return it in
517     * the following next() call even if it was in the process of
518     * being removed when hasNext() was called.
519     **/
520 dl 1.8 private E nextItem;
521 dl 1.5
522     /**
523     * Index of element returned by most recent call to next.
524     * Reset to -1 if this element is deleted by a call to remove.
525     */
526 dl 1.8 private int lastRet;
527 tim 1.12
528 dl 1.5 Itr() {
529     lastRet = -1;
530 tim 1.12 if (count == 0)
531 dl 1.5 nextIndex = -1;
532     else {
533     nextIndex = takeIndex;
534     nextItem = items[takeIndex];
535     }
536     }
537 tim 1.12
538 dl 1.5 public boolean hasNext() {
539     /*
540     * No sync. We can return true by mistake here
541     * only if this iterator passed across threads,
542     * which we don't support anyway.
543 dl 1.2 */
544 dl 1.5 return nextIndex >= 0;
545     }
546    
547     /**
548 dholmes 1.13 * Check whether nextIndex is valid; if so setting nextItem.
549 dl 1.5 * Stops iterator when either hits putIndex or sees null item.
550     */
551     private void checkNext() {
552     if (nextIndex == putIndex) {
553     nextIndex = -1;
554     nextItem = null;
555 tim 1.23 } else {
556 dl 1.5 nextItem = items[nextIndex];
557     if (nextItem == null)
558     nextIndex = -1;
559 dl 1.2 }
560 dl 1.5 }
561 tim 1.12
562 dl 1.5 public E next() {
563     lock.lock();
564     try {
565     if (nextIndex < 0)
566 dl 1.2 throw new NoSuchElementException();
567 dl 1.5 lastRet = nextIndex;
568     E x = nextItem;
569     nextIndex = inc(nextIndex);
570     checkNext();
571     return x;
572 tim 1.23 } finally {
573 tim 1.12 lock.unlock();
574 dl 1.2 }
575 dl 1.5 }
576 tim 1.12
577 dl 1.5 public void remove() {
578     lock.lock();
579     try {
580 dl 1.2 int i = lastRet;
581     if (i == -1)
582     throw new IllegalStateException();
583     lastRet = -1;
584 tim 1.12
585 dl 1.9 int ti = takeIndex;
586 dl 1.2 removeAt(i);
587 dl 1.9 // back up cursor (reset to front if was first element)
588 tim 1.12 nextIndex = (i == ti) ? takeIndex : i;
589 dl 1.5 checkNext();
590 tim 1.23 } finally {
591 dl 1.5 lock.unlock();
592     }
593     }
594     }
595 tim 1.12
596 dl 1.5 /**
597     * Save the state to a stream (that is, serialize it).
598     *
599     * @serialData The maximumSize is emitted (int), followed by all of
600     * its elements (each an <tt>E</tt>) in the proper order.
601 dl 1.8 * @param s the stream
602 dl 1.5 */
603     private void writeObject(java.io.ObjectOutputStream s)
604     throws java.io.IOException {
605 tim 1.12
606 dl 1.5 // Write out element count, and any hidden stuff
607     s.defaultWriteObject();
608     // Write out maximumSize == items length
609     s.writeInt(items.length);
610 tim 1.12
611 dl 1.5 // Write out all elements in the proper order.
612     int i = takeIndex;
613     int k = 0;
614     while (k++ < count) {
615     s.writeObject(items[i]);
616     i = inc(i);
617 dl 1.2 }
618 dl 1.5 }
619 tim 1.12
620 dl 1.5 /**
621 dholmes 1.13 * Reconstitute this queue instance from a stream (that is,
622 dl 1.5 * deserialize it).
623 dl 1.8 * @param s the stream
624 dl 1.5 */
625     private void readObject(java.io.ObjectInputStream s)
626     throws java.io.IOException, ClassNotFoundException {
627     // Read in size, and any hidden stuff
628     s.defaultReadObject();
629     int size = count;
630 tim 1.12
631 dl 1.5 // Read in array length and allocate array
632     int arrayLength = s.readInt();
633 tim 1.12
634 dl 1.5 // We use deserializedItems here because "items" is final
635 tim 1.12 deserializedItems = (E[]) new Object[arrayLength];
636    
637 dl 1.5 // Read in all elements in the proper order into deserializedItems
638     for (int i = 0; i < size; i++)
639     deserializedItems[i] = (E)s.readObject();
640     }
641 tim 1.12
642 dl 1.5 /**
643     * Throw away the object created with readObject, and replace it
644     * with a usable ArrayBlockingQueue.
645 dl 1.8 * @return the ArrayBlockingQueue
646 dl 1.5 */
647     private Object readResolve() throws java.io.ObjectStreamException {
648     E[] array = deserializedItems;
649     deserializedItems = null;
650 tim 1.17 return new ArrayBlockingQueue<E>(array.length, array, count, lock);
651 tim 1.1 }
652     }