ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.68
Committed: Thu Sep 30 00:24:20 2010 UTC (13 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.67: +100 -75 lines
Log Message:
fix javac warnings; optimize copy constructor, toString

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
19 *
20 * <p>This is a classic &quot;bounded buffer&quot;, in which a
21 * fixed-sized array holds elements inserted by producers and
22 * extracted by consumers. Once created, the capacity cannot be
23 * increased. Attempts to <tt>put</tt> an element into a full queue
24 * will result in the operation blocking; attempts to <tt>take</tt> an
25 * element from an empty queue will similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * waiting producer and consumer threads. By default, this ordering
29 * is not guaranteed. However, a queue constructed with fairness set
30 * to <tt>true</tt> grants threads access in FIFO order. Fairness
31 * generally decreases throughput but reduces variability and avoids
32 * starvation.
33 *
34 * <p>This class and its iterator implement all of the
35 * <em>optional</em> methods of the {@link Collection} and {@link
36 * Iterator} interfaces.
37 *
38 * <p>This class is a member of the
39 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 * Java Collections Framework</a>.
41 *
42 * @since 1.5
43 * @author Doug Lea
44 * @param <E> the type of elements held in this collection
45 */
46 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47 implements BlockingQueue<E>, java.io.Serializable {
48
49 /**
50 * Serialization ID. This class relies on default serialization
51 * even for the items array, which is default-serialized, even if
52 * it is empty. Otherwise it could not be declared final, which is
53 * necessary here.
54 */
55 private static final long serialVersionUID = -817911632652898426L;
56
57 /** The queued items */
58 final Object[] items;
59 /** items index for next take, poll or remove */
60 int takeIndex;
61 /** items index for next put, offer, or add. */
62 int putIndex;
63 /** Number of items in the queue */
64 int count;
65
66 /*
67 * Concurrency control uses the classic two-condition algorithm
68 * found in any textbook.
69 */
70
71 /** Main lock guarding all access */
72 final ReentrantLock lock;
73 /** Condition for waiting takes */
74 private final Condition notEmpty;
75 /** Condition for waiting puts */
76 private final Condition notFull;
77
78 // Internal helper methods
79
80 /**
81 * Circularly increment i.
82 */
83 final int inc(int i) {
84 return (++i == items.length) ? 0 : i;
85 }
86
87 @SuppressWarnings("unchecked")
88 static <E> E cast(Object item) {
89 return (E) item;
90 }
91
92 /**
93 * Returns item at index i.
94 */
95 final E itemAt(int i) {
96 return ArrayBlockingQueue.<E>cast(items[i]);
97 }
98
99 /**
100 * Throws NullPointerException if argument is null.
101 *
102 * @param v the element
103 */
104 private static void checkNotNull(Object v) {
105 if (v == null)
106 throw new NullPointerException();
107 }
108
109 /**
110 * Inserts element at current put position, advances, and signals.
111 * Call only when holding lock.
112 */
113 private void insert(E x) {
114 items[putIndex] = x;
115 putIndex = inc(putIndex);
116 ++count;
117 notEmpty.signal();
118 }
119
120 /**
121 * Extracts element at current take position, advances, and signals.
122 * Call only when holding lock.
123 */
124 private E extract() {
125 final Object[] items = this.items;
126 E x = ArrayBlockingQueue.<E>cast(items[takeIndex]);
127 items[takeIndex] = null;
128 takeIndex = inc(takeIndex);
129 --count;
130 notFull.signal();
131 return x;
132 }
133
134 /**
135 * Deletes item at position i.
136 * Utility for remove and iterator.remove.
137 * Call only when holding lock.
138 */
139 void removeAt(int i) {
140 final Object[] items = this.items;
141 // if removing front item, just advance
142 if (i == takeIndex) {
143 items[takeIndex] = null;
144 takeIndex = inc(takeIndex);
145 } else {
146 // slide over all others up through putIndex.
147 for (;;) {
148 int nexti = inc(i);
149 if (nexti != putIndex) {
150 items[i] = items[nexti];
151 i = nexti;
152 } else {
153 items[i] = null;
154 putIndex = i;
155 break;
156 }
157 }
158 }
159 --count;
160 notFull.signal();
161 }
162
163 /**
164 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
165 * capacity and default access policy.
166 *
167 * @param capacity the capacity of this queue
168 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
169 */
170 public ArrayBlockingQueue(int capacity) {
171 this(capacity, false);
172 }
173
174 /**
175 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
176 * capacity and the specified access policy.
177 *
178 * @param capacity the capacity of this queue
179 * @param fair if <tt>true</tt> then queue accesses for threads blocked
180 * on insertion or removal, are processed in FIFO order;
181 * if <tt>false</tt> the access order is unspecified.
182 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
183 */
184 public ArrayBlockingQueue(int capacity, boolean fair) {
185 if (capacity <= 0)
186 throw new IllegalArgumentException();
187 this.items = new Object[capacity];
188 lock = new ReentrantLock(fair);
189 notEmpty = lock.newCondition();
190 notFull = lock.newCondition();
191 }
192
193 /**
194 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
195 * capacity, the specified access policy and initially containing the
196 * elements of the given collection,
197 * added in traversal order of the collection's iterator.
198 *
199 * @param capacity the capacity of this queue
200 * @param fair if <tt>true</tt> then queue accesses for threads blocked
201 * on insertion or removal, are processed in FIFO order;
202 * if <tt>false</tt> the access order is unspecified.
203 * @param c the collection of elements to initially contain
204 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
205 * <tt>c.size()</tt>, or less than 1.
206 * @throws NullPointerException if the specified collection or any
207 * of its elements are null
208 */
209 public ArrayBlockingQueue(int capacity, boolean fair,
210 Collection<? extends E> c) {
211 this(capacity, fair);
212
213 final ReentrantLock lock = this.lock;
214 lock.lock(); // Lock only for visibility, not mutual exclusion
215 try {
216 int i = 0;
217 try {
218 for (E e : c) {
219 checkNotNull(e);
220 items[i++] = e;
221 }
222 } catch (ArrayIndexOutOfBoundsException ex) {
223 throw new IllegalArgumentException();
224 }
225 count = i;
226 putIndex = (i == capacity) ? 0 : i;
227 } finally {
228 lock.unlock();
229 }
230 }
231
232 /**
233 * Inserts the specified element at the tail of this queue if it is
234 * possible to do so immediately without exceeding the queue's capacity,
235 * returning <tt>true</tt> upon success and throwing an
236 * <tt>IllegalStateException</tt> if this queue is full.
237 *
238 * @param e the element to add
239 * @return <tt>true</tt> (as specified by {@link Collection#add})
240 * @throws IllegalStateException if this queue is full
241 * @throws NullPointerException if the specified element is null
242 */
243 public boolean add(E e) {
244 return super.add(e);
245 }
246
247 /**
248 * Inserts the specified element at the tail of this queue if it is
249 * possible to do so immediately without exceeding the queue's capacity,
250 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
251 * is full. This method is generally preferable to method {@link #add},
252 * which can fail to insert an element only by throwing an exception.
253 *
254 * @throws NullPointerException if the specified element is null
255 */
256 public boolean offer(E e) {
257 checkNotNull(e);
258 final ReentrantLock lock = this.lock;
259 lock.lock();
260 try {
261 if (count == items.length)
262 return false;
263 else {
264 insert(e);
265 return true;
266 }
267 } finally {
268 lock.unlock();
269 }
270 }
271
272 /**
273 * Inserts the specified element at the tail of this queue, waiting
274 * for space to become available if the queue is full.
275 *
276 * @throws InterruptedException {@inheritDoc}
277 * @throws NullPointerException {@inheritDoc}
278 */
279 public void put(E e) throws InterruptedException {
280 checkNotNull(e);
281 final ReentrantLock lock = this.lock;
282 lock.lockInterruptibly();
283 try {
284 while (count == items.length)
285 notFull.await();
286 insert(e);
287 } finally {
288 lock.unlock();
289 }
290 }
291
292 /**
293 * Inserts the specified element at the tail of this queue, waiting
294 * up to the specified wait time for space to become available if
295 * the queue is full.
296 *
297 * @throws InterruptedException {@inheritDoc}
298 * @throws NullPointerException {@inheritDoc}
299 */
300 public boolean offer(E e, long timeout, TimeUnit unit)
301 throws InterruptedException {
302
303 checkNotNull(e);
304 long nanos = unit.toNanos(timeout);
305 final ReentrantLock lock = this.lock;
306 lock.lockInterruptibly();
307 try {
308 while (count == items.length) {
309 if (nanos <= 0)
310 return false;
311 nanos = notFull.awaitNanos(nanos);
312 }
313 insert(e);
314 return true;
315 } finally {
316 lock.unlock();
317 }
318 }
319
320 public E poll() {
321 final ReentrantLock lock = this.lock;
322 lock.lock();
323 try {
324 return (count == 0) ? null : extract();
325 } finally {
326 lock.unlock();
327 }
328 }
329
330 public E take() throws InterruptedException {
331 final ReentrantLock lock = this.lock;
332 lock.lockInterruptibly();
333 try {
334 while (count == 0)
335 notEmpty.await();
336 return extract();
337 } finally {
338 lock.unlock();
339 }
340 }
341
342 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
343 long nanos = unit.toNanos(timeout);
344 final ReentrantLock lock = this.lock;
345 lock.lockInterruptibly();
346 try {
347 while (count == 0) {
348 if (nanos <= 0)
349 return null;
350 nanos = notEmpty.awaitNanos(nanos);
351 }
352 return extract();
353 } finally {
354 lock.unlock();
355 }
356 }
357
358 public E peek() {
359 final ReentrantLock lock = this.lock;
360 lock.lock();
361 try {
362 return (count == 0) ? null : itemAt(takeIndex);
363 } finally {
364 lock.unlock();
365 }
366 }
367
368 // this doc comment is overridden to remove the reference to collections
369 // greater in size than Integer.MAX_VALUE
370 /**
371 * Returns the number of elements in this queue.
372 *
373 * @return the number of elements in this queue
374 */
375 public int size() {
376 final ReentrantLock lock = this.lock;
377 lock.lock();
378 try {
379 return count;
380 } finally {
381 lock.unlock();
382 }
383 }
384
385 // this doc comment is a modified copy of the inherited doc comment,
386 // without the reference to unlimited queues.
387 /**
388 * Returns the number of additional elements that this queue can ideally
389 * (in the absence of memory or resource constraints) accept without
390 * blocking. This is always equal to the initial capacity of this queue
391 * less the current <tt>size</tt> of this queue.
392 *
393 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
394 * an element will succeed by inspecting <tt>remainingCapacity</tt>
395 * because it may be the case that another thread is about to
396 * insert or remove an element.
397 */
398 public int remainingCapacity() {
399 final ReentrantLock lock = this.lock;
400 lock.lock();
401 try {
402 return items.length - count;
403 } finally {
404 lock.unlock();
405 }
406 }
407
408 /**
409 * Removes a single instance of the specified element from this queue,
410 * if it is present. More formally, removes an element <tt>e</tt> such
411 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
412 * elements.
413 * Returns <tt>true</tt> if this queue contained the specified element
414 * (or equivalently, if this queue changed as a result of the call).
415 *
416 * <p>Removal of interior elements in circular array based queues
417 * is an intrinsically slow and disruptive operation, so should
418 * be undertaken only in exceptional circumstances, ideally
419 * only when the queue is known not to be accessible by other
420 * threads.
421 *
422 * @param o element to be removed from this queue, if present
423 * @return <tt>true</tt> if this queue changed as a result of the call
424 */
425 public boolean remove(Object o) {
426 if (o == null) return false;
427 final Object[] items = this.items;
428 final ReentrantLock lock = this.lock;
429 lock.lock();
430 try {
431 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
432 if (o.equals(items[i])) {
433 removeAt(i);
434 return true;
435 }
436 }
437 return false;
438 } finally {
439 lock.unlock();
440 }
441 }
442
443 /**
444 * Returns <tt>true</tt> if this queue contains the specified element.
445 * More formally, returns <tt>true</tt> if and only if this queue contains
446 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
447 *
448 * @param o object to be checked for containment in this queue
449 * @return <tt>true</tt> if this queue contains the specified element
450 */
451 public boolean contains(Object o) {
452 if (o == null) return false;
453 final Object[] items = this.items;
454 final ReentrantLock lock = this.lock;
455 lock.lock();
456 try {
457 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
458 if (o.equals(items[i]))
459 return true;
460 return false;
461 } finally {
462 lock.unlock();
463 }
464 }
465
466 /**
467 * Returns an array containing all of the elements in this queue, in
468 * proper sequence.
469 *
470 * <p>The returned array will be "safe" in that no references to it are
471 * maintained by this queue. (In other words, this method must allocate
472 * a new array). The caller is thus free to modify the returned array.
473 *
474 * <p>This method acts as bridge between array-based and collection-based
475 * APIs.
476 *
477 * @return an array containing all of the elements in this queue
478 */
479 public Object[] toArray() {
480 final Object[] items = this.items;
481 final ReentrantLock lock = this.lock;
482 lock.lock();
483 try {
484 final int count = this.count;
485 Object[] a = new Object[count];
486 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
487 a[k] = items[i];
488 return a;
489 } finally {
490 lock.unlock();
491 }
492 }
493
494 /**
495 * Returns an array containing all of the elements in this queue, in
496 * proper sequence; the runtime type of the returned array is that of
497 * the specified array. If the queue fits in the specified array, it
498 * is returned therein. Otherwise, a new array is allocated with the
499 * runtime type of the specified array and the size of this queue.
500 *
501 * <p>If this queue fits in the specified array with room to spare
502 * (i.e., the array has more elements than this queue), the element in
503 * the array immediately following the end of the queue is set to
504 * <tt>null</tt>.
505 *
506 * <p>Like the {@link #toArray()} method, this method acts as bridge between
507 * array-based and collection-based APIs. Further, this method allows
508 * precise control over the runtime type of the output array, and may,
509 * under certain circumstances, be used to save allocation costs.
510 *
511 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
512 * The following code can be used to dump the queue into a newly
513 * allocated array of <tt>String</tt>:
514 *
515 * <pre>
516 * String[] y = x.toArray(new String[0]);</pre>
517 *
518 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
519 * <tt>toArray()</tt>.
520 *
521 * @param a the array into which the elements of the queue are to
522 * be stored, if it is big enough; otherwise, a new array of the
523 * same runtime type is allocated for this purpose
524 * @return an array containing all of the elements in this queue
525 * @throws ArrayStoreException if the runtime type of the specified array
526 * is not a supertype of the runtime type of every element in
527 * this queue
528 * @throws NullPointerException if the specified array is null
529 */
530 @SuppressWarnings("unchecked")
531 public <T> T[] toArray(T[] a) {
532 final Object[] items = this.items;
533 final ReentrantLock lock = this.lock;
534 lock.lock();
535 try {
536 final int count = this.count;
537 final int len = a.length;
538 if (len < count)
539 a = (T[])java.lang.reflect.Array.newInstance(
540 a.getClass().getComponentType(), count);
541 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
542 a[k] = (T) items[i];
543 if (len > count)
544 a[count] = null;
545 return a;
546 } finally {
547 lock.unlock();
548 }
549 }
550
551 public String toString() {
552 final ReentrantLock lock = this.lock;
553 lock.lock();
554 try {
555 int k = count;
556 if (k == 0)
557 return "[]";
558
559 StringBuilder sb = new StringBuilder();
560 sb.append('[');
561 for (int i = takeIndex; ; i = inc(i)) {
562 Object e = items[i];
563 sb.append(e == this ? "(this Collection)" : e);
564 if (--k == 0)
565 return sb.append(']').toString();
566 sb.append(',').append(' ');
567 }
568 } finally {
569 lock.unlock();
570 }
571 }
572
573 /**
574 * Atomically removes all of the elements from this queue.
575 * The queue will be empty after this call returns.
576 */
577 public void clear() {
578 final Object[] items = this.items;
579 final ReentrantLock lock = this.lock;
580 lock.lock();
581 try {
582 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
583 items[i] = null;
584 count = 0;
585 putIndex = 0;
586 takeIndex = 0;
587 notFull.signalAll();
588 } finally {
589 lock.unlock();
590 }
591 }
592
593 /**
594 * @throws UnsupportedOperationException {@inheritDoc}
595 * @throws ClassCastException {@inheritDoc}
596 * @throws NullPointerException {@inheritDoc}
597 * @throws IllegalArgumentException {@inheritDoc}
598 */
599 public int drainTo(Collection<? super E> c) {
600 checkNotNull(c);
601 if (c == this)
602 throw new IllegalArgumentException();
603 final Object[] items = this.items;
604 final ReentrantLock lock = this.lock;
605 lock.lock();
606 try {
607 int i = takeIndex;
608 int n = 0;
609 int max = count;
610 while (n < max) {
611 c.add(ArrayBlockingQueue.<E>cast(items[i]));
612 items[i] = null;
613 i = inc(i);
614 ++n;
615 }
616 if (n > 0) {
617 count = 0;
618 putIndex = 0;
619 takeIndex = 0;
620 notFull.signalAll();
621 }
622 return n;
623 } finally {
624 lock.unlock();
625 }
626 }
627
628 /**
629 * @throws UnsupportedOperationException {@inheritDoc}
630 * @throws ClassCastException {@inheritDoc}
631 * @throws NullPointerException {@inheritDoc}
632 * @throws IllegalArgumentException {@inheritDoc}
633 */
634 public int drainTo(Collection<? super E> c, int maxElements) {
635 checkNotNull(c);
636 if (c == this)
637 throw new IllegalArgumentException();
638 if (maxElements <= 0)
639 return 0;
640 final Object[] items = this.items;
641 final ReentrantLock lock = this.lock;
642 lock.lock();
643 try {
644 int i = takeIndex;
645 int n = 0;
646 int sz = count;
647 int max = (maxElements < count) ? maxElements : count;
648 while (n < max) {
649 c.add(ArrayBlockingQueue.<E>cast(items[i]));
650 items[i] = null;
651 i = inc(i);
652 ++n;
653 }
654 if (n > 0) {
655 count -= n;
656 takeIndex = i;
657 notFull.signalAll();
658 }
659 return n;
660 } finally {
661 lock.unlock();
662 }
663 }
664
665 /**
666 * Returns an iterator over the elements in this queue in proper
667 * sequence. The returned <tt>Iterator</tt> is "weakly
668 * consistent" with respect to operations at the head and tail of
669 * the queue, and will never throw {@link
670 * ConcurrentModificationException}. It might return elements
671 * that existed upon construction of the iterator but have since
672 * been polled or taken, and might not return elements that have
673 * since been added. Further, no consistency guarantees are made
674 * with respect to "interior" removals occuring in concurrent
675 * invocations of {@link Collection#remove(Object)} or {@link
676 * Iterator#remove} occurring in other threads.
677 *
678 * <p>The returned iterator supports the optional {@link Iterator#remove}
679 * operation. However, removal of interior elements in circular
680 * array based queues is an intrinsically slow and disruptive
681 * operation, so should be undertaken only in exceptional
682 * circumstances, ideally only when the queue is known not to be
683 * accessible by other threads.
684 *
685 * @return an iterator over the elements in this queue in proper sequence
686 */
687 public Iterator<E> iterator() {
688 return new Itr();
689 }
690
691 /**
692 * Iterator for ArrayBlockingQueue. To maintain weak consistency
693 * with respect to puts and takes, we (1) read ahead one slot, so
694 * as to not report hasNext true but then not have an element to
695 * return (2) ensure that each array slot is traversed at most
696 * once (by tracking "remaining" elements); (3) skip over null
697 * slots, which can occur if takes race ahead of iterators.
698 * However, for circular array-based queues, we cannot rely on any
699 * well established definition of what it means to be weakly
700 * consistent with respect to interior removes since these may
701 * require slot overwrites in the process of sliding elements to
702 * cover gaps. So we settle for resiliency, operating on
703 * established apparent nexts, which may miss some elements that
704 * have moved between calls to next.
705 */
706 private class Itr implements Iterator<E> {
707 private int remaining; // Number of elements yet to be returned
708 private int nextIndex; // Index of element to be returned by next
709 private E nextItem; // Element to be returned by next call to next
710 private E lastItem; // Element returned by last call to next
711 private int lastRet; // Index of last element returned, or -1 if none
712
713 Itr() {
714 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
715 lock.lock();
716 try {
717 lastRet = -1;
718 if ((remaining = count) > 0)
719 nextItem = itemAt(nextIndex = takeIndex);
720 } finally {
721 lock.unlock();
722 }
723 }
724
725 public boolean hasNext() {
726 return remaining > 0;
727 }
728
729 public E next() {
730 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
731 lock.lock();
732 try {
733 if (remaining <= 0)
734 throw new NoSuchElementException();
735 lastRet = nextIndex;
736 E x = lastItem = nextItem;
737 while (--remaining > 0) {
738 if ((nextItem = itemAt(nextIndex = inc(nextIndex))) != null)
739 break;
740 }
741 return x;
742 } finally {
743 lock.unlock();
744 }
745 }
746
747 public void remove() {
748 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
749 lock.lock();
750 try {
751 int i = lastRet;
752 if (i == -1)
753 throw new IllegalStateException();
754 lastRet = -1;
755 E x = lastItem;
756 lastItem = null;
757 if (x == items[i])
758 removeAt(i); // only remove if item still at index
759 } finally {
760 lock.unlock();
761 }
762 }
763 }
764
765 }