ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.44
Committed: Thu May 27 11:06:11 2004 UTC (20 years ago) by dl
Branch: MAIN
Changes since 1.43: +9 -2 lines
Log Message:
Override javadoc specs when overriding AbstractQueue implementations
Clarify atomicity in BlockingQueue

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
19 *
20 * <p>This is a classic &quot;bounded buffer&quot;, in which a
21 * fixed-sized array holds elements inserted by producers and
22 * extracted by consumers. Once created, the capacity cannot be
23 * increased. Attempts to offer an element to a full queue will
24 * result in the offer operation blocking; attempts to retrieve an
25 * element from an empty queue will similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * waiting producer and consumer threads. By default, this ordering
29 * is not guaranteed. However, a queue constructed with fairness set
30 * to <tt>true</tt> grants threads access in FIFO order. Fairness
31 * generally decreases throughput but reduces variability and avoids
32 * starvation.
33 *
34 * <p>This class and its iterator implement all of the
35 * <em>optional</em> methods of the {@link Collection} and {@link
36 * Iterator} interfaces.
37 *
38 * <p>This class is a member of the
39 * <a href="{@docRoot}/../guide/collections/index.html">
40 * Java Collections Framework</a>.
41 *
42 * @since 1.5
43 * @author Doug Lea
44 * @param <E> the type of elements held in this collection
45 */
46 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
47 implements BlockingQueue<E>, java.io.Serializable {
48
49 /**
50 * Serialization ID. This class relies on default serialization
51 * even for the items array, which is default-serialized, even if
52 * it is empty. Otherwise it could not be declared final, which is
53 * necessary here.
54 */
55 private static final long serialVersionUID = -817911632652898426L;
56
57 /** The queued items */
58 private final E[] items;
59 /** items index for next take, poll or remove */
60 private transient int takeIndex;
61 /** items index for next put, offer, or add. */
62 private transient int putIndex;
63 /** Number of items in the queue */
64 private int count;
65
66 /*
67 * Concurrency control uses the classic two-condition algorithm
68 * found in any textbook.
69 */
70
71 /** Main lock guarding all access */
72 private final ReentrantLock lock;
73 /** Condition for waiting takes */
74 private final Condition notEmpty;
75 /** Condition for waiting puts */
76 private final Condition notFull;
77
78 // Internal helper methods
79
80 /**
81 * Circularly increment i.
82 */
83 final int inc(int i) {
84 return (++i == items.length)? 0 : i;
85 }
86
87 /**
88 * Insert element at current put position, advance, and signal.
89 * Call only when holding lock.
90 */
91 private void insert(E x) {
92 items[putIndex] = x;
93 putIndex = inc(putIndex);
94 ++count;
95 notEmpty.signal();
96 }
97
98 /**
99 * Extract element at current take position, advance, and signal.
100 * Call only when holding lock.
101 */
102 private E extract() {
103 final E[] items = this.items;
104 E x = items[takeIndex];
105 items[takeIndex] = null;
106 takeIndex = inc(takeIndex);
107 --count;
108 notFull.signal();
109 return x;
110 }
111
112 /**
113 * Utility for remove and iterator.remove: Delete item at position i.
114 * Call only when holding lock.
115 */
116 void removeAt(int i) {
117 final E[] items = this.items;
118 // if removing front item, just advance
119 if (i == takeIndex) {
120 items[takeIndex] = null;
121 takeIndex = inc(takeIndex);
122 } else {
123 // slide over all others up through putIndex.
124 for (;;) {
125 int nexti = inc(i);
126 if (nexti != putIndex) {
127 items[i] = items[nexti];
128 i = nexti;
129 } else {
130 items[i] = null;
131 putIndex = i;
132 break;
133 }
134 }
135 }
136 --count;
137 notFull.signal();
138 }
139
140 /**
141 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
142 * capacity and default access policy.
143 * @param capacity the capacity of this queue
144 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
145 */
146 public ArrayBlockingQueue(int capacity) {
147 this(capacity, false);
148 }
149
150 /**
151 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
152 * capacity and the specified access policy.
153 * @param capacity the capacity of this queue
154 * @param fair if <tt>true</tt> then queue accesses for threads blocked
155 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
156 * the access order is unspecified.
157 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
158 */
159 public ArrayBlockingQueue(int capacity, boolean fair) {
160 if (capacity <= 0)
161 throw new IllegalArgumentException();
162 this.items = (E[]) new Object[capacity];
163 lock = new ReentrantLock(fair);
164 notEmpty = lock.newCondition();
165 notFull = lock.newCondition();
166 }
167
168 /**
169 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
170 * capacity, the specified access policy and initially containing the
171 * elements of the given collection,
172 * added in traversal order of the collection's iterator.
173 * @param capacity the capacity of this queue
174 * @param fair if <tt>true</tt> then queue accesses for threads blocked
175 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
176 * the access order is unspecified.
177 * @param c the collection of elements to initially contain
178 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
179 * <tt>c.size()</tt>, or less than 1.
180 * @throws NullPointerException if <tt>c</tt> or any element within it
181 * is <tt>null</tt>
182 */
183 public ArrayBlockingQueue(int capacity, boolean fair,
184 Collection<? extends E> c) {
185 this(capacity, fair);
186 if (capacity < c.size())
187 throw new IllegalArgumentException();
188
189 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
190 add(it.next());
191 }
192
193 /**
194 * Inserts the specified element at the tail of this queue if possible,
195 * returning immediately if this queue is full.
196 *
197 * @param o the element to add.
198 * @return <tt>true</tt> if it was possible to add the element to
199 * this queue, else <tt>false</tt>
200 * @throws NullPointerException if the specified element is <tt>null</tt>
201 */
202 public boolean offer(E o) {
203 if (o == null) throw new NullPointerException();
204 final ReentrantLock lock = this.lock;
205 lock.lock();
206 try {
207 if (count == items.length)
208 return false;
209 else {
210 insert(o);
211 return true;
212 }
213 } finally {
214 lock.unlock();
215 }
216 }
217
218 /**
219 * Inserts the specified element at the tail of this queue, waiting if
220 * necessary up to the specified wait time for space to become available.
221 * @param o the element to add
222 * @param timeout how long to wait before giving up, in units of
223 * <tt>unit</tt>
224 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
225 * <tt>timeout</tt> parameter
226 * @return <tt>true</tt> if successful, or <tt>false</tt> if
227 * the specified waiting time elapses before space is available.
228 * @throws InterruptedException if interrupted while waiting.
229 * @throws NullPointerException if the specified element is <tt>null</tt>.
230 */
231 public boolean offer(E o, long timeout, TimeUnit unit)
232 throws InterruptedException {
233
234 if (o == null) throw new NullPointerException();
235 final ReentrantLock lock = this.lock;
236 lock.lockInterruptibly();
237 try {
238 long nanos = unit.toNanos(timeout);
239 for (;;) {
240 if (count != items.length) {
241 insert(o);
242 return true;
243 }
244 if (nanos <= 0)
245 return false;
246 try {
247 nanos = notFull.awaitNanos(nanos);
248 } catch (InterruptedException ie) {
249 notFull.signal(); // propagate to non-interrupted thread
250 throw ie;
251 }
252 }
253 } finally {
254 lock.unlock();
255 }
256 }
257
258
259 public E poll() {
260 final ReentrantLock lock = this.lock;
261 lock.lock();
262 try {
263 if (count == 0)
264 return null;
265 E x = extract();
266 return x;
267 } finally {
268 lock.unlock();
269 }
270 }
271
272 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
273 final ReentrantLock lock = this.lock;
274 lock.lockInterruptibly();
275 try {
276 long nanos = unit.toNanos(timeout);
277 for (;;) {
278 if (count != 0) {
279 E x = extract();
280 return x;
281 }
282 if (nanos <= 0)
283 return null;
284 try {
285 nanos = notEmpty.awaitNanos(nanos);
286 } catch (InterruptedException ie) {
287 notEmpty.signal(); // propagate to non-interrupted thread
288 throw ie;
289 }
290
291 }
292 } finally {
293 lock.unlock();
294 }
295 }
296
297 /**
298 * Removes a single instance of the specified element from this
299 * collection, if it is present.
300 */
301 public boolean remove(Object o) {
302 if (o == null) return false;
303 final E[] items = this.items;
304 final ReentrantLock lock = this.lock;
305 lock.lock();
306 try {
307 int i = takeIndex;
308 int k = 0;
309 for (;;) {
310 if (k++ >= count)
311 return false;
312 if (o.equals(items[i])) {
313 removeAt(i);
314 return true;
315 }
316 i = inc(i);
317 }
318
319 } finally {
320 lock.unlock();
321 }
322 }
323
324 public E peek() {
325 final ReentrantLock lock = this.lock;
326 lock.lock();
327 try {
328 return (count == 0) ? null : items[takeIndex];
329 } finally {
330 lock.unlock();
331 }
332 }
333
334 public E take() throws InterruptedException {
335 final ReentrantLock lock = this.lock;
336 lock.lockInterruptibly();
337 try {
338 try {
339 while (count == 0)
340 notEmpty.await();
341 } catch (InterruptedException ie) {
342 notEmpty.signal(); // propagate to non-interrupted thread
343 throw ie;
344 }
345 E x = extract();
346 return x;
347 } finally {
348 lock.unlock();
349 }
350 }
351
352 /**
353 * Adds the specified element to the tail of this queue, waiting if
354 * necessary for space to become available.
355 * @param o the element to add
356 * @throws InterruptedException if interrupted while waiting.
357 * @throws NullPointerException if the specified element is <tt>null</tt>.
358 */
359 public void put(E o) throws InterruptedException {
360 if (o == null) throw new NullPointerException();
361 final E[] items = this.items;
362 final ReentrantLock lock = this.lock;
363 lock.lockInterruptibly();
364 try {
365 try {
366 while (count == items.length)
367 notFull.await();
368 } catch (InterruptedException ie) {
369 notFull.signal(); // propagate to non-interrupted thread
370 throw ie;
371 }
372 insert(o);
373 } finally {
374 lock.unlock();
375 }
376 }
377
378 // this doc comment is overridden to remove the reference to collections
379 // greater in size than Integer.MAX_VALUE
380 /**
381 * Returns the number of elements in this queue.
382 *
383 * @return the number of elements in this queue.
384 */
385 public int size() {
386 final ReentrantLock lock = this.lock;
387 lock.lock();
388 try {
389 return count;
390 } finally {
391 lock.unlock();
392 }
393 }
394
395 // this doc comment is a modified copy of the inherited doc comment,
396 // without the reference to unlimited queues.
397 /**
398 * Returns the number of elements that this queue can ideally (in
399 * the absence of memory or resource constraints) accept without
400 * blocking. This is always equal to the initial capacity of this queue
401 * less the current <tt>size</tt> of this queue.
402 * <p>Note that you <em>cannot</em> always tell if
403 * an attempt to <tt>add</tt> an element will succeed by
404 * inspecting <tt>remainingCapacity</tt> because it may be the
405 * case that a waiting consumer is ready to <tt>take</tt> an
406 * element out of an otherwise full queue.
407 */
408 public int remainingCapacity() {
409 final ReentrantLock lock = this.lock;
410 lock.lock();
411 try {
412 return items.length - count;
413 } finally {
414 lock.unlock();
415 }
416 }
417
418
419 public boolean contains(Object o) {
420 if (o == null) return false;
421 final E[] items = this.items;
422 final ReentrantLock lock = this.lock;
423 lock.lock();
424 try {
425 int i = takeIndex;
426 int k = 0;
427 while (k++ < count) {
428 if (o.equals(items[i]))
429 return true;
430 i = inc(i);
431 }
432 return false;
433 } finally {
434 lock.unlock();
435 }
436 }
437
438 public Object[] toArray() {
439 final E[] items = this.items;
440 final ReentrantLock lock = this.lock;
441 lock.lock();
442 try {
443 Object[] a = new Object[count];
444 int k = 0;
445 int i = takeIndex;
446 while (k < count) {
447 a[k++] = items[i];
448 i = inc(i);
449 }
450 return a;
451 } finally {
452 lock.unlock();
453 }
454 }
455
456 public <T> T[] toArray(T[] a) {
457 final E[] items = this.items;
458 final ReentrantLock lock = this.lock;
459 lock.lock();
460 try {
461 if (a.length < count)
462 a = (T[])java.lang.reflect.Array.newInstance(
463 a.getClass().getComponentType(),
464 count
465 );
466
467 int k = 0;
468 int i = takeIndex;
469 while (k < count) {
470 a[k++] = (T)items[i];
471 i = inc(i);
472 }
473 if (a.length > count)
474 a[count] = null;
475 return a;
476 } finally {
477 lock.unlock();
478 }
479 }
480
481 public String toString() {
482 final ReentrantLock lock = this.lock;
483 lock.lock();
484 try {
485 return super.toString();
486 } finally {
487 lock.unlock();
488 }
489 }
490
491
492 /**
493 * Atomically removes all of the elements from this queue.
494 * The queue will be empty after this call returns.
495 */
496 public void clear() {
497 final E[] items = this.items;
498 final ReentrantLock lock = this.lock;
499 lock.lock();
500 try {
501 int i = takeIndex;
502 int k = count;
503 while (k-- > 0) {
504 items[i] = null;
505 i = inc(i);
506 }
507 count = 0;
508 putIndex = 0;
509 takeIndex = 0;
510 notFull.signalAll();
511 } finally {
512 lock.unlock();
513 }
514 }
515
516 public int drainTo(Collection<? super E> c) {
517 if (c == null)
518 throw new NullPointerException();
519 if (c == this)
520 throw new IllegalArgumentException();
521 final E[] items = this.items;
522 final ReentrantLock lock = this.lock;
523 lock.lock();
524 try {
525 int i = takeIndex;
526 int n = 0;
527 int max = count;
528 while (n < max) {
529 c.add(items[i]);
530 items[i] = null;
531 i = inc(i);
532 ++n;
533 }
534 if (n > 0) {
535 count = 0;
536 putIndex = 0;
537 takeIndex = 0;
538 notFull.signalAll();
539 }
540 return n;
541 } finally {
542 lock.unlock();
543 }
544 }
545
546
547 public int drainTo(Collection<? super E> c, int maxElements) {
548 if (c == null)
549 throw new NullPointerException();
550 if (c == this)
551 throw new IllegalArgumentException();
552 if (maxElements <= 0)
553 return 0;
554 final E[] items = this.items;
555 final ReentrantLock lock = this.lock;
556 lock.lock();
557 try {
558 int i = takeIndex;
559 int n = 0;
560 int sz = count;
561 int max = (maxElements < count)? maxElements : count;
562 while (n < max) {
563 c.add(items[i]);
564 items[i] = null;
565 i = inc(i);
566 ++n;
567 }
568 if (n > 0) {
569 count -= n;
570 takeIndex = i;
571 notFull.signalAll();
572 }
573 return n;
574 } finally {
575 lock.unlock();
576 }
577 }
578
579
580 /**
581 * Returns an iterator over the elements in this queue in proper sequence.
582 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
583 * will never throw {@link java.util.ConcurrentModificationException},
584 * and guarantees to traverse elements as they existed upon
585 * construction of the iterator, and may (but is not guaranteed to)
586 * reflect any modifications subsequent to construction.
587 *
588 * @return an iterator over the elements in this queue in proper sequence.
589 */
590 public Iterator<E> iterator() {
591 final ReentrantLock lock = this.lock;
592 lock.lock();
593 try {
594 return new Itr();
595 } finally {
596 lock.unlock();
597 }
598 }
599
600 /**
601 * Iterator for ArrayBlockingQueue
602 */
603 private class Itr implements Iterator<E> {
604 /**
605 * Index of element to be returned by next,
606 * or a negative number if no such.
607 */
608 private int nextIndex;
609
610 /**
611 * nextItem holds on to item fields because once we claim
612 * that an element exists in hasNext(), we must return it in
613 * the following next() call even if it was in the process of
614 * being removed when hasNext() was called.
615 **/
616 private E nextItem;
617
618 /**
619 * Index of element returned by most recent call to next.
620 * Reset to -1 if this element is deleted by a call to remove.
621 */
622 private int lastRet;
623
624 Itr() {
625 lastRet = -1;
626 if (count == 0)
627 nextIndex = -1;
628 else {
629 nextIndex = takeIndex;
630 nextItem = items[takeIndex];
631 }
632 }
633
634 public boolean hasNext() {
635 /*
636 * No sync. We can return true by mistake here
637 * only if this iterator passed across threads,
638 * which we don't support anyway.
639 */
640 return nextIndex >= 0;
641 }
642
643 /**
644 * Check whether nextIndex is valid; if so setting nextItem.
645 * Stops iterator when either hits putIndex or sees null item.
646 */
647 private void checkNext() {
648 if (nextIndex == putIndex) {
649 nextIndex = -1;
650 nextItem = null;
651 } else {
652 nextItem = items[nextIndex];
653 if (nextItem == null)
654 nextIndex = -1;
655 }
656 }
657
658 public E next() {
659 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
660 lock.lock();
661 try {
662 if (nextIndex < 0)
663 throw new NoSuchElementException();
664 lastRet = nextIndex;
665 E x = nextItem;
666 nextIndex = inc(nextIndex);
667 checkNext();
668 return x;
669 } finally {
670 lock.unlock();
671 }
672 }
673
674 public void remove() {
675 final ReentrantLock lock = ArrayBlockingQueue.this.lock;
676 lock.lock();
677 try {
678 int i = lastRet;
679 if (i == -1)
680 throw new IllegalStateException();
681 lastRet = -1;
682
683 int ti = takeIndex;
684 removeAt(i);
685 // back up cursor (reset to front if was first element)
686 nextIndex = (i == ti) ? takeIndex : i;
687 checkNext();
688 } finally {
689 lock.unlock();
690 }
691 }
692 }
693 }