ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.20
Committed: Mon Aug 4 16:14:48 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.19: +3 -3 lines
Log Message:
Make atomics emulation classes match the main atomics.
Fix docs for atomics (both in main and emulation).
Restored more specific iterator types in both blocking queue impls.
Fix unchecked cast warning in PQ.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@link BlockingQueue blocking queue} backed by an array.
13 * This queue orders elements FIFO (first-in-first-out).
14 * The <em>head</em> of the queue is that element that has been on the
15 * queue the longest time.
16 * The <em>tail</em> of the queue is that element that has been on the
17 * queue the shortest time.
18 *
19 * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
20 * array holds
21 * elements inserted by producers and extracted by consumers. Once
22 * created, the capacity can not be increased. Attempts to offer an
23 * element to a full queue will result in the offer operation
24 * blocking; attempts to retrieve an element from an empty queue will
25 * similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * threads blocked on an insertion or removal. By default, this
29 * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
30 * constructed with fairness set to <tt>true</tt> grants blocked
31 * threads access in FIFO order. Fairness generally substantially
32 * decreases throughput but reduces variablility and avoids
33 * starvation.
34 *
35 * @since 1.5
36 * @author Doug Lea
37 */
38 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40
41 /** The queued items */
42 private transient final E[] items;
43 /** items index for next take, poll or remove */
44 private transient int takeIndex;
45 /** items index for next put, offer, or add. */
46 private transient int putIndex;
47 /** Number of items in the queue */
48 private int count;
49
50 /**
51 * An array used only during deserialization, to hold
52 * items read back in from the stream, and then used
53 * as "items" by readResolve via the private constructor.
54 */
55 private transient E[] deserializedItems;
56
57 /*
58 * Concurrency control via the classic two-condition algorithm
59 * found in any textbook.
60 */
61
62 /** Main lock guarding all access */
63 private final ReentrantLock lock;
64 /** Condition for waiting takes */
65 private final Condition notEmpty;
66 /** Condition for wiating puts */
67 private final Condition notFull;
68
69 // Internal helper methods
70
71 /**
72 * Circularly increment i.
73 */
74 int inc(int i) {
75 return (++i == items.length)? 0 : i;
76 }
77
78 /**
79 * Insert element at current put position, advance, and signal.
80 * Call only when holding lock.
81 */
82 private void insert(E x) {
83 items[putIndex] = x;
84 putIndex = inc(putIndex);
85 ++count;
86 notEmpty.signal();
87 }
88
89 /**
90 * Extract element at current take position, advance, and signal.
91 * Call only when holding lock.
92 */
93 private E extract() {
94 E x = items[takeIndex];
95 items[takeIndex] = null;
96 takeIndex = inc(takeIndex);
97 --count;
98 notFull.signal();
99 return x;
100 }
101
102 /**
103 * Utility for remove and iterator.remove: Delete item at position i.
104 * Call only when holding lock.
105 */
106 void removeAt(int i) {
107 // if removing front item, just advance
108 if (i == takeIndex) {
109 items[takeIndex] = null;
110 takeIndex = inc(takeIndex);
111 }
112 else {
113 // slide over all others up through putIndex.
114 for (;;) {
115 int nexti = inc(i);
116 if (nexti != putIndex) {
117 items[i] = items[nexti];
118 i = nexti;
119 }
120 else {
121 items[i] = null;
122 putIndex = i;
123 break;
124 }
125 }
126 }
127 --count;
128 notFull.signal();
129 }
130
131 /**
132 * Internal constructor also used by readResolve.
133 * Sets all final fields, plus count.
134 * @param cap the capacity
135 * @param array the array to use or null if should create new one
136 * @param count the number of items in the array, where indices 0
137 * to count-1 hold items.
138 * @param lk the lock to use with this queue
139 */
140 private ArrayBlockingQueue(int cap, E[] array, int count,
141 ReentrantLock lk) {
142 if (cap <= 0)
143 throw new IllegalArgumentException();
144 if (array == null)
145 this.items = (E[]) new Object[cap];
146 else
147 this.items = array;
148 this.putIndex = count;
149 this.count = count;
150 lock = lk;
151 notEmpty = lock.newCondition();
152 notFull = lock.newCondition();
153 }
154
155 /**
156 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
157 * capacity and default access policy.
158 * @param capacity the capacity of this queue
159 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
160 */
161 public ArrayBlockingQueue(int capacity) {
162 this(capacity, null, 0, new ReentrantLock());
163 }
164
165 /**
166 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
167 * capacity and the specified access policy.
168 * @param capacity the capacity of this queue
169 * @param fair if <tt>true</tt> then queue accesses for threads blocked
170 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
171 * the access order is unspecified.
172 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
173 */
174 public ArrayBlockingQueue(int capacity, boolean fair) {
175 this(capacity, null, 0, new ReentrantLock(fair));
176 }
177
178 /**
179 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
180 * capacity, the specified access policy and initially holding the
181 * elements of the given collection,
182 * added in traversal order of the collection's iterator.
183 * @param capacity the capacity of this queue
184 * @param fair if <tt>true</tt> then queue accesses for threads blocked
185 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
186 * the access order is unspecified.
187 * @param c the collection of elements to initially contain
188 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
189 * <tt>c.size()</tt>, or less than 1.
190 * @throws NullPointerException if <tt>c</tt> or any element within it
191 * is <tt>null</tt>
192 */
193 public ArrayBlockingQueue(int capacity, boolean fair,
194 Collection<? extends E> c) {
195 this(capacity, null, 0, new ReentrantLock(fair));
196
197 if (capacity < c.size())
198 throw new IllegalArgumentException();
199
200 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
201 add(it.next());
202 }
203
204 // Have to override just to update the javadoc for @throws
205
206 /**
207 * @throws IllegalStateException {@inheritDoc}
208 * @throws NullPointerException {@inheritDoc}
209 */
210 public boolean add(E o) {
211 return super.add(o);
212 }
213
214 /**
215 * @throws IllegalStateException {@inheritDoc}
216 * @throws NullPointerException {@inheritDoc}
217 */
218 public boolean addAll(Collection<? extends E> c) {
219 return super.addAll(c);
220 }
221
222 /**
223 * Add the specified element to the tail of this queue if possible,
224 * returning immediately if this queue is full.
225 *
226 * @throws NullPointerException {@inheritDoc}
227 */
228 public boolean offer(E x) {
229 if (x == null) throw new NullPointerException();
230 lock.lock();
231 try {
232 if (count == items.length)
233 return false;
234 else {
235 insert(x);
236 return true;
237 }
238 }
239 finally {
240 lock.unlock();
241 }
242 }
243
244 /**
245 * Add the specified element to the tail of this queue, waiting if
246 * necessary up to the specified wait time for space to become available.
247 * @throws NullPointerException {@inheritDoc}
248 */
249 public boolean offer(E x, long timeout, TimeUnit unit)
250 throws InterruptedException {
251
252 if (x == null) throw new NullPointerException();
253
254 lock.lockInterruptibly();
255 try {
256 long nanos = unit.toNanos(timeout);
257 for (;;) {
258 if (count != items.length) {
259 insert(x);
260 return true;
261 }
262 if (nanos <= 0)
263 return false;
264 try {
265 nanos = notFull.awaitNanos(nanos);
266 }
267 catch (InterruptedException ie) {
268 notFull.signal(); // propagate to non-interrupted thread
269 throw ie;
270 }
271 }
272 }
273 finally {
274 lock.unlock();
275 }
276 }
277
278
279 public E poll() {
280 lock.lock();
281 try {
282 if (count == 0)
283 return null;
284 E x = extract();
285 return x;
286 }
287 finally {
288 lock.unlock();
289 }
290 }
291
292 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
293 lock.lockInterruptibly();
294 try {
295 long nanos = unit.toNanos(timeout);
296 for (;;) {
297 if (count != 0) {
298 E x = extract();
299 return x;
300 }
301 if (nanos <= 0)
302 return null;
303 try {
304 nanos = notEmpty.awaitNanos(nanos);
305 }
306 catch (InterruptedException ie) {
307 notEmpty.signal(); // propagate to non-interrupted thread
308 throw ie;
309 }
310
311 }
312 }
313 finally {
314 lock.unlock();
315 }
316 }
317
318
319 public boolean remove(Object x) {
320 if (x == null) return false;
321 lock.lock();
322 try {
323 int i = takeIndex;
324 int k = 0;
325 for (;;) {
326 if (k++ >= count)
327 return false;
328 if (x.equals(items[i])) {
329 removeAt(i);
330 return true;
331 }
332 i = inc(i);
333 }
334
335 }
336 finally {
337 lock.unlock();
338 }
339 }
340
341 public E peek() {
342 lock.lock();
343 try {
344 return (count == 0) ? null : items[takeIndex];
345 }
346 finally {
347 lock.unlock();
348 }
349 }
350
351 public E take() throws InterruptedException {
352 lock.lockInterruptibly();
353 try {
354 try {
355 while (count == 0)
356 notEmpty.await();
357 }
358 catch (InterruptedException ie) {
359 notEmpty.signal(); // propagate to non-interrupted thread
360 throw ie;
361 }
362 E x = extract();
363 return x;
364 }
365 finally {
366 lock.unlock();
367 }
368 }
369
370 /**
371 * Add the specified element to the tail of this queue, waiting if
372 * necessary for space to become available.
373 * @throws NullPointerException {@inheritDoc}
374 */
375 public void put(E x) throws InterruptedException {
376
377 if (x == null) throw new NullPointerException();
378
379 lock.lockInterruptibly();
380 try {
381 try {
382 while (count == items.length)
383 notFull.await();
384 }
385 catch (InterruptedException ie) {
386 notFull.signal(); // propagate to non-interrupted thread
387 throw ie;
388 }
389 insert(x);
390 }
391 finally {
392 lock.unlock();
393 }
394 }
395
396 // this doc comment is overridden to remove the reference to collections
397 // greater in size than Integer.MAX_VALUE
398 /**
399 * Return the number of elements in this collection.
400 */
401 public int size() {
402 lock.lock();
403 try {
404 return count;
405 }
406 finally {
407 lock.unlock();
408 }
409 }
410
411 // this doc comment is a modified copy of the inherited doc comment,
412 // without the reference to unlimited queues.
413 /**
414 * Return the number of elements that this queue can ideally (in
415 * the absence of memory or resource constraints) accept without
416 * blocking. This is always equal to the initial capacity of this queue
417 * less the current <tt>size</tt> of this queue.
418 * <p>Note that you <em>cannot</em> always tell if
419 * an attempt to <tt>add</tt> an element will succeed by
420 * inspecting <tt>remainingCapacity</tt> because it may be the
421 * case that a waiting consumer is ready to <tt>take</tt> an
422 * element out of an otherwise full queue.
423 */
424 public int remainingCapacity() {
425 lock.lock();
426 try {
427 return items.length - count;
428 }
429 finally {
430 lock.unlock();
431 }
432 }
433
434
435 public boolean contains(Object x) {
436 if (x == null) return false;
437 lock.lock();
438 try {
439 int i = takeIndex;
440 int k = 0;
441 while (k++ < count) {
442 if (x.equals(items[i]))
443 return true;
444 i = inc(i);
445 }
446 return false;
447 }
448 finally {
449 lock.unlock();
450 }
451 }
452
453 public Object[] toArray() {
454 lock.lock();
455 try {
456 E[] a = (E[]) new Object[count];
457 int k = 0;
458 int i = takeIndex;
459 while (k < count) {
460 a[k++] = items[i];
461 i = inc(i);
462 }
463 return a;
464 }
465 finally {
466 lock.unlock();
467 }
468 }
469
470 public <T> T[] toArray(T[] a) {
471 lock.lock();
472 try {
473 if (a.length < count)
474 a = (T[])java.lang.reflect.Array.newInstance(
475 a.getClass().getComponentType(),
476 count
477 );
478
479 int k = 0;
480 int i = takeIndex;
481 while (k < count) {
482 a[k++] = (T)items[i];
483 i = inc(i);
484 }
485 if (a.length > count)
486 a[count] = null;
487 return a;
488 }
489 finally {
490 lock.unlock();
491 }
492 }
493
494 public String toString() {
495 lock.lock();
496 try {
497 return super.toString();
498 }
499 finally {
500 lock.unlock();
501 }
502 }
503
504 /**
505 * Returns an iterator over the elements in this queue in proper sequence.
506 *
507 * @return an iterator over the elements in this queue in proper sequence.
508 */
509 public Iterator<E> iterator() {
510 lock.lock();
511 try {
512 return new Itr();
513 }
514 finally {
515 lock.unlock();
516 }
517 }
518
519 /**
520 * Iterator for ArrayBlockingQueue
521 */
522 private class Itr implements Iterator<E> {
523 /**
524 * Index of element to be returned by next,
525 * or a negative number if no such.
526 */
527 private int nextIndex;
528
529 /**
530 * nextItem holds on to item fields because once we claim
531 * that an element exists in hasNext(), we must return it in
532 * the following next() call even if it was in the process of
533 * being removed when hasNext() was called.
534 **/
535 private E nextItem;
536
537 /**
538 * Index of element returned by most recent call to next.
539 * Reset to -1 if this element is deleted by a call to remove.
540 */
541 private int lastRet;
542
543 Itr() {
544 lastRet = -1;
545 if (count == 0)
546 nextIndex = -1;
547 else {
548 nextIndex = takeIndex;
549 nextItem = items[takeIndex];
550 }
551 }
552
553 public boolean hasNext() {
554 /*
555 * No sync. We can return true by mistake here
556 * only if this iterator passed across threads,
557 * which we don't support anyway.
558 */
559 return nextIndex >= 0;
560 }
561
562 /**
563 * Check whether nextIndex is valid; if so setting nextItem.
564 * Stops iterator when either hits putIndex or sees null item.
565 */
566 private void checkNext() {
567 if (nextIndex == putIndex) {
568 nextIndex = -1;
569 nextItem = null;
570 }
571 else {
572 nextItem = items[nextIndex];
573 if (nextItem == null)
574 nextIndex = -1;
575 }
576 }
577
578 public E next() {
579 lock.lock();
580 try {
581 if (nextIndex < 0)
582 throw new NoSuchElementException();
583 lastRet = nextIndex;
584 E x = nextItem;
585 nextIndex = inc(nextIndex);
586 checkNext();
587 return x;
588 }
589 finally {
590 lock.unlock();
591 }
592 }
593
594 public void remove() {
595 lock.lock();
596 try {
597 int i = lastRet;
598 if (i == -1)
599 throw new IllegalStateException();
600 lastRet = -1;
601
602 int ti = takeIndex;
603 removeAt(i);
604 // back up cursor (reset to front if was first element)
605 nextIndex = (i == ti) ? takeIndex : i;
606 checkNext();
607 }
608 finally {
609 lock.unlock();
610 }
611 }
612 }
613
614 /**
615 * Save the state to a stream (that is, serialize it).
616 *
617 * @serialData The maximumSize is emitted (int), followed by all of
618 * its elements (each an <tt>E</tt>) in the proper order.
619 * @param s the stream
620 */
621 private void writeObject(java.io.ObjectOutputStream s)
622 throws java.io.IOException {
623
624 // Write out element count, and any hidden stuff
625 s.defaultWriteObject();
626 // Write out maximumSize == items length
627 s.writeInt(items.length);
628
629 // Write out all elements in the proper order.
630 int i = takeIndex;
631 int k = 0;
632 while (k++ < count) {
633 s.writeObject(items[i]);
634 i = inc(i);
635 }
636 }
637
638 /**
639 * Reconstitute this queue instance from a stream (that is,
640 * deserialize it).
641 * @param s the stream
642 */
643 private void readObject(java.io.ObjectInputStream s)
644 throws java.io.IOException, ClassNotFoundException {
645 // Read in size, and any hidden stuff
646 s.defaultReadObject();
647 int size = count;
648
649 // Read in array length and allocate array
650 int arrayLength = s.readInt();
651
652 // We use deserializedItems here because "items" is final
653 deserializedItems = (E[]) new Object[arrayLength];
654
655 // Read in all elements in the proper order into deserializedItems
656 for (int i = 0; i < size; i++)
657 deserializedItems[i] = (E)s.readObject();
658 }
659
660 /**
661 * Throw away the object created with readObject, and replace it
662 * with a usable ArrayBlockingQueue.
663 * @return the ArrayBlockingQueue
664 */
665 private Object readResolve() throws java.io.ObjectStreamException {
666 E[] array = deserializedItems;
667 deserializedItems = null;
668 return new ArrayBlockingQueue<E>(array.length, array, count, lock);
669 }
670 }