ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.32
Committed: Sat Oct 18 12:29:33 2003 UTC (20 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.31: +1 -0 lines
Log Message:
Added docs for type params

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
19 *
20 * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
21 * array holds
22 * elements inserted by producers and extracted by consumers. Once
23 * created, the capacity can not be increased. Attempts to offer an
24 * element to a full queue will result in the offer operation
25 * blocking; attempts to retrieve an element from an empty queue will
26 * similarly block.
27 *
28 * <p> This class supports an optional fairness policy for ordering
29 * threads blocked on an insertion or removal. By default, this
30 * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
31 * constructed with fairness set to <tt>true</tt> grants blocked
32 * threads access in FIFO order. Fairness generally decreases
33 * throughput but reduces variability and avoids starvation.
34 *
35 * <p>This class implements all of the <em>optional</em> methods
36 * of the {@link Collection} and {@link Iterator} interfaces.
37 *
38 * @since 1.5
39 * @author Doug Lea
40 * @param <E> the base class of all elements held in this collection
41 */
42 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
43 implements BlockingQueue<E>, java.io.Serializable {
44 private static final long serialVersionUID = -817911632652898425L;
45
46 /** The queued items */
47 private transient final E[] items;
48 /** items index for next take, poll or remove */
49 private transient int takeIndex;
50 /** items index for next put, offer, or add. */
51 private transient int putIndex;
52 /** Number of items in the queue */
53 private int count;
54
55 /**
56 * An array used only during deserialization, to hold
57 * items read back in from the stream, and then used
58 * as "items" by readResolve via the private constructor.
59 */
60 private transient E[] deserializedItems;
61
62 /*
63 * Concurrency control via the classic two-condition algorithm
64 * found in any textbook.
65 */
66
67 /** Main lock guarding all access */
68 private final ReentrantLock lock;
69 /** Condition for waiting takes */
70 private final ReentrantLock.ConditionObject notEmpty;
71 /** Condition for wiating puts */
72 private final ReentrantLock.ConditionObject notFull;
73
74 // Internal helper methods
75
76 /**
77 * Circularly increment i.
78 */
79 int inc(int i) {
80 return (++i == items.length)? 0 : i;
81 }
82
83 /**
84 * Insert element at current put position, advance, and signal.
85 * Call only when holding lock.
86 */
87 private void insert(E x) {
88 items[putIndex] = x;
89 putIndex = inc(putIndex);
90 ++count;
91 notEmpty.signal();
92 }
93
94 /**
95 * Extract element at current take position, advance, and signal.
96 * Call only when holding lock.
97 */
98 private E extract() {
99 E x = items[takeIndex];
100 items[takeIndex] = null;
101 takeIndex = inc(takeIndex);
102 --count;
103 notFull.signal();
104 return x;
105 }
106
107 /**
108 * Utility for remove and iterator.remove: Delete item at position i.
109 * Call only when holding lock.
110 */
111 void removeAt(int i) {
112 // if removing front item, just advance
113 if (i == takeIndex) {
114 items[takeIndex] = null;
115 takeIndex = inc(takeIndex);
116 } else {
117 // slide over all others up through putIndex.
118 for (;;) {
119 int nexti = inc(i);
120 if (nexti != putIndex) {
121 items[i] = items[nexti];
122 i = nexti;
123 } else {
124 items[i] = null;
125 putIndex = i;
126 break;
127 }
128 }
129 }
130 --count;
131 notFull.signal();
132 }
133
134 /**
135 * Internal constructor also used by readResolve.
136 * Sets all final fields, plus count.
137 * @param cap the capacity
138 * @param array the array to use or null if should create new one
139 * @param count the number of items in the array, where indices 0
140 * to count-1 hold items.
141 * @param lk the lock to use with this queue
142 */
143 private ArrayBlockingQueue(int cap, E[] array, int count,
144 ReentrantLock lk) {
145 if (cap <= 0)
146 throw new IllegalArgumentException();
147 if (array == null)
148 this.items = (E[]) new Object[cap];
149 else
150 this.items = array;
151 this.putIndex = count;
152 this.count = count;
153 lock = lk;
154 notEmpty = lock.newCondition();
155 notFull = lock.newCondition();
156 }
157
158 /**
159 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
160 * capacity and default access policy.
161 * @param capacity the capacity of this queue
162 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
163 */
164 public ArrayBlockingQueue(int capacity) {
165 this(capacity, null, 0, new ReentrantLock());
166 }
167
168 /**
169 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
170 * capacity and the specified access policy.
171 * @param capacity the capacity of this queue
172 * @param fair if <tt>true</tt> then queue accesses for threads blocked
173 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
174 * the access order is unspecified.
175 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
176 */
177 public ArrayBlockingQueue(int capacity, boolean fair) {
178 this(capacity, null, 0, new ReentrantLock(fair));
179 }
180
181 /**
182 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
183 * capacity, the specified access policy and initially containing the
184 * elements of the given collection,
185 * added in traversal order of the collection's iterator.
186 * @param capacity the capacity of this queue
187 * @param fair if <tt>true</tt> then queue accesses for threads blocked
188 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
189 * the access order is unspecified.
190 * @param c the collection of elements to initially contain
191 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
192 * <tt>c.size()</tt>, or less than 1.
193 * @throws NullPointerException if <tt>c</tt> or any element within it
194 * is <tt>null</tt>
195 */
196 public ArrayBlockingQueue(int capacity, boolean fair,
197 Collection<? extends E> c) {
198 this(capacity, null, 0, new ReentrantLock(fair));
199
200 if (capacity < c.size())
201 throw new IllegalArgumentException();
202
203 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
204 add(it.next());
205 }
206
207 /**
208 * Inserts the specified element at the tail of this queue if possible,
209 * returning immediately if this queue is full.
210 *
211 * @param o the element to add.
212 * @return <tt>true</tt> if it was possible to add the element to
213 * this queue, else <tt>false</tt>
214 * @throws NullPointerException if the specified element is <tt>null</tt>
215 */
216 public boolean offer(E o) {
217 if (o == null) throw new NullPointerException();
218 lock.lock();
219 try {
220 if (count == items.length)
221 return false;
222 else {
223 insert(o);
224 return true;
225 }
226 } finally {
227 lock.unlock();
228 }
229 }
230
231 /**
232 * Inserts the specified element at the tail of this queue, waiting if
233 * necessary up to the specified wait time for space to become available.
234 * @param o the element to add
235 * @param timeout how long to wait before giving up, in units of
236 * <tt>unit</tt>
237 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
238 * <tt>timeout</tt> parameter
239 * @return <tt>true</tt> if successful, or <tt>false</tt> if
240 * the specified waiting time elapses before space is available.
241 * @throws InterruptedException if interrupted while waiting.
242 * @throws NullPointerException if the specified element is <tt>null</tt>.
243 */
244 public boolean offer(E o, long timeout, TimeUnit unit)
245 throws InterruptedException {
246
247 if (o == null) throw new NullPointerException();
248
249 lock.lockInterruptibly();
250 try {
251 long nanos = unit.toNanos(timeout);
252 for (;;) {
253 if (count != items.length) {
254 insert(o);
255 return true;
256 }
257 if (nanos <= 0)
258 return false;
259 try {
260 nanos = notFull.awaitNanos(nanos);
261 } catch (InterruptedException ie) {
262 notFull.signal(); // propagate to non-interrupted thread
263 throw ie;
264 }
265 }
266 } finally {
267 lock.unlock();
268 }
269 }
270
271
272 public E poll() {
273 lock.lock();
274 try {
275 if (count == 0)
276 return null;
277 E x = extract();
278 return x;
279 } finally {
280 lock.unlock();
281 }
282 }
283
284 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
285 lock.lockInterruptibly();
286 try {
287 long nanos = unit.toNanos(timeout);
288 for (;;) {
289 if (count != 0) {
290 E x = extract();
291 return x;
292 }
293 if (nanos <= 0)
294 return null;
295 try {
296 nanos = notEmpty.awaitNanos(nanos);
297 } catch (InterruptedException ie) {
298 notEmpty.signal(); // propagate to non-interrupted thread
299 throw ie;
300 }
301
302 }
303 } finally {
304 lock.unlock();
305 }
306 }
307
308
309 public boolean remove(Object o) {
310 if (o == null) return false;
311 lock.lock();
312 try {
313 int i = takeIndex;
314 int k = 0;
315 for (;;) {
316 if (k++ >= count)
317 return false;
318 if (o.equals(items[i])) {
319 removeAt(i);
320 return true;
321 }
322 i = inc(i);
323 }
324
325 } finally {
326 lock.unlock();
327 }
328 }
329
330 public E peek() {
331 lock.lock();
332 try {
333 return (count == 0) ? null : items[takeIndex];
334 } finally {
335 lock.unlock();
336 }
337 }
338
339 public E take() throws InterruptedException {
340 lock.lockInterruptibly();
341 try {
342 try {
343 while (count == 0)
344 notEmpty.await();
345 } catch (InterruptedException ie) {
346 notEmpty.signal(); // propagate to non-interrupted thread
347 throw ie;
348 }
349 E x = extract();
350 return x;
351 } finally {
352 lock.unlock();
353 }
354 }
355
356 /**
357 * Adds the specified element to the tail of this queue, waiting if
358 * necessary for space to become available.
359 * @param o the element to add
360 * @throws InterruptedException if interrupted while waiting.
361 * @throws NullPointerException if the specified element is <tt>null</tt>.
362 */
363 public void put(E o) throws InterruptedException {
364
365 if (o == null) throw new NullPointerException();
366
367 lock.lockInterruptibly();
368 try {
369 try {
370 while (count == items.length)
371 notFull.await();
372 } catch (InterruptedException ie) {
373 notFull.signal(); // propagate to non-interrupted thread
374 throw ie;
375 }
376 insert(o);
377 } finally {
378 lock.unlock();
379 }
380 }
381
382 // this doc comment is overridden to remove the reference to collections
383 // greater in size than Integer.MAX_VALUE
384 /**
385 * Returns the number of elements in this queue.
386 *
387 * @return the number of elements in this queue.
388 */
389 public int size() {
390 lock.lock();
391 try {
392 return count;
393 } finally {
394 lock.unlock();
395 }
396 }
397
398 // this doc comment is a modified copy of the inherited doc comment,
399 // without the reference to unlimited queues.
400 /**
401 * Returns the number of elements that this queue can ideally (in
402 * the absence of memory or resource constraints) accept without
403 * blocking. This is always equal to the initial capacity of this queue
404 * less the current <tt>size</tt> of this queue.
405 * <p>Note that you <em>cannot</em> always tell if
406 * an attempt to <tt>add</tt> an element will succeed by
407 * inspecting <tt>remainingCapacity</tt> because it may be the
408 * case that a waiting consumer is ready to <tt>take</tt> an
409 * element out of an otherwise full queue.
410 */
411 public int remainingCapacity() {
412 lock.lock();
413 try {
414 return items.length - count;
415 } finally {
416 lock.unlock();
417 }
418 }
419
420
421 public boolean contains(Object o) {
422 if (o == null) return false;
423 lock.lock();
424 try {
425 int i = takeIndex;
426 int k = 0;
427 while (k++ < count) {
428 if (o.equals(items[i]))
429 return true;
430 i = inc(i);
431 }
432 return false;
433 } finally {
434 lock.unlock();
435 }
436 }
437
438 public Object[] toArray() {
439 lock.lock();
440 try {
441 E[] a = (E[]) new Object[count];
442 int k = 0;
443 int i = takeIndex;
444 while (k < count) {
445 a[k++] = items[i];
446 i = inc(i);
447 }
448 return a;
449 } finally {
450 lock.unlock();
451 }
452 }
453
454 public <T> T[] toArray(T[] a) {
455 lock.lock();
456 try {
457 if (a.length < count)
458 a = (T[])java.lang.reflect.Array.newInstance(
459 a.getClass().getComponentType(),
460 count
461 );
462
463 int k = 0;
464 int i = takeIndex;
465 while (k < count) {
466 a[k++] = (T)items[i];
467 i = inc(i);
468 }
469 if (a.length > count)
470 a[count] = null;
471 return a;
472 } finally {
473 lock.unlock();
474 }
475 }
476
477 public String toString() {
478 lock.lock();
479 try {
480 return super.toString();
481 } finally {
482 lock.unlock();
483 }
484 }
485
486
487 public void clear() {
488 lock.lock();
489 try {
490 int i = takeIndex;
491 int k = count;
492 while (k-- > 0) {
493 items[i] = null;
494 i = inc(i);
495 }
496 count = 0;
497 putIndex = 0;
498 takeIndex = 0;
499 notFull.signalAll();
500 } finally {
501 lock.unlock();
502 }
503 }
504
505 public int drainTo(Collection<? super E> c) {
506 if (c == null)
507 throw new NullPointerException();
508 if (c == this)
509 throw new IllegalArgumentException();
510 lock.lock();
511 try {
512 int i = takeIndex;
513 int n = 0;
514 int max = count;
515 while (n < max) {
516 c.add(items[i]);
517 items[i] = null;
518 i = inc(i);
519 ++n;
520 }
521 if (n > 0) {
522 count = 0;
523 putIndex = 0;
524 takeIndex = 0;
525 notFull.signalAll();
526 }
527 return n;
528 } finally {
529 lock.unlock();
530 }
531 }
532
533
534 public int drainTo(Collection<? super E> c, int maxElements) {
535 if (c == null)
536 throw new NullPointerException();
537 if (c == this)
538 throw new IllegalArgumentException();
539 if (maxElements <= 0)
540 return 0;
541 lock.lock();
542 try {
543 int i = takeIndex;
544 int n = 0;
545 int sz = count;
546 int max = (maxElements < count)? maxElements : count;
547 while (n < max) {
548 c.add(items[i]);
549 items[i] = null;
550 i = inc(i);
551 ++n;
552 }
553 if (n > 0) {
554 count -= n;
555 takeIndex = i;
556 notFull.signalAll();
557 }
558 return n;
559 } finally {
560 lock.unlock();
561 }
562 }
563
564
565 /**
566 * Returns an iterator over the elements in this queue in proper sequence.
567 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
568 * will never throw {@link java.util.ConcurrentModificationException},
569 * and guarantees to traverse elements as they existed upon
570 * construction of the iterator, and may (but is not guaranteed to)
571 * reflect any modifications subsequent to construction.
572 *
573 * @return an iterator over the elements in this queue in proper sequence.
574 */
575 public Iterator<E> iterator() {
576 lock.lock();
577 try {
578 return new Itr();
579 } finally {
580 lock.unlock();
581 }
582 }
583
584 /**
585 * Iterator for ArrayBlockingQueue
586 */
587 private class Itr implements Iterator<E> {
588 /**
589 * Index of element to be returned by next,
590 * or a negative number if no such.
591 */
592 private int nextIndex;
593
594 /**
595 * nextItem holds on to item fields because once we claim
596 * that an element exists in hasNext(), we must return it in
597 * the following next() call even if it was in the process of
598 * being removed when hasNext() was called.
599 **/
600 private E nextItem;
601
602 /**
603 * Index of element returned by most recent call to next.
604 * Reset to -1 if this element is deleted by a call to remove.
605 */
606 private int lastRet;
607
608 Itr() {
609 lastRet = -1;
610 if (count == 0)
611 nextIndex = -1;
612 else {
613 nextIndex = takeIndex;
614 nextItem = items[takeIndex];
615 }
616 }
617
618 public boolean hasNext() {
619 /*
620 * No sync. We can return true by mistake here
621 * only if this iterator passed across threads,
622 * which we don't support anyway.
623 */
624 return nextIndex >= 0;
625 }
626
627 /**
628 * Check whether nextIndex is valid; if so setting nextItem.
629 * Stops iterator when either hits putIndex or sees null item.
630 */
631 private void checkNext() {
632 if (nextIndex == putIndex) {
633 nextIndex = -1;
634 nextItem = null;
635 } else {
636 nextItem = items[nextIndex];
637 if (nextItem == null)
638 nextIndex = -1;
639 }
640 }
641
642 public E next() {
643 lock.lock();
644 try {
645 if (nextIndex < 0)
646 throw new NoSuchElementException();
647 lastRet = nextIndex;
648 E x = nextItem;
649 nextIndex = inc(nextIndex);
650 checkNext();
651 return x;
652 } finally {
653 lock.unlock();
654 }
655 }
656
657 public void remove() {
658 lock.lock();
659 try {
660 int i = lastRet;
661 if (i == -1)
662 throw new IllegalStateException();
663 lastRet = -1;
664
665 int ti = takeIndex;
666 removeAt(i);
667 // back up cursor (reset to front if was first element)
668 nextIndex = (i == ti) ? takeIndex : i;
669 checkNext();
670 } finally {
671 lock.unlock();
672 }
673 }
674 }
675
676 /**
677 * Save the state to a stream (that is, serialize it).
678 *
679 * @serialData The maximumSize is emitted (int), followed by all of
680 * its elements (each an <tt>E</tt>) in the proper order.
681 * @param s the stream
682 */
683 private void writeObject(java.io.ObjectOutputStream s)
684 throws java.io.IOException {
685
686 // Write out element count, and any hidden stuff
687 s.defaultWriteObject();
688 // Write out maximumSize == items length
689 s.writeInt(items.length);
690
691 // Write out all elements in the proper order.
692 int i = takeIndex;
693 int k = 0;
694 while (k++ < count) {
695 s.writeObject(items[i]);
696 i = inc(i);
697 }
698 }
699
700 /**
701 * Reconstitute this queue instance from a stream (that is,
702 * deserialize it).
703 * @param s the stream
704 */
705 private void readObject(java.io.ObjectInputStream s)
706 throws java.io.IOException, ClassNotFoundException {
707 // Read in size, and any hidden stuff
708 s.defaultReadObject();
709 int size = count;
710
711 // Read in array length and allocate array
712 int arrayLength = s.readInt();
713
714 // We use deserializedItems here because "items" is final
715 deserializedItems = (E[]) new Object[arrayLength];
716
717 // Read in all elements in the proper order into deserializedItems
718 for (int i = 0; i < size; i++)
719 deserializedItems[i] = (E)s.readObject();
720 }
721
722 /**
723 * Throw away the object created with readObject, and replace it
724 * with a usable ArrayBlockingQueue.
725 * @return the ArrayBlockingQueue
726 */
727 private Object readResolve() throws java.io.ObjectStreamException {
728 E[] array = deserializedItems;
729 deserializedItems = null;
730 return new ArrayBlockingQueue<E>(array.length, array, count, lock);
731 }
732 }