ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.12
Committed: Sat Jul 26 13:17:51 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.11: +39 -39 lines
Log Message:
Default compiler is now 2.2-ea. Some sources are not compatible with 2.0-ea.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.11 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 brian 1.7 * A bounded blocking queue backed by an array. The implementation is
13 dl 1.4 * a classic "bounded buffer", in which a fixed-sized array holds
14 dl 1.11 * elements inserted by producers and extracted by consumers. Once
15     * created, the capacity can not be increased. Attempts to offer an
16     * element to a full queue will result in the offer operation
17     * blocking; attempts to retrieve an element from an empty queue will
18 tim 1.12 * similarly block.
19 dl 1.11 *
20     * <p> This class supports an optional fairness policy for ordering
21     * threads blocked on an insertion or removal. By default, this
22     * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
23     * constructed with fairness set to <tt>true</tt> grants blocked
24     * threads access in FIFO order. Fairness generally substantially
25     * decreases throughput but reduces variablility and avoids
26     * starvation.
27 brian 1.7 *
28 dl 1.8 * @since 1.5
29     * @author Doug Lea
30     */
31 dl 1.5 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
32 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
33    
34 dl 1.8 /** The queued items */
35 tim 1.12 private transient final E[] items;
36 dl 1.8 /** items index for next take, poll or remove */
37 tim 1.12 private transient int takeIndex;
38 dl 1.8 /** items index for next put, offer, or add. */
39 tim 1.12 private transient int putIndex;
40 dl 1.8 /** Number of items in the queue */
41 dl 1.5 private int count;
42 tim 1.12
43 dl 1.5 /**
44     * An array used only during deserialization, to hold
45     * items read back in from the stream, and then used
46     * as "items" by readResolve via the private constructor.
47     */
48     private transient E[] deserializedItems;
49 tim 1.12
50 dl 1.5 /*
51     * Concurrency control via the classic two-condition algorithm
52     * found in any textbook.
53     */
54    
55 dl 1.11 /** Main lock guarding all access */
56     private final ReentrantLock lock;
57 dl 1.8 /** Condition wor waiting takes */
58 dl 1.11 private final Condition notEmpty;
59 dl 1.8 /** Condition for wiating puts */
60 dl 1.11 private final Condition notFull;
61 dl 1.5
62     // Internal helper methods
63    
64     /**
65     * Circularly increment i.
66     */
67     int inc(int i) {
68     return (++i == items.length)? 0 : i;
69     }
70    
71     /**
72 dl 1.9 * Insert element at current put position, advance, and signal.
73     * Call only when holding lock.
74 dl 1.5 */
75     private void insert(E x) {
76     items[putIndex] = x;
77     putIndex = inc(putIndex);
78     ++count;
79 dl 1.9 notEmpty.signal();
80 tim 1.1 }
81 tim 1.12
82 dl 1.5 /**
83 dl 1.9 * Extract element at current take position, advance, and signal.
84     * Call only when holding lock.
85 dl 1.5 */
86     private E extract() {
87     E x = items[takeIndex];
88     items[takeIndex] = null;
89     takeIndex = inc(takeIndex);
90     --count;
91 dl 1.9 notFull.signal();
92 dl 1.5 return x;
93     }
94    
95     /**
96 tim 1.12 * Utility for remove and iterator.remove: Delete item at position i.
97 dl 1.9 * Call only when holding lock.
98 dl 1.5 */
99     void removeAt(int i) {
100 dl 1.9 // if removing front item, just advance
101     if (i == takeIndex) {
102     items[takeIndex] = null;
103     takeIndex = inc(takeIndex);
104     }
105     else {
106     // slide over all others up through putIndex.
107     for (;;) {
108     int nexti = inc(i);
109     if (nexti != putIndex) {
110     items[i] = items[nexti];
111     i = nexti;
112     }
113     else {
114     items[i] = null;
115     putIndex = i;
116     break;
117     }
118 dl 1.5 }
119     }
120 dl 1.9 --count;
121     notFull.signal();
122 tim 1.1 }
123    
124     /**
125 tim 1.12 * Internal constructor also used by readResolve.
126 dl 1.5 * Sets all final fields, plus count.
127 tim 1.12 * @param cap the maximumSize
128 dl 1.5 * @param array the array to use or null if should create new one
129     * @param count the number of items in the array, where indices 0
130     * to count-1 hold items.
131     */
132 dl 1.11 private ArrayBlockingQueue(int cap, E[] array, int count, ReentrantLock lk) {
133 tim 1.12 if (cap <= 0)
134 dl 1.5 throw new IllegalArgumentException();
135     if (array == null)
136 tim 1.12 this.items = (E[]) new Object[cap];
137 dl 1.5 else
138     this.items = array;
139     this.putIndex = count;
140     this.count = count;
141 dl 1.11 lock = lk;
142     notEmpty = lock.newCondition();
143     notFull = lock.newCondition();
144 dl 1.5 }
145 tim 1.12
146 dl 1.5 /**
147 dl 1.11 * Creates a new ArrayBlockingQueue with the given (fixed) capacity
148     * and default access policy.
149 dl 1.5 * @param maximumSize the capacity
150     */
151     public ArrayBlockingQueue(int maximumSize) {
152 dl 1.11 this(maximumSize, null, 0, new ReentrantLock());
153 dl 1.5 }
154 dl 1.2
155 dl 1.5 /**
156 dl 1.11 * Creates a new ArrayBlockingQueue with the given (fixed) capacity.
157 dl 1.5 * @param maximumSize the capacity
158 dl 1.11 * @param fair true if queue access should use a fair policy
159     */
160     public ArrayBlockingQueue(int maximumSize, boolean fair) {
161     this(maximumSize, null, 0, new ReentrantLock(fair));
162 dl 1.5 }
163 dl 1.2
164 brian 1.7 /** Return the number of elements currently in the queue */
165 dl 1.5 public int size() {
166     lock.lock();
167     try {
168     return count;
169     }
170     finally {
171     lock.unlock();
172     }
173     }
174    
175 brian 1.7 /** Return the remaining capacity of the queue, which is the
176     * number of elements that can be inserted before the queue is
177     * full. */
178 dl 1.5 public int remainingCapacity() {
179     lock.lock();
180     try {
181     return items.length - count;
182     }
183     finally {
184     lock.unlock();
185 dl 1.2 }
186 dl 1.5 }
187 dl 1.2
188 brian 1.7
189     /** Insert a new element into the queue, blocking if the queue is full. */
190 dl 1.5 public void put(E x) throws InterruptedException {
191 dl 1.8 if (x == null) throw new NullPointerException();
192 dl 1.5 lock.lockInterruptibly();
193     try {
194     try {
195     while (count == items.length)
196     notFull.await();
197     }
198     catch (InterruptedException ie) {
199     notFull.signal(); // propagate to non-interrupted thread
200     throw ie;
201 dl 1.2 }
202 dl 1.5 insert(x);
203 dl 1.2 }
204 dl 1.5 finally {
205     lock.unlock();
206     }
207     }
208 dl 1.2
209 brian 1.7 /** Remove and return the first element from the queue, blocking
210 tim 1.12 * if the queue is empty.
211 dl 1.8 */
212 dl 1.5 public E take() throws InterruptedException {
213     lock.lockInterruptibly();
214     try {
215     try {
216     while (count == 0)
217     notEmpty.await();
218     }
219     catch (InterruptedException ie) {
220     notEmpty.signal(); // propagate to non-interrupted thread
221     throw ie;
222     }
223     E x = extract();
224     return x;
225 dl 1.2 }
226 dl 1.5 finally {
227     lock.unlock();
228 dl 1.2 }
229 dl 1.5 }
230    
231 brian 1.7 /** Attempt to insert a new element into the queue, but return
232     * immediately without inserting the element if the queue is full.
233 dl 1.8 * @return <tt>true</tt> if the element was inserted successfully,
234     * <tt>false</tt> otherwise
235 brian 1.7 */
236 dl 1.5 public boolean offer(E x) {
237 dl 1.8 if (x == null) throw new NullPointerException();
238 dl 1.5 lock.lock();
239     try {
240 tim 1.12 if (count == items.length)
241 dl 1.2 return false;
242 dl 1.5 else {
243     insert(x);
244     return true;
245     }
246     }
247     finally {
248 tim 1.12 lock.unlock();
249 dl 1.2 }
250 dl 1.5 }
251 dl 1.2
252 brian 1.7 /** Attempt to retrieve the first insert element from the queue,
253     * but return immediately if the queue is empty.
254 dl 1.8 * @return The first element of the queue if the queue is not
255     * empty, or <tt>null</tt> otherwise.
256 brian 1.7 */
257 dl 1.5 public E poll() {
258     lock.lock();
259     try {
260     if (count == 0)
261 dl 1.2 return null;
262 dl 1.5 E x = extract();
263 dl 1.2 return x;
264     }
265 dl 1.5 finally {
266 tim 1.12 lock.unlock();
267 dl 1.5 }
268     }
269 dl 1.2
270 brian 1.7 /** Attempt to insert a new element into the queue. If the queue
271     * is full, wait up to the specified amount of time before giving
272     * up.
273     * @param x the element to be inserted
274     * @param timeout how long to wait before giving up, in units of
275     * <tt>unit</tt>
276     * @param unit a TimeUnit determining how to interpret the timeout
277     * parameter
278     * @return <tt>true</tt> if the element was inserted successfully,
279     * <tt>false</tt> otherwise
280 dl 1.8 * @throws InterruptedException if interrupted while waiting
281 brian 1.7 */
282 dl 1.5 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
283 dl 1.8 if (x == null) throw new NullPointerException();
284 dl 1.5 lock.lockInterruptibly();
285     long nanos = unit.toNanos(timeout);
286     try {
287     for (;;) {
288     if (count != items.length) {
289     insert(x);
290     return true;
291     }
292     if (nanos <= 0)
293     return false;
294     try {
295     nanos = notFull.awaitNanos(nanos);
296     }
297     catch (InterruptedException ie) {
298     notFull.signal(); // propagate to non-interrupted thread
299     throw ie;
300     }
301     }
302     }
303     finally {
304     lock.unlock();
305 dl 1.2 }
306 dl 1.5 }
307 dl 1.2
308 brian 1.7 /**
309     * Attempt to retrieve the first insert element from the queue.
310     * If the queue is empty, wait up to the specified amount of time
311     * before giving up.
312     * @param timeout how long to wait before giving up, in units of
313     * <tt>unit</tt>
314     * @param unit a TimeUnit determining how to interpret the timeout
315     * parameter
316     * @return The first element of the queue if an item was
317     * successfully retrieved, or <tt>null</tt> otherwise.
318 dl 1.8 * @throws InterruptedException if interrupted while waiting
319 brian 1.7 *
320     */
321 dl 1.5 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
322     lock.lockInterruptibly();
323     long nanos = unit.toNanos(timeout);
324     try {
325 dl 1.2 for (;;) {
326 dl 1.5 if (count != 0) {
327     E x = extract();
328     return x;
329 dl 1.2 }
330 dl 1.5 if (nanos <= 0)
331     return null;
332     try {
333     nanos = notEmpty.awaitNanos(nanos);
334     }
335     catch (InterruptedException ie) {
336     notEmpty.signal(); // propagate to non-interrupted thread
337     throw ie;
338     }
339    
340 dl 1.2 }
341     }
342 dl 1.5 finally {
343     lock.unlock();
344     }
345     }
346 dl 1.2
347 brian 1.7 /** Return, but do not remove, the first element from the queue,
348     * if the queue is not empty. This will return the same result as
349     * <tt>poll</tt>, but will not remove it from the queue.
350     * @return The first element of the queue if the queue is not
351     * empty, or <tt>null</tt> otherwise.
352     */
353 dl 1.5 public E peek() {
354     lock.lock();
355     try {
356 dl 1.8 return (count == 0) ? null : items[takeIndex];
357 dl 1.5 }
358     finally {
359     lock.unlock();
360     }
361     }
362 brian 1.7
363    
364 dl 1.5 public boolean remove(Object x) {
365 dl 1.8 if (x == null) return false;
366 dl 1.5 lock.lock();
367     try {
368     int i = takeIndex;
369     int k = 0;
370     for (;;) {
371     if (k++ >= count)
372     return false;
373     if (x.equals(items[i])) {
374     removeAt(i);
375     return true;
376     }
377 dl 1.2 i = inc(i);
378 dl 1.5 }
379 tim 1.12
380 dl 1.2 }
381 dl 1.5 finally {
382     lock.unlock();
383     }
384     }
385 brian 1.7
386 dl 1.5 public boolean contains(Object x) {
387 dl 1.8 if (x == null) return false;
388 dl 1.5 lock.lock();
389     try {
390     int i = takeIndex;
391     int k = 0;
392     while (k++ < count) {
393 dl 1.2 if (x.equals(items[i]))
394     return true;
395 dl 1.5 i = inc(i);
396     }
397 dl 1.2 return false;
398     }
399 dl 1.5 finally {
400     lock.unlock();
401     }
402     }
403 brian 1.7
404 dl 1.5 public Object[] toArray() {
405     lock.lock();
406     try {
407 tim 1.12 E[] a = (E[]) new Object[count];
408 dl 1.5 int k = 0;
409     int i = takeIndex;
410     while (k < count) {
411 dl 1.2 a[k++] = items[i];
412 dl 1.5 i = inc(i);
413     }
414 dl 1.2 return a;
415     }
416 dl 1.5 finally {
417     lock.unlock();
418     }
419     }
420 brian 1.7
421 dl 1.5 public <T> T[] toArray(T[] a) {
422     lock.lock();
423     try {
424     if (a.length < count)
425     a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
426 tim 1.12
427 dl 1.5 int k = 0;
428     int i = takeIndex;
429     while (k < count) {
430 dl 1.2 a[k++] = (T)items[i];
431 dl 1.5 i = inc(i);
432     }
433     if (a.length > count)
434     a[count] = null;
435 dl 1.2 return a;
436     }
437 dl 1.5 finally {
438     lock.unlock();
439     }
440     }
441 dl 1.6
442     public String toString() {
443     lock.lock();
444     try {
445     return super.toString();
446     }
447     finally {
448     lock.unlock();
449     }
450     }
451 tim 1.12
452 brian 1.7 /**
453     * Returns an iterator over the elements in this queue in proper sequence.
454     *
455     * @return an iterator over the elements in this queue in proper sequence.
456     */
457 dl 1.5 public Iterator<E> iterator() {
458     lock.lock();
459     try {
460 dl 1.2 return new Itr();
461     }
462 dl 1.5 finally {
463     lock.unlock();
464     }
465     }
466 dl 1.8
467     /**
468     * Iterator for ArrayBlockingQueue
469     */
470 dl 1.5 private class Itr implements Iterator<E> {
471     /**
472     * Index of element to be returned by next,
473     * or a negative number if no such.
474     */
475 dl 1.8 private int nextIndex;
476 dl 1.2
477 tim 1.12 /**
478 dl 1.5 * nextItem holds on to item fields because once we claim
479     * that an element exists in hasNext(), we must return it in
480     * the following next() call even if it was in the process of
481     * being removed when hasNext() was called.
482     **/
483 dl 1.8 private E nextItem;
484 dl 1.5
485     /**
486     * Index of element returned by most recent call to next.
487     * Reset to -1 if this element is deleted by a call to remove.
488     */
489 dl 1.8 private int lastRet;
490 tim 1.12
491 dl 1.5 Itr() {
492     lastRet = -1;
493 tim 1.12 if (count == 0)
494 dl 1.5 nextIndex = -1;
495     else {
496     nextIndex = takeIndex;
497     nextItem = items[takeIndex];
498     }
499     }
500 tim 1.12
501 dl 1.5 public boolean hasNext() {
502     /*
503     * No sync. We can return true by mistake here
504     * only if this iterator passed across threads,
505     * which we don't support anyway.
506 dl 1.2 */
507 dl 1.5 return nextIndex >= 0;
508     }
509    
510     /**
511     * Check whether nextIndex is valied; if so setting nextItem.
512     * Stops iterator when either hits putIndex or sees null item.
513     */
514     private void checkNext() {
515     if (nextIndex == putIndex) {
516     nextIndex = -1;
517     nextItem = null;
518 dl 1.2 }
519 dl 1.5 else {
520     nextItem = items[nextIndex];
521     if (nextItem == null)
522     nextIndex = -1;
523 dl 1.2 }
524 dl 1.5 }
525 tim 1.12
526 dl 1.5 public E next() {
527     lock.lock();
528     try {
529     if (nextIndex < 0)
530 dl 1.2 throw new NoSuchElementException();
531 dl 1.5 lastRet = nextIndex;
532     E x = nextItem;
533     nextIndex = inc(nextIndex);
534     checkNext();
535     return x;
536     }
537     finally {
538 tim 1.12 lock.unlock();
539 dl 1.2 }
540 dl 1.5 }
541 tim 1.12
542 dl 1.5 public void remove() {
543     lock.lock();
544     try {
545 dl 1.2 int i = lastRet;
546     if (i == -1)
547     throw new IllegalStateException();
548     lastRet = -1;
549 tim 1.12
550 dl 1.9 int ti = takeIndex;
551 dl 1.2 removeAt(i);
552 dl 1.9 // back up cursor (reset to front if was first element)
553 tim 1.12 nextIndex = (i == ti) ? takeIndex : i;
554 dl 1.5 checkNext();
555 dl 1.2 }
556 dl 1.5 finally {
557     lock.unlock();
558     }
559     }
560     }
561 tim 1.12
562 dl 1.5 /**
563     * Save the state to a stream (that is, serialize it).
564     *
565     * @serialData The maximumSize is emitted (int), followed by all of
566     * its elements (each an <tt>E</tt>) in the proper order.
567 dl 1.8 * @param s the stream
568 dl 1.5 */
569     private void writeObject(java.io.ObjectOutputStream s)
570     throws java.io.IOException {
571 tim 1.12
572 dl 1.5 // Write out element count, and any hidden stuff
573     s.defaultWriteObject();
574     // Write out maximumSize == items length
575     s.writeInt(items.length);
576 tim 1.12
577 dl 1.5 // Write out all elements in the proper order.
578     int i = takeIndex;
579     int k = 0;
580     while (k++ < count) {
581     s.writeObject(items[i]);
582     i = inc(i);
583 dl 1.2 }
584 dl 1.5 }
585 tim 1.12
586 dl 1.5 /**
587     * Reconstitute the Queue instance from a stream (that is,
588     * deserialize it).
589 dl 1.8 * @param s the stream
590 dl 1.5 */
591     private void readObject(java.io.ObjectInputStream s)
592     throws java.io.IOException, ClassNotFoundException {
593     // Read in size, and any hidden stuff
594     s.defaultReadObject();
595     int size = count;
596 tim 1.12
597 dl 1.5 // Read in array length and allocate array
598     int arrayLength = s.readInt();
599 tim 1.12
600 dl 1.5 // We use deserializedItems here because "items" is final
601 tim 1.12 deserializedItems = (E[]) new Object[arrayLength];
602    
603 dl 1.5 // Read in all elements in the proper order into deserializedItems
604     for (int i = 0; i < size; i++)
605     deserializedItems[i] = (E)s.readObject();
606     }
607 tim 1.12
608 dl 1.5 /**
609     * Throw away the object created with readObject, and replace it
610     * with a usable ArrayBlockingQueue.
611 dl 1.8 * @return the ArrayBlockingQueue
612 dl 1.5 */
613     private Object readResolve() throws java.io.ObjectStreamException {
614     E[] array = deserializedItems;
615     deserializedItems = null;
616 dl 1.11 return new ArrayBlockingQueue(array.length, array, count, lock);
617 tim 1.1 }
618     }