ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.73
Committed: Mon Oct 18 04:13:04 2010 UTC (13 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.72: +7 -4 lines
Log Message:
small comment improvements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
19 *
20 * <p>This is a classic &quot;bounded buffer&quot;, in which a
21 * fixed-sized array holds elements inserted by producers and
22 * extracted by consumers. Once created, the capacity cannot be
23 * changed. Attempts to {@code put} an element into a full queue
24 * will result in the operation blocking; attempts to {@code take} an
25 * element from an empty queue will similarly block.
26 *
27 * <p>This class supports an optional fairness policy for ordering
28 * waiting producer and consumer threads. By default, this ordering
29 * is not guaranteed. However, a queue constructed with fairness set
30 * to {@code true} grants threads access in FIFO order. Fairness
31 * generally decreases throughput but reduces variability and avoids
32 * starvation.
33 *
34 * <p>This class and its iterator implement all of the
35 * <em>optional</em> methods of the {@link Collection} and {@link
36 * Iterator} interfaces.
37 *
38 * <p>This class is a member of the
39 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 * Java Collections Framework</a>.
41 *
42 * @since 1.5
43 * @author Doug Lea
44 * @param <E> the type of elements held in this collection
45 */
46 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47 implements BlockingQueue<E>, java.io.Serializable {
48
49 /**
50 * Serialization ID. This class relies on default serialization
51 * even for the items array, which is default-serialized, even if
52 * it is empty. Otherwise it could not be declared final, which is
53 * necessary here.
54 */
55 private static final long serialVersionUID = -817911632652898426L;
56
57 /** The queued items */
58 final Object[] items;
59
60 /** items index for next take, poll, peek or remove */
61 int takeIndex;
62
63 /** items index for next put, offer, or add */
64 int putIndex;
65
66 /** Number of elements in the queue */
67 int count;
68
69 /*
70 * Concurrency control uses the classic two-condition algorithm
71 * found in any textbook.
72 */
73
74 /** Main lock guarding all access */
75 final ReentrantLock lock;
76 /** Condition for waiting takes */
77 private final Condition notEmpty;
78 /** Condition for waiting puts */
79 private final Condition notFull;
80
81 // Internal helper methods
82
83 /**
84 * Circularly increment i.
85 */
86 final int inc(int i) {
87 return (++i == items.length) ? 0 : i;
88 }
89
90 /**
91 * Circularly decrement i.
92 */
93 final int dec(int i) {
94 return ((i == 0) ? items.length : i) - 1;
95 }
96
97 @SuppressWarnings("unchecked")
98 static <E> E cast(Object item) {
99 return (E) item;
100 }
101
102 /**
103 * Returns item at index i.
104 */
105 final E itemAt(int i) {
106 return this.<E>cast(items[i]);
107 }
108
109 /**
110 * Throws NullPointerException if argument is null.
111 *
112 * @param v the element
113 */
114 private static void checkNotNull(Object v) {
115 if (v == null)
116 throw new NullPointerException();
117 }
118
119 /**
120 * Inserts element at current put position, advances, and signals.
121 * Call only when holding lock.
122 */
123 private void insert(E x) {
124 items[putIndex] = x;
125 putIndex = inc(putIndex);
126 ++count;
127 notEmpty.signal();
128 }
129
130 /**
131 * Extracts element at current take position, advances, and signals.
132 * Call only when holding lock.
133 */
134 private E extract() {
135 final Object[] items = this.items;
136 E x = this.<E>cast(items[takeIndex]);
137 items[takeIndex] = null;
138 takeIndex = inc(takeIndex);
139 --count;
140 notFull.signal();
141 return x;
142 }
143
144 /**
145 * Deletes item at position i.
146 * Utility for remove and iterator.remove.
147 * Call only when holding lock.
148 */
149 void removeAt(int i) {
150 final Object[] items = this.items;
151 // if removing front item, just advance
152 if (i == takeIndex) {
153 items[takeIndex] = null;
154 takeIndex = inc(takeIndex);
155 } else {
156 // slide over all others up through putIndex.
157 for (;;) {
158 int nexti = inc(i);
159 if (nexti != putIndex) {
160 items[i] = items[nexti];
161 i = nexti;
162 } else {
163 items[i] = null;
164 putIndex = i;
165 break;
166 }
167 }
168 }
169 --count;
170 notFull.signal();
171 }
172
173 /**
174 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
175 * capacity and default access policy.
176 *
177 * @param capacity the capacity of this queue
178 * @throws IllegalArgumentException if {@code capacity < 1}
179 */
180 public ArrayBlockingQueue(int capacity) {
181 this(capacity, false);
182 }
183
184 /**
185 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
186 * capacity and the specified access policy.
187 *
188 * @param capacity the capacity of this queue
189 * @param fair if {@code true} then queue accesses for threads blocked
190 * on insertion or removal, are processed in FIFO order;
191 * if {@code false} the access order is unspecified.
192 * @throws IllegalArgumentException if {@code capacity < 1}
193 */
194 public ArrayBlockingQueue(int capacity, boolean fair) {
195 if (capacity <= 0)
196 throw new IllegalArgumentException();
197 this.items = new Object[capacity];
198 lock = new ReentrantLock(fair);
199 notEmpty = lock.newCondition();
200 notFull = lock.newCondition();
201 }
202
203 /**
204 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
205 * capacity, the specified access policy and initially containing the
206 * elements of the given collection,
207 * added in traversal order of the collection's iterator.
208 *
209 * @param capacity the capacity of this queue
210 * @param fair if {@code true} then queue accesses for threads blocked
211 * on insertion or removal, are processed in FIFO order;
212 * if {@code false} the access order is unspecified.
213 * @param c the collection of elements to initially contain
214 * @throws IllegalArgumentException if {@code capacity} is less than
215 * {@code c.size()}, or less than 1.
216 * @throws NullPointerException if the specified collection or any
217 * of its elements are null
218 */
219 public ArrayBlockingQueue(int capacity, boolean fair,
220 Collection<? extends E> c) {
221 this(capacity, fair);
222
223 final ReentrantLock lock = this.lock;
224 lock.lock(); // Lock only for visibility, not mutual exclusion
225 try {
226 int i = 0;
227 try {
228 for (E e : c) {
229 checkNotNull(e);
230 items[i++] = e;
231 }
232 } catch (ArrayIndexOutOfBoundsException ex) {
233 throw new IllegalArgumentException();
234 }
235 count = i;
236 putIndex = (i == capacity) ? 0 : i;
237 } finally {
238 lock.unlock();
239 }
240 }
241
242 /**
243 * Inserts the specified element at the tail of this queue if it is
244 * possible to do so immediately without exceeding the queue's capacity,
245 * returning {@code true} upon success and throwing an
246 * {@code IllegalStateException} if this queue is full.
247 *
248 * @param e the element to add
249 * @return {@code true} (as specified by {@link Collection#add})
250 * @throws IllegalStateException if this queue is full
251 * @throws NullPointerException if the specified element is null
252 */
253 public boolean add(E e) {
254 return super.add(e);
255 }
256
257 /**
258 * Inserts the specified element at the tail of this queue if it is
259 * possible to do so immediately without exceeding the queue's capacity,
260 * returning {@code true} upon success and {@code false} if this queue
261 * is full. This method is generally preferable to method {@link #add},
262 * which can fail to insert an element only by throwing an exception.
263 *
264 * @throws NullPointerException if the specified element is null
265 */
266 public boolean offer(E e) {
267 checkNotNull(e);
268 final ReentrantLock lock = this.lock;
269 lock.lock();
270 try {
271 if (count == items.length)
272 return false;
273 else {
274 insert(e);
275 return true;
276 }
277 } finally {
278 lock.unlock();
279 }
280 }
281
282 /**
283 * Inserts the specified element at the tail of this queue, waiting
284 * for space to become available if the queue is full.
285 *
286 * @throws InterruptedException {@inheritDoc}
287 * @throws NullPointerException {@inheritDoc}
288 */
289 public void put(E e) throws InterruptedException {
290 checkNotNull(e);
291 final ReentrantLock lock = this.lock;
292 lock.lockInterruptibly();
293 try {
294 while (count == items.length)
295 notFull.await();
296 insert(e);
297 } finally {
298 lock.unlock();
299 }
300 }
301
302 /**
303 * Inserts the specified element at the tail of this queue, waiting
304 * up to the specified wait time for space to become available if
305 * the queue is full.
306 *
307 * @throws InterruptedException {@inheritDoc}
308 * @throws NullPointerException {@inheritDoc}
309 */
310 public boolean offer(E e, long timeout, TimeUnit unit)
311 throws InterruptedException {
312
313 checkNotNull(e);
314 long nanos = unit.toNanos(timeout);
315 final ReentrantLock lock = this.lock;
316 lock.lockInterruptibly();
317 try {
318 while (count == items.length) {
319 if (nanos <= 0)
320 return false;
321 nanos = notFull.awaitNanos(nanos);
322 }
323 insert(e);
324 return true;
325 } finally {
326 lock.unlock();
327 }
328 }
329
330 public E poll() {
331 final ReentrantLock lock = this.lock;
332 lock.lock();
333 try {
334 return (count == 0) ? null : extract();
335 } finally {
336 lock.unlock();
337 }
338 }
339
340 public E take() throws InterruptedException {
341 final ReentrantLock lock = this.lock;
342 lock.lockInterruptibly();
343 try {
344 while (count == 0)
345 notEmpty.await();
346 return extract();
347 } finally {
348 lock.unlock();
349 }
350 }
351
352 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
353 long nanos = unit.toNanos(timeout);
354 final ReentrantLock lock = this.lock;
355 lock.lockInterruptibly();
356 try {
357 while (count == 0) {
358 if (nanos <= 0)
359 return null;
360 nanos = notEmpty.awaitNanos(nanos);
361 }
362 return extract();
363 } finally {
364 lock.unlock();
365 }
366 }
367
368 public E peek() {
369 final ReentrantLock lock = this.lock;
370 lock.lock();
371 try {
372 return (count == 0) ? null : itemAt(takeIndex);
373 } finally {
374 lock.unlock();
375 }
376 }
377
378 // this doc comment is overridden to remove the reference to collections
379 // greater in size than Integer.MAX_VALUE
380 /**
381 * Returns the number of elements in this queue.
382 *
383 * @return the number of elements in this queue
384 */
385 public int size() {
386 final ReentrantLock lock = this.lock;
387 lock.lock();
388 try {
389 return count;
390 } finally {
391 lock.unlock();
392 }
393 }
394
395 // this doc comment is a modified copy of the inherited doc comment,
396 // without the reference to unlimited queues.
397 /**
398 * Returns the number of additional elements that this queue can ideally
399 * (in the absence of memory or resource constraints) accept without
400 * blocking. This is always equal to the initial capacity of this queue
401 * less the current {@code size} of this queue.
402 *
403 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
404 * an element will succeed by inspecting {@code remainingCapacity}
405 * because it may be the case that another thread is about to
406 * insert or remove an element.
407 */
408 public int remainingCapacity() {
409 final ReentrantLock lock = this.lock;
410 lock.lock();
411 try {
412 return items.length - count;
413 } finally {
414 lock.unlock();
415 }
416 }
417
418 /**
419 * Removes a single instance of the specified element from this queue,
420 * if it is present. More formally, removes an element {@code e} such
421 * that {@code o.equals(e)}, if this queue contains one or more such
422 * elements.
423 * Returns {@code true} if this queue contained the specified element
424 * (or equivalently, if this queue changed as a result of the call).
425 *
426 * <p>Removal of interior elements in circular array based queues
427 * is an intrinsically slow and disruptive operation, so should
428 * be undertaken only in exceptional circumstances, ideally
429 * only when the queue is known not to be accessible by other
430 * threads.
431 *
432 * @param o element to be removed from this queue, if present
433 * @return {@code true} if this queue changed as a result of the call
434 */
435 public boolean remove(Object o) {
436 if (o == null) return false;
437 final Object[] items = this.items;
438 final ReentrantLock lock = this.lock;
439 lock.lock();
440 try {
441 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
442 if (o.equals(items[i])) {
443 removeAt(i);
444 return true;
445 }
446 }
447 return false;
448 } finally {
449 lock.unlock();
450 }
451 }
452
453 /**
454 * Returns {@code true} if this queue contains the specified element.
455 * More formally, returns {@code true} if and only if this queue contains
456 * at least one element {@code e} such that {@code o.equals(e)}.
457 *
458 * @param o object to be checked for containment in this queue
459 * @return {@code true} if this queue contains the specified element
460 */
461 public boolean contains(Object o) {
462 if (o == null) return false;
463 final Object[] items = this.items;
464 final ReentrantLock lock = this.lock;
465 lock.lock();
466 try {
467 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
468 if (o.equals(items[i]))
469 return true;
470 return false;
471 } finally {
472 lock.unlock();
473 }
474 }
475
476 /**
477 * Returns an array containing all of the elements in this queue, in
478 * proper sequence.
479 *
480 * <p>The returned array will be "safe" in that no references to it are
481 * maintained by this queue. (In other words, this method must allocate
482 * a new array). The caller is thus free to modify the returned array.
483 *
484 * <p>This method acts as bridge between array-based and collection-based
485 * APIs.
486 *
487 * @return an array containing all of the elements in this queue
488 */
489 public Object[] toArray() {
490 final Object[] items = this.items;
491 final ReentrantLock lock = this.lock;
492 lock.lock();
493 try {
494 final int count = this.count;
495 Object[] a = new Object[count];
496 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
497 a[k] = items[i];
498 return a;
499 } finally {
500 lock.unlock();
501 }
502 }
503
504 /**
505 * Returns an array containing all of the elements in this queue, in
506 * proper sequence; the runtime type of the returned array is that of
507 * the specified array. If the queue fits in the specified array, it
508 * is returned therein. Otherwise, a new array is allocated with the
509 * runtime type of the specified array and the size of this queue.
510 *
511 * <p>If this queue fits in the specified array with room to spare
512 * (i.e., the array has more elements than this queue), the element in
513 * the array immediately following the end of the queue is set to
514 * {@code null}.
515 *
516 * <p>Like the {@link #toArray()} method, this method acts as bridge between
517 * array-based and collection-based APIs. Further, this method allows
518 * precise control over the runtime type of the output array, and may,
519 * under certain circumstances, be used to save allocation costs.
520 *
521 * <p>Suppose {@code x} is a queue known to contain only strings.
522 * The following code can be used to dump the queue into a newly
523 * allocated array of {@code String}:
524 *
525 * <pre>
526 * String[] y = x.toArray(new String[0]);</pre>
527 *
528 * Note that {@code toArray(new Object[0])} is identical in function to
529 * {@code toArray()}.
530 *
531 * @param a the array into which the elements of the queue are to
532 * be stored, if it is big enough; otherwise, a new array of the
533 * same runtime type is allocated for this purpose
534 * @return an array containing all of the elements in this queue
535 * @throws ArrayStoreException if the runtime type of the specified array
536 * is not a supertype of the runtime type of every element in
537 * this queue
538 * @throws NullPointerException if the specified array is null
539 */
540 @SuppressWarnings("unchecked")
541 public <T> T[] toArray(T[] a) {
542 final Object[] items = this.items;
543 final ReentrantLock lock = this.lock;
544 lock.lock();
545 try {
546 final int count = this.count;
547 final int len = a.length;
548 if (len < count)
549 a = (T[])java.lang.reflect.Array.newInstance(
550 a.getClass().getComponentType(), count);
551 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
552 a[k] = (T) items[i];
553 if (len > count)
554 a[count] = null;
555 return a;
556 } finally {
557 lock.unlock();
558 }
559 }
560
561 public String toString() {
562 final ReentrantLock lock = this.lock;
563 lock.lock();
564 try {
565 int k = count;
566 if (k == 0)
567 return "[]";
568
569 StringBuilder sb = new StringBuilder();
570 sb.append('[');
571 for (int i = takeIndex; ; i = inc(i)) {
572 Object e = items[i];
573 sb.append(e == this ? "(this Collection)" : e);
574 if (--k == 0)
575 return sb.append(']').toString();
576 sb.append(',').append(' ');
577 }
578 } finally {
579 lock.unlock();
580 }
581 }
582
583 /**
584 * Atomically removes all of the elements from this queue.
585 * The queue will be empty after this call returns.
586 */
587 public void clear() {
588 final Object[] items = this.items;
589 final ReentrantLock lock = this.lock;
590 lock.lock();
591 try {
592 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
593 items[i] = null;
594 count = 0;
595 putIndex = 0;
596 takeIndex = 0;
597 notFull.signalAll();
598 } finally {
599 lock.unlock();
600 }
601 }
602
603 /**
604 * @throws UnsupportedOperationException {@inheritDoc}
605 * @throws ClassCastException {@inheritDoc}
606 * @throws NullPointerException {@inheritDoc}
607 * @throws IllegalArgumentException {@inheritDoc}
608 */
609 public int drainTo(Collection<? super E> c) {
610 checkNotNull(c);
611 if (c == this)
612 throw new IllegalArgumentException();
613 final Object[] items = this.items;
614 final ReentrantLock lock = this.lock;
615 lock.lock();
616 try {
617 int i = takeIndex;
618 int n = 0;
619 int max = count;
620 while (n < max) {
621 c.add(this.<E>cast(items[i]));
622 items[i] = null;
623 i = inc(i);
624 ++n;
625 }
626 if (n > 0) {
627 count = 0;
628 putIndex = 0;
629 takeIndex = 0;
630 notFull.signalAll();
631 }
632 return n;
633 } finally {
634 lock.unlock();
635 }
636 }
637
638 /**
639 * @throws UnsupportedOperationException {@inheritDoc}
640 * @throws ClassCastException {@inheritDoc}
641 * @throws NullPointerException {@inheritDoc}
642 * @throws IllegalArgumentException {@inheritDoc}
643 */
644 public int drainTo(Collection<? super E> c, int maxElements) {
645 checkNotNull(c);
646 if (c == this)
647 throw new IllegalArgumentException();
648 if (maxElements <= 0)
649 return 0;
650 final Object[] items = this.items;
651 final ReentrantLock lock = this.lock;
652 lock.lock();
653 try {
654 int i = takeIndex;
655 int n = 0;
656 int sz = count;
657 int max = (maxElements < count) ? maxElements : count;
658 while (n < max) {
659 c.add(this.<E>cast(items[i]));
660 items[i] = null;
661 i = inc(i);
662 ++n;
663 }
664 if (n > 0) {
665 count -= n;
666 takeIndex = i;
667 notFull.signalAll();
668 }
669 return n;
670 } finally {
671 lock.unlock();
672 }
673 }
674
675 /**
676 * Returns an iterator over the elements in this queue in proper
677 * sequence. The returned {@code Iterator} is "weakly
678 * consistent" with respect to operations at the head and tail of
679 * the queue, and will never throw {@link
680 * ConcurrentModificationException}. It might return elements
681 * that existed upon construction of the iterator but have since
682 * been polled or taken, and might not return elements that have
683 * since been added. Further, no consistency guarantees are made
684 * with respect to "interior" removals occuring in concurrent
685 * invocations of {@link Collection#remove(Object)} or {@link
686 * Iterator#remove} occurring in other threads.
687 *
688 * <p>The returned iterator supports the optional {@link Iterator#remove}
689 * operation. However, removal of interior elements in circular
690 * array based queues is an intrinsically slow and disruptive
691 * operation, so should be undertaken only in exceptional
692 * circumstances, ideally only when the queue is known not to be
693 * accessible by other threads.
694 *
695 * @return an iterator over the elements in this queue in proper sequence
696 */
697 public Iterator<E> iterator() {
698 return new Itr();
699 }
700
701 /**
702 * Iterator for ArrayBlockingQueue. To maintain weak consistency
703 * with respect to puts and takes, we (1) read ahead one slot, so
704 * as to not report hasNext true but then not have an element to
705 * return (2) ensure that each array slot is traversed at most
706 * once (by tracking "remaining" elements); (3) skip over null
707 * slots, which can occur if takes race ahead of iterators.
708 * However, for circular array-based queues, we cannot rely on any
709 * well established definition of what it means to be weakly
710 * consistent with respect to interior removes since these may
711 * require slot overwrites in the process of sliding elements to
712 * cover gaps. So we settle for resiliency, operating on
713 * established apparent nexts, which may miss some elements that
714 * have moved between calls to next.
715 */
716 private class Itr implements Iterator<E> {
717 private int remaining; // Number of elements yet to be returned
718 private int nextIndex; // Index of element to be returned by next
719 private E nextItem; // Element to be returned by next call to next
720 private E lastItem; // Element returned by last call to next
721 private int lastRet; // Index of last element returned, or -1 if none
722
723 Itr() {
724 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
725 lock.lock();
726 try {
727 lastRet = -1;
728 if ((remaining = count) > 0)
729 nextItem = itemAt(nextIndex = takeIndex);
730 } finally {
731 lock.unlock();
732 }
733 }
734
735 public boolean hasNext() {
736 return remaining > 0;
737 }
738
739 public E next() {
740 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
741 lock.lock();
742 try {
743 if (remaining <= 0)
744 throw new NoSuchElementException();
745 lastRet = nextIndex;
746 E x = lastItem = nextItem;
747 while (--remaining > 0) {
748 if ((nextItem = itemAt(nextIndex = inc(nextIndex))) != null)
749 break;
750 }
751 return x;
752 } finally {
753 lock.unlock();
754 }
755 }
756
757 public void remove() {
758 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
759 lock.lock();
760 try {
761 int i = lastRet;
762 if (i == -1)
763 throw new IllegalStateException();
764 lastRet = -1;
765 E x = lastItem;
766 lastItem = null;
767 // only remove if item still at index
768 if (x == items[i]) {
769 boolean removingHead = (i == takeIndex);
770 removeAt(i);
771 if (!removingHead)
772 nextIndex = dec(nextIndex);
773 }
774 } finally {
775 lock.unlock();
776 }
777 }
778 }
779
780 }