ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.13
Committed: Mon Jul 28 04:11:54 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.12: +162 -143 lines
Log Message:
Significant doc updates:
 - inherit comments where appropriate
 - ensure runtime exception comments inherited (overriding as needed)
 - consistent descriptions
 - introduce head and tail terminology

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.11 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dholmes 1.13 * A bounded {@link BlockingQueue blocking queue} backed by an array.
13     * This queue orders elements FIFO (first-in-first-out).
14     * The <em>head</em> of the queue is that element that has been on the
15     * queue the longest time.
16     * The <em>tail</em> of the queue is that element that has been on the
17     * queue the shortest time.
18     *
19     * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
20     * array holds
21 dl 1.11 * elements inserted by producers and extracted by consumers. Once
22     * created, the capacity can not be increased. Attempts to offer an
23     * element to a full queue will result in the offer operation
24     * blocking; attempts to retrieve an element from an empty queue will
25 tim 1.12 * similarly block.
26 dl 1.11 *
27     * <p> This class supports an optional fairness policy for ordering
28     * threads blocked on an insertion or removal. By default, this
29     * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
30     * constructed with fairness set to <tt>true</tt> grants blocked
31     * threads access in FIFO order. Fairness generally substantially
32     * decreases throughput but reduces variablility and avoids
33     * starvation.
34 brian 1.7 *
35 dl 1.8 * @since 1.5
36     * @author Doug Lea
37     */
38 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
39 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
40    
41 dl 1.8 /** The queued items */
42 tim 1.12 private transient final E[] items;
43 dl 1.8 /** items index for next take, poll or remove */
44 tim 1.12 private transient int takeIndex;
45 dl 1.8 /** items index for next put, offer, or add. */
46 tim 1.12 private transient int putIndex;
47 dl 1.8 /** Number of items in the queue */
48 dl 1.5 private int count;
49 tim 1.12
50 dl 1.5 /**
51     * An array used only during deserialization, to hold
52     * items read back in from the stream, and then used
53     * as "items" by readResolve via the private constructor.
54     */
55     private transient E[] deserializedItems;
56 tim 1.12
57 dl 1.5 /*
58     * Concurrency control via the classic two-condition algorithm
59     * found in any textbook.
60     */
61    
62 dl 1.11 /** Main lock guarding all access */
63     private final ReentrantLock lock;
64 dholmes 1.13 /** Condition for waiting takes */
65 dl 1.11 private final Condition notEmpty;
66 dl 1.8 /** Condition for wiating puts */
67 dl 1.11 private final Condition notFull;
68 dl 1.5
69     // Internal helper methods
70    
71     /**
72     * Circularly increment i.
73     */
74     int inc(int i) {
75     return (++i == items.length)? 0 : i;
76     }
77    
78     /**
79 dl 1.9 * Insert element at current put position, advance, and signal.
80     * Call only when holding lock.
81 dl 1.5 */
82     private void insert(E x) {
83     items[putIndex] = x;
84     putIndex = inc(putIndex);
85     ++count;
86 dl 1.9 notEmpty.signal();
87 tim 1.1 }
88 tim 1.12
89 dl 1.5 /**
90 dl 1.9 * Extract element at current take position, advance, and signal.
91     * Call only when holding lock.
92 dl 1.5 */
93 dholmes 1.13 private E extract() {
94 dl 1.5 E x = items[takeIndex];
95     items[takeIndex] = null;
96     takeIndex = inc(takeIndex);
97     --count;
98 dl 1.9 notFull.signal();
99 dl 1.5 return x;
100     }
101    
102     /**
103 tim 1.12 * Utility for remove and iterator.remove: Delete item at position i.
104 dl 1.9 * Call only when holding lock.
105 dl 1.5 */
106     void removeAt(int i) {
107 dl 1.9 // if removing front item, just advance
108     if (i == takeIndex) {
109     items[takeIndex] = null;
110     takeIndex = inc(takeIndex);
111     }
112     else {
113     // slide over all others up through putIndex.
114     for (;;) {
115     int nexti = inc(i);
116     if (nexti != putIndex) {
117     items[i] = items[nexti];
118     i = nexti;
119     }
120     else {
121     items[i] = null;
122     putIndex = i;
123     break;
124     }
125 dl 1.5 }
126     }
127 dl 1.9 --count;
128     notFull.signal();
129 tim 1.1 }
130    
131     /**
132 tim 1.12 * Internal constructor also used by readResolve.
133 dl 1.5 * Sets all final fields, plus count.
134 dholmes 1.13 * @param cap the capacity
135 dl 1.5 * @param array the array to use or null if should create new one
136     * @param count the number of items in the array, where indices 0
137     * to count-1 hold items.
138     */
139 dholmes 1.13 private ArrayBlockingQueue(int cap, E[] array, int count,
140     ReentrantLock lk) {
141     if (cap <= 0)
142 dl 1.5 throw new IllegalArgumentException();
143     if (array == null)
144 tim 1.12 this.items = (E[]) new Object[cap];
145 dl 1.5 else
146     this.items = array;
147     this.putIndex = count;
148     this.count = count;
149 dl 1.11 lock = lk;
150     notEmpty = lock.newCondition();
151     notFull = lock.newCondition();
152 dl 1.5 }
153 tim 1.12
154 dl 1.5 /**
155 dholmes 1.13 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
156     * capacity and default access policy.
157     * @param capacity the capacity of this queue
158 dl 1.5 */
159 dholmes 1.13 public ArrayBlockingQueue(int capacity) {
160     this(capacity, null, 0, new ReentrantLock());
161 dl 1.5 }
162 dl 1.2
163 dl 1.5 /**
164 dholmes 1.13 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
165     * capacity and the specified access policy.
166     * @param capacity the capacity of this queue
167     * @param fair if <tt>true</tt> then queue accesses for threads blocked
168     * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
169     * the access order is unspecified.
170 dl 1.11 */
171 dholmes 1.13 public ArrayBlockingQueue(int capacity, boolean fair) {
172     this(capacity, null, 0, new ReentrantLock(fair));
173 dl 1.5 }
174    
175 dl 1.2
176 dholmes 1.13 // Have to override just to update the javadoc for @throws
177 brian 1.7
178 dholmes 1.13 /**
179     * @throws IllegalStateException {@inheritDoc}
180     */
181     public boolean add(E x) {
182     return super.add(x);
183 dl 1.5 }
184 dl 1.2
185 dholmes 1.13 /**
186     * @throws IllegalStateException {@inheritDoc}
187 dl 1.8 */
188 dholmes 1.13 public boolean addAll(Collection c) {
189     return super.addAll(c);
190 dl 1.5 }
191    
192 dholmes 1.13 /**
193     * Add the specified element to the tail of this queue if possible,
194     * returning immediately if this queue is full.
195     *
196     * @throws NullPointerException {@inheritDoc}
197 brian 1.7 */
198 dl 1.5 public boolean offer(E x) {
199 dl 1.8 if (x == null) throw new NullPointerException();
200 dl 1.5 lock.lock();
201     try {
202 tim 1.12 if (count == items.length)
203 dl 1.2 return false;
204 dl 1.5 else {
205     insert(x);
206     return true;
207     }
208     }
209     finally {
210 tim 1.12 lock.unlock();
211 dl 1.2 }
212 dl 1.5 }
213 dl 1.2
214 dholmes 1.13 /**
215     * Add the specified element to the tail of this queue, waiting if
216     * necessary up to the specified wait time for space to become available.
217     * @throws NullPointerException {@inheritDoc}
218 brian 1.7 */
219 dholmes 1.13 public boolean offer(E x, long timeout, TimeUnit unit)
220     throws InterruptedException {
221 dl 1.2
222 dl 1.8 if (x == null) throw new NullPointerException();
223 dholmes 1.13
224 dl 1.5 lock.lockInterruptibly();
225     try {
226 dholmes 1.13 long nanos = unit.toNanos(timeout);
227 dl 1.5 for (;;) {
228     if (count != items.length) {
229     insert(x);
230     return true;
231     }
232     if (nanos <= 0)
233     return false;
234     try {
235     nanos = notFull.awaitNanos(nanos);
236     }
237     catch (InterruptedException ie) {
238     notFull.signal(); // propagate to non-interrupted thread
239     throw ie;
240     }
241     }
242     }
243     finally {
244     lock.unlock();
245 dl 1.2 }
246 dl 1.5 }
247 dl 1.2
248 dholmes 1.13
249     public E poll() {
250     lock.lock();
251     try {
252     if (count == 0)
253     return null;
254     E x = extract();
255     return x;
256     }
257     finally {
258     lock.unlock();
259     }
260     }
261    
262 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
263     lock.lockInterruptibly();
264     try {
265 dholmes 1.13 long nanos = unit.toNanos(timeout);
266 dl 1.2 for (;;) {
267 dl 1.5 if (count != 0) {
268     E x = extract();
269     return x;
270 dl 1.2 }
271 dl 1.5 if (nanos <= 0)
272     return null;
273     try {
274     nanos = notEmpty.awaitNanos(nanos);
275     }
276     catch (InterruptedException ie) {
277     notEmpty.signal(); // propagate to non-interrupted thread
278     throw ie;
279     }
280    
281 dl 1.2 }
282     }
283 dl 1.5 finally {
284     lock.unlock();
285     }
286     }
287 dl 1.2
288 brian 1.7
289 dl 1.5 public boolean remove(Object x) {
290 dl 1.8 if (x == null) return false;
291 dl 1.5 lock.lock();
292     try {
293     int i = takeIndex;
294     int k = 0;
295     for (;;) {
296     if (k++ >= count)
297     return false;
298     if (x.equals(items[i])) {
299     removeAt(i);
300     return true;
301     }
302 dl 1.2 i = inc(i);
303 dl 1.5 }
304 tim 1.12
305 dl 1.2 }
306 dl 1.5 finally {
307     lock.unlock();
308     }
309     }
310 brian 1.7
311 dholmes 1.13 public E peek() {
312     lock.lock();
313     try {
314     return (count == 0) ? null : items[takeIndex];
315     }
316     finally {
317     lock.unlock();
318     }
319     }
320    
321     public E take() throws InterruptedException {
322     lock.lockInterruptibly();
323     try {
324     try {
325     while (count == 0)
326     notEmpty.await();
327     }
328     catch (InterruptedException ie) {
329     notEmpty.signal(); // propagate to non-interrupted thread
330     throw ie;
331     }
332     E x = extract();
333     return x;
334     }
335     finally {
336     lock.unlock();
337     }
338     }
339    
340     /**
341     * Add the specified element to the tail of this queue, waiting if
342     * necessary for space to become available.
343     * @throws NullPointerException {@inheritDoc}
344     */
345     public void put(E x) throws InterruptedException {
346    
347     if (x == null) throw new NullPointerException();
348    
349     lock.lockInterruptibly();
350     try {
351     try {
352     while (count == items.length)
353     notFull.await();
354     }
355     catch (InterruptedException ie) {
356     notFull.signal(); // propagate to non-interrupted thread
357     throw ie;
358     }
359     insert(x);
360     }
361     finally {
362     lock.unlock();
363     }
364     }
365    
366     // this doc comment is overridden to remove the reference to collections
367     // greater in size than Integer.MAX_VALUE
368     /**
369     * Return the number of elements in this collection.
370     */
371     public int size() {
372     lock.lock();
373     try {
374     return count;
375     }
376     finally {
377     lock.unlock();
378     }
379     }
380    
381     // this doc comment is a modified copy of the inherited doc comment,
382     // without the reference to unlimited queues.
383     /**
384     * Return the number of elements that this queue can ideally (in
385     * the absence of memory or resource constraints) accept without
386     * blocking. This is always equal to the initial capacity of this queue
387     * less the current <tt>size</tt> of this queue.
388     * <p>Note that you <em>cannot</em> always tell if
389     * an attempt to <tt>add</tt> an element will succeed by
390     * inspecting <tt>remainingCapacity</tt> because it may be the
391     * case that a waiting consumer is ready to <tt>take</tt> an
392     * element out of an otherwise full queue.
393     */
394     public int remainingCapacity() {
395     lock.lock();
396     try {
397     return items.length - count;
398     }
399     finally {
400     lock.unlock();
401     }
402     }
403    
404    
405 dl 1.5 public boolean contains(Object x) {
406 dl 1.8 if (x == null) return false;
407 dl 1.5 lock.lock();
408     try {
409     int i = takeIndex;
410     int k = 0;
411     while (k++ < count) {
412 dl 1.2 if (x.equals(items[i]))
413     return true;
414 dl 1.5 i = inc(i);
415     }
416 dl 1.2 return false;
417     }
418 dl 1.5 finally {
419     lock.unlock();
420     }
421     }
422 brian 1.7
423 dl 1.5 public Object[] toArray() {
424     lock.lock();
425     try {
426 tim 1.12 E[] a = (E[]) new Object[count];
427 dl 1.5 int k = 0;
428     int i = takeIndex;
429     while (k < count) {
430 dl 1.2 a[k++] = items[i];
431 dl 1.5 i = inc(i);
432     }
433 dl 1.2 return a;
434     }
435 dl 1.5 finally {
436     lock.unlock();
437     }
438     }
439 brian 1.7
440 dl 1.5 public <T> T[] toArray(T[] a) {
441     lock.lock();
442     try {
443     if (a.length < count)
444     a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
445 tim 1.12
446 dl 1.5 int k = 0;
447     int i = takeIndex;
448     while (k < count) {
449 dl 1.2 a[k++] = (T)items[i];
450 dl 1.5 i = inc(i);
451     }
452     if (a.length > count)
453     a[count] = null;
454 dl 1.2 return a;
455     }
456 dl 1.5 finally {
457     lock.unlock();
458     }
459     }
460 dl 1.6
461     public String toString() {
462     lock.lock();
463     try {
464     return super.toString();
465     }
466     finally {
467     lock.unlock();
468     }
469     }
470 tim 1.12
471 brian 1.7 /**
472     * Returns an iterator over the elements in this queue in proper sequence.
473     *
474     * @return an iterator over the elements in this queue in proper sequence.
475     */
476 dl 1.5 public Iterator<E> iterator() {
477     lock.lock();
478     try {
479 dl 1.2 return new Itr();
480     }
481 dl 1.5 finally {
482     lock.unlock();
483     }
484     }
485 dl 1.8
486     /**
487     * Iterator for ArrayBlockingQueue
488     */
489 dl 1.5 private class Itr implements Iterator<E> {
490     /**
491     * Index of element to be returned by next,
492     * or a negative number if no such.
493     */
494 dl 1.8 private int nextIndex;
495 dl 1.2
496 tim 1.12 /**
497 dl 1.5 * nextItem holds on to item fields because once we claim
498     * that an element exists in hasNext(), we must return it in
499     * the following next() call even if it was in the process of
500     * being removed when hasNext() was called.
501     **/
502 dl 1.8 private E nextItem;
503 dl 1.5
504     /**
505     * Index of element returned by most recent call to next.
506     * Reset to -1 if this element is deleted by a call to remove.
507     */
508 dl 1.8 private int lastRet;
509 tim 1.12
510 dl 1.5 Itr() {
511     lastRet = -1;
512 tim 1.12 if (count == 0)
513 dl 1.5 nextIndex = -1;
514     else {
515     nextIndex = takeIndex;
516     nextItem = items[takeIndex];
517     }
518     }
519 tim 1.12
520 dl 1.5 public boolean hasNext() {
521     /*
522     * No sync. We can return true by mistake here
523     * only if this iterator passed across threads,
524     * which we don't support anyway.
525 dl 1.2 */
526 dl 1.5 return nextIndex >= 0;
527     }
528    
529     /**
530 dholmes 1.13 * Check whether nextIndex is valid; if so setting nextItem.
531 dl 1.5 * Stops iterator when either hits putIndex or sees null item.
532     */
533     private void checkNext() {
534     if (nextIndex == putIndex) {
535     nextIndex = -1;
536     nextItem = null;
537 dl 1.2 }
538 dl 1.5 else {
539     nextItem = items[nextIndex];
540     if (nextItem == null)
541     nextIndex = -1;
542 dl 1.2 }
543 dl 1.5 }
544 tim 1.12
545 dl 1.5 public E next() {
546     lock.lock();
547     try {
548     if (nextIndex < 0)
549 dl 1.2 throw new NoSuchElementException();
550 dl 1.5 lastRet = nextIndex;
551     E x = nextItem;
552     nextIndex = inc(nextIndex);
553     checkNext();
554     return x;
555     }
556     finally {
557 tim 1.12 lock.unlock();
558 dl 1.2 }
559 dl 1.5 }
560 tim 1.12
561 dl 1.5 public void remove() {
562     lock.lock();
563     try {
564 dl 1.2 int i = lastRet;
565     if (i == -1)
566     throw new IllegalStateException();
567     lastRet = -1;
568 tim 1.12
569 dl 1.9 int ti = takeIndex;
570 dl 1.2 removeAt(i);
571 dl 1.9 // back up cursor (reset to front if was first element)
572 tim 1.12 nextIndex = (i == ti) ? takeIndex : i;
573 dl 1.5 checkNext();
574 dl 1.2 }
575 dl 1.5 finally {
576     lock.unlock();
577     }
578     }
579     }
580 tim 1.12
581 dl 1.5 /**
582     * Save the state to a stream (that is, serialize it).
583     *
584     * @serialData The maximumSize is emitted (int), followed by all of
585     * its elements (each an <tt>E</tt>) in the proper order.
586 dl 1.8 * @param s the stream
587 dl 1.5 */
588     private void writeObject(java.io.ObjectOutputStream s)
589     throws java.io.IOException {
590 tim 1.12
591 dl 1.5 // Write out element count, and any hidden stuff
592     s.defaultWriteObject();
593     // Write out maximumSize == items length
594     s.writeInt(items.length);
595 tim 1.12
596 dl 1.5 // Write out all elements in the proper order.
597     int i = takeIndex;
598     int k = 0;
599     while (k++ < count) {
600     s.writeObject(items[i]);
601     i = inc(i);
602 dl 1.2 }
603 dl 1.5 }
604 tim 1.12
605 dl 1.5 /**
606 dholmes 1.13 * Reconstitute this queue instance from a stream (that is,
607 dl 1.5 * deserialize it).
608 dl 1.8 * @param s the stream
609 dl 1.5 */
610     private void readObject(java.io.ObjectInputStream s)
611     throws java.io.IOException, ClassNotFoundException {
612     // Read in size, and any hidden stuff
613     s.defaultReadObject();
614     int size = count;
615 tim 1.12
616 dl 1.5 // Read in array length and allocate array
617     int arrayLength = s.readInt();
618 tim 1.12
619 dl 1.5 // We use deserializedItems here because "items" is final
620 tim 1.12 deserializedItems = (E[]) new Object[arrayLength];
621    
622 dl 1.5 // Read in all elements in the proper order into deserializedItems
623     for (int i = 0; i < size; i++)
624     deserializedItems[i] = (E)s.readObject();
625     }
626 tim 1.12
627 dl 1.5 /**
628     * Throw away the object created with readObject, and replace it
629     * with a usable ArrayBlockingQueue.
630 dl 1.8 * @return the ArrayBlockingQueue
631 dl 1.5 */
632     private Object readResolve() throws java.io.ObjectStreamException {
633     E[] array = deserializedItems;
634     deserializedItems = null;
635 dl 1.11 return new ArrayBlockingQueue(array.length, array, count, lock);
636 tim 1.1 }
637     }