ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.21
Committed: Wed Aug 6 01:57:53 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.20: +51 -26 lines
Log Message:
Final major updates to Collection related classes.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an array.
13 * This queue orders elements FIFO (first-in-first-out).
14 * The <em>head</em> of the queue is that element that has been on the
15 * queue the longest time.
16 * The <em>tail</em> of the queue is that element that has been on the
17 * queue the shortest time.
18 *
19 * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
20 * array holds
21 * elements inserted by producers and extracted by consumers. Once
22 * created, the capacity can not be increased. Attempts to offer an
23 * element to a full queue will result in the offer operation
24 * blocking; attempts to retrieve an element from an empty queue will
25 * similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * threads blocked on an insertion or removal. By default, this
29 * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
30 * constructed with fairness set to <tt>true</tt> grants blocked
31 * threads access in FIFO order. Fairness generally substantially
32 * decreases throughput but reduces variablility and avoids
33 * starvation.
34 *
35 * @since 1.5
36 * @author Doug Lea
37 */
38 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40
41 /** The queued items */
42 private transient final E[] items;
43 /** items index for next take, poll or remove */
44 private transient int takeIndex;
45 /** items index for next put, offer, or add. */
46 private transient int putIndex;
47 /** Number of items in the queue */
48 private int count;
49
50 /**
51 * An array used only during deserialization, to hold
52 * items read back in from the stream, and then used
53 * as "items" by readResolve via the private constructor.
54 */
55 private transient E[] deserializedItems;
56
57 /*
58 * Concurrency control via the classic two-condition algorithm
59 * found in any textbook.
60 */
61
62 /** Main lock guarding all access */
63 private final ReentrantLock lock;
64 /** Condition for waiting takes */
65 private final Condition notEmpty;
66 /** Condition for wiating puts */
67 private final Condition notFull;
68
69 // Internal helper methods
70
71 /**
72 * Circularly increment i.
73 */
74 int inc(int i) {
75 return (++i == items.length)? 0 : i;
76 }
77
78 /**
79 * Insert element at current put position, advance, and signal.
80 * Call only when holding lock.
81 */
82 private void insert(E x) {
83 items[putIndex] = x;
84 putIndex = inc(putIndex);
85 ++count;
86 notEmpty.signal();
87 }
88
89 /**
90 * Extract element at current take position, advance, and signal.
91 * Call only when holding lock.
92 */
93 private E extract() {
94 E x = items[takeIndex];
95 items[takeIndex] = null;
96 takeIndex = inc(takeIndex);
97 --count;
98 notFull.signal();
99 return x;
100 }
101
102 /**
103 * Utility for remove and iterator.remove: Delete item at position i.
104 * Call only when holding lock.
105 */
106 void removeAt(int i) {
107 // if removing front item, just advance
108 if (i == takeIndex) {
109 items[takeIndex] = null;
110 takeIndex = inc(takeIndex);
111 }
112 else {
113 // slide over all others up through putIndex.
114 for (;;) {
115 int nexti = inc(i);
116 if (nexti != putIndex) {
117 items[i] = items[nexti];
118 i = nexti;
119 }
120 else {
121 items[i] = null;
122 putIndex = i;
123 break;
124 }
125 }
126 }
127 --count;
128 notFull.signal();
129 }
130
131 /**
132 * Internal constructor also used by readResolve.
133 * Sets all final fields, plus count.
134 * @param cap the capacity
135 * @param array the array to use or null if should create new one
136 * @param count the number of items in the array, where indices 0
137 * to count-1 hold items.
138 * @param lk the lock to use with this queue
139 */
140 private ArrayBlockingQueue(int cap, E[] array, int count,
141 ReentrantLock lk) {
142 if (cap <= 0)
143 throw new IllegalArgumentException();
144 if (array == null)
145 this.items = (E[]) new Object[cap];
146 else
147 this.items = array;
148 this.putIndex = count;
149 this.count = count;
150 lock = lk;
151 notEmpty = lock.newCondition();
152 notFull = lock.newCondition();
153 }
154
155 /**
156 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
157 * capacity and default access policy.
158 * @param capacity the capacity of this queue
159 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
160 */
161 public ArrayBlockingQueue(int capacity) {
162 this(capacity, null, 0, new ReentrantLock());
163 }
164
165 /**
166 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
167 * capacity and the specified access policy.
168 * @param capacity the capacity of this queue
169 * @param fair if <tt>true</tt> then queue accesses for threads blocked
170 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
171 * the access order is unspecified.
172 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
173 */
174 public ArrayBlockingQueue(int capacity, boolean fair) {
175 this(capacity, null, 0, new ReentrantLock(fair));
176 }
177
178 /**
179 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
180 * capacity, the specified access policy and initially containing the
181 * elements of the given collection,
182 * added in traversal order of the collection's iterator.
183 * @param capacity the capacity of this queue
184 * @param fair if <tt>true</tt> then queue accesses for threads blocked
185 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
186 * the access order is unspecified.
187 * @param c the collection of elements to initially contain
188 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
189 * <tt>c.size()</tt>, or less than 1.
190 * @throws NullPointerException if <tt>c</tt> or any element within it
191 * is <tt>null</tt>
192 */
193 public ArrayBlockingQueue(int capacity, boolean fair,
194 Collection<? extends E> c) {
195 this(capacity, null, 0, new ReentrantLock(fair));
196
197 if (capacity < c.size())
198 throw new IllegalArgumentException();
199
200 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
201 add(it.next());
202 }
203
204 // Have to override just to update the javadoc
205
206 /**
207 * Adds the specified element to the tail of this queue.
208 * @return <tt>true</tt> (as per the general contract of
209 * <tt>Collection.add</tt>).
210 * @throws IllegalStateException {@inheritDoc}
211 * @throws NullPointerException {@inheritDoc}
212 */
213 public boolean add(E o) {
214 return super.add(o);
215 }
216
217 /**
218 * Adds all of the elements in the specified collection to this queue.
219 * The behavior of this operation is undefined if
220 * the specified collection is modified while the operation is in
221 * progress. (This implies that the behavior of this call is undefined if
222 * the specified collection is this queue, and this queue is nonempty.)
223 * <p>
224 * This implementation iterates over the specified collection, and adds
225 * each object returned by the iterator to this queue's tail, in turn.
226 * @throws IllegalStateException {@inheritDoc}
227 * @throws NullPointerException {@inheritDoc}
228 */
229 public boolean addAll(Collection<? extends E> c) {
230 return super.addAll(c);
231 }
232
233 /**
234 * Adds the specified element to the tail of this queue if possible,
235 * returning immediately if this queue is full.
236 *
237 * @throws NullPointerException {@inheritDoc}
238 */
239 public boolean offer(E o) {
240 if (o == null) throw new NullPointerException();
241 lock.lock();
242 try {
243 if (count == items.length)
244 return false;
245 else {
246 insert(o);
247 return true;
248 }
249 }
250 finally {
251 lock.unlock();
252 }
253 }
254
255 /**
256 * Adds the specified element to the tail of this queue, waiting if
257 * necessary up to the specified wait time for space to become available.
258 * @throws NullPointerException {@inheritDoc}
259 */
260 public boolean offer(E o, long timeout, TimeUnit unit)
261 throws InterruptedException {
262
263 if (o == null) throw new NullPointerException();
264
265 lock.lockInterruptibly();
266 try {
267 long nanos = unit.toNanos(timeout);
268 for (;;) {
269 if (count != items.length) {
270 insert(o);
271 return true;
272 }
273 if (nanos <= 0)
274 return false;
275 try {
276 nanos = notFull.awaitNanos(nanos);
277 }
278 catch (InterruptedException ie) {
279 notFull.signal(); // propagate to non-interrupted thread
280 throw ie;
281 }
282 }
283 }
284 finally {
285 lock.unlock();
286 }
287 }
288
289
290 public E poll() {
291 lock.lock();
292 try {
293 if (count == 0)
294 return null;
295 E x = extract();
296 return x;
297 }
298 finally {
299 lock.unlock();
300 }
301 }
302
303 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
304 lock.lockInterruptibly();
305 try {
306 long nanos = unit.toNanos(timeout);
307 for (;;) {
308 if (count != 0) {
309 E x = extract();
310 return x;
311 }
312 if (nanos <= 0)
313 return null;
314 try {
315 nanos = notEmpty.awaitNanos(nanos);
316 }
317 catch (InterruptedException ie) {
318 notEmpty.signal(); // propagate to non-interrupted thread
319 throw ie;
320 }
321
322 }
323 }
324 finally {
325 lock.unlock();
326 }
327 }
328
329
330 /**
331 * Removes a single instance of the specified element from this
332 * queue, if it is present. More formally,
333 * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
334 * o.equals(e))</tt>, if the queue contains one or more such
335 * elements. Returns <tt>true</tt> if the queue contained the
336 * specified element (or equivalently, if the queue changed as a
337 * result of the call).
338 *
339 * <p>This implementation iterates over the queue looking for the
340 * specified element. If it finds the element, it removes the element
341 * from the queue using the iterator's remove method.<p>
342 *
343 */
344 public boolean remove(Object o) {
345 if (o == null) return false;
346 lock.lock();
347 try {
348 int i = takeIndex;
349 int k = 0;
350 for (;;) {
351 if (k++ >= count)
352 return false;
353 if (o.equals(items[i])) {
354 removeAt(i);
355 return true;
356 }
357 i = inc(i);
358 }
359
360 }
361 finally {
362 lock.unlock();
363 }
364 }
365
366 public E peek() {
367 lock.lock();
368 try {
369 return (count == 0) ? null : items[takeIndex];
370 }
371 finally {
372 lock.unlock();
373 }
374 }
375
376 public E take() throws InterruptedException {
377 lock.lockInterruptibly();
378 try {
379 try {
380 while (count == 0)
381 notEmpty.await();
382 }
383 catch (InterruptedException ie) {
384 notEmpty.signal(); // propagate to non-interrupted thread
385 throw ie;
386 }
387 E x = extract();
388 return x;
389 }
390 finally {
391 lock.unlock();
392 }
393 }
394
395 /**
396 * Adds the specified element to the tail of this queue, waiting if
397 * necessary for space to become available.
398 * @throws NullPointerException {@inheritDoc}
399 */
400 public void put(E o) throws InterruptedException {
401
402 if (o == null) throw new NullPointerException();
403
404 lock.lockInterruptibly();
405 try {
406 try {
407 while (count == items.length)
408 notFull.await();
409 }
410 catch (InterruptedException ie) {
411 notFull.signal(); // propagate to non-interrupted thread
412 throw ie;
413 }
414 insert(o);
415 }
416 finally {
417 lock.unlock();
418 }
419 }
420
421 // this doc comment is overridden to remove the reference to collections
422 // greater in size than Integer.MAX_VALUE
423 /**
424 * Returns the number of elements in this collection.
425 */
426 public int size() {
427 lock.lock();
428 try {
429 return count;
430 }
431 finally {
432 lock.unlock();
433 }
434 }
435
436 // this doc comment is a modified copy of the inherited doc comment,
437 // without the reference to unlimited queues.
438 /**
439 * Returns the number of elements that this queue can ideally (in
440 * the absence of memory or resource constraints) accept without
441 * blocking. This is always equal to the initial capacity of this queue
442 * less the current <tt>size</tt> of this queue.
443 * <p>Note that you <em>cannot</em> always tell if
444 * an attempt to <tt>add</tt> an element will succeed by
445 * inspecting <tt>remainingCapacity</tt> because it may be the
446 * case that a waiting consumer is ready to <tt>take</tt> an
447 * element out of an otherwise full queue.
448 */
449 public int remainingCapacity() {
450 lock.lock();
451 try {
452 return items.length - count;
453 }
454 finally {
455 lock.unlock();
456 }
457 }
458
459
460 public boolean contains(Object o) {
461 if (o == null) return false;
462 lock.lock();
463 try {
464 int i = takeIndex;
465 int k = 0;
466 while (k++ < count) {
467 if (o.equals(items[i]))
468 return true;
469 i = inc(i);
470 }
471 return false;
472 }
473 finally {
474 lock.unlock();
475 }
476 }
477
478 public Object[] toArray() {
479 lock.lock();
480 try {
481 E[] a = (E[]) new Object[count];
482 int k = 0;
483 int i = takeIndex;
484 while (k < count) {
485 a[k++] = items[i];
486 i = inc(i);
487 }
488 return a;
489 }
490 finally {
491 lock.unlock();
492 }
493 }
494
495 public <T> T[] toArray(T[] a) {
496 lock.lock();
497 try {
498 if (a.length < count)
499 a = (T[])java.lang.reflect.Array.newInstance(
500 a.getClass().getComponentType(),
501 count
502 );
503
504 int k = 0;
505 int i = takeIndex;
506 while (k < count) {
507 a[k++] = (T)items[i];
508 i = inc(i);
509 }
510 if (a.length > count)
511 a[count] = null;
512 return a;
513 }
514 finally {
515 lock.unlock();
516 }
517 }
518
519 public String toString() {
520 lock.lock();
521 try {
522 return super.toString();
523 }
524 finally {
525 lock.unlock();
526 }
527 }
528
529 /**
530 * Returns an iterator over the elements in this queue in proper sequence.
531 *
532 * @return an iterator over the elements in this queue in proper sequence.
533 */
534 public Iterator<E> iterator() {
535 lock.lock();
536 try {
537 return new Itr();
538 }
539 finally {
540 lock.unlock();
541 }
542 }
543
544 /**
545 * Iterator for ArrayBlockingQueue
546 */
547 private class Itr implements Iterator<E> {
548 /**
549 * Index of element to be returned by next,
550 * or a negative number if no such.
551 */
552 private int nextIndex;
553
554 /**
555 * nextItem holds on to item fields because once we claim
556 * that an element exists in hasNext(), we must return it in
557 * the following next() call even if it was in the process of
558 * being removed when hasNext() was called.
559 **/
560 private E nextItem;
561
562 /**
563 * Index of element returned by most recent call to next.
564 * Reset to -1 if this element is deleted by a call to remove.
565 */
566 private int lastRet;
567
568 Itr() {
569 lastRet = -1;
570 if (count == 0)
571 nextIndex = -1;
572 else {
573 nextIndex = takeIndex;
574 nextItem = items[takeIndex];
575 }
576 }
577
578 public boolean hasNext() {
579 /*
580 * No sync. We can return true by mistake here
581 * only if this iterator passed across threads,
582 * which we don't support anyway.
583 */
584 return nextIndex >= 0;
585 }
586
587 /**
588 * Check whether nextIndex is valid; if so setting nextItem.
589 * Stops iterator when either hits putIndex or sees null item.
590 */
591 private void checkNext() {
592 if (nextIndex == putIndex) {
593 nextIndex = -1;
594 nextItem = null;
595 }
596 else {
597 nextItem = items[nextIndex];
598 if (nextItem == null)
599 nextIndex = -1;
600 }
601 }
602
603 public E next() {
604 lock.lock();
605 try {
606 if (nextIndex < 0)
607 throw new NoSuchElementException();
608 lastRet = nextIndex;
609 E x = nextItem;
610 nextIndex = inc(nextIndex);
611 checkNext();
612 return x;
613 }
614 finally {
615 lock.unlock();
616 }
617 }
618
619 public void remove() {
620 lock.lock();
621 try {
622 int i = lastRet;
623 if (i == -1)
624 throw new IllegalStateException();
625 lastRet = -1;
626
627 int ti = takeIndex;
628 removeAt(i);
629 // back up cursor (reset to front if was first element)
630 nextIndex = (i == ti) ? takeIndex : i;
631 checkNext();
632 }
633 finally {
634 lock.unlock();
635 }
636 }
637 }
638
639 /**
640 * Save the state to a stream (that is, serialize it).
641 *
642 * @serialData The maximumSize is emitted (int), followed by all of
643 * its elements (each an <tt>E</tt>) in the proper order.
644 * @param s the stream
645 */
646 private void writeObject(java.io.ObjectOutputStream s)
647 throws java.io.IOException {
648
649 // Write out element count, and any hidden stuff
650 s.defaultWriteObject();
651 // Write out maximumSize == items length
652 s.writeInt(items.length);
653
654 // Write out all elements in the proper order.
655 int i = takeIndex;
656 int k = 0;
657 while (k++ < count) {
658 s.writeObject(items[i]);
659 i = inc(i);
660 }
661 }
662
663 /**
664 * Reconstitute this queue instance from a stream (that is,
665 * deserialize it).
666 * @param s the stream
667 */
668 private void readObject(java.io.ObjectInputStream s)
669 throws java.io.IOException, ClassNotFoundException {
670 // Read in size, and any hidden stuff
671 s.defaultReadObject();
672 int size = count;
673
674 // Read in array length and allocate array
675 int arrayLength = s.readInt();
676
677 // We use deserializedItems here because "items" is final
678 deserializedItems = (E[]) new Object[arrayLength];
679
680 // Read in all elements in the proper order into deserializedItems
681 for (int i = 0; i < size; i++)
682 deserializedItems[i] = (E)s.readObject();
683 }
684
685 /**
686 * Throw away the object created with readObject, and replace it
687 * with a usable ArrayBlockingQueue.
688 * @return the ArrayBlockingQueue
689 */
690 private Object readResolve() throws java.io.ObjectStreamException {
691 E[] array = deserializedItems;
692 deserializedItems = null;
693 return new ArrayBlockingQueue<E>(array.length, array, count, lock);
694 }
695 }