ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.59
Committed: Fri Jul 23 16:35:02 2010 UTC (13 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.58: +9 -14 lines
Log Message:
hand-inline empty() and full() methods, to appease hotspot

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
19 *
20 * <p>This is a classic &quot;bounded buffer&quot;, in which a
21 * fixed-sized array holds elements inserted by producers and
22 * extracted by consumers. Once created, the capacity cannot be
23 * increased. Attempts to <tt>put</tt> an element into a full queue
24 * will result in the operation blocking; attempts to <tt>take</tt> an
25 * element from an empty queue will similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * waiting producer and consumer threads. By default, this ordering
29 * is not guaranteed. However, a queue constructed with fairness set
30 * to <tt>true</tt> grants threads access in FIFO order. Fairness
31 * generally decreases throughput but reduces variability and avoids
32 * starvation.
33 *
34 * <p>This class and its iterator implement all of the
35 * <em>optional</em> methods of the {@link Collection} and {@link
36 * Iterator} interfaces.
37 *
38 * <p>This class is a member of the
39 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 * Java Collections Framework</a>.
41 *
42 * @since 1.5
43 * @author Doug Lea
44 * @param <E> the type of elements held in this collection
45 */
46 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47 implements BlockingQueue<E>, java.io.Serializable {
48
49 /**
50 * Serialization ID. This class relies on default serialization
51 * even for the items array, which is default-serialized, even if
52 * it is empty. Otherwise it could not be declared final, which is
53 * necessary here.
54 */
55 private static final long serialVersionUID = -817911632652898426L;
56
57 /** The queued items */
58 final E[] items;
59 /** items index for next take, poll or remove */
60 int takeIndex;
61 /** items index for next put, offer, or add. */
62 int putIndex;
63 /** Number of items in the queue */
64 int count;
65
66 /*
67 * Concurrency control uses the classic two-condition algorithm
68 * found in any textbook.
69 */
70
71 /** Main lock guarding all access */
72 final ReentrantLock lock;
73 /** Condition for waiting takes */
74 private final Condition notEmpty;
75 /** Condition for waiting puts */
76 private final Condition notFull;
77
78 // Internal helper methods
79
80 /**
81 * Circularly increment i.
82 */
83 final int inc(int i) {
84 return (++i == items.length) ? 0 : i;
85 }
86
87 /**
88 * Inserts element at current put position, advances, and signals.
89 * Call only when holding lock.
90 */
91 private void insert(E x) {
92 items[putIndex] = x;
93 putIndex = inc(putIndex);
94 ++count;
95 notEmpty.signal();
96 }
97
98 /**
99 * Extracts element at current take position, advances, and signals.
100 * Call only when holding lock.
101 */
102 private E extract() {
103 final E[] items = this.items;
104 E x = items[takeIndex];
105 items[takeIndex] = null;
106 takeIndex = inc(takeIndex);
107 --count;
108 notFull.signal();
109 return x;
110 }
111
112 /**
113 * Utility for remove and iterator.remove: Delete item at position i.
114 * Call only when holding lock.
115 */
116 void removeAt(int i) {
117 final E[] items = this.items;
118 // if removing front item, just advance
119 if (i == takeIndex) {
120 items[takeIndex] = null;
121 takeIndex = inc(takeIndex);
122 } else {
123 // slide over all others up through putIndex.
124 for (;;) {
125 int nexti = inc(i);
126 if (nexti != putIndex) {
127 items[i] = items[nexti];
128 i = nexti;
129 } else {
130 items[i] = null;
131 putIndex = i;
132 break;
133 }
134 }
135 }
136 --count;
137 notFull.signal();
138 }
139
140 /**
141 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
142 * capacity and default access policy.
143 *
144 * @param capacity the capacity of this queue
145 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
146 */
147 public ArrayBlockingQueue(int capacity) {
148 this(capacity, false);
149 }
150
151 /**
152 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
153 * capacity and the specified access policy.
154 *
155 * @param capacity the capacity of this queue
156 * @param fair if <tt>true</tt> then queue accesses for threads blocked
157 * on insertion or removal, are processed in FIFO order;
158 * if <tt>false</tt> the access order is unspecified.
159 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
160 */
161 public ArrayBlockingQueue(int capacity, boolean fair) {
162 if (capacity <= 0)
163 throw new IllegalArgumentException();
164 this.items = (E[]) new Object[capacity];
165 lock = new ReentrantLock(fair);
166 notEmpty = lock.newCondition();
167 notFull = lock.newCondition();
168 }
169
170 /**
171 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
172 * capacity, the specified access policy and initially containing the
173 * elements of the given collection,
174 * added in traversal order of the collection's iterator.
175 *
176 * @param capacity the capacity of this queue
177 * @param fair if <tt>true</tt> then queue accesses for threads blocked
178 * on insertion or removal, are processed in FIFO order;
179 * if <tt>false</tt> the access order is unspecified.
180 * @param c the collection of elements to initially contain
181 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
182 * <tt>c.size()</tt>, or less than 1.
183 * @throws NullPointerException if the specified collection or any
184 * of its elements are null
185 */
186 public ArrayBlockingQueue(int capacity, boolean fair,
187 Collection<? extends E> c) {
188 this(capacity, fair);
189 if (capacity < c.size())
190 throw new IllegalArgumentException();
191
192 for (E e : c)
193 add(e);
194 }
195
196 /**
197 * Inserts the specified element at the tail of this queue if it is
198 * possible to do so immediately without exceeding the queue's capacity,
199 * returning <tt>true</tt> upon success and throwing an
200 * <tt>IllegalStateException</tt> if this queue is full.
201 *
202 * @param e the element to add
203 * @return <tt>true</tt> (as specified by {@link Collection#add})
204 * @throws IllegalStateException if this queue is full
205 * @throws NullPointerException if the specified element is null
206 */
207 public boolean add(E e) {
208 return super.add(e);
209 }
210
211 /**
212 * Inserts the specified element at the tail of this queue if it is
213 * possible to do so immediately without exceeding the queue's capacity,
214 * returning <tt>true</tt> upon success and <tt>false</tt> if this queue
215 * is full. This method is generally preferable to method {@link #add},
216 * which can fail to insert an element only by throwing an exception.
217 *
218 * @throws NullPointerException if the specified element is null
219 */
220 public boolean offer(E e) {
221 if (e == null) throw new NullPointerException();
222 final ReentrantLock lock = this.lock;
223 lock.lock();
224 try {
225 if (count == items.length)
226 return false;
227 else {
228 insert(e);
229 return true;
230 }
231 } finally {
232 lock.unlock();
233 }
234 }
235
236 /**
237 * Inserts the specified element at the tail of this queue, waiting
238 * for space to become available if the queue is full.
239 *
240 * @throws InterruptedException {@inheritDoc}
241 * @throws NullPointerException {@inheritDoc}
242 */
243 public void put(E e) throws InterruptedException {
244 if (e == null) throw new NullPointerException();
245 final ReentrantLock lock = this.lock;
246 lock.lockInterruptibly();
247 try {
248 while (count == items.length)
249 notFull.await();
250 insert(e);
251 } finally {
252 lock.unlock();
253 }
254 }
255
256 /**
257 * Inserts the specified element at the tail of this queue, waiting
258 * up to the specified wait time for space to become available if
259 * the queue is full.
260 *
261 * @throws InterruptedException {@inheritDoc}
262 * @throws NullPointerException {@inheritDoc}
263 */
264 public boolean offer(E e, long timeout, TimeUnit unit)
265 throws InterruptedException {
266
267 if (e == null) throw new NullPointerException();
268 long nanos = unit.toNanos(timeout);
269 final ReentrantLock lock = this.lock;
270 lock.lockInterruptibly();
271 try {
272 while (count == items.length) {
273 if (nanos <= 0)
274 return false;
275 nanos = notFull.awaitNanos(nanos);
276 }
277 insert(e);
278 return true;
279 } finally {
280 lock.unlock();
281 }
282 }
283
284 public E poll() {
285 final ReentrantLock lock = this.lock;
286 lock.lock();
287 try {
288 return (count == 0) ? null : extract();
289 } finally {
290 lock.unlock();
291 }
292 }
293
294 public E take() throws InterruptedException {
295 final ReentrantLock lock = this.lock;
296 lock.lockInterruptibly();
297 try {
298 while (count == 0)
299 notEmpty.await();
300 return extract();
301 } finally {
302 lock.unlock();
303 }
304 }
305
306 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
307 long nanos = unit.toNanos(timeout);
308 final ReentrantLock lock = this.lock;
309 lock.lockInterruptibly();
310 try {
311 while (count == 0) {
312 if (nanos <= 0)
313 return null;
314 nanos = notEmpty.awaitNanos(nanos);
315 }
316 return extract();
317 } finally {
318 lock.unlock();
319 }
320 }
321
322 public E peek() {
323 final ReentrantLock lock = this.lock;
324 lock.lock();
325 try {
326 return (count == 0) ? null : items[takeIndex];
327 } finally {
328 lock.unlock();
329 }
330 }
331
332 // this doc comment is overridden to remove the reference to collections
333 // greater in size than Integer.MAX_VALUE
334 /**
335 * Returns the number of elements in this queue.
336 *
337 * @return the number of elements in this queue
338 */
339 public int size() {
340 final ReentrantLock lock = this.lock;
341 lock.lock();
342 try {
343 return count;
344 } finally {
345 lock.unlock();
346 }
347 }
348
349 // this doc comment is a modified copy of the inherited doc comment,
350 // without the reference to unlimited queues.
351 /**
352 * Returns the number of additional elements that this queue can ideally
353 * (in the absence of memory or resource constraints) accept without
354 * blocking. This is always equal to the initial capacity of this queue
355 * less the current <tt>size</tt> of this queue.
356 *
357 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
358 * an element will succeed by inspecting <tt>remainingCapacity</tt>
359 * because it may be the case that another thread is about to
360 * insert or remove an element.
361 */
362 public int remainingCapacity() {
363 final ReentrantLock lock = this.lock;
364 lock.lock();
365 try {
366 return items.length - count;
367 } finally {
368 lock.unlock();
369 }
370 }
371
372 /**
373 * Removes a single instance of the specified element from this queue,
374 * if it is present. More formally, removes an element <tt>e</tt> such
375 * that <tt>o.equals(e)</tt>, if this queue contains one or more such
376 * elements.
377 * Returns <tt>true</tt> if this queue contained the specified element
378 * (or equivalently, if this queue changed as a result of the call).
379 *
380 * @param o element to be removed from this queue, if present
381 * @return <tt>true</tt> if this queue changed as a result of the call
382 */
383 public boolean remove(Object o) {
384 if (o == null) return false;
385 final E[] items = this.items;
386 final ReentrantLock lock = this.lock;
387 lock.lock();
388 try {
389 int i = takeIndex;
390 int k = 0;
391 for (;;) {
392 if (k++ >= count)
393 return false;
394 if (o.equals(items[i])) {
395 removeAt(i);
396 return true;
397 }
398 i = inc(i);
399 }
400
401 } finally {
402 lock.unlock();
403 }
404 }
405
406 /**
407 * Returns <tt>true</tt> if this queue contains the specified element.
408 * More formally, returns <tt>true</tt> if and only if this queue contains
409 * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>.
410 *
411 * @param o object to be checked for containment in this queue
412 * @return <tt>true</tt> if this queue contains the specified element
413 */
414 public boolean contains(Object o) {
415 if (o == null) return false;
416 final E[] items = this.items;
417 final ReentrantLock lock = this.lock;
418 lock.lock();
419 try {
420 int i = takeIndex;
421 int k = 0;
422 while (k++ < count) {
423 if (o.equals(items[i]))
424 return true;
425 i = inc(i);
426 }
427 return false;
428 } finally {
429 lock.unlock();
430 }
431 }
432
433 /**
434 * Returns an array containing all of the elements in this queue, in
435 * proper sequence.
436 *
437 * <p>The returned array will be "safe" in that no references to it are
438 * maintained by this queue. (In other words, this method must allocate
439 * a new array). The caller is thus free to modify the returned array.
440 *
441 * <p>This method acts as bridge between array-based and collection-based
442 * APIs.
443 *
444 * @return an array containing all of the elements in this queue
445 */
446 public Object[] toArray() {
447 final E[] items = this.items;
448 final ReentrantLock lock = this.lock;
449 lock.lock();
450 try {
451 Object[] a = new Object[count];
452 int k = 0;
453 int i = takeIndex;
454 while (k < count) {
455 a[k++] = items[i];
456 i = inc(i);
457 }
458 return a;
459 } finally {
460 lock.unlock();
461 }
462 }
463
464 /**
465 * Returns an array containing all of the elements in this queue, in
466 * proper sequence; the runtime type of the returned array is that of
467 * the specified array. If the queue fits in the specified array, it
468 * is returned therein. Otherwise, a new array is allocated with the
469 * runtime type of the specified array and the size of this queue.
470 *
471 * <p>If this queue fits in the specified array with room to spare
472 * (i.e., the array has more elements than this queue), the element in
473 * the array immediately following the end of the queue is set to
474 * <tt>null</tt>.
475 *
476 * <p>Like the {@link #toArray()} method, this method acts as bridge between
477 * array-based and collection-based APIs. Further, this method allows
478 * precise control over the runtime type of the output array, and may,
479 * under certain circumstances, be used to save allocation costs.
480 *
481 * <p>Suppose <tt>x</tt> is a queue known to contain only strings.
482 * The following code can be used to dump the queue into a newly
483 * allocated array of <tt>String</tt>:
484 *
485 * <pre>
486 * String[] y = x.toArray(new String[0]);</pre>
487 *
488 * Note that <tt>toArray(new Object[0])</tt> is identical in function to
489 * <tt>toArray()</tt>.
490 *
491 * @param a the array into which the elements of the queue are to
492 * be stored, if it is big enough; otherwise, a new array of the
493 * same runtime type is allocated for this purpose
494 * @return an array containing all of the elements in this queue
495 * @throws ArrayStoreException if the runtime type of the specified array
496 * is not a supertype of the runtime type of every element in
497 * this queue
498 * @throws NullPointerException if the specified array is null
499 */
500 public <T> T[] toArray(T[] a) {
501 final E[] items = this.items;
502 final ReentrantLock lock = this.lock;
503 lock.lock();
504 try {
505 if (a.length < count)
506 a = (T[])java.lang.reflect.Array.newInstance(
507 a.getClass().getComponentType(),
508 count
509 );
510
511 int k = 0;
512 int i = takeIndex;
513 while (k < count) {
514 a[k++] = (T)items[i];
515 i = inc(i);
516 }
517 if (a.length > count)
518 a[count] = null;
519 return a;
520 } finally {
521 lock.unlock();
522 }
523 }
524
525 public String toString() {
526 final ReentrantLock lock = this.lock;
527 lock.lock();
528 try {
529 return super.toString();
530 } finally {
531 lock.unlock();
532 }
533 }
534
535 /**
536 * Atomically removes all of the elements from this queue.
537 * The queue will be empty after this call returns.
538 */
539 public void clear() {
540 final E[] items = this.items;
541 final ReentrantLock lock = this.lock;
542 lock.lock();
543 try {
544 int i = takeIndex;
545 int k = count;
546 while (k-- > 0) {
547 items[i] = null;
548 i = inc(i);
549 }
550 count = 0;
551 putIndex = 0;
552 takeIndex = 0;
553 notFull.signalAll();
554 } finally {
555 lock.unlock();
556 }
557 }
558
559 /**
560 * @throws UnsupportedOperationException {@inheritDoc}
561 * @throws ClassCastException {@inheritDoc}
562 * @throws NullPointerException {@inheritDoc}
563 * @throws IllegalArgumentException {@inheritDoc}
564 */
565 public int drainTo(Collection<? super E> c) {
566 if (c == null)
567 throw new NullPointerException();
568 if (c == this)
569 throw new IllegalArgumentException();
570 final E[] items = this.items;
571 final ReentrantLock lock = this.lock;
572 lock.lock();
573 try {
574 int i = takeIndex;
575 int n = 0;
576 int max = count;
577 while (n < max) {
578 c.add(items[i]);
579 items[i] = null;
580 i = inc(i);
581 ++n;
582 }
583 if (n > 0) {
584 count = 0;
585 putIndex = 0;
586 takeIndex = 0;
587 notFull.signalAll();
588 }
589 return n;
590 } finally {
591 lock.unlock();
592 }
593 }
594
595 /**
596 * @throws UnsupportedOperationException {@inheritDoc}
597 * @throws ClassCastException {@inheritDoc}
598 * @throws NullPointerException {@inheritDoc}
599 * @throws IllegalArgumentException {@inheritDoc}
600 */
601 public int drainTo(Collection<? super E> c, int maxElements) {
602 if (c == null)
603 throw new NullPointerException();
604 if (c == this)
605 throw new IllegalArgumentException();
606 if (maxElements <= 0)
607 return 0;
608 final E[] items = this.items;
609 final ReentrantLock lock = this.lock;
610 lock.lock();
611 try {
612 int i = takeIndex;
613 int n = 0;
614 int sz = count;
615 int max = (maxElements < count) ? maxElements : count;
616 while (n < max) {
617 c.add(items[i]);
618 items[i] = null;
619 i = inc(i);
620 ++n;
621 }
622 if (n > 0) {
623 count -= n;
624 takeIndex = i;
625 notFull.signalAll();
626 }
627 return n;
628 } finally {
629 lock.unlock();
630 }
631 }
632
633
634 /**
635 * Returns an iterator over the elements in this queue in proper sequence.
636 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
637 * will never throw {@link ConcurrentModificationException},
638 * and guarantees to traverse elements as they existed upon
639 * construction of the iterator, and may (but is not guaranteed to)
640 * reflect any modifications subsequent to construction.
641 *
642 * @return an iterator over the elements in this queue in proper sequence
643 */
644 public Iterator<E> iterator() {
645 final ReentrantLock lock = this.lock;
646 lock.lock();
647 try {
648 return new Itr();
649 } finally {
650 lock.unlock();
651 }
652 }
653
654 /**
655 * Iterator for ArrayBlockingQueue
656 */
657 private class Itr implements Iterator<E> {
658 /**
659 * Index of element to be returned by next,
660 * or a negative number if no such.
661 */
662 private int nextIndex;
663
664 /**
665 * nextItem holds on to item fields because once we claim
666 * that an element exists in hasNext(), we must return it in
667 * the following next() call even if it was in the process of
668 * being removed when hasNext() was called.
669 */
670 private E nextItem;
671
672 /**
673 * Index of element returned by most recent call to next.
674 * Reset to -1 if this element is deleted by a call to remove.
675 */
676 private int lastRet;
677
678 Itr() {
679 lastRet = -1;
680 if (count == 0)
681 nextIndex = -1;
682 else {
683 nextIndex = takeIndex;
684 nextItem = items[takeIndex];
685 }
686 }
687
688 public boolean hasNext() {
689 /*
690 * No sync. We can return true by mistake here
691 * only if this iterator passed across threads,
692 * which we don't support anyway.
693 */
694 return nextIndex >= 0;
695 }
696
697 /**
698 * Checks whether nextIndex is valid; if so setting nextItem.
699 * Stops iterator when either hits putIndex or sees null item.
700 */
701 private void checkNext() {
702 if (nextIndex == putIndex) {
703 nextIndex = -1;
704 nextItem = null;
705 } else {
706 nextItem = items[nextIndex];
707 if (nextItem == null)
708 nextIndex = -1;
709 }
710 }
711
712 public E next() {
713 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
714 lock.lock();
715 try {
716 if (nextIndex < 0)
717 throw new NoSuchElementException();
718 lastRet = nextIndex;
719 E x = nextItem;
720 nextIndex = inc(nextIndex);
721 checkNext();
722 return x;
723 } finally {
724 lock.unlock();
725 }
726 }
727
728 public void remove() {
729 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
730 lock.lock();
731 try {
732 int i = lastRet;
733 if (i == -1)
734 throw new IllegalStateException();
735 lastRet = -1;
736
737 int ti = takeIndex;
738 removeAt(i);
739 // back up cursor (reset to front if was first element)
740 nextIndex = (i == ti) ? takeIndex : i;
741 checkNext();
742 } finally {
743 lock.unlock();
744 }
745 }
746 }
747 }