ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.8
Committed: Tue Jun 24 14:34:47 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.7: +35 -68 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8     import java.util.*;
9    
10     /**
11 brian 1.7 * A bounded blocking queue backed by an array. The implementation is
12 dl 1.4 * a classic "bounded buffer", in which a fixed-sized array holds
13 brian 1.7 * elements inserted by producers and extracted by
14     * consumers. Once created, the capacity can not be increased.
15     * Array-based queues typically have more predictable
16 dl 1.4 * performance than linked queues but lower throughput in most
17     * concurrent applications.
18 brian 1.7 *
19     * Attempts to offer an element to a full queue will result
20     * in the offer operation blocking; attempts to retrieve an element from
21     * an empty queue will similarly block. Threads blocked on an insertion or
22     * removal will be services in FIFO order.
23 dl 1.8 * @since 1.5
24     * @author Doug Lea
25     */
26 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
27 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
28    
29 dl 1.8 /** The queued items */
30 dl 1.5 private transient final E[] items;
31 dl 1.8 /** items index for next take, poll or remove */
32     private transient int takeIndex;
33     /** items index for next put, offer, or add. */
34 dl 1.5 private transient int putIndex;
35 dl 1.8 /** Number of items in the queue */
36 dl 1.5 private int count;
37    
38     /**
39     * An array used only during deserialization, to hold
40     * items read back in from the stream, and then used
41     * as "items" by readResolve via the private constructor.
42     */
43     private transient E[] deserializedItems;
44    
45     /*
46     * Concurrency control via the classic two-condition algorithm
47     * found in any textbook.
48     */
49    
50 dl 1.8 /** Main lock gurding all access */
51 dl 1.5 private final FairReentrantLock lock = new FairReentrantLock();
52 dl 1.8 /** Condition wor waiting takes */
53 dl 1.5 private final Condition notEmpty = lock.newCondition();
54 dl 1.8 /** Condition for wiating puts */
55 dl 1.5 private final Condition notFull = lock.newCondition();
56    
57     // Internal helper methods
58    
59     /**
60     * Circularly increment i.
61     */
62     int inc(int i) {
63     return (++i == items.length)? 0 : i;
64     }
65    
66     /**
67     * Insert element at current put position and advance.
68     */
69     private void insert(E x) {
70     items[putIndex] = x;
71     putIndex = inc(putIndex);
72     ++count;
73 tim 1.1 }
74 dl 1.2
75 dl 1.5 /**
76     * Extract element at current take position and advance.
77     */
78     private E extract() {
79     E x = items[takeIndex];
80     items[takeIndex] = null;
81     takeIndex = inc(takeIndex);
82     --count;
83     return x;
84     }
85    
86     /**
87     * Utility for remove and iterator.remove: Delete item at position
88     * i by sliding over all others up through putIndex.
89     */
90     void removeAt(int i) {
91     for (;;) {
92     int nexti = inc(i);
93     items[i] = items[nexti];
94     if (nexti != putIndex)
95     i = nexti;
96     else {
97     items[nexti] = null;
98     putIndex = i;
99     --count;
100     return;
101     }
102     }
103 tim 1.1 }
104    
105     /**
106 dl 1.5 * Internal constructor also used by readResolve.
107     * Sets all final fields, plus count.
108     * @param cap the maximumSize
109     * @param array the array to use or null if should create new one
110     * @param count the number of items in the array, where indices 0
111     * to count-1 hold items.
112     */
113     private ArrayBlockingQueue(int cap, E[] array, int count) {
114     if (cap <= 0)
115     throw new IllegalArgumentException();
116     if (array == null)
117     this.items = new E[cap];
118     else
119     this.items = array;
120     this.putIndex = count;
121     this.count = count;
122     }
123    
124     /**
125     * Creates a new ArrayBlockingQueue with the given (fixed) capacity.
126     * @param maximumSize the capacity
127     */
128     public ArrayBlockingQueue(int maximumSize) {
129     this(maximumSize, null, 0);
130     }
131 dl 1.2
132 dl 1.5 /**
133     * Creates a new ArrayBlockingQueue with the given (fixed)
134     * capacity, and initially contianing the given elements, added in
135     * iterator traversal order.
136     * @param maximumSize the capacity
137     * @param initialElements the items to hold initially.
138     * @throws IllegalArgumentException if the given capacity is
139     * less than the number of initialElements.
140     */
141     public ArrayBlockingQueue(int maximumSize, Collection<E> initialElements) {
142     this(maximumSize, null, 0);
143     int n = 0;
144     for (Iterator<E> it = initialElements.iterator(); it.hasNext();) {
145     if (++n >= items.length)
146 dl 1.2 throw new IllegalArgumentException();
147 dl 1.5 items[n] = it.next();
148 dl 1.2 }
149 dl 1.5 putIndex = count = n;
150     }
151 dl 1.2
152 brian 1.7 /** Return the number of elements currently in the queue */
153 dl 1.5 public int size() {
154     lock.lock();
155     try {
156     return count;
157     }
158     finally {
159     lock.unlock();
160     }
161     }
162    
163 brian 1.7 /** Return the remaining capacity of the queue, which is the
164     * number of elements that can be inserted before the queue is
165     * full. */
166 dl 1.5 public int remainingCapacity() {
167     lock.lock();
168     try {
169     return items.length - count;
170     }
171     finally {
172     lock.unlock();
173 dl 1.2 }
174 dl 1.5 }
175 dl 1.2
176 brian 1.7
177     /** Insert a new element into the queue, blocking if the queue is full. */
178 dl 1.5 public void put(E x) throws InterruptedException {
179 dl 1.8 if (x == null) throw new NullPointerException();
180 dl 1.5 lock.lockInterruptibly();
181     try {
182     try {
183     while (count == items.length)
184     notFull.await();
185     }
186     catch (InterruptedException ie) {
187     notFull.signal(); // propagate to non-interrupted thread
188     throw ie;
189 dl 1.2 }
190 dl 1.5 insert(x);
191     notEmpty.signal();
192 dl 1.2 }
193 dl 1.5 finally {
194     lock.unlock();
195     }
196     }
197 dl 1.2
198 brian 1.7 /** Remove and return the first element from the queue, blocking
199 dl 1.8 * if the queue is empty.
200     */
201 dl 1.5 public E take() throws InterruptedException {
202     lock.lockInterruptibly();
203     try {
204     try {
205     while (count == 0)
206     notEmpty.await();
207     }
208     catch (InterruptedException ie) {
209     notEmpty.signal(); // propagate to non-interrupted thread
210     throw ie;
211     }
212     E x = extract();
213     notFull.signal();
214     return x;
215 dl 1.2 }
216 dl 1.5 finally {
217     lock.unlock();
218 dl 1.2 }
219 dl 1.5 }
220    
221 brian 1.7 /** Attempt to insert a new element into the queue, but return
222     * immediately without inserting the element if the queue is full.
223 dl 1.8 * @return <tt>true</tt> if the element was inserted successfully,
224     * <tt>false</tt> otherwise
225 brian 1.7 */
226 dl 1.5 public boolean offer(E x) {
227 dl 1.8 if (x == null) throw new NullPointerException();
228 dl 1.5 lock.lock();
229     try {
230     if (count == items.length)
231 dl 1.2 return false;
232 dl 1.5 else {
233     insert(x);
234     notEmpty.signal();
235     return true;
236     }
237     }
238     finally {
239     lock.unlock();
240 dl 1.2 }
241 dl 1.5 }
242 dl 1.2
243 brian 1.7 /** Attempt to retrieve the first insert element from the queue,
244     * but return immediately if the queue is empty.
245 dl 1.8 * @return The first element of the queue if the queue is not
246     * empty, or <tt>null</tt> otherwise.
247 brian 1.7 */
248 dl 1.5 public E poll() {
249     lock.lock();
250     try {
251     if (count == 0)
252 dl 1.2 return null;
253 dl 1.5 E x = extract();
254     notFull.signal();
255 dl 1.2 return x;
256     }
257 dl 1.5 finally {
258     lock.unlock();
259     }
260     }
261 dl 1.2
262 brian 1.7 /** Attempt to insert a new element into the queue. If the queue
263     * is full, wait up to the specified amount of time before giving
264     * up.
265     * @param x the element to be inserted
266     * @param timeout how long to wait before giving up, in units of
267     * <tt>unit</tt>
268     * @param unit a TimeUnit determining how to interpret the timeout
269     * parameter
270     * @return <tt>true</tt> if the element was inserted successfully,
271     * <tt>false</tt> otherwise
272 dl 1.8 * @throws InterruptedException if interrupted while waiting
273 brian 1.7 */
274 dl 1.5 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
275 dl 1.8 if (x == null) throw new NullPointerException();
276 dl 1.5 lock.lockInterruptibly();
277     long nanos = unit.toNanos(timeout);
278     try {
279     for (;;) {
280     if (count != items.length) {
281     insert(x);
282     notEmpty.signal();
283     return true;
284     }
285     if (nanos <= 0)
286     return false;
287     try {
288     nanos = notFull.awaitNanos(nanos);
289     }
290     catch (InterruptedException ie) {
291     notFull.signal(); // propagate to non-interrupted thread
292     throw ie;
293     }
294     }
295     }
296     finally {
297     lock.unlock();
298 dl 1.2 }
299 dl 1.5 }
300 dl 1.2
301 brian 1.7 /**
302     * Attempt to retrieve the first insert element from the queue.
303     * If the queue is empty, wait up to the specified amount of time
304     * before giving up.
305     * @param timeout how long to wait before giving up, in units of
306     * <tt>unit</tt>
307     * @param unit a TimeUnit determining how to interpret the timeout
308     * parameter
309     * @return The first element of the queue if an item was
310     * successfully retrieved, or <tt>null</tt> otherwise.
311 dl 1.8 * @throws InterruptedException if interrupted while waiting
312 brian 1.7 *
313     */
314 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
315     lock.lockInterruptibly();
316     long nanos = unit.toNanos(timeout);
317     try {
318 dl 1.2 for (;;) {
319 dl 1.5 if (count != 0) {
320     E x = extract();
321     notFull.signal();
322     return x;
323 dl 1.2 }
324 dl 1.5 if (nanos <= 0)
325     return null;
326     try {
327     nanos = notEmpty.awaitNanos(nanos);
328     }
329     catch (InterruptedException ie) {
330     notEmpty.signal(); // propagate to non-interrupted thread
331     throw ie;
332     }
333    
334 dl 1.2 }
335     }
336 dl 1.5 finally {
337     lock.unlock();
338     }
339     }
340 dl 1.2
341 brian 1.7 /** Return, but do not remove, the first element from the queue,
342     * if the queue is not empty. This will return the same result as
343     * <tt>poll</tt>, but will not remove it from the queue.
344     * @return The first element of the queue if the queue is not
345     * empty, or <tt>null</tt> otherwise.
346     */
347 dl 1.5 public E peek() {
348     lock.lock();
349     try {
350 dl 1.8 return (count == 0) ? null : items[takeIndex];
351 dl 1.5 }
352     finally {
353     lock.unlock();
354     }
355     }
356 brian 1.7
357    
358 dl 1.5 public boolean remove(Object x) {
359 dl 1.8 if (x == null) return false;
360 dl 1.5 lock.lock();
361     try {
362     int i = takeIndex;
363     int k = 0;
364     for (;;) {
365     if (k++ >= count)
366     return false;
367     if (x.equals(items[i])) {
368     removeAt(i);
369     return true;
370     }
371 dl 1.2 i = inc(i);
372 dl 1.5 }
373 dl 1.2
374     }
375 dl 1.5 finally {
376     lock.unlock();
377     }
378     }
379 brian 1.7
380 dl 1.5 public boolean contains(Object x) {
381 dl 1.8 if (x == null) return false;
382 dl 1.5 lock.lock();
383     try {
384     int i = takeIndex;
385     int k = 0;
386     while (k++ < count) {
387 dl 1.2 if (x.equals(items[i]))
388     return true;
389 dl 1.5 i = inc(i);
390     }
391 dl 1.2 return false;
392     }
393 dl 1.5 finally {
394     lock.unlock();
395     }
396     }
397 brian 1.7
398 dl 1.5 public Object[] toArray() {
399     lock.lock();
400     try {
401     E[] a = new E[count];
402     int k = 0;
403     int i = takeIndex;
404     while (k < count) {
405 dl 1.2 a[k++] = items[i];
406 dl 1.5 i = inc(i);
407     }
408 dl 1.2 return a;
409     }
410 dl 1.5 finally {
411     lock.unlock();
412     }
413     }
414 brian 1.7
415 dl 1.5 public <T> T[] toArray(T[] a) {
416     lock.lock();
417     try {
418     if (a.length < count)
419     a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
420    
421     int k = 0;
422     int i = takeIndex;
423     while (k < count) {
424 dl 1.2 a[k++] = (T)items[i];
425 dl 1.5 i = inc(i);
426     }
427     if (a.length > count)
428     a[count] = null;
429 dl 1.2 return a;
430     }
431 dl 1.5 finally {
432     lock.unlock();
433     }
434     }
435 dl 1.6
436     public String toString() {
437     lock.lock();
438     try {
439     return super.toString();
440     }
441     finally {
442     lock.unlock();
443     }
444     }
445 dl 1.5
446 brian 1.7 /**
447     * Returns an iterator over the elements in this queue in proper sequence.
448     *
449     * @return an iterator over the elements in this queue in proper sequence.
450     */
451 dl 1.5 public Iterator<E> iterator() {
452     lock.lock();
453     try {
454 dl 1.2 return new Itr();
455     }
456 dl 1.5 finally {
457     lock.unlock();
458     }
459     }
460 dl 1.8
461     /**
462     * Iterator for ArrayBlockingQueue
463     */
464 dl 1.5 private class Itr implements Iterator<E> {
465     /**
466     * Index of element to be returned by next,
467     * or a negative number if no such.
468     */
469 dl 1.8 private int nextIndex;
470 dl 1.2
471 dl 1.5 /**
472     * nextItem holds on to item fields because once we claim
473     * that an element exists in hasNext(), we must return it in
474     * the following next() call even if it was in the process of
475     * being removed when hasNext() was called.
476     **/
477 dl 1.8 private E nextItem;
478 dl 1.5
479     /**
480     * Index of element returned by most recent call to next.
481     * Reset to -1 if this element is deleted by a call to remove.
482     */
483 dl 1.8 private int lastRet;
484 dl 1.5
485     Itr() {
486     lastRet = -1;
487     if (count == 0)
488     nextIndex = -1;
489     else {
490     nextIndex = takeIndex;
491     nextItem = items[takeIndex];
492     }
493     }
494    
495     public boolean hasNext() {
496     /*
497     * No sync. We can return true by mistake here
498     * only if this iterator passed across threads,
499     * which we don't support anyway.
500 dl 1.2 */
501 dl 1.5 return nextIndex >= 0;
502     }
503    
504     /**
505     * Check whether nextIndex is valied; if so setting nextItem.
506     * Stops iterator when either hits putIndex or sees null item.
507     */
508     private void checkNext() {
509     if (nextIndex == putIndex) {
510     nextIndex = -1;
511     nextItem = null;
512 dl 1.2 }
513 dl 1.5 else {
514     nextItem = items[nextIndex];
515     if (nextItem == null)
516     nextIndex = -1;
517 dl 1.2 }
518 dl 1.5 }
519    
520     public E next() {
521     lock.lock();
522     try {
523     if (nextIndex < 0)
524 dl 1.2 throw new NoSuchElementException();
525 dl 1.5 lastRet = nextIndex;
526     E x = nextItem;
527     nextIndex = inc(nextIndex);
528     checkNext();
529     return x;
530     }
531     finally {
532     lock.unlock();
533 dl 1.2 }
534 dl 1.5 }
535    
536     public void remove() {
537     lock.lock();
538     try {
539 dl 1.2 int i = lastRet;
540     if (i == -1)
541     throw new IllegalStateException();
542     lastRet = -1;
543    
544 dl 1.5 nextIndex = i; // back up cursor
545 dl 1.2 removeAt(i);
546 dl 1.5 checkNext();
547 dl 1.2 }
548 dl 1.5 finally {
549     lock.unlock();
550     }
551     }
552     }
553 dl 1.2
554 dl 1.5 /**
555     * Save the state to a stream (that is, serialize it).
556     *
557     * @serialData The maximumSize is emitted (int), followed by all of
558     * its elements (each an <tt>E</tt>) in the proper order.
559 dl 1.8 * @param s the stream
560 dl 1.5 */
561     private void writeObject(java.io.ObjectOutputStream s)
562     throws java.io.IOException {
563    
564     // Write out element count, and any hidden stuff
565     s.defaultWriteObject();
566     // Write out maximumSize == items length
567     s.writeInt(items.length);
568 dl 1.2
569 dl 1.5 // Write out all elements in the proper order.
570     int i = takeIndex;
571     int k = 0;
572     while (k++ < count) {
573     s.writeObject(items[i]);
574     i = inc(i);
575 dl 1.2 }
576 dl 1.5 }
577    
578     /**
579     * Reconstitute the Queue instance from a stream (that is,
580     * deserialize it).
581 dl 1.8 * @param s the stream
582 dl 1.5 */
583     private void readObject(java.io.ObjectInputStream s)
584     throws java.io.IOException, ClassNotFoundException {
585     // Read in size, and any hidden stuff
586     s.defaultReadObject();
587     int size = count;
588    
589     // Read in array length and allocate array
590     int arrayLength = s.readInt();
591    
592     // We use deserializedItems here because "items" is final
593     deserializedItems = new E[arrayLength];
594 dl 1.2
595 dl 1.5 // Read in all elements in the proper order into deserializedItems
596     for (int i = 0; i < size; i++)
597     deserializedItems[i] = (E)s.readObject();
598     }
599    
600     /**
601     * Throw away the object created with readObject, and replace it
602     * with a usable ArrayBlockingQueue.
603 dl 1.8 * @return the ArrayBlockingQueue
604 dl 1.5 */
605     private Object readResolve() throws java.io.ObjectStreamException {
606     E[] array = deserializedItems;
607     deserializedItems = null;
608     return new ArrayBlockingQueue(array.length, array, count);
609 tim 1.1 }
610     }