ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.23
Committed: Fri Aug 8 20:05:07 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.22: +24 -48 lines
Log Message:
Scrunched catch, finally, else clauses.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an array.
13 * This queue orders elements FIFO (first-in-first-out).
14 * The <em>head</em> of the queue is that element that has been on the
15 * queue the longest time.
16 * The <em>tail</em> of the queue is that element that has been on the
17 * queue the shortest time.
18 *
19 * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
20 * array holds
21 * elements inserted by producers and extracted by consumers. Once
22 * created, the capacity can not be increased. Attempts to offer an
23 * element to a full queue will result in the offer operation
24 * blocking; attempts to retrieve an element from an empty queue will
25 * similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * threads blocked on an insertion or removal. By default, this
29 * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
30 * constructed with fairness set to <tt>true</tt> grants blocked
31 * threads access in FIFO order. Fairness generally substantially
32 * decreases throughput but reduces variablility and avoids
33 * starvation.
34 *
35 * @since 1.5
36 * @author Doug Lea
37 */
38 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40
41 /** The queued items */
42 private transient final E[] items;
43 /** items index for next take, poll or remove */
44 private transient int takeIndex;
45 /** items index for next put, offer, or add. */
46 private transient int putIndex;
47 /** Number of items in the queue */
48 private int count;
49
50 /**
51 * An array used only during deserialization, to hold
52 * items read back in from the stream, and then used
53 * as "items" by readResolve via the private constructor.
54 */
55 private transient E[] deserializedItems;
56
57 /*
58 * Concurrency control via the classic two-condition algorithm
59 * found in any textbook.
60 */
61
62 /** Main lock guarding all access */
63 private final ReentrantLock lock;
64 /** Condition for waiting takes */
65 private final Condition notEmpty;
66 /** Condition for wiating puts */
67 private final Condition notFull;
68
69 // Internal helper methods
70
71 /**
72 * Circularly increment i.
73 */
74 int inc(int i) {
75 return (++i == items.length)? 0 : i;
76 }
77
78 /**
79 * Insert element at current put position, advance, and signal.
80 * Call only when holding lock.
81 */
82 private void insert(E x) {
83 items[putIndex] = x;
84 putIndex = inc(putIndex);
85 ++count;
86 notEmpty.signal();
87 }
88
89 /**
90 * Extract element at current take position, advance, and signal.
91 * Call only when holding lock.
92 */
93 private E extract() {
94 E x = items[takeIndex];
95 items[takeIndex] = null;
96 takeIndex = inc(takeIndex);
97 --count;
98 notFull.signal();
99 return x;
100 }
101
102 /**
103 * Utility for remove and iterator.remove: Delete item at position i.
104 * Call only when holding lock.
105 */
106 void removeAt(int i) {
107 // if removing front item, just advance
108 if (i == takeIndex) {
109 items[takeIndex] = null;
110 takeIndex = inc(takeIndex);
111 } else {
112 // slide over all others up through putIndex.
113 for (;;) {
114 int nexti = inc(i);
115 if (nexti != putIndex) {
116 items[i] = items[nexti];
117 i = nexti;
118 } else {
119 items[i] = null;
120 putIndex = i;
121 break;
122 }
123 }
124 }
125 --count;
126 notFull.signal();
127 }
128
129 /**
130 * Internal constructor also used by readResolve.
131 * Sets all final fields, plus count.
132 * @param cap the capacity
133 * @param array the array to use or null if should create new one
134 * @param count the number of items in the array, where indices 0
135 * to count-1 hold items.
136 * @param lk the lock to use with this queue
137 */
138 private ArrayBlockingQueue(int cap, E[] array, int count,
139 ReentrantLock lk) {
140 if (cap <= 0)
141 throw new IllegalArgumentException();
142 if (array == null)
143 this.items = (E[]) new Object[cap];
144 else
145 this.items = array;
146 this.putIndex = count;
147 this.count = count;
148 lock = lk;
149 notEmpty = lock.newCondition();
150 notFull = lock.newCondition();
151 }
152
153 /**
154 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
155 * capacity and default access policy.
156 * @param capacity the capacity of this queue
157 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
158 */
159 public ArrayBlockingQueue(int capacity) {
160 this(capacity, null, 0, new ReentrantLock());
161 }
162
163 /**
164 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
165 * capacity and the specified access policy.
166 * @param capacity the capacity of this queue
167 * @param fair if <tt>true</tt> then queue accesses for threads blocked
168 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
169 * the access order is unspecified.
170 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
171 */
172 public ArrayBlockingQueue(int capacity, boolean fair) {
173 this(capacity, null, 0, new ReentrantLock(fair));
174 }
175
176 /**
177 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
178 * capacity, the specified access policy and initially containing the
179 * elements of the given collection,
180 * added in traversal order of the collection's iterator.
181 * @param capacity the capacity of this queue
182 * @param fair if <tt>true</tt> then queue accesses for threads blocked
183 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
184 * the access order is unspecified.
185 * @param c the collection of elements to initially contain
186 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
187 * <tt>c.size()</tt>, or less than 1.
188 * @throws NullPointerException if <tt>c</tt> or any element within it
189 * is <tt>null</tt>
190 */
191 public ArrayBlockingQueue(int capacity, boolean fair,
192 Collection<? extends E> c) {
193 this(capacity, null, 0, new ReentrantLock(fair));
194
195 if (capacity < c.size())
196 throw new IllegalArgumentException();
197
198 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
199 add(it.next());
200 }
201
202 // Have to override just to update the javadoc
203
204 /**
205 * Adds the specified element to the tail of this queue.
206 * @return <tt>true</tt> (as per the general contract of
207 * <tt>Collection.add</tt>).
208 * @throws IllegalStateException {@inheritDoc}
209 * @throws NullPointerException {@inheritDoc}
210 */
211 public boolean add(E o) {
212 return super.add(o);
213 }
214
215 /**
216 * Adds all of the elements in the specified collection to this queue.
217 * The behavior of this operation is undefined if
218 * the specified collection is modified while the operation is in
219 * progress. (This implies that the behavior of this call is undefined if
220 * the specified collection is this queue, and this queue is nonempty.)
221 * <p>
222 * This implementation iterates over the specified collection, and adds
223 * each object returned by the iterator to this queue's tail, in turn.
224 * @throws IllegalStateException {@inheritDoc}
225 * @throws NullPointerException {@inheritDoc}
226 */
227 public boolean addAll(Collection<? extends E> c) {
228 return super.addAll(c);
229 }
230
231 /**
232 * Adds the specified element to the tail of this queue if possible,
233 * returning immediately if this queue is full.
234 *
235 * @throws NullPointerException {@inheritDoc}
236 */
237 public boolean offer(E o) {
238 if (o == null) throw new NullPointerException();
239 lock.lock();
240 try {
241 if (count == items.length)
242 return false;
243 else {
244 insert(o);
245 return true;
246 }
247 } finally {
248 lock.unlock();
249 }
250 }
251
252 /**
253 * Adds the specified element to the tail of this queue, waiting if
254 * necessary up to the specified wait time for space to become available.
255 * @throws NullPointerException {@inheritDoc}
256 */
257 public boolean offer(E o, long timeout, TimeUnit unit)
258 throws InterruptedException {
259
260 if (o == null) throw new NullPointerException();
261
262 lock.lockInterruptibly();
263 try {
264 long nanos = unit.toNanos(timeout);
265 for (;;) {
266 if (count != items.length) {
267 insert(o);
268 return true;
269 }
270 if (nanos <= 0)
271 return false;
272 try {
273 nanos = notFull.awaitNanos(nanos);
274 } catch (InterruptedException ie) {
275 notFull.signal(); // propagate to non-interrupted thread
276 throw ie;
277 }
278 }
279 } finally {
280 lock.unlock();
281 }
282 }
283
284
285 public E poll() {
286 lock.lock();
287 try {
288 if (count == 0)
289 return null;
290 E x = extract();
291 return x;
292 } finally {
293 lock.unlock();
294 }
295 }
296
297 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
298 lock.lockInterruptibly();
299 try {
300 long nanos = unit.toNanos(timeout);
301 for (;;) {
302 if (count != 0) {
303 E x = extract();
304 return x;
305 }
306 if (nanos <= 0)
307 return null;
308 try {
309 nanos = notEmpty.awaitNanos(nanos);
310 } catch (InterruptedException ie) {
311 notEmpty.signal(); // propagate to non-interrupted thread
312 throw ie;
313 }
314
315 }
316 } finally {
317 lock.unlock();
318 }
319 }
320
321
322 /**
323 * Removes a single instance of the specified element from this
324 * queue, if it is present. More formally,
325 * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
326 * o.equals(e))</tt>, if the queue contains one or more such
327 * elements. Returns <tt>true</tt> if the queue contained the
328 * specified element (or equivalently, if the queue changed as a
329 * result of the call).
330 *
331 * <p>This implementation iterates over the queue looking for the
332 * specified element. If it finds the element, it removes the element
333 * from the queue using the iterator's remove method.<p>
334 *
335 */
336 public boolean remove(Object o) {
337 if (o == null) return false;
338 lock.lock();
339 try {
340 int i = takeIndex;
341 int k = 0;
342 for (;;) {
343 if (k++ >= count)
344 return false;
345 if (o.equals(items[i])) {
346 removeAt(i);
347 return true;
348 }
349 i = inc(i);
350 }
351
352 } finally {
353 lock.unlock();
354 }
355 }
356
357 public E peek() {
358 lock.lock();
359 try {
360 return (count == 0) ? null : items[takeIndex];
361 } finally {
362 lock.unlock();
363 }
364 }
365
366 public E take() throws InterruptedException {
367 lock.lockInterruptibly();
368 try {
369 try {
370 while (count == 0)
371 notEmpty.await();
372 } catch (InterruptedException ie) {
373 notEmpty.signal(); // propagate to non-interrupted thread
374 throw ie;
375 }
376 E x = extract();
377 return x;
378 } finally {
379 lock.unlock();
380 }
381 }
382
383 /**
384 * Adds the specified element to the tail of this queue, waiting if
385 * necessary for space to become available.
386 * @throws NullPointerException {@inheritDoc}
387 */
388 public void put(E o) throws InterruptedException {
389
390 if (o == null) throw new NullPointerException();
391
392 lock.lockInterruptibly();
393 try {
394 try {
395 while (count == items.length)
396 notFull.await();
397 } catch (InterruptedException ie) {
398 notFull.signal(); // propagate to non-interrupted thread
399 throw ie;
400 }
401 insert(o);
402 } finally {
403 lock.unlock();
404 }
405 }
406
407 // this doc comment is overridden to remove the reference to collections
408 // greater in size than Integer.MAX_VALUE
409 /**
410 * Returns the number of elements in this collection.
411 */
412 public int size() {
413 lock.lock();
414 try {
415 return count;
416 } finally {
417 lock.unlock();
418 }
419 }
420
421 // this doc comment is a modified copy of the inherited doc comment,
422 // without the reference to unlimited queues.
423 /**
424 * Returns the number of elements that this queue can ideally (in
425 * the absence of memory or resource constraints) accept without
426 * blocking. This is always equal to the initial capacity of this queue
427 * less the current <tt>size</tt> of this queue.
428 * <p>Note that you <em>cannot</em> always tell if
429 * an attempt to <tt>add</tt> an element will succeed by
430 * inspecting <tt>remainingCapacity</tt> because it may be the
431 * case that a waiting consumer is ready to <tt>take</tt> an
432 * element out of an otherwise full queue.
433 */
434 public int remainingCapacity() {
435 lock.lock();
436 try {
437 return items.length - count;
438 } finally {
439 lock.unlock();
440 }
441 }
442
443
444 public boolean contains(Object o) {
445 if (o == null) return false;
446 lock.lock();
447 try {
448 int i = takeIndex;
449 int k = 0;
450 while (k++ < count) {
451 if (o.equals(items[i]))
452 return true;
453 i = inc(i);
454 }
455 return false;
456 } finally {
457 lock.unlock();
458 }
459 }
460
461 public Object[] toArray() {
462 lock.lock();
463 try {
464 E[] a = (E[]) new Object[count];
465 int k = 0;
466 int i = takeIndex;
467 while (k < count) {
468 a[k++] = items[i];
469 i = inc(i);
470 }
471 return a;
472 } finally {
473 lock.unlock();
474 }
475 }
476
477 public <T> T[] toArray(T[] a) {
478 lock.lock();
479 try {
480 if (a.length < count)
481 a = (T[])java.lang.reflect.Array.newInstance(
482 a.getClass().getComponentType(),
483 count
484 );
485
486 int k = 0;
487 int i = takeIndex;
488 while (k < count) {
489 a[k++] = (T)items[i];
490 i = inc(i);
491 }
492 if (a.length > count)
493 a[count] = null;
494 return a;
495 } finally {
496 lock.unlock();
497 }
498 }
499
500 public String toString() {
501 lock.lock();
502 try {
503 return super.toString();
504 } finally {
505 lock.unlock();
506 }
507 }
508
509 /**
510 * Returns an iterator over the elements in this queue in proper sequence.
511 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
512 * will never throw {@link java.util.ConcurrentModificationException},
513 * and guarantees to traverse elements as they existed upon
514 * construction of the iterator, and may (but is not guaranteed to)
515 * reflect any modifications subsequent to construction.
516 *
517 * @return an iterator over the elements in this queue in proper sequence.
518 */
519 public Iterator<E> iterator() {
520 lock.lock();
521 try {
522 return new Itr();
523 } finally {
524 lock.unlock();
525 }
526 }
527
528 /**
529 * Iterator for ArrayBlockingQueue
530 */
531 private class Itr implements Iterator<E> {
532 /**
533 * Index of element to be returned by next,
534 * or a negative number if no such.
535 */
536 private int nextIndex;
537
538 /**
539 * nextItem holds on to item fields because once we claim
540 * that an element exists in hasNext(), we must return it in
541 * the following next() call even if it was in the process of
542 * being removed when hasNext() was called.
543 **/
544 private E nextItem;
545
546 /**
547 * Index of element returned by most recent call to next.
548 * Reset to -1 if this element is deleted by a call to remove.
549 */
550 private int lastRet;
551
552 Itr() {
553 lastRet = -1;
554 if (count == 0)
555 nextIndex = -1;
556 else {
557 nextIndex = takeIndex;
558 nextItem = items[takeIndex];
559 }
560 }
561
562 public boolean hasNext() {
563 /*
564 * No sync. We can return true by mistake here
565 * only if this iterator passed across threads,
566 * which we don't support anyway.
567 */
568 return nextIndex >= 0;
569 }
570
571 /**
572 * Check whether nextIndex is valid; if so setting nextItem.
573 * Stops iterator when either hits putIndex or sees null item.
574 */
575 private void checkNext() {
576 if (nextIndex == putIndex) {
577 nextIndex = -1;
578 nextItem = null;
579 } else {
580 nextItem = items[nextIndex];
581 if (nextItem == null)
582 nextIndex = -1;
583 }
584 }
585
586 public E next() {
587 lock.lock();
588 try {
589 if (nextIndex < 0)
590 throw new NoSuchElementException();
591 lastRet = nextIndex;
592 E x = nextItem;
593 nextIndex = inc(nextIndex);
594 checkNext();
595 return x;
596 } finally {
597 lock.unlock();
598 }
599 }
600
601 public void remove() {
602 lock.lock();
603 try {
604 int i = lastRet;
605 if (i == -1)
606 throw new IllegalStateException();
607 lastRet = -1;
608
609 int ti = takeIndex;
610 removeAt(i);
611 // back up cursor (reset to front if was first element)
612 nextIndex = (i == ti) ? takeIndex : i;
613 checkNext();
614 } finally {
615 lock.unlock();
616 }
617 }
618 }
619
620 /**
621 * Save the state to a stream (that is, serialize it).
622 *
623 * @serialData The maximumSize is emitted (int), followed by all of
624 * its elements (each an <tt>E</tt>) in the proper order.
625 * @param s the stream
626 */
627 private void writeObject(java.io.ObjectOutputStream s)
628 throws java.io.IOException {
629
630 // Write out element count, and any hidden stuff
631 s.defaultWriteObject();
632 // Write out maximumSize == items length
633 s.writeInt(items.length);
634
635 // Write out all elements in the proper order.
636 int i = takeIndex;
637 int k = 0;
638 while (k++ < count) {
639 s.writeObject(items[i]);
640 i = inc(i);
641 }
642 }
643
644 /**
645 * Reconstitute this queue instance from a stream (that is,
646 * deserialize it).
647 * @param s the stream
648 */
649 private void readObject(java.io.ObjectInputStream s)
650 throws java.io.IOException, ClassNotFoundException {
651 // Read in size, and any hidden stuff
652 s.defaultReadObject();
653 int size = count;
654
655 // Read in array length and allocate array
656 int arrayLength = s.readInt();
657
658 // We use deserializedItems here because "items" is final
659 deserializedItems = (E[]) new Object[arrayLength];
660
661 // Read in all elements in the proper order into deserializedItems
662 for (int i = 0; i < size; i++)
663 deserializedItems[i] = (E)s.readObject();
664 }
665
666 /**
667 * Throw away the object created with readObject, and replace it
668 * with a usable ArrayBlockingQueue.
669 * @return the ArrayBlockingQueue
670 */
671 private Object readResolve() throws java.io.ObjectStreamException {
672 E[] array = deserializedItems;
673 deserializedItems = null;
674 return new ArrayBlockingQueue<E>(array.length, array, count, lock);
675 }
676 }