ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.62
Committed: Sun Sep 26 17:52:54 2010 UTC (13 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.61: +2 -2 lines
Log Message:
wording clarification

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
19 *
20 * <p>This is a classic &quot;bounded buffer&quot;, in which a
21 * fixed-sized array holds elements inserted by producers and
22 * extracted by consumers. Once created, the capacity cannot be
23 * increased. Attempts to <tt>put</tt> an element into a full queue
24 * will result in the operation blocking; attempts to <tt>take</tt> an
25 * element from an empty queue will similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * waiting producer and consumer threads. By default, this ordering
29 * is not guaranteed. However, a queue constructed with fairness set
30 * to <tt>true</tt> grants threads access in FIFO order. Fairness
31 * generally decreases throughput but reduces variability and avoids
32 * starvation.
33 *
34 * <p>This class and its iterator implement all of the
35 * <em>optional</em> methods of the {@link Collection} and {@link
36 * Iterator} interfaces.
37 *
38 * <p>This class is a member of the
39 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 * Java Collections Framework</a>.
41 *
42 * @since 1.5
43 * @author Doug Lea
44 * @param <E> the type of elements held in this collection
45 */
46 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47 implements BlockingQueue<E>, java.io.Serializable {
48
49 /**
50 * Serialization ID. This class relies on default serialization
51 * even for the items array, which is default-serialized, even if
52 * it is empty. Otherwise it could not be declared final, which is
53 * necessary here.
54 */
55 private static final long serialVersionUID = -817911632652898426L;
56
57 /** The queued items */
58 final E[] items;
59 /** items index for next take, poll or remove */
60 int takeIndex;
61 /** items index for next put, offer, or add. */
62 int putIndex;
63 /** Number of items in the queue */
64 int count;
65
66 /*
67 * Concurrency control uses the classic two-condition algorithm
68 * found in any textbook.
69 */
70
71 /** Main lock guarding all access */
72 final ReentrantLock lock;
73 /** Condition for waiting takes */
74 private final Condition notEmpty;
75 /** Condition for waiting puts */
76 private final Condition notFull;
77
78 // Internal helper methods
79
80 /**
81 * Circularly increment i.
82 */
83 final int inc(int i) {
84 return (++i == items.length) ? 0 : i;
85 }
86
87 /**
88 * Inserts element at current put position, advances, and signals.
89 * Call only when holding lock.
90 */
91 private void insert(E x) {
92 items[putIndex] = x;
93 putIndex = inc(putIndex);
94 ++count;
95 notEmpty.signal();
96 }
97
98 /**
99 * Extracts element at current take position, advances, and signals.
100 * Call only when holding lock.
101 */
102 private E extract() {
103 final E[] items = this.items;
104 E x = items[takeIndex];
105 items[takeIndex] = null;
106 takeIndex = inc(takeIndex);
107 --count;
108 notFull.signal();
109 return x;
110 }
111
112 /**
113 * Utility for remove and iterator.remove: Delete item at position i.
114 * Call only when holding lock.
115 */
116 void removeAt(int i) {
117 final E[] items = this.items;
118 // if removing front item, just advance
119 if (i == takeIndex) {
120 items[takeIndex] = null;
121 takeIndex = inc(takeIndex);
122 } else {
123 // slide over all others up through putIndex.
124 for (;;) {
125 int nexti = inc(i);
126 if (nexti != putIndex) {
127 items[i] = items[nexti];
128 i = nexti;
129 } else {
130 items[i] = null;
131 putIndex = i;
132 break;
133 }
134 }
135 }
136 --count;
137 notFull.signal();
138 }
139
140 /**
141 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
142 * capacity and default access policy.
143 *
144 * @param capacity the capacity of this queue
145 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
146 */
147 public ArrayBlockingQueue(int capacity) {
148 this(capacity, false);
149 }
150
151 /**
152 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
153 * capacity and the specified access policy.
154 *
155 * @param capacity the capacity of this queue
156 * @param fair if <tt>true</tt> then queue accesses for threads blocked
157 * on insertion or removal, are processed in FIFO order;
158 * if <tt>false</tt> the access order is unspecified.
159 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
160 */
161 public ArrayBlockingQueue(int capacity, boolean fair) {
162 if (capacity <= 0)
163 throw new IllegalArgumentException();
164 this.items = (E[]) new Object[capacity];
165 lock = new ReentrantLock(fair);
166 notEmpty = lock.newCondition();
167 notFull = lock.newCondition();
168 }
169
170 /**
171 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
172 * capacity, the specified access policy and initially containing the
173 * elements of the given collection,
174 * added in traversal order of the collection's iterator.
175 *
176 * @param capacity the capacity of this queue
177 * @param fair if <tt>true</tt> then queue accesses for threads blocked
178 * on insertion or removal, are processed in FIFO order;
179 * if <tt>false</tt> the access order is unspecified.
180 * @param c the collection of elements to initially contain
181 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
182 * <tt>c.size()</tt>, or less than 1.
183 * @throws NullPointerException if the specified collection or any
184 * of its elements are null
185 */
186 public ArrayBlockingQueue(int capacity, boolean fair,
187 Collection<? extends E> c) {
188 this(capacity, fair);
189 if (capacity < c.size())
190 throw new IllegalArgumentException();
191
192 for (E e : c)
193 add(e);
194 }
195
196 /**
197 * Inserts the specified element at the tail of this queue if it is
198 * possible to do so immediately without exceeding the queue's capacity,
199 * returning <tt>true</tt> upon success and throwing an
200 * <tt>IllegalStateException</tt> if this queue is full.
201 *
202 * @param e the element to add
203 * @return <tt>true</tt> (as specified by {@link Collection#add})
204 * @throws IllegalStateException if this queue is full
205 * @throws NullPointerException if the specified element is null
206 */
207 public boolean add(E e) {
208 return super.add(e);
209 }
210
211 /**
212 * Inserts the specified element at the tail of this queue if it is
213 * possible to do so immediately without exceeding the queue's capacity,
214 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
215 * is full. This method is generally preferable to method {@link #add},
216 * which can fail to insert an element only by throwing an exception.
217 *
218 * @throws NullPointerException if the specified element is null
219 */
220 public boolean offer(E e) {
221 if (e == null) throw new NullPointerException();
222 final ReentrantLock lock = this.lock;
223 lock.lock();
224 try {
225 if (count == items.length)
226 return false;
227 else {
228 insert(e);
229 return true;
230 }
231 } finally {
232 lock.unlock();
233 }
234 }
235
236 /**
237 * Inserts the specified element at the tail of this queue, waiting
238 * for space to become available if the queue is full.
239 *
240 * @throws InterruptedException {@inheritDoc}
241 * @throws NullPointerException {@inheritDoc}
242 */
243 public void put(E e) throws InterruptedException {
244 if (e == null) throw new NullPointerException();
245 final ReentrantLock lock = this.lock;
246 lock.lockInterruptibly();
247 try {
248 while (count == items.length)
249 notFull.await();
250 insert(e);
251 } finally {
252 lock.unlock();
253 }
254 }
255
256 /**
257 * Inserts the specified element at the tail of this queue, waiting
258 * up to the specified wait time for space to become available if
259 * the queue is full.
260 *
261 * @throws InterruptedException {@inheritDoc}
262 * @throws NullPointerException {@inheritDoc}
263 */
264 public boolean offer(E e, long timeout, TimeUnit unit)
265 throws InterruptedException {
266
267 if (e == null) throw new NullPointerException();
268 long nanos = unit.toNanos(timeout);
269 final ReentrantLock lock = this.lock;
270 lock.lockInterruptibly();
271 try {
272 while (count == items.length) {
273 if (nanos <= 0)
274 return false;
275 nanos = notFull.awaitNanos(nanos);
276 }
277 insert(e);
278 return true;
279 } finally {
280 lock.unlock();
281 }
282 }
283
284 public E poll() {
285 final ReentrantLock lock = this.lock;
286 lock.lock();
287 try {
288 return (count == 0) ? null : extract();
289 } finally {
290 lock.unlock();
291 }
292 }
293
294 public E take() throws InterruptedException {
295 final ReentrantLock lock = this.lock;
296 lock.lockInterruptibly();
297 try {
298 while (count == 0)
299 notEmpty.await();
300 return extract();
301 } finally {
302 lock.unlock();
303 }
304 }
305
306 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
307 long nanos = unit.toNanos(timeout);
308 final ReentrantLock lock = this.lock;
309 lock.lockInterruptibly();
310 try {
311 while (count == 0) {
312 if (nanos <= 0)
313 return null;
314 nanos = notEmpty.awaitNanos(nanos);
315 }
316 return extract();
317 } finally {
318 lock.unlock();
319 }
320 }
321
322 public E peek() {
323 final ReentrantLock lock = this.lock;
324 lock.lock();
325 try {
326 return (count == 0) ? null : items[takeIndex];
327 } finally {
328 lock.unlock();
329 }
330 }
331
332 // this doc comment is overridden to remove the reference to collections
333 // greater in size than Integer.MAX_VALUE
334 /**
335 * Returns the number of elements in this queue.
336 *
337 * @return the number of elements in this queue
338 */
339 public int size() {
340 final ReentrantLock lock = this.lock;
341 lock.lock();
342 try {
343 return count;
344 } finally {
345 lock.unlock();
346 }
347 }
348
349 // this doc comment is a modified copy of the inherited doc comment,
350 // without the reference to unlimited queues.
351 /**
352 * Returns the number of additional elements that this queue can ideally
353 * (in the absence of memory or resource constraints) accept without
354 * blocking. This is always equal to the initial capacity of this queue
355 * less the current <tt>size</tt> of this queue.
356 *
357 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
358 * an element will succeed by inspecting <tt>remainingCapacity</tt>
359 * because it may be the case that another thread is about to
360 * insert or remove an element.
361 */
362 public int remainingCapacity() {
363 final ReentrantLock lock = this.lock;
364 lock.lock();
365 try {
366 return items.length - count;
367 } finally {
368 lock.unlock();
369 }
370 }
371
372 /**
373 * Removes a single instance of the specified element from this queue,
374 * if it is present. More formally, removes an element <tt>e</tt> such
375 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
376 * elements.
377 * Returns <tt>true</tt> if this queue contained the specified element
378 * (or equivalently, if this queue changed as a result of the call).
379 *
380 * Removal of interior elements in circular array based queues
381 * is an intrinsically slow and disruptive operation, so should
382 * be undertaken only in exceptional circumstances, ideally
383 * only when the queue is known not to be accessible by other
384 * threads.
385 *
386 * @param o element to be removed from this queue, if present
387 * @return <tt>true</tt> if this queue changed as a result of the call
388 */
389 public boolean remove(Object o) {
390 if (o == null) return false;
391 final E[] items = this.items;
392 final ReentrantLock lock = this.lock;
393 lock.lock();
394 try {
395 int i = takeIndex;
396 int k = 0;
397 for (;;) {
398 if (k++ >= count)
399 return false;
400 if (o.equals(items[i])) {
401 removeAt(i);
402 return true;
403 }
404 i = inc(i);
405 }
406
407 } finally {
408 lock.unlock();
409 }
410 }
411
412 /**
413 * Returns <tt>true</tt> if this queue contains the specified element.
414 * More formally, returns <tt>true</tt> if and only if this queue contains
415 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
416 *
417 * @param o object to be checked for containment in this queue
418 * @return <tt>true</tt> if this queue contains the specified element
419 */
420 public boolean contains(Object o) {
421 if (o == null) return false;
422 final E[] items = this.items;
423 final ReentrantLock lock = this.lock;
424 lock.lock();
425 try {
426 int i = takeIndex;
427 int k = 0;
428 while (k++ < count) {
429 if (o.equals(items[i]))
430 return true;
431 i = inc(i);
432 }
433 return false;
434 } finally {
435 lock.unlock();
436 }
437 }
438
439 /**
440 * Returns an array containing all of the elements in this queue, in
441 * proper sequence.
442 *
443 * <p>The returned array will be "safe" in that no references to it are
444 * maintained by this queue. (In other words, this method must allocate
445 * a new array). The caller is thus free to modify the returned array.
446 *
447 * <p>This method acts as bridge between array-based and collection-based
448 * APIs.
449 *
450 * @return an array containing all of the elements in this queue
451 */
452 public Object[] toArray() {
453 final E[] items = this.items;
454 final ReentrantLock lock = this.lock;
455 lock.lock();
456 try {
457 Object[] a = new Object[count];
458 int k = 0;
459 int i = takeIndex;
460 while (k < count) {
461 a[k++] = items[i];
462 i = inc(i);
463 }
464 return a;
465 } finally {
466 lock.unlock();
467 }
468 }
469
470 /**
471 * Returns an array containing all of the elements in this queue, in
472 * proper sequence; the runtime type of the returned array is that of
473 * the specified array. If the queue fits in the specified array, it
474 * is returned therein. Otherwise, a new array is allocated with the
475 * runtime type of the specified array and the size of this queue.
476 *
477 * <p>If this queue fits in the specified array with room to spare
478 * (i.e., the array has more elements than this queue), the element in
479 * the array immediately following the end of the queue is set to
480 * <tt>null</tt>.
481 *
482 * <p>Like the {@link #toArray()} method, this method acts as bridge between
483 * array-based and collection-based APIs. Further, this method allows
484 * precise control over the runtime type of the output array, and may,
485 * under certain circumstances, be used to save allocation costs.
486 *
487 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
488 * The following code can be used to dump the queue into a newly
489 * allocated array of <tt>String</tt>:
490 *
491 * <pre>
492 * String[] y = x.toArray(new String[0]);</pre>
493 *
494 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
495 * <tt>toArray()</tt>.
496 *
497 * @param a the array into which the elements of the queue are to
498 * be stored, if it is big enough; otherwise, a new array of the
499 * same runtime type is allocated for this purpose
500 * @return an array containing all of the elements in this queue
501 * @throws ArrayStoreException if the runtime type of the specified array
502 * is not a supertype of the runtime type of every element in
503 * this queue
504 * @throws NullPointerException if the specified array is null
505 */
506 public <T> T[] toArray(T[] a) {
507 final E[] items = this.items;
508 final ReentrantLock lock = this.lock;
509 lock.lock();
510 try {
511 if (a.length < count)
512 a = (T[])java.lang.reflect.Array.newInstance(
513 a.getClass().getComponentType(),
514 count
515 );
516
517 int k = 0;
518 int i = takeIndex;
519 while (k < count) {
520 a[k++] = (T)items[i];
521 i = inc(i);
522 }
523 if (a.length > count)
524 a[count] = null;
525 return a;
526 } finally {
527 lock.unlock();
528 }
529 }
530
531 public String toString() {
532 final ReentrantLock lock = this.lock;
533 lock.lock();
534 try {
535 return super.toString();
536 } finally {
537 lock.unlock();
538 }
539 }
540
541 /**
542 * Atomically removes all of the elements from this queue.
543 * The queue will be empty after this call returns.
544 */
545 public void clear() {
546 final E[] items = this.items;
547 final ReentrantLock lock = this.lock;
548 lock.lock();
549 try {
550 int i = takeIndex;
551 int k = count;
552 while (k-- > 0) {
553 items[i] = null;
554 i = inc(i);
555 }
556 count = 0;
557 putIndex = 0;
558 takeIndex = 0;
559 notFull.signalAll();
560 } finally {
561 lock.unlock();
562 }
563 }
564
565 /**
566 * @throws UnsupportedOperationException {@inheritDoc}
567 * @throws ClassCastException {@inheritDoc}
568 * @throws NullPointerException {@inheritDoc}
569 * @throws IllegalArgumentException {@inheritDoc}
570 */
571 public int drainTo(Collection<? super E> c) {
572 if (c == null)
573 throw new NullPointerException();
574 if (c == this)
575 throw new IllegalArgumentException();
576 final E[] items = this.items;
577 final ReentrantLock lock = this.lock;
578 lock.lock();
579 try {
580 int i = takeIndex;
581 int n = 0;
582 int max = count;
583 while (n < max) {
584 c.add(items[i]);
585 items[i] = null;
586 i = inc(i);
587 ++n;
588 }
589 if (n > 0) {
590 count = 0;
591 putIndex = 0;
592 takeIndex = 0;
593 notFull.signalAll();
594 }
595 return n;
596 } finally {
597 lock.unlock();
598 }
599 }
600
601 /**
602 * @throws UnsupportedOperationException {@inheritDoc}
603 * @throws ClassCastException {@inheritDoc}
604 * @throws NullPointerException {@inheritDoc}
605 * @throws IllegalArgumentException {@inheritDoc}
606 */
607 public int drainTo(Collection<? super E> c, int maxElements) {
608 if (c == null)
609 throw new NullPointerException();
610 if (c == this)
611 throw new IllegalArgumentException();
612 if (maxElements <= 0)
613 return 0;
614 final E[] items = this.items;
615 final ReentrantLock lock = this.lock;
616 lock.lock();
617 try {
618 int i = takeIndex;
619 int n = 0;
620 int sz = count;
621 int max = (maxElements < count) ? maxElements : count;
622 while (n < max) {
623 c.add(items[i]);
624 items[i] = null;
625 i = inc(i);
626 ++n;
627 }
628 if (n > 0) {
629 count -= n;
630 takeIndex = i;
631 notFull.signalAll();
632 }
633 return n;
634 } finally {
635 lock.unlock();
636 }
637 }
638
639
640 /**
641 * Returns an iterator over the elements in this queue in proper
642 * sequence. The returned <tt>Iterator</tt> is "weakly
643 * consistent" with respect to operations at the head and tail of
644 * the queue, and will never throw {@link
645 * ConcurrentModificationException}. It might return elements
646 * that existed upon construction of the iterator but have since
647 * been polled or taken, and might not return elements that have
648 * since been added. Further, no consistency guarantees are made
649 * with respect to "interior" removals occuring in concurrent
650 * invocations of {@link Collection#remove(Object)} or {@link
651 * Iterator#remove} occurring in other threads.
652 *
653 * The returned iterator supports the optional {@link Iterator#remove}
654 * operation. However, removal of interior elements in circular
655 * array based queues is an intrinsically slow and disruptive
656 * operation, so should be undertaken only in exceptional
657 * circumstances, ideally only when the queue is known not to be
658 * accessible by other threads.
659 *
660 * @return an iterator over the elements in this queue in proper sequence
661 */
662 public Iterator<E> iterator() {
663 final ReentrantLock lock = this.lock;
664 lock.lock();
665 try {
666 return new Itr();
667 } finally {
668 lock.unlock();
669 }
670 }
671
672 /**
673 * Iterator for ArrayBlockingQueue. To maintain weak consistency
674 * with respect to puts and takes, we (1) read ahead one slot, so
675 * as to not report hasNext true but then not have an element to
676 * return (2) ensure that each array slot is traversed at most
677 * once (by tracking "remaining" elements); (3) skip over null
678 * slots, which can occur if takes race ahead of iterators.
679 * However, for circular array-based queues, we cannot rely on any
680 * well established definition of what it means to be weakly
681 * consistent with respect to interior removes since these may
682 * require slot overwrites in the process of sliding elements to
683 * cover gaps. So we settle for resiliency, operating on
684 * established apparent nexts, which may miss some elements that
685 * have moved between calls to next.
686 */
687 private class Itr implements Iterator<E> {
688 private int remaining; // Number of elements yet to be returned
689 private int nextIndex; // Index of element to be returned by next
690 private E nextItem; // Element to be returned by next call to next
691 private E lastItem; // Element returned by last call to next
692 private int lastRet; // Index of last element returned, or -1 if none
693
694 Itr() {
695 lastRet = -1;
696 if ((remaining = count) > 0)
697 nextItem = items[nextIndex = takeIndex];
698 }
699
700 public boolean hasNext() {
701 return remaining > 0;
702 }
703
704 public E next() {
705 if (remaining <= 0)
706 throw new NoSuchElementException();
707 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
708 lock.lock();
709 try {
710 lastRet = nextIndex;
711 E x = lastItem = nextItem;
712 while (--remaining > 0) {
713 if ((nextItem = items[nextIndex = inc(nextIndex)]) != null)
714 break;
715 }
716 return x;
717 } finally {
718 lock.unlock();
719 }
720 }
721
722 public void remove() {
723 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
724 lock.lock();
725 try {
726 int i = lastRet;
727 if (i == -1)
728 throw new IllegalStateException();
729 lastRet = -1;
730 E x = lastItem;
731 lastItem = null;
732 if (x == items[i])
733 removeAt(i); // only remove if item still at index
734 } finally {
735 lock.unlock();
736 }
737 }
738 }
739
740 }