ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.54
Committed: Sun May 28 23:36:29 2006 UTC (18 years ago) by jsr166
Branch: MAIN
Changes since 1.53: +1 -1 lines
Log Message:
Location of Collections Guide has changed

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
19 *
20 * <p>This is a classic &quot;bounded buffer&quot;, in which a
21 * fixed-sized array holds elements inserted by producers and
22 * extracted by consumers. Once created, the capacity cannot be
23 * increased. Attempts to <tt>put</tt> an element into a full queue
24 * will result in the operation blocking; attempts to <tt>take</tt> an
25 * element from an empty queue will similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * waiting producer and consumer threads. By default, this ordering
29 * is not guaranteed. However, a queue constructed with fairness set
30 * to <tt>true</tt> grants threads access in FIFO order. Fairness
31 * generally decreases throughput but reduces variability and avoids
32 * starvation.
33 *
34 * <p>This class and its iterator implement all of the
35 * <em>optional</em> methods of the {@link Collection} and {@link
36 * Iterator} interfaces.
37 *
38 * <p>This class is a member of the
39 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 * Java Collections Framework</a>.
41 *
42 * @since 1.5
43 * @author Doug Lea
44 * @param <E> the type of elements held in this collection
45 */
46 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47 implements BlockingQueue<E>, java.io.Serializable {
48
49 /**
50 * Serialization ID. This class relies on default serialization
51 * even for the items array, which is default-serialized, even if
52 * it is empty. Otherwise it could not be declared final, which is
53 * necessary here.
54 */
55 private static final long serialVersionUID = -817911632652898426L;
56
57 /** The queued items */
58 private final E[] items;
59 /** items index for next take, poll or remove */
60 private transient int takeIndex;
61 /** items index for next put, offer, or add. */
62 private transient int putIndex;
63 /** Number of items in the queue */
64 private int count;
65
66 /*
67 * Concurrency control uses the classic two-condition algorithm
68 * found in any textbook.
69 */
70
71 /** Main lock guarding all access */
72 private final ReentrantLock lock;
73 /** Condition for waiting takes */
74 private final Condition notEmpty;
75 /** Condition for waiting puts */
76 private final Condition notFull;
77
78 // Internal helper methods
79
80 /**
81 * Circularly increment i.
82 */
83 final int inc(int i) {
84 return (++i == items.length)? 0 : i;
85 }
86
87 /**
88 * Inserts element at current put position, advances, and signals.
89 * Call only when holding lock.
90 */
91 private void insert(E x) {
92 items[putIndex] = x;
93 putIndex = inc(putIndex);
94 ++count;
95 notEmpty.signal();
96 }
97
98 /**
99 * Extracts element at current take position, advances, and signals.
100 * Call only when holding lock.
101 */
102 private E extract() {
103 final E[] items = this.items;
104 E x = items[takeIndex];
105 items[takeIndex] = null;
106 takeIndex = inc(takeIndex);
107 --count;
108 notFull.signal();
109 return x;
110 }
111
112 /**
113 * Utility for remove and iterator.remove: Delete item at position i.
114 * Call only when holding lock.
115 */
116 void removeAt(int i) {
117 final E[] items = this.items;
118 // if removing front item, just advance
119 if (i == takeIndex) {
120 items[takeIndex] = null;
121 takeIndex = inc(takeIndex);
122 } else {
123 // slide over all others up through putIndex.
124 for (;;) {
125 int nexti = inc(i);
126 if (nexti != putIndex) {
127 items[i] = items[nexti];
128 i = nexti;
129 } else {
130 items[i] = null;
131 putIndex = i;
132 break;
133 }
134 }
135 }
136 --count;
137 notFull.signal();
138 }
139
140 /**
141 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
142 * capacity and default access policy.
143 *
144 * @param capacity the capacity of this queue
145 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
146 */
147 public ArrayBlockingQueue(int capacity) {
148 this(capacity, false);
149 }
150
151 /**
152 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
153 * capacity and the specified access policy.
154 *
155 * @param capacity the capacity of this queue
156 * @param fair if <tt>true</tt> then queue accesses for threads blocked
157 * on insertion or removal, are processed in FIFO order;
158 * if <tt>false</tt> the access order is unspecified.
159 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
160 */
161 public ArrayBlockingQueue(int capacity, boolean fair) {
162 if (capacity <= 0)
163 throw new IllegalArgumentException();
164 this.items = (E[]) new Object[capacity];
165 lock = new ReentrantLock(fair);
166 notEmpty = lock.newCondition();
167 notFull = lock.newCondition();
168 }
169
170 /**
171 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
172 * capacity, the specified access policy and initially containing the
173 * elements of the given collection,
174 * added in traversal order of the collection's iterator.
175 *
176 * @param capacity the capacity of this queue
177 * @param fair if <tt>true</tt> then queue accesses for threads blocked
178 * on insertion or removal, are processed in FIFO order;
179 * if <tt>false</tt> the access order is unspecified.
180 * @param c the collection of elements to initially contain
181 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
182 * <tt>c.size()</tt>, or less than 1.
183 * @throws NullPointerException if the specified collection or any
184 * of its elements are null
185 */
186 public ArrayBlockingQueue(int capacity, boolean fair,
187 Collection<? extends E> c) {
188 this(capacity, fair);
189 if (capacity < c.size())
190 throw new IllegalArgumentException();
191
192 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
193 add(it.next());
194 }
195
196 /**
197 * Inserts the specified element at the tail of this queue if it is
198 * possible to do so immediately without exceeding the queue's capacity,
199 * returning <tt>true</tt> upon success and throwing an
200 * <tt>IllegalStateException</tt> if this queue is full.
201 *
202 * @param e the element to add
203 * @return <tt>true</tt> (as specified by {@link Collection#add})
204 * @throws IllegalStateException if this queue is full
205 * @throws NullPointerException if the specified element is null
206 */
207 public boolean add(E e) {
208 return super.add(e);
209 }
210
211 /**
212 * Inserts the specified element at the tail of this queue if it is
213 * possible to do so immediately without exceeding the queue's capacity,
214 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
215 * is full. This method is generally preferable to method {@link #add},
216 * which can fail to insert an element only by throwing an exception.
217 *
218 * @throws NullPointerException if the specified element is null
219 */
220 public boolean offer(E e) {
221 if (e == null) throw new NullPointerException();
222 final ReentrantLock lock = this.lock;
223 lock.lock();
224 try {
225 if (count == items.length)
226 return false;
227 else {
228 insert(e);
229 return true;
230 }
231 } finally {
232 lock.unlock();
233 }
234 }
235
236 /**
237 * Inserts the specified element at the tail of this queue, waiting
238 * for space to become available if the queue is full.
239 *
240 * @throws InterruptedException {@inheritDoc}
241 * @throws NullPointerException {@inheritDoc}
242 */
243 public void put(E e) throws InterruptedException {
244 if (e == null) throw new NullPointerException();
245 final E[] items = this.items;
246 final ReentrantLock lock = this.lock;
247 lock.lockInterruptibly();
248 try {
249 try {
250 while (count == items.length)
251 notFull.await();
252 } catch (InterruptedException ie) {
253 notFull.signal(); // propagate to non-interrupted thread
254 throw ie;
255 }
256 insert(e);
257 } finally {
258 lock.unlock();
259 }
260 }
261
262 /**
263 * Inserts the specified element at the tail of this queue, waiting
264 * up to the specified wait time for space to become available if
265 * the queue is full.
266 *
267 * @throws InterruptedException {@inheritDoc}
268 * @throws NullPointerException {@inheritDoc}
269 */
270 public boolean offer(E e, long timeout, TimeUnit unit)
271 throws InterruptedException {
272
273 if (e == null) throw new NullPointerException();
274 long nanos = unit.toNanos(timeout);
275 final ReentrantLock lock = this.lock;
276 lock.lockInterruptibly();
277 try {
278 for (;;) {
279 if (count != items.length) {
280 insert(e);
281 return true;
282 }
283 if (nanos <= 0)
284 return false;
285 try {
286 nanos = notFull.awaitNanos(nanos);
287 } catch (InterruptedException ie) {
288 notFull.signal(); // propagate to non-interrupted thread
289 throw ie;
290 }
291 }
292 } finally {
293 lock.unlock();
294 }
295 }
296
297 public E poll() {
298 final ReentrantLock lock = this.lock;
299 lock.lock();
300 try {
301 if (count == 0)
302 return null;
303 E x = extract();
304 return x;
305 } finally {
306 lock.unlock();
307 }
308 }
309
310 public E take() throws InterruptedException {
311 final ReentrantLock lock = this.lock;
312 lock.lockInterruptibly();
313 try {
314 try {
315 while (count == 0)
316 notEmpty.await();
317 } catch (InterruptedException ie) {
318 notEmpty.signal(); // propagate to non-interrupted thread
319 throw ie;
320 }
321 E x = extract();
322 return x;
323 } finally {
324 lock.unlock();
325 }
326 }
327
328 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
329 long nanos = unit.toNanos(timeout);
330 final ReentrantLock lock = this.lock;
331 lock.lockInterruptibly();
332 try {
333 for (;;) {
334 if (count != 0) {
335 E x = extract();
336 return x;
337 }
338 if (nanos <= 0)
339 return null;
340 try {
341 nanos = notEmpty.awaitNanos(nanos);
342 } catch (InterruptedException ie) {
343 notEmpty.signal(); // propagate to non-interrupted thread
344 throw ie;
345 }
346
347 }
348 } finally {
349 lock.unlock();
350 }
351 }
352
353 public E peek() {
354 final ReentrantLock lock = this.lock;
355 lock.lock();
356 try {
357 return (count == 0) ? null : items[takeIndex];
358 } finally {
359 lock.unlock();
360 }
361 }
362
363 // this doc comment is overridden to remove the reference to collections
364 // greater in size than Integer.MAX_VALUE
365 /**
366 * Returns the number of elements in this queue.
367 *
368 * @return the number of elements in this queue
369 */
370 public int size() {
371 final ReentrantLock lock = this.lock;
372 lock.lock();
373 try {
374 return count;
375 } finally {
376 lock.unlock();
377 }
378 }
379
380 // this doc comment is a modified copy of the inherited doc comment,
381 // without the reference to unlimited queues.
382 /**
383 * Returns the number of additional elements that this queue can ideally
384 * (in the absence of memory or resource constraints) accept without
385 * blocking. This is always equal to the initial capacity of this queue
386 * less the current <tt>size</tt> of this queue.
387 *
388 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
389 * an element will succeed by inspecting <tt>remainingCapacity</tt>
390 * because it may be the case that another thread is about to
391 * insert or remove an element.
392 */
393 public int remainingCapacity() {
394 final ReentrantLock lock = this.lock;
395 lock.lock();
396 try {
397 return items.length - count;
398 } finally {
399 lock.unlock();
400 }
401 }
402
403 /**
404 * Removes a single instance of the specified element from this queue,
405 * if it is present. More formally, removes an element <tt>e</tt> such
406 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
407 * elements.
408 * Returns <tt>true</tt> if this queue contained the specified element
409 * (or equivalently, if this queue changed as a result of the call).
410 *
411 * @param o element to be removed from this queue, if present
412 * @return <tt>true</tt> if this queue changed as a result of the call
413 */
414 public boolean remove(Object o) {
415 if (o == null) return false;
416 final E[] items = this.items;
417 final ReentrantLock lock = this.lock;
418 lock.lock();
419 try {
420 int i = takeIndex;
421 int k = 0;
422 for (;;) {
423 if (k++ >= count)
424 return false;
425 if (o.equals(items[i])) {
426 removeAt(i);
427 return true;
428 }
429 i = inc(i);
430 }
431
432 } finally {
433 lock.unlock();
434 }
435 }
436
437 /**
438 * Returns <tt>true</tt> if this queue contains the specified element.
439 * More formally, returns <tt>true</tt> if and only if this queue contains
440 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
441 *
442 * @param o object to be checked for containment in this queue
443 * @return <tt>true</tt> if this queue contains the specified element
444 */
445 public boolean contains(Object o) {
446 if (o == null) return false;
447 final E[] items = this.items;
448 final ReentrantLock lock = this.lock;
449 lock.lock();
450 try {
451 int i = takeIndex;
452 int k = 0;
453 while (k++ < count) {
454 if (o.equals(items[i]))
455 return true;
456 i = inc(i);
457 }
458 return false;
459 } finally {
460 lock.unlock();
461 }
462 }
463
464 /**
465 * Returns an array containing all of the elements in this queue, in
466 * proper sequence.
467 *
468 * <p>The returned array will be "safe" in that no references to it are
469 * maintained by this queue. (In other words, this method must allocate
470 * a new array). The caller is thus free to modify the returned array.
471 *
472 * <p>This method acts as bridge between array-based and collection-based
473 * APIs.
474 *
475 * @return an array containing all of the elements in this queue
476 */
477 public Object[] toArray() {
478 final E[] items = this.items;
479 final ReentrantLock lock = this.lock;
480 lock.lock();
481 try {
482 Object[] a = new Object[count];
483 int k = 0;
484 int i = takeIndex;
485 while (k < count) {
486 a[k++] = items[i];
487 i = inc(i);
488 }
489 return a;
490 } finally {
491 lock.unlock();
492 }
493 }
494
495 /**
496 * Returns an array containing all of the elements in this queue, in
497 * proper sequence; the runtime type of the returned array is that of
498 * the specified array. If the queue fits in the specified array, it
499 * is returned therein. Otherwise, a new array is allocated with the
500 * runtime type of the specified array and the size of this queue.
501 *
502 * <p>If this queue fits in the specified array with room to spare
503 * (i.e., the array has more elements than this queue), the element in
504 * the array immediately following the end of the queue is set to
505 * <tt>null</tt>.
506 *
507 * <p>Like the {@link #toArray()} method, this method acts as bridge between
508 * array-based and collection-based APIs. Further, this method allows
509 * precise control over the runtime type of the output array, and may,
510 * under certain circumstances, be used to save allocation costs.
511 *
512 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
513 * The following code can be used to dump the queue into a newly
514 * allocated array of <tt>String</tt>:
515 *
516 * <pre>
517 * String[] y = x.toArray(new String[0]);</pre>
518 *
519 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
520 * <tt>toArray()</tt>.
521 *
522 * @param a the array into which the elements of the queue are to
523 * be stored, if it is big enough; otherwise, a new array of the
524 * same runtime type is allocated for this purpose
525 * @return an array containing all of the elements in this queue
526 * @throws ArrayStoreException if the runtime type of the specified array
527 * is not a supertype of the runtime type of every element in
528 * this queue
529 * @throws NullPointerException if the specified array is null
530 */
531 public <T> T[] toArray(T[] a) {
532 final E[] items = this.items;
533 final ReentrantLock lock = this.lock;
534 lock.lock();
535 try {
536 if (a.length < count)
537 a = (T[])java.lang.reflect.Array.newInstance(
538 a.getClass().getComponentType(),
539 count
540 );
541
542 int k = 0;
543 int i = takeIndex;
544 while (k < count) {
545 a[k++] = (T)items[i];
546 i = inc(i);
547 }
548 if (a.length > count)
549 a[count] = null;
550 return a;
551 } finally {
552 lock.unlock();
553 }
554 }
555
556 public String toString() {
557 final ReentrantLock lock = this.lock;
558 lock.lock();
559 try {
560 return super.toString();
561 } finally {
562 lock.unlock();
563 }
564 }
565
566 /**
567 * Atomically removes all of the elements from this queue.
568 * The queue will be empty after this call returns.
569 */
570 public void clear() {
571 final E[] items = this.items;
572 final ReentrantLock lock = this.lock;
573 lock.lock();
574 try {
575 int i = takeIndex;
576 int k = count;
577 while (k-- > 0) {
578 items[i] = null;
579 i = inc(i);
580 }
581 count = 0;
582 putIndex = 0;
583 takeIndex = 0;
584 notFull.signalAll();
585 } finally {
586 lock.unlock();
587 }
588 }
589
590 /**
591 * @throws UnsupportedOperationException {@inheritDoc}
592 * @throws ClassCastException {@inheritDoc}
593 * @throws NullPointerException {@inheritDoc}
594 * @throws IllegalArgumentException {@inheritDoc}
595 */
596 public int drainTo(Collection<? super E> c) {
597 if (c == null)
598 throw new NullPointerException();
599 if (c == this)
600 throw new IllegalArgumentException();
601 final E[] items = this.items;
602 final ReentrantLock lock = this.lock;
603 lock.lock();
604 try {
605 int i = takeIndex;
606 int n = 0;
607 int max = count;
608 while (n < max) {
609 c.add(items[i]);
610 items[i] = null;
611 i = inc(i);
612 ++n;
613 }
614 if (n > 0) {
615 count = 0;
616 putIndex = 0;
617 takeIndex = 0;
618 notFull.signalAll();
619 }
620 return n;
621 } finally {
622 lock.unlock();
623 }
624 }
625
626 /**
627 * @throws UnsupportedOperationException {@inheritDoc}
628 * @throws ClassCastException {@inheritDoc}
629 * @throws NullPointerException {@inheritDoc}
630 * @throws IllegalArgumentException {@inheritDoc}
631 */
632 public int drainTo(Collection<? super E> c, int maxElements) {
633 if (c == null)
634 throw new NullPointerException();
635 if (c == this)
636 throw new IllegalArgumentException();
637 if (maxElements <= 0)
638 return 0;
639 final E[] items = this.items;
640 final ReentrantLock lock = this.lock;
641 lock.lock();
642 try {
643 int i = takeIndex;
644 int n = 0;
645 int sz = count;
646 int max = (maxElements < count)? maxElements : count;
647 while (n < max) {
648 c.add(items[i]);
649 items[i] = null;
650 i = inc(i);
651 ++n;
652 }
653 if (n > 0) {
654 count -= n;
655 takeIndex = i;
656 notFull.signalAll();
657 }
658 return n;
659 } finally {
660 lock.unlock();
661 }
662 }
663
664
665 /**
666 * Returns an iterator over the elements in this queue in proper sequence.
667 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
668 * will never throw {@link ConcurrentModificationException},
669 * and guarantees to traverse elements as they existed upon
670 * construction of the iterator, and may (but is not guaranteed to)
671 * reflect any modifications subsequent to construction.
672 *
673 * @return an iterator over the elements in this queue in proper sequence
674 */
675 public Iterator<E> iterator() {
676 final ReentrantLock lock = this.lock;
677 lock.lock();
678 try {
679 return new Itr();
680 } finally {
681 lock.unlock();
682 }
683 }
684
685 /**
686 * Iterator for ArrayBlockingQueue
687 */
688 private class Itr implements Iterator<E> {
689 /**
690 * Index of element to be returned by next,
691 * or a negative number if no such.
692 */
693 private int nextIndex;
694
695 /**
696 * nextItem holds on to item fields because once we claim
697 * that an element exists in hasNext(), we must return it in
698 * the following next() call even if it was in the process of
699 * being removed when hasNext() was called.
700 */
701 private E nextItem;
702
703 /**
704 * Index of element returned by most recent call to next.
705 * Reset to -1 if this element is deleted by a call to remove.
706 */
707 private int lastRet;
708
709 Itr() {
710 lastRet = -1;
711 if (count == 0)
712 nextIndex = -1;
713 else {
714 nextIndex = takeIndex;
715 nextItem = items[takeIndex];
716 }
717 }
718
719 public boolean hasNext() {
720 /*
721 * No sync. We can return true by mistake here
722 * only if this iterator passed across threads,
723 * which we don't support anyway.
724 */
725 return nextIndex >= 0;
726 }
727
728 /**
729 * Checks whether nextIndex is valid; if so setting nextItem.
730 * Stops iterator when either hits putIndex or sees null item.
731 */
732 private void checkNext() {
733 if (nextIndex == putIndex) {
734 nextIndex = -1;
735 nextItem = null;
736 } else {
737 nextItem = items[nextIndex];
738 if (nextItem == null)
739 nextIndex = -1;
740 }
741 }
742
743 public E next() {
744 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
745 lock.lock();
746 try {
747 if (nextIndex < 0)
748 throw new NoSuchElementException();
749 lastRet = nextIndex;
750 E x = nextItem;
751 nextIndex = inc(nextIndex);
752 checkNext();
753 return x;
754 } finally {
755 lock.unlock();
756 }
757 }
758
759 public void remove() {
760 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
761 lock.lock();
762 try {
763 int i = lastRet;
764 if (i == -1)
765 throw new IllegalStateException();
766 lastRet = -1;
767
768 int ti = takeIndex;
769 removeAt(i);
770 // back up cursor (reset to front if was first element)
771 nextIndex = (i == ti) ? takeIndex : i;
772 checkNext();
773 } finally {
774 lock.unlock();
775 }
776 }
777 }
778 }