ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.36
Committed: Tue Dec 23 19:38:09 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.35: +49 -99 lines
Log Message:
cache finals across volatiles; avoid readResolve; doc improvments; timed invokeAll interleaves

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
19 *
20 * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
21 * array holds
22 * elements inserted by producers and extracted by consumers. Once
23 * created, the capacity can not be increased. Attempts to offer an
24 * element to a full queue will result in the offer operation
25 * blocking; attempts to retrieve an element from an empty queue will
26 * similarly block.
27 *
28 * <p> This class supports an optional fairness policy for ordering
29 * threads blocked on an insertion or removal. By default, this
30 * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
31 * constructed with fairness set to <tt>true</tt> grants blocked
32 * threads access in FIFO order. Fairness generally decreases
33 * throughput but reduces variability and avoids starvation.
34 *
35 * <p>This class implements all of the <em>optional</em> methods
36 * of the {@link Collection} and {@link Iterator} interfaces.
37 *
38 * @since 1.5
39 * @author Doug Lea
40 * @param <E> the type of elements held in this collection
41 */
42 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
43 implements BlockingQueue<E>, java.io.Serializable {
44
45 /**
46 * Seialization ID. This class relies on default serialzation even
47 * for the items array is default-serialized, even if it is
48 * empty. Otherwise it could not be declared final, which is
49 * necessary here.
50 */
51 private static final long serialVersionUID = -817911632652898426L;
52
53 /** The queued items */
54 private final E[] items;
55 /** items index for next take, poll or remove */
56 private transient int takeIndex;
57 /** items index for next put, offer, or add. */
58 private transient int putIndex;
59 /** Number of items in the queue */
60 private int count;
61
62 /*
63 * Concurrency control uses the classic two-condition algorithm
64 * found in any textbook.
65 */
66
67 /** Main lock guarding all access */
68 private final ReentrantLock lock;
69 /** Condition for waiting takes */
70 private final ReentrantLock.ConditionObject notEmpty;
71 /** Condition for waiting puts */
72 private final ReentrantLock.ConditionObject notFull;
73
74 // Internal helper methods
75
76 /**
77 * Circularly increment i.
78 */
79 int inc(int i) {
80 return (++i == items.length)? 0 : i;
81 }
82
83 /**
84 * Insert element at current put position, advance, and signal.
85 * Call only when holding lock.
86 */
87 private void insert(E x) {
88 items[putIndex] = x;
89 putIndex = inc(putIndex);
90 ++count;
91 notEmpty.signal();
92 }
93
94 /**
95 * Extract element at current take position, advance, and signal.
96 * Call only when holding lock.
97 */
98 private E extract() {
99 final E[] items = this.items;
100 E x = items[takeIndex];
101 items[takeIndex] = null;
102 takeIndex = inc(takeIndex);
103 --count;
104 notFull.signal();
105 return x;
106 }
107
108 /**
109 * Utility for remove and iterator.remove: Delete item at position i.
110 * Call only when holding lock.
111 */
112 void removeAt(int i) {
113 final E[] items = this.items;
114 // if removing front item, just advance
115 if (i == takeIndex) {
116 items[takeIndex] = null;
117 takeIndex = inc(takeIndex);
118 } else {
119 // slide over all others up through putIndex.
120 for (;;) {
121 int nexti = inc(i);
122 if (nexti != putIndex) {
123 items[i] = items[nexti];
124 i = nexti;
125 } else {
126 items[i] = null;
127 putIndex = i;
128 break;
129 }
130 }
131 }
132 --count;
133 notFull.signal();
134 }
135
136 /**
137 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
138 * capacity and default access policy.
139 * @param capacity the capacity of this queue
140 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
141 */
142 public ArrayBlockingQueue(int capacity) {
143 this(capacity, false);
144 }
145
146 /**
147 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
148 * capacity and the specified access policy.
149 * @param capacity the capacity of this queue
150 * @param fair if <tt>true</tt> then queue accesses for threads blocked
151 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
152 * the access order is unspecified.
153 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
154 */
155 public ArrayBlockingQueue(int capacity, boolean fair) {
156 if (capacity <= 0)
157 throw new IllegalArgumentException();
158 this.items = (E[]) new Object[capacity];
159 lock = new ReentrantLock(fair);
160 notEmpty = lock.newCondition();
161 notFull = lock.newCondition();
162 }
163
164 /**
165 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
166 * capacity, the specified access policy and initially containing the
167 * elements of the given collection,
168 * added in traversal order of the collection's iterator.
169 * @param capacity the capacity of this queue
170 * @param fair if <tt>true</tt> then queue accesses for threads blocked
171 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
172 * the access order is unspecified.
173 * @param c the collection of elements to initially contain
174 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
175 * <tt>c.size()</tt>, or less than 1.
176 * @throws NullPointerException if <tt>c</tt> or any element within it
177 * is <tt>null</tt>
178 */
179 public ArrayBlockingQueue(int capacity, boolean fair,
180 Collection<? extends E> c) {
181 this(capacity, fair);
182 if (capacity < c.size())
183 throw new IllegalArgumentException();
184
185 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
186 add(it.next());
187 }
188
189 /**
190 * Inserts the specified element at the tail of this queue if possible,
191 * returning immediately if this queue is full.
192 *
193 * @param o the element to add.
194 * @return <tt>true</tt> if it was possible to add the element to
195 * this queue, else <tt>false</tt>
196 * @throws NullPointerException if the specified element is <tt>null</tt>
197 */
198 public boolean offer(E o) {
199 if (o == null) throw new NullPointerException();
200 final ReentrantLock lock = this.lock;
201 lock.lock();
202 try {
203 if (count == items.length)
204 return false;
205 else {
206 insert(o);
207 return true;
208 }
209 } finally {
210 lock.unlock();
211 }
212 }
213
214 /**
215 * Inserts the specified element at the tail of this queue, waiting if
216 * necessary up to the specified wait time for space to become available.
217 * @param o the element to add
218 * @param timeout how long to wait before giving up, in units of
219 * <tt>unit</tt>
220 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
221 * <tt>timeout</tt> parameter
222 * @return <tt>true</tt> if successful, or <tt>false</tt> if
223 * the specified waiting time elapses before space is available.
224 * @throws InterruptedException if interrupted while waiting.
225 * @throws NullPointerException if the specified element is <tt>null</tt>.
226 */
227 public boolean offer(E o, long timeout, TimeUnit unit)
228 throws InterruptedException {
229
230 if (o == null) throw new NullPointerException();
231 final ReentrantLock lock = this.lock;
232 lock.lockInterruptibly();
233 try {
234 long nanos = unit.toNanos(timeout);
235 for (;;) {
236 if (count != items.length) {
237 insert(o);
238 return true;
239 }
240 if (nanos <= 0)
241 return false;
242 try {
243 nanos = notFull.awaitNanos(nanos);
244 } catch (InterruptedException ie) {
245 notFull.signal(); // propagate to non-interrupted thread
246 throw ie;
247 }
248 }
249 } finally {
250 lock.unlock();
251 }
252 }
253
254
255 public E poll() {
256 final ReentrantLock lock = this.lock;
257 lock.lock();
258 try {
259 if (count == 0)
260 return null;
261 E x = extract();
262 return x;
263 } finally {
264 lock.unlock();
265 }
266 }
267
268 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
269 final ReentrantLock lock = this.lock;
270 lock.lockInterruptibly();
271 try {
272 long nanos = unit.toNanos(timeout);
273 for (;;) {
274 if (count != 0) {
275 E x = extract();
276 return x;
277 }
278 if (nanos <= 0)
279 return null;
280 try {
281 nanos = notEmpty.awaitNanos(nanos);
282 } catch (InterruptedException ie) {
283 notEmpty.signal(); // propagate to non-interrupted thread
284 throw ie;
285 }
286
287 }
288 } finally {
289 lock.unlock();
290 }
291 }
292
293
294 public boolean remove(Object o) {
295 if (o == null) return false;
296 final E[] items = this.items;
297 final ReentrantLock lock = this.lock;
298 lock.lock();
299 try {
300 int i = takeIndex;
301 int k = 0;
302 for (;;) {
303 if (k++ >= count)
304 return false;
305 if (o.equals(items[i])) {
306 removeAt(i);
307 return true;
308 }
309 i = inc(i);
310 }
311
312 } finally {
313 lock.unlock();
314 }
315 }
316
317 public E peek() {
318 final ReentrantLock lock = this.lock;
319 lock.lock();
320 try {
321 return (count == 0) ? null : items[takeIndex];
322 } finally {
323 lock.unlock();
324 }
325 }
326
327 public E take() throws InterruptedException {
328 final ReentrantLock lock = this.lock;
329 lock.lockInterruptibly();
330 try {
331 try {
332 while (count == 0)
333 notEmpty.await();
334 } catch (InterruptedException ie) {
335 notEmpty.signal(); // propagate to non-interrupted thread
336 throw ie;
337 }
338 E x = extract();
339 return x;
340 } finally {
341 lock.unlock();
342 }
343 }
344
345 /**
346 * Adds the specified element to the tail of this queue, waiting if
347 * necessary for space to become available.
348 * @param o the element to add
349 * @throws InterruptedException if interrupted while waiting.
350 * @throws NullPointerException if the specified element is <tt>null</tt>.
351 */
352 public void put(E o) throws InterruptedException {
353 if (o == null) throw new NullPointerException();
354 final E[] items = this.items;
355 final ReentrantLock lock = this.lock;
356 lock.lockInterruptibly();
357 try {
358 try {
359 while (count == items.length)
360 notFull.await();
361 } catch (InterruptedException ie) {
362 notFull.signal(); // propagate to non-interrupted thread
363 throw ie;
364 }
365 insert(o);
366 } finally {
367 lock.unlock();
368 }
369 }
370
371 // this doc comment is overridden to remove the reference to collections
372 // greater in size than Integer.MAX_VALUE
373 /**
374 * Returns the number of elements in this queue.
375 *
376 * @return the number of elements in this queue.
377 */
378 public int size() {
379 final ReentrantLock lock = this.lock;
380 lock.lock();
381 try {
382 return count;
383 } finally {
384 lock.unlock();
385 }
386 }
387
388 // this doc comment is a modified copy of the inherited doc comment,
389 // without the reference to unlimited queues.
390 /**
391 * Returns the number of elements that this queue can ideally (in
392 * the absence of memory or resource constraints) accept without
393 * blocking. This is always equal to the initial capacity of this queue
394 * less the current <tt>size</tt> of this queue.
395 * <p>Note that you <em>cannot</em> always tell if
396 * an attempt to <tt>add</tt> an element will succeed by
397 * inspecting <tt>remainingCapacity</tt> because it may be the
398 * case that a waiting consumer is ready to <tt>take</tt> an
399 * element out of an otherwise full queue.
400 */
401 public int remainingCapacity() {
402 final ReentrantLock lock = this.lock;
403 lock.lock();
404 try {
405 return items.length - count;
406 } finally {
407 lock.unlock();
408 }
409 }
410
411
412 public boolean contains(Object o) {
413 if (o == null) return false;
414 final E[] items = this.items;
415 final ReentrantLock lock = this.lock;
416 lock.lock();
417 try {
418 int i = takeIndex;
419 int k = 0;
420 while (k++ < count) {
421 if (o.equals(items[i]))
422 return true;
423 i = inc(i);
424 }
425 return false;
426 } finally {
427 lock.unlock();
428 }
429 }
430
431 public Object[] toArray() {
432 final E[] items = this.items;
433 final ReentrantLock lock = this.lock;
434 lock.lock();
435 try {
436 Object[] a = new Object[count];
437 int k = 0;
438 int i = takeIndex;
439 while (k < count) {
440 a[k++] = items[i];
441 i = inc(i);
442 }
443 return a;
444 } finally {
445 lock.unlock();
446 }
447 }
448
449 public <T> T[] toArray(T[] a) {
450 final E[] items = this.items;
451 final ReentrantLock lock = this.lock;
452 lock.lock();
453 try {
454 if (a.length < count)
455 a = (T[])java.lang.reflect.Array.newInstance(
456 a.getClass().getComponentType(),
457 count
458 );
459
460 int k = 0;
461 int i = takeIndex;
462 while (k < count) {
463 a[k++] = (T)items[i];
464 i = inc(i);
465 }
466 if (a.length > count)
467 a[count] = null;
468 return a;
469 } finally {
470 lock.unlock();
471 }
472 }
473
474 public String toString() {
475 final ReentrantLock lock = this.lock;
476 lock.lock();
477 try {
478 return super.toString();
479 } finally {
480 lock.unlock();
481 }
482 }
483
484
485 public void clear() {
486 final E[] items = this.items;
487 final ReentrantLock lock = this.lock;
488 lock.lock();
489 try {
490 int i = takeIndex;
491 int k = count;
492 while (k-- > 0) {
493 items[i] = null;
494 i = inc(i);
495 }
496 count = 0;
497 putIndex = 0;
498 takeIndex = 0;
499 notFull.signalAll();
500 } finally {
501 lock.unlock();
502 }
503 }
504
505 public int drainTo(Collection<? super E> c) {
506 if (c == null)
507 throw new NullPointerException();
508 if (c == this)
509 throw new IllegalArgumentException();
510 final E[] items = this.items;
511 final ReentrantLock lock = this.lock;
512 lock.lock();
513 try {
514 int i = takeIndex;
515 int n = 0;
516 int max = count;
517 while (n < max) {
518 c.add(items[i]);
519 items[i] = null;
520 i = inc(i);
521 ++n;
522 }
523 if (n > 0) {
524 count = 0;
525 putIndex = 0;
526 takeIndex = 0;
527 notFull.signalAll();
528 }
529 return n;
530 } finally {
531 lock.unlock();
532 }
533 }
534
535
536 public int drainTo(Collection<? super E> c, int maxElements) {
537 if (c == null)
538 throw new NullPointerException();
539 if (c == this)
540 throw new IllegalArgumentException();
541 if (maxElements <= 0)
542 return 0;
543 final E[] items = this.items;
544 final ReentrantLock lock = this.lock;
545 lock.lock();
546 try {
547 int i = takeIndex;
548 int n = 0;
549 int sz = count;
550 int max = (maxElements < count)? maxElements : count;
551 while (n < max) {
552 c.add(items[i]);
553 items[i] = null;
554 i = inc(i);
555 ++n;
556 }
557 if (n > 0) {
558 count -= n;
559 takeIndex = i;
560 notFull.signalAll();
561 }
562 return n;
563 } finally {
564 lock.unlock();
565 }
566 }
567
568
569 /**
570 * Returns an iterator over the elements in this queue in proper sequence.
571 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
572 * will never throw {@link java.util.ConcurrentModificationException},
573 * and guarantees to traverse elements as they existed upon
574 * construction of the iterator, and may (but is not guaranteed to)
575 * reflect any modifications subsequent to construction.
576 *
577 * @return an iterator over the elements in this queue in proper sequence.
578 */
579 public Iterator<E> iterator() {
580 final ReentrantLock lock = this.lock;
581 lock.lock();
582 try {
583 return new Itr();
584 } finally {
585 lock.unlock();
586 }
587 }
588
589 /**
590 * Iterator for ArrayBlockingQueue
591 */
592 private class Itr implements Iterator<E> {
593 /**
594 * Index of element to be returned by next,
595 * or a negative number if no such.
596 */
597 private int nextIndex;
598
599 /**
600 * nextItem holds on to item fields because once we claim
601 * that an element exists in hasNext(), we must return it in
602 * the following next() call even if it was in the process of
603 * being removed when hasNext() was called.
604 **/
605 private E nextItem;
606
607 /**
608 * Index of element returned by most recent call to next.
609 * Reset to -1 if this element is deleted by a call to remove.
610 */
611 private int lastRet;
612
613 Itr() {
614 lastRet = -1;
615 if (count == 0)
616 nextIndex = -1;
617 else {
618 nextIndex = takeIndex;
619 nextItem = items[takeIndex];
620 }
621 }
622
623 public boolean hasNext() {
624 /*
625 * No sync. We can return true by mistake here
626 * only if this iterator passed across threads,
627 * which we don't support anyway.
628 */
629 return nextIndex >= 0;
630 }
631
632 /**
633 * Check whether nextIndex is valid; if so setting nextItem.
634 * Stops iterator when either hits putIndex or sees null item.
635 */
636 private void checkNext() {
637 if (nextIndex == putIndex) {
638 nextIndex = -1;
639 nextItem = null;
640 } else {
641 nextItem = items[nextIndex];
642 if (nextItem == null)
643 nextIndex = -1;
644 }
645 }
646
647 public E next() {
648 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
649 lock.lock();
650 try {
651 if (nextIndex < 0)
652 throw new NoSuchElementException();
653 lastRet = nextIndex;
654 E x = nextItem;
655 nextIndex = inc(nextIndex);
656 checkNext();
657 return x;
658 } finally {
659 lock.unlock();
660 }
661 }
662
663 public void remove() {
664 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
665 lock.lock();
666 try {
667 int i = lastRet;
668 if (i == -1)
669 throw new IllegalStateException();
670 lastRet = -1;
671
672 int ti = takeIndex;
673 removeAt(i);
674 // back up cursor (reset to front if was first element)
675 nextIndex = (i == ti) ? takeIndex : i;
676 checkNext();
677 } finally {
678 lock.unlock();
679 }
680 }
681 }
682 }