ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.71
Committed: Sun Oct 10 04:29:39 2010 UTC (13 years, 7 months ago) by jsr166
Branch: MAIN
Changes since 1.70: +14 -2 lines
Log Message:
Update iterator index on iterator.remove()

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
19 *
20 * <p>This is a classic &quot;bounded buffer&quot;, in which a
21 * fixed-sized array holds elements inserted by producers and
22 * extracted by consumers. Once created, the capacity cannot be
23 * changed. Attempts to {@code put} an element into a full queue
24 * will result in the operation blocking; attempts to {@code take} an
25 * element from an empty queue will similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * waiting producer and consumer threads. By default, this ordering
29 * is not guaranteed. However, a queue constructed with fairness set
30 * to {@code true} grants threads access in FIFO order. Fairness
31 * generally decreases throughput but reduces variability and avoids
32 * starvation.
33 *
34 * <p>This class and its iterator implement all of the
35 * <em>optional</em> methods of the {@link Collection} and {@link
36 * Iterator} interfaces.
37 *
38 * <p>This class is a member of the
39 * <a href="{@docRoot}/../technotes/guides/collections/index.html">
40 * Java Collections Framework</a>.
41 *
42 * @since 1.5
43 * @author Doug Lea
44 * @param <E> the type of elements held in this collection
45 */
46 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47 implements BlockingQueue<E>, java.io.Serializable {
48
49 /**
50 * Serialization ID. This class relies on default serialization
51 * even for the items array, which is default-serialized, even if
52 * it is empty. Otherwise it could not be declared final, which is
53 * necessary here.
54 */
55 private static final long serialVersionUID = -817911632652898426L;
56
57 /** The queued items */
58 final Object[] items;
59 /** items index for next take, poll or remove */
60 int takeIndex;
61 /** items index for next put, offer, or add. */
62 int putIndex;
63 /** Number of items in the queue */
64 int count;
65
66 /*
67 * Concurrency control uses the classic two-condition algorithm
68 * found in any textbook.
69 */
70
71 /** Main lock guarding all access */
72 final ReentrantLock lock;
73 /** Condition for waiting takes */
74 private final Condition notEmpty;
75 /** Condition for waiting puts */
76 private final Condition notFull;
77
78 // Internal helper methods
79
80 /**
81 * Circularly increment i.
82 */
83 final int inc(int i) {
84 return (++i == items.length) ? 0 : i;
85 }
86
87 /**
88 * Circularly decrement i.
89 */
90 final int dec(int i) {
91 return ((i == 0) ? items.length : i) - 1;
92 }
93
94 @SuppressWarnings("unchecked")
95 static <E> E cast(Object item) {
96 return (E) item;
97 }
98
99 /**
100 * Returns item at index i.
101 */
102 final E itemAt(int i) {
103 return this.<E>cast(items[i]);
104 }
105
106 /**
107 * Throws NullPointerException if argument is null.
108 *
109 * @param v the element
110 */
111 private static void checkNotNull(Object v) {
112 if (v == null)
113 throw new NullPointerException();
114 }
115
116 /**
117 * Inserts element at current put position, advances, and signals.
118 * Call only when holding lock.
119 */
120 private void insert(E x) {
121 items[putIndex] = x;
122 putIndex = inc(putIndex);
123 ++count;
124 notEmpty.signal();
125 }
126
127 /**
128 * Extracts element at current take position, advances, and signals.
129 * Call only when holding lock.
130 */
131 private E extract() {
132 final Object[] items = this.items;
133 E x = this.<E>cast(items[takeIndex]);
134 items[takeIndex] = null;
135 takeIndex = inc(takeIndex);
136 --count;
137 notFull.signal();
138 return x;
139 }
140
141 /**
142 * Deletes item at position i.
143 * Utility for remove and iterator.remove.
144 * Call only when holding lock.
145 */
146 void removeAt(int i) {
147 final Object[] items = this.items;
148 // if removing front item, just advance
149 if (i == takeIndex) {
150 items[takeIndex] = null;
151 takeIndex = inc(takeIndex);
152 } else {
153 // slide over all others up through putIndex.
154 for (;;) {
155 int nexti = inc(i);
156 if (nexti != putIndex) {
157 items[i] = items[nexti];
158 i = nexti;
159 } else {
160 items[i] = null;
161 putIndex = i;
162 break;
163 }
164 }
165 }
166 --count;
167 notFull.signal();
168 }
169
170 /**
171 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
172 * capacity and default access policy.
173 *
174 * @param capacity the capacity of this queue
175 * @throws IllegalArgumentException if {@code capacity < 1}
176 */
177 public ArrayBlockingQueue(int capacity) {
178 this(capacity, false);
179 }
180
181 /**
182 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
183 * capacity and the specified access policy.
184 *
185 * @param capacity the capacity of this queue
186 * @param fair if {@code true} then queue accesses for threads blocked
187 * on insertion or removal, are processed in FIFO order;
188 * if {@code false} the access order is unspecified.
189 * @throws IllegalArgumentException if {@code capacity < 1}
190 */
191 public ArrayBlockingQueue(int capacity, boolean fair) {
192 if (capacity <= 0)
193 throw new IllegalArgumentException();
194 this.items = new Object[capacity];
195 lock = new ReentrantLock(fair);
196 notEmpty = lock.newCondition();
197 notFull = lock.newCondition();
198 }
199
200 /**
201 * Creates an {@code ArrayBlockingQueue} with the given (fixed)
202 * capacity, the specified access policy and initially containing the
203 * elements of the given collection,
204 * added in traversal order of the collection's iterator.
205 *
206 * @param capacity the capacity of this queue
207 * @param fair if {@code true} then queue accesses for threads blocked
208 * on insertion or removal, are processed in FIFO order;
209 * if {@code false} the access order is unspecified.
210 * @param c the collection of elements to initially contain
211 * @throws IllegalArgumentException if {@code capacity} is less than
212 * {@code c.size()}, or less than 1.
213 * @throws NullPointerException if the specified collection or any
214 * of its elements are null
215 */
216 public ArrayBlockingQueue(int capacity, boolean fair,
217 Collection<? extends E> c) {
218 this(capacity, fair);
219
220 final ReentrantLock lock = this.lock;
221 lock.lock(); // Lock only for visibility, not mutual exclusion
222 try {
223 int i = 0;
224 try {
225 for (E e : c) {
226 checkNotNull(e);
227 items[i++] = e;
228 }
229 } catch (ArrayIndexOutOfBoundsException ex) {
230 throw new IllegalArgumentException();
231 }
232 count = i;
233 putIndex = (i == capacity) ? 0 : i;
234 } finally {
235 lock.unlock();
236 }
237 }
238
239 /**
240 * Inserts the specified element at the tail of this queue if it is
241 * possible to do so immediately without exceeding the queue's capacity,
242 * returning {@code true} upon success and throwing an
243 * {@code IllegalStateException} if this queue is full.
244 *
245 * @param e the element to add
246 * @return {@code true} (as specified by {@link Collection#add})
247 * @throws IllegalStateException if this queue is full
248 * @throws NullPointerException if the specified element is null
249 */
250 public boolean add(E e) {
251 return super.add(e);
252 }
253
254 /**
255 * Inserts the specified element at the tail of this queue if it is
256 * possible to do so immediately without exceeding the queue's capacity,
257 * returning {@code true} upon success and {@code false} if this queue
258 * is full. This method is generally preferable to method {@link #add},
259 * which can fail to insert an element only by throwing an exception.
260 *
261 * @throws NullPointerException if the specified element is null
262 */
263 public boolean offer(E e) {
264 checkNotNull(e);
265 final ReentrantLock lock = this.lock;
266 lock.lock();
267 try {
268 if (count == items.length)
269 return false;
270 else {
271 insert(e);
272 return true;
273 }
274 } finally {
275 lock.unlock();
276 }
277 }
278
279 /**
280 * Inserts the specified element at the tail of this queue, waiting
281 * for space to become available if the queue is full.
282 *
283 * @throws InterruptedException {@inheritDoc}
284 * @throws NullPointerException {@inheritDoc}
285 */
286 public void put(E e) throws InterruptedException {
287 checkNotNull(e);
288 final ReentrantLock lock = this.lock;
289 lock.lockInterruptibly();
290 try {
291 while (count == items.length)
292 notFull.await();
293 insert(e);
294 } finally {
295 lock.unlock();
296 }
297 }
298
299 /**
300 * Inserts the specified element at the tail of this queue, waiting
301 * up to the specified wait time for space to become available if
302 * the queue is full.
303 *
304 * @throws InterruptedException {@inheritDoc}
305 * @throws NullPointerException {@inheritDoc}
306 */
307 public boolean offer(E e, long timeout, TimeUnit unit)
308 throws InterruptedException {
309
310 checkNotNull(e);
311 long nanos = unit.toNanos(timeout);
312 final ReentrantLock lock = this.lock;
313 lock.lockInterruptibly();
314 try {
315 while (count == items.length) {
316 if (nanos <= 0)
317 return false;
318 nanos = notFull.awaitNanos(nanos);
319 }
320 insert(e);
321 return true;
322 } finally {
323 lock.unlock();
324 }
325 }
326
327 public E poll() {
328 final ReentrantLock lock = this.lock;
329 lock.lock();
330 try {
331 return (count == 0) ? null : extract();
332 } finally {
333 lock.unlock();
334 }
335 }
336
337 public E take() throws InterruptedException {
338 final ReentrantLock lock = this.lock;
339 lock.lockInterruptibly();
340 try {
341 while (count == 0)
342 notEmpty.await();
343 return extract();
344 } finally {
345 lock.unlock();
346 }
347 }
348
349 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
350 long nanos = unit.toNanos(timeout);
351 final ReentrantLock lock = this.lock;
352 lock.lockInterruptibly();
353 try {
354 while (count == 0) {
355 if (nanos <= 0)
356 return null;
357 nanos = notEmpty.awaitNanos(nanos);
358 }
359 return extract();
360 } finally {
361 lock.unlock();
362 }
363 }
364
365 public E peek() {
366 final ReentrantLock lock = this.lock;
367 lock.lock();
368 try {
369 return (count == 0) ? null : itemAt(takeIndex);
370 } finally {
371 lock.unlock();
372 }
373 }
374
375 // this doc comment is overridden to remove the reference to collections
376 // greater in size than Integer.MAX_VALUE
377 /**
378 * Returns the number of elements in this queue.
379 *
380 * @return the number of elements in this queue
381 */
382 public int size() {
383 final ReentrantLock lock = this.lock;
384 lock.lock();
385 try {
386 return count;
387 } finally {
388 lock.unlock();
389 }
390 }
391
392 // this doc comment is a modified copy of the inherited doc comment,
393 // without the reference to unlimited queues.
394 /**
395 * Returns the number of additional elements that this queue can ideally
396 * (in the absence of memory or resource constraints) accept without
397 * blocking. This is always equal to the initial capacity of this queue
398 * less the current {@code size} of this queue.
399 *
400 * <p>Note that you <em>cannot</em> always tell if an attempt to insert
401 * an element will succeed by inspecting {@code remainingCapacity}
402 * because it may be the case that another thread is about to
403 * insert or remove an element.
404 */
405 public int remainingCapacity() {
406 final ReentrantLock lock = this.lock;
407 lock.lock();
408 try {
409 return items.length - count;
410 } finally {
411 lock.unlock();
412 }
413 }
414
415 /**
416 * Removes a single instance of the specified element from this queue,
417 * if it is present. More formally, removes an element {@code e} such
418 * that {@code o.equals(e)}, if this queue contains one or more such
419 * elements.
420 * Returns {@code true} if this queue contained the specified element
421 * (or equivalently, if this queue changed as a result of the call).
422 *
423 * <p>Removal of interior elements in circular array based queues
424 * is an intrinsically slow and disruptive operation, so should
425 * be undertaken only in exceptional circumstances, ideally
426 * only when the queue is known not to be accessible by other
427 * threads.
428 *
429 * @param o element to be removed from this queue, if present
430 * @return {@code true} if this queue changed as a result of the call
431 */
432 public boolean remove(Object o) {
433 if (o == null) return false;
434 final Object[] items = this.items;
435 final ReentrantLock lock = this.lock;
436 lock.lock();
437 try {
438 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
439 if (o.equals(items[i])) {
440 removeAt(i);
441 return true;
442 }
443 }
444 return false;
445 } finally {
446 lock.unlock();
447 }
448 }
449
450 /**
451 * Returns {@code true} if this queue contains the specified element.
452 * More formally, returns {@code true} if and only if this queue contains
453 * at least one element {@code e} such that {@code o.equals(e)}.
454 *
455 * @param o object to be checked for containment in this queue
456 * @return {@code true} if this queue contains the specified element
457 */
458 public boolean contains(Object o) {
459 if (o == null) return false;
460 final Object[] items = this.items;
461 final ReentrantLock lock = this.lock;
462 lock.lock();
463 try {
464 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
465 if (o.equals(items[i]))
466 return true;
467 return false;
468 } finally {
469 lock.unlock();
470 }
471 }
472
473 /**
474 * Returns an array containing all of the elements in this queue, in
475 * proper sequence.
476 *
477 * <p>The returned array will be "safe" in that no references to it are
478 * maintained by this queue. (In other words, this method must allocate
479 * a new array). The caller is thus free to modify the returned array.
480 *
481 * <p>This method acts as bridge between array-based and collection-based
482 * APIs.
483 *
484 * @return an array containing all of the elements in this queue
485 */
486 public Object[] toArray() {
487 final Object[] items = this.items;
488 final ReentrantLock lock = this.lock;
489 lock.lock();
490 try {
491 final int count = this.count;
492 Object[] a = new Object[count];
493 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
494 a[k] = items[i];
495 return a;
496 } finally {
497 lock.unlock();
498 }
499 }
500
501 /**
502 * Returns an array containing all of the elements in this queue, in
503 * proper sequence; the runtime type of the returned array is that of
504 * the specified array. If the queue fits in the specified array, it
505 * is returned therein. Otherwise, a new array is allocated with the
506 * runtime type of the specified array and the size of this queue.
507 *
508 * <p>If this queue fits in the specified array with room to spare
509 * (i.e., the array has more elements than this queue), the element in
510 * the array immediately following the end of the queue is set to
511 * {@code null}.
512 *
513 * <p>Like the {@link #toArray()} method, this method acts as bridge between
514 * array-based and collection-based APIs. Further, this method allows
515 * precise control over the runtime type of the output array, and may,
516 * under certain circumstances, be used to save allocation costs.
517 *
518 * <p>Suppose {@code x} is a queue known to contain only strings.
519 * The following code can be used to dump the queue into a newly
520 * allocated array of {@code String}:
521 *
522 * <pre>
523 * String[] y = x.toArray(new String[0]);</pre>
524 *
525 * Note that {@code toArray(new Object[0])} is identical in function to
526 * {@code toArray()}.
527 *
528 * @param a the array into which the elements of the queue are to
529 * be stored, if it is big enough; otherwise, a new array of the
530 * same runtime type is allocated for this purpose
531 * @return an array containing all of the elements in this queue
532 * @throws ArrayStoreException if the runtime type of the specified array
533 * is not a supertype of the runtime type of every element in
534 * this queue
535 * @throws NullPointerException if the specified array is null
536 */
537 @SuppressWarnings("unchecked")
538 public <T> T[] toArray(T[] a) {
539 final Object[] items = this.items;
540 final ReentrantLock lock = this.lock;
541 lock.lock();
542 try {
543 final int count = this.count;
544 final int len = a.length;
545 if (len < count)
546 a = (T[])java.lang.reflect.Array.newInstance(
547 a.getClass().getComponentType(), count);
548 for (int i = takeIndex, k = 0; k < count; i = inc(i), k++)
549 a[k] = (T) items[i];
550 if (len > count)
551 a[count] = null;
552 return a;
553 } finally {
554 lock.unlock();
555 }
556 }
557
558 public String toString() {
559 final ReentrantLock lock = this.lock;
560 lock.lock();
561 try {
562 int k = count;
563 if (k == 0)
564 return "[]";
565
566 StringBuilder sb = new StringBuilder();
567 sb.append('[');
568 for (int i = takeIndex; ; i = inc(i)) {
569 Object e = items[i];
570 sb.append(e == this ? "(this Collection)" : e);
571 if (--k == 0)
572 return sb.append(']').toString();
573 sb.append(',').append(' ');
574 }
575 } finally {
576 lock.unlock();
577 }
578 }
579
580 /**
581 * Atomically removes all of the elements from this queue.
582 * The queue will be empty after this call returns.
583 */
584 public void clear() {
585 final Object[] items = this.items;
586 final ReentrantLock lock = this.lock;
587 lock.lock();
588 try {
589 for (int i = takeIndex, k = count; k > 0; i = inc(i), k--)
590 items[i] = null;
591 count = 0;
592 putIndex = 0;
593 takeIndex = 0;
594 notFull.signalAll();
595 } finally {
596 lock.unlock();
597 }
598 }
599
600 /**
601 * @throws UnsupportedOperationException {@inheritDoc}
602 * @throws ClassCastException {@inheritDoc}
603 * @throws NullPointerException {@inheritDoc}
604 * @throws IllegalArgumentException {@inheritDoc}
605 */
606 public int drainTo(Collection<? super E> c) {
607 checkNotNull(c);
608 if (c == this)
609 throw new IllegalArgumentException();
610 final Object[] items = this.items;
611 final ReentrantLock lock = this.lock;
612 lock.lock();
613 try {
614 int i = takeIndex;
615 int n = 0;
616 int max = count;
617 while (n < max) {
618 c.add(this.<E>cast(items[i]));
619 items[i] = null;
620 i = inc(i);
621 ++n;
622 }
623 if (n > 0) {
624 count = 0;
625 putIndex = 0;
626 takeIndex = 0;
627 notFull.signalAll();
628 }
629 return n;
630 } finally {
631 lock.unlock();
632 }
633 }
634
635 /**
636 * @throws UnsupportedOperationException {@inheritDoc}
637 * @throws ClassCastException {@inheritDoc}
638 * @throws NullPointerException {@inheritDoc}
639 * @throws IllegalArgumentException {@inheritDoc}
640 */
641 public int drainTo(Collection<? super E> c, int maxElements) {
642 checkNotNull(c);
643 if (c == this)
644 throw new IllegalArgumentException();
645 if (maxElements <= 0)
646 return 0;
647 final Object[] items = this.items;
648 final ReentrantLock lock = this.lock;
649 lock.lock();
650 try {
651 int i = takeIndex;
652 int n = 0;
653 int sz = count;
654 int max = (maxElements < count) ? maxElements : count;
655 while (n < max) {
656 c.add(this.<E>cast(items[i]));
657 items[i] = null;
658 i = inc(i);
659 ++n;
660 }
661 if (n > 0) {
662 count -= n;
663 takeIndex = i;
664 notFull.signalAll();
665 }
666 return n;
667 } finally {
668 lock.unlock();
669 }
670 }
671
672 /**
673 * Returns an iterator over the elements in this queue in proper
674 * sequence. The returned {@code Iterator} is "weakly
675 * consistent" with respect to operations at the head and tail of
676 * the queue, and will never throw {@link
677 * ConcurrentModificationException}. It might return elements
678 * that existed upon construction of the iterator but have since
679 * been polled or taken, and might not return elements that have
680 * since been added. Further, no consistency guarantees are made
681 * with respect to "interior" removals occuring in concurrent
682 * invocations of {@link Collection#remove(Object)} or {@link
683 * Iterator#remove} occurring in other threads.
684 *
685 * <p>The returned iterator supports the optional {@link Iterator#remove}
686 * operation. However, removal of interior elements in circular
687 * array based queues is an intrinsically slow and disruptive
688 * operation, so should be undertaken only in exceptional
689 * circumstances, ideally only when the queue is known not to be
690 * accessible by other threads.
691 *
692 * @return an iterator over the elements in this queue in proper sequence
693 */
694 public Iterator<E> iterator() {
695 return new Itr();
696 }
697
698 /**
699 * Iterator for ArrayBlockingQueue. To maintain weak consistency
700 * with respect to puts and takes, we (1) read ahead one slot, so
701 * as to not report hasNext true but then not have an element to
702 * return (2) ensure that each array slot is traversed at most
703 * once (by tracking "remaining" elements); (3) skip over null
704 * slots, which can occur if takes race ahead of iterators.
705 * However, for circular array-based queues, we cannot rely on any
706 * well established definition of what it means to be weakly
707 * consistent with respect to interior removes since these may
708 * require slot overwrites in the process of sliding elements to
709 * cover gaps. So we settle for resiliency, operating on
710 * established apparent nexts, which may miss some elements that
711 * have moved between calls to next.
712 */
713 private class Itr implements Iterator<E> {
714 private int remaining; // Number of elements yet to be returned
715 private int nextIndex; // Index of element to be returned by next
716 private E nextItem; // Element to be returned by next call to next
717 private E lastItem; // Element returned by last call to next
718 private int lastRet; // Index of last element returned, or -1 if none
719
720 Itr() {
721 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
722 lock.lock();
723 try {
724 lastRet = -1;
725 if ((remaining = count) > 0)
726 nextItem = itemAt(nextIndex = takeIndex);
727 } finally {
728 lock.unlock();
729 }
730 }
731
732 public boolean hasNext() {
733 return remaining > 0;
734 }
735
736 public E next() {
737 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
738 lock.lock();
739 try {
740 if (remaining <= 0)
741 throw new NoSuchElementException();
742 lastRet = nextIndex;
743 E x = lastItem = nextItem;
744 while (--remaining > 0) {
745 if ((nextItem = itemAt(nextIndex = inc(nextIndex))) != null)
746 break;
747 }
748 return x;
749 } finally {
750 lock.unlock();
751 }
752 }
753
754 public void remove() {
755 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
756 lock.lock();
757 try {
758 int i = lastRet;
759 if (i == -1)
760 throw new IllegalStateException();
761 lastRet = -1;
762 E x = lastItem;
763 lastItem = null;
764 // only remove if item still at index
765 if (x == items[i]) {
766 boolean removingHead = (i == takeIndex);
767 removeAt(i);
768 if (!removingHead)
769 nextIndex = dec(nextIndex);
770 }
771 } finally {
772 lock.unlock();
773 }
774 }
775 }
776
777 }