ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.16
Committed: Thu Jul 31 07:18:02 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.15: +35 -3 lines
Log Message:
Continued updates to explicit and inherited doc comments.
Consistency over remove(null)
Some inherited doc is still not right.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@link BlockingQueue blocking queue} backed by an array.
13 * This queue orders elements FIFO (first-in-first-out).
14 * The <em>head</em> of the queue is that element that has been on the
15 * queue the longest time.
16 * The <em>tail</em> of the queue is that element that has been on the
17 * queue the shortest time.
18 *
19 * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
20 * array holds
21 * elements inserted by producers and extracted by consumers. Once
22 * created, the capacity can not be increased. Attempts to offer an
23 * element to a full queue will result in the offer operation
24 * blocking; attempts to retrieve an element from an empty queue will
25 * similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * threads blocked on an insertion or removal. By default, this
29 * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
30 * constructed with fairness set to <tt>true</tt> grants blocked
31 * threads access in FIFO order. Fairness generally substantially
32 * decreases throughput but reduces variablility and avoids
33 * starvation.
34 *
35 * @since 1.5
36 * @author Doug Lea
37 */
38 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40
41 /** The queued items */
42 private transient final E[] items;
43 /** items index for next take, poll or remove */
44 private transient int takeIndex;
45 /** items index for next put, offer, or add. */
46 private transient int putIndex;
47 /** Number of items in the queue */
48 private int count;
49
50 /**
51 * An array used only during deserialization, to hold
52 * items read back in from the stream, and then used
53 * as "items" by readResolve via the private constructor.
54 */
55 private transient E[] deserializedItems;
56
57 /*
58 * Concurrency control via the classic two-condition algorithm
59 * found in any textbook.
60 */
61
62 /** Main lock guarding all access */
63 private final ReentrantLock lock;
64 /** Condition for waiting takes */
65 private final Condition notEmpty;
66 /** Condition for wiating puts */
67 private final Condition notFull;
68
69 // Internal helper methods
70
71 /**
72 * Circularly increment i.
73 */
74 int inc(int i) {
75 return (++i == items.length)? 0 : i;
76 }
77
78 /**
79 * Insert element at current put position, advance, and signal.
80 * Call only when holding lock.
81 */
82 private void insert(E x) {
83 items[putIndex] = x;
84 putIndex = inc(putIndex);
85 ++count;
86 notEmpty.signal();
87 }
88
89 /**
90 * Extract element at current take position, advance, and signal.
91 * Call only when holding lock.
92 */
93 private E extract() {
94 E x = items[takeIndex];
95 items[takeIndex] = null;
96 takeIndex = inc(takeIndex);
97 --count;
98 notFull.signal();
99 return x;
100 }
101
102 /**
103 * Utility for remove and iterator.remove: Delete item at position i.
104 * Call only when holding lock.
105 */
106 void removeAt(int i) {
107 // if removing front item, just advance
108 if (i == takeIndex) {
109 items[takeIndex] = null;
110 takeIndex = inc(takeIndex);
111 }
112 else {
113 // slide over all others up through putIndex.
114 for (;;) {
115 int nexti = inc(i);
116 if (nexti != putIndex) {
117 items[i] = items[nexti];
118 i = nexti;
119 }
120 else {
121 items[i] = null;
122 putIndex = i;
123 break;
124 }
125 }
126 }
127 --count;
128 notFull.signal();
129 }
130
131 /**
132 * Internal constructor also used by readResolve.
133 * Sets all final fields, plus count.
134 * @param cap the capacity
135 * @param array the array to use or null if should create new one
136 * @param count the number of items in the array, where indices 0
137 * to count-1 hold items.
138 * @param lk the lock to use with this queue
139 */
140 private ArrayBlockingQueue(int cap, E[] array, int count,
141 ReentrantLock lk) {
142 if (cap <= 0)
143 throw new IllegalArgumentException();
144 if (array == null)
145 this.items = (E[]) new Object[cap];
146 else
147 this.items = array;
148 this.putIndex = count;
149 this.count = count;
150 lock = lk;
151 notEmpty = lock.newCondition();
152 notFull = lock.newCondition();
153 }
154
155 /**
156 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
157 * capacity and default access policy.
158 * @param capacity the capacity of this queue
159 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
160 */
161 public ArrayBlockingQueue(int capacity) {
162 this(capacity, null, 0, new ReentrantLock());
163 }
164
165 /**
166 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
167 * capacity and the specified access policy.
168 * @param capacity the capacity of this queue
169 * @param fair if <tt>true</tt> then queue accesses for threads blocked
170 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
171 * the access order is unspecified.
172 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
173 */
174 public ArrayBlockingQueue(int capacity, boolean fair) {
175 this(capacity, null, 0, new ReentrantLock(fair));
176 }
177
178 /**
179 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
180 * capacity, the specified access policy and initially holding the
181 * elements of the given collection,
182 * added in traversal order of the collection's iterator.
183 * @param capacity the capacity of this queue
184 * @param fair if <tt>true</tt> then queue accesses for threads blocked
185 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
186 * the access order is unspecified.
187 * @param c the collection of elements to initially contain
188 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
189 * <tt>c.size()</tt>, or less than 1.
190 * @throws NullPointerException if <tt>c</tt> or any element within it
191 * is <tt>null</tt>
192 */
193 public ArrayBlockingQueue(int capacity, boolean fair, Collection<E> c) {
194 this(capacity, null, 0, new ReentrantLock(fair));
195
196 if (capacity < c.size())
197 throw new IllegalArgumentException();
198
199 for (Iterator<E> it = c.iterator(); it.hasNext();)
200 add(it.next());
201 }
202
203 // Have to override just to update the javadoc for @throws
204
205 /**
206 * @throws IllegalStateException {@inheritDoc}
207 * @throws NullPointerException {@inheritDoc}
208 */
209 public boolean add(E o) {
210 return super.add(o);
211 }
212
213 /**
214 * @throws IllegalStateException {@inheritDoc}
215 * @throws NullPointerException {@inheritDoc}
216 */
217 public boolean addAll(Collection<? extends E> c) {
218 return super.addAll(c);
219 }
220
221 /**
222 * Add the specified element to the tail of this queue if possible,
223 * returning immediately if this queue is full.
224 *
225 * @throws NullPointerException {@inheritDoc}
226 */
227 public boolean offer(E x) {
228 if (x == null) throw new NullPointerException();
229 lock.lock();
230 try {
231 if (count == items.length)
232 return false;
233 else {
234 insert(x);
235 return true;
236 }
237 }
238 finally {
239 lock.unlock();
240 }
241 }
242
243 /**
244 * Add the specified element to the tail of this queue, waiting if
245 * necessary up to the specified wait time for space to become available.
246 * @throws NullPointerException {@inheritDoc}
247 */
248 public boolean offer(E x, long timeout, TimeUnit unit)
249 throws InterruptedException {
250
251 if (x == null) throw new NullPointerException();
252
253 lock.lockInterruptibly();
254 try {
255 long nanos = unit.toNanos(timeout);
256 for (;;) {
257 if (count != items.length) {
258 insert(x);
259 return true;
260 }
261 if (nanos <= 0)
262 return false;
263 try {
264 nanos = notFull.awaitNanos(nanos);
265 }
266 catch (InterruptedException ie) {
267 notFull.signal(); // propagate to non-interrupted thread
268 throw ie;
269 }
270 }
271 }
272 finally {
273 lock.unlock();
274 }
275 }
276
277
278 public E poll() {
279 lock.lock();
280 try {
281 if (count == 0)
282 return null;
283 E x = extract();
284 return x;
285 }
286 finally {
287 lock.unlock();
288 }
289 }
290
291 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
292 lock.lockInterruptibly();
293 try {
294 long nanos = unit.toNanos(timeout);
295 for (;;) {
296 if (count != 0) {
297 E x = extract();
298 return x;
299 }
300 if (nanos <= 0)
301 return null;
302 try {
303 nanos = notEmpty.awaitNanos(nanos);
304 }
305 catch (InterruptedException ie) {
306 notEmpty.signal(); // propagate to non-interrupted thread
307 throw ie;
308 }
309
310 }
311 }
312 finally {
313 lock.unlock();
314 }
315 }
316
317
318 public boolean remove(Object x) {
319 if (x == null) return false;
320 lock.lock();
321 try {
322 int i = takeIndex;
323 int k = 0;
324 for (;;) {
325 if (k++ >= count)
326 return false;
327 if (x.equals(items[i])) {
328 removeAt(i);
329 return true;
330 }
331 i = inc(i);
332 }
333
334 }
335 finally {
336 lock.unlock();
337 }
338 }
339
340 public E peek() {
341 lock.lock();
342 try {
343 return (count == 0) ? null : items[takeIndex];
344 }
345 finally {
346 lock.unlock();
347 }
348 }
349
350 public E take() throws InterruptedException {
351 lock.lockInterruptibly();
352 try {
353 try {
354 while (count == 0)
355 notEmpty.await();
356 }
357 catch (InterruptedException ie) {
358 notEmpty.signal(); // propagate to non-interrupted thread
359 throw ie;
360 }
361 E x = extract();
362 return x;
363 }
364 finally {
365 lock.unlock();
366 }
367 }
368
369 /**
370 * Add the specified element to the tail of this queue, waiting if
371 * necessary for space to become available.
372 * @throws NullPointerException {@inheritDoc}
373 */
374 public void put(E x) throws InterruptedException {
375
376 if (x == null) throw new NullPointerException();
377
378 lock.lockInterruptibly();
379 try {
380 try {
381 while (count == items.length)
382 notFull.await();
383 }
384 catch (InterruptedException ie) {
385 notFull.signal(); // propagate to non-interrupted thread
386 throw ie;
387 }
388 insert(x);
389 }
390 finally {
391 lock.unlock();
392 }
393 }
394
395 // this doc comment is overridden to remove the reference to collections
396 // greater in size than Integer.MAX_VALUE
397 /**
398 * Return the number of elements in this collection.
399 */
400 public int size() {
401 lock.lock();
402 try {
403 return count;
404 }
405 finally {
406 lock.unlock();
407 }
408 }
409
410 // this doc comment is a modified copy of the inherited doc comment,
411 // without the reference to unlimited queues.
412 /**
413 * Return the number of elements that this queue can ideally (in
414 * the absence of memory or resource constraints) accept without
415 * blocking. This is always equal to the initial capacity of this queue
416 * less the current <tt>size</tt> of this queue.
417 * <p>Note that you <em>cannot</em> always tell if
418 * an attempt to <tt>add</tt> an element will succeed by
419 * inspecting <tt>remainingCapacity</tt> because it may be the
420 * case that a waiting consumer is ready to <tt>take</tt> an
421 * element out of an otherwise full queue.
422 */
423 public int remainingCapacity() {
424 lock.lock();
425 try {
426 return items.length - count;
427 }
428 finally {
429 lock.unlock();
430 }
431 }
432
433
434 public boolean contains(Object x) {
435 if (x == null) return false;
436 lock.lock();
437 try {
438 int i = takeIndex;
439 int k = 0;
440 while (k++ < count) {
441 if (x.equals(items[i]))
442 return true;
443 i = inc(i);
444 }
445 return false;
446 }
447 finally {
448 lock.unlock();
449 }
450 }
451
452 public Object[] toArray() {
453 lock.lock();
454 try {
455 E[] a = (E[]) new Object[count];
456 int k = 0;
457 int i = takeIndex;
458 while (k < count) {
459 a[k++] = items[i];
460 i = inc(i);
461 }
462 return a;
463 }
464 finally {
465 lock.unlock();
466 }
467 }
468
469 public <T> T[] toArray(T[] a) {
470 lock.lock();
471 try {
472 if (a.length < count)
473 a = (T[])java.lang.reflect.Array.newInstance(
474 a.getClass().getComponentType(),
475 count
476 );
477
478 int k = 0;
479 int i = takeIndex;
480 while (k < count) {
481 a[k++] = (T)items[i];
482 i = inc(i);
483 }
484 if (a.length > count)
485 a[count] = null;
486 return a;
487 }
488 finally {
489 lock.unlock();
490 }
491 }
492
493 public String toString() {
494 lock.lock();
495 try {
496 return super.toString();
497 }
498 finally {
499 lock.unlock();
500 }
501 }
502
503 /**
504 * Returns an iterator over the elements in this queue in proper sequence.
505 *
506 * @return an iterator over the elements in this queue in proper sequence.
507 */
508 public Iterator<E> iterator() {
509 lock.lock();
510 try {
511 return new Itr();
512 }
513 finally {
514 lock.unlock();
515 }
516 }
517
518 /**
519 * Iterator for ArrayBlockingQueue
520 */
521 private class Itr implements Iterator<E> {
522 /**
523 * Index of element to be returned by next,
524 * or a negative number if no such.
525 */
526 private int nextIndex;
527
528 /**
529 * nextItem holds on to item fields because once we claim
530 * that an element exists in hasNext(), we must return it in
531 * the following next() call even if it was in the process of
532 * being removed when hasNext() was called.
533 **/
534 private E nextItem;
535
536 /**
537 * Index of element returned by most recent call to next.
538 * Reset to -1 if this element is deleted by a call to remove.
539 */
540 private int lastRet;
541
542 Itr() {
543 lastRet = -1;
544 if (count == 0)
545 nextIndex = -1;
546 else {
547 nextIndex = takeIndex;
548 nextItem = items[takeIndex];
549 }
550 }
551
552 public boolean hasNext() {
553 /*
554 * No sync. We can return true by mistake here
555 * only if this iterator passed across threads,
556 * which we don't support anyway.
557 */
558 return nextIndex >= 0;
559 }
560
561 /**
562 * Check whether nextIndex is valid; if so setting nextItem.
563 * Stops iterator when either hits putIndex or sees null item.
564 */
565 private void checkNext() {
566 if (nextIndex == putIndex) {
567 nextIndex = -1;
568 nextItem = null;
569 }
570 else {
571 nextItem = items[nextIndex];
572 if (nextItem == null)
573 nextIndex = -1;
574 }
575 }
576
577 public E next() {
578 lock.lock();
579 try {
580 if (nextIndex < 0)
581 throw new NoSuchElementException();
582 lastRet = nextIndex;
583 E x = nextItem;
584 nextIndex = inc(nextIndex);
585 checkNext();
586 return x;
587 }
588 finally {
589 lock.unlock();
590 }
591 }
592
593 public void remove() {
594 lock.lock();
595 try {
596 int i = lastRet;
597 if (i == -1)
598 throw new IllegalStateException();
599 lastRet = -1;
600
601 int ti = takeIndex;
602 removeAt(i);
603 // back up cursor (reset to front if was first element)
604 nextIndex = (i == ti) ? takeIndex : i;
605 checkNext();
606 }
607 finally {
608 lock.unlock();
609 }
610 }
611 }
612
613 /**
614 * Save the state to a stream (that is, serialize it).
615 *
616 * @serialData The maximumSize is emitted (int), followed by all of
617 * its elements (each an <tt>E</tt>) in the proper order.
618 * @param s the stream
619 */
620 private void writeObject(java.io.ObjectOutputStream s)
621 throws java.io.IOException {
622
623 // Write out element count, and any hidden stuff
624 s.defaultWriteObject();
625 // Write out maximumSize == items length
626 s.writeInt(items.length);
627
628 // Write out all elements in the proper order.
629 int i = takeIndex;
630 int k = 0;
631 while (k++ < count) {
632 s.writeObject(items[i]);
633 i = inc(i);
634 }
635 }
636
637 /**
638 * Reconstitute this queue instance from a stream (that is,
639 * deserialize it).
640 * @param s the stream
641 */
642 private void readObject(java.io.ObjectInputStream s)
643 throws java.io.IOException, ClassNotFoundException {
644 // Read in size, and any hidden stuff
645 s.defaultReadObject();
646 int size = count;
647
648 // Read in array length and allocate array
649 int arrayLength = s.readInt();
650
651 // We use deserializedItems here because "items" is final
652 deserializedItems = (E[]) new Object[arrayLength];
653
654 // Read in all elements in the proper order into deserializedItems
655 for (int i = 0; i < size; i++)
656 deserializedItems[i] = (E)s.readObject();
657 }
658
659 /**
660 * Throw away the object created with readObject, and replace it
661 * with a usable ArrayBlockingQueue.
662 * @return the ArrayBlockingQueue
663 */
664 private Object readResolve() throws java.io.ObjectStreamException {
665 E[] array = deserializedItems;
666 deserializedItems = null;
667 return new ArrayBlockingQueue(array.length, array, count, lock);
668 }
669 }