ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.18
Committed: Mon Aug 4 01:58:13 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.17: +2 -1 lines
Log Message:
Fixed wildcards

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@link BlockingQueue blocking queue} backed by an array.
13 * This queue orders elements FIFO (first-in-first-out).
14 * The <em>head</em> of the queue is that element that has been on the
15 * queue the longest time.
16 * The <em>tail</em> of the queue is that element that has been on the
17 * queue the shortest time.
18 *
19 * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
20 * array holds
21 * elements inserted by producers and extracted by consumers. Once
22 * created, the capacity can not be increased. Attempts to offer an
23 * element to a full queue will result in the offer operation
24 * blocking; attempts to retrieve an element from an empty queue will
25 * similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * threads blocked on an insertion or removal. By default, this
29 * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
30 * constructed with fairness set to <tt>true</tt> grants blocked
31 * threads access in FIFO order. Fairness generally substantially
32 * decreases throughput but reduces variablility and avoids
33 * starvation.
34 *
35 * @since 1.5
36 * @author Doug Lea
37 */
38 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40
41 /** The queued items */
42 private transient final E[] items;
43 /** items index for next take, poll or remove */
44 private transient int takeIndex;
45 /** items index for next put, offer, or add. */
46 private transient int putIndex;
47 /** Number of items in the queue */
48 private int count;
49
50 /**
51 * An array used only during deserialization, to hold
52 * items read back in from the stream, and then used
53 * as "items" by readResolve via the private constructor.
54 */
55 private transient E[] deserializedItems;
56
57 /*
58 * Concurrency control via the classic two-condition algorithm
59 * found in any textbook.
60 */
61
62 /** Main lock guarding all access */
63 private final ReentrantLock lock;
64 /** Condition for waiting takes */
65 private final Condition notEmpty;
66 /** Condition for wiating puts */
67 private final Condition notFull;
68
69 // Internal helper methods
70
71 /**
72 * Circularly increment i.
73 */
74 int inc(int i) {
75 return (++i == items.length)? 0 : i;
76 }
77
78 /**
79 * Insert element at current put position, advance, and signal.
80 * Call only when holding lock.
81 */
82 private void insert(E x) {
83 items[putIndex] = x;
84 putIndex = inc(putIndex);
85 ++count;
86 notEmpty.signal();
87 }
88
89 /**
90 * Extract element at current take position, advance, and signal.
91 * Call only when holding lock.
92 */
93 private E extract() {
94 E x = items[takeIndex];
95 items[takeIndex] = null;
96 takeIndex = inc(takeIndex);
97 --count;
98 notFull.signal();
99 return x;
100 }
101
102 /**
103 * Utility for remove and iterator.remove: Delete item at position i.
104 * Call only when holding lock.
105 */
106 void removeAt(int i) {
107 // if removing front item, just advance
108 if (i == takeIndex) {
109 items[takeIndex] = null;
110 takeIndex = inc(takeIndex);
111 }
112 else {
113 // slide over all others up through putIndex.
114 for (;;) {
115 int nexti = inc(i);
116 if (nexti != putIndex) {
117 items[i] = items[nexti];
118 i = nexti;
119 }
120 else {
121 items[i] = null;
122 putIndex = i;
123 break;
124 }
125 }
126 }
127 --count;
128 notFull.signal();
129 }
130
131 /**
132 * Internal constructor also used by readResolve.
133 * Sets all final fields, plus count.
134 * @param cap the capacity
135 * @param array the array to use or null if should create new one
136 * @param count the number of items in the array, where indices 0
137 * to count-1 hold items.
138 * @param lk the lock to use with this queue
139 */
140 private ArrayBlockingQueue(int cap, E[] array, int count,
141 ReentrantLock lk) {
142 if (cap <= 0)
143 throw new IllegalArgumentException();
144 if (array == null)
145 this.items = (E[]) new Object[cap];
146 else
147 this.items = array;
148 this.putIndex = count;
149 this.count = count;
150 lock = lk;
151 notEmpty = lock.newCondition();
152 notFull = lock.newCondition();
153 }
154
155 /**
156 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
157 * capacity and default access policy.
158 * @param capacity the capacity of this queue
159 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
160 */
161 public ArrayBlockingQueue(int capacity) {
162 this(capacity, null, 0, new ReentrantLock());
163 }
164
165 /**
166 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
167 * capacity and the specified access policy.
168 * @param capacity the capacity of this queue
169 * @param fair if <tt>true</tt> then queue accesses for threads blocked
170 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
171 * the access order is unspecified.
172 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
173 */
174 public ArrayBlockingQueue(int capacity, boolean fair) {
175 this(capacity, null, 0, new ReentrantLock(fair));
176 }
177
178 /**
179 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
180 * capacity, the specified access policy and initially holding the
181 * elements of the given collection,
182 * added in traversal order of the collection's iterator.
183 * @param capacity the capacity of this queue
184 * @param fair if <tt>true</tt> then queue accesses for threads blocked
185 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
186 * the access order is unspecified.
187 * @param c the collection of elements to initially contain
188 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
189 * <tt>c.size()</tt>, or less than 1.
190 * @throws NullPointerException if <tt>c</tt> or any element within it
191 * is <tt>null</tt>
192 */
193 public ArrayBlockingQueue(int capacity, boolean fair,
194 Collection<? extends E> c) {
195 this(capacity, null, 0, new ReentrantLock(fair));
196
197 if (capacity < c.size())
198 throw new IllegalArgumentException();
199
200 for (Iterator<E> it = c.iterator(); it.hasNext();)
201 add(it.next());
202 }
203
204 // Have to override just to update the javadoc for @throws
205
206 /**
207 * @throws IllegalStateException {@inheritDoc}
208 * @throws NullPointerException {@inheritDoc}
209 */
210 public boolean add(E o) {
211 return super.add(o);
212 }
213
214 /**
215 * @throws IllegalStateException {@inheritDoc}
216 * @throws NullPointerException {@inheritDoc}
217 */
218 public boolean addAll(Collection<? extends E> c) {
219 return super.addAll(c);
220 }
221
222 /**
223 * Add the specified element to the tail of this queue if possible,
224 * returning immediately if this queue is full.
225 *
226 * @throws NullPointerException {@inheritDoc}
227 */
228 public boolean offer(E x) {
229 if (x == null) throw new NullPointerException();
230 lock.lock();
231 try {
232 if (count == items.length)
233 return false;
234 else {
235 insert(x);
236 return true;
237 }
238 }
239 finally {
240 lock.unlock();
241 }
242 }
243
244 /**
245 * Add the specified element to the tail of this queue, waiting if
246 * necessary up to the specified wait time for space to become available.
247 * @throws NullPointerException {@inheritDoc}
248 */
249 public boolean offer(E x, long timeout, TimeUnit unit)
250 throws InterruptedException {
251
252 if (x == null) throw new NullPointerException();
253
254 lock.lockInterruptibly();
255 try {
256 long nanos = unit.toNanos(timeout);
257 for (;;) {
258 if (count != items.length) {
259 insert(x);
260 return true;
261 }
262 if (nanos <= 0)
263 return false;
264 try {
265 nanos = notFull.awaitNanos(nanos);
266 }
267 catch (InterruptedException ie) {
268 notFull.signal(); // propagate to non-interrupted thread
269 throw ie;
270 }
271 }
272 }
273 finally {
274 lock.unlock();
275 }
276 }
277
278
279 public E poll() {
280 lock.lock();
281 try {
282 if (count == 0)
283 return null;
284 E x = extract();
285 return x;
286 }
287 finally {
288 lock.unlock();
289 }
290 }
291
292 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
293 lock.lockInterruptibly();
294 try {
295 long nanos = unit.toNanos(timeout);
296 for (;;) {
297 if (count != 0) {
298 E x = extract();
299 return x;
300 }
301 if (nanos <= 0)
302 return null;
303 try {
304 nanos = notEmpty.awaitNanos(nanos);
305 }
306 catch (InterruptedException ie) {
307 notEmpty.signal(); // propagate to non-interrupted thread
308 throw ie;
309 }
310
311 }
312 }
313 finally {
314 lock.unlock();
315 }
316 }
317
318
319 public boolean remove(Object x) {
320 if (x == null) return false;
321 lock.lock();
322 try {
323 int i = takeIndex;
324 int k = 0;
325 for (;;) {
326 if (k++ >= count)
327 return false;
328 if (x.equals(items[i])) {
329 removeAt(i);
330 return true;
331 }
332 i = inc(i);
333 }
334
335 }
336 finally {
337 lock.unlock();
338 }
339 }
340
341 public E peek() {
342 lock.lock();
343 try {
344 return (count == 0) ? null : items[takeIndex];
345 }
346 finally {
347 lock.unlock();
348 }
349 }
350
351 public E take() throws InterruptedException {
352 lock.lockInterruptibly();
353 try {
354 try {
355 while (count == 0)
356 notEmpty.await();
357 }
358 catch (InterruptedException ie) {
359 notEmpty.signal(); // propagate to non-interrupted thread
360 throw ie;
361 }
362 E x = extract();
363 return x;
364 }
365 finally {
366 lock.unlock();
367 }
368 }
369
370 /**
371 * Add the specified element to the tail of this queue, waiting if
372 * necessary for space to become available.
373 * @throws NullPointerException {@inheritDoc}
374 */
375 public void put(E x) throws InterruptedException {
376
377 if (x == null) throw new NullPointerException();
378
379 lock.lockInterruptibly();
380 try {
381 try {
382 while (count == items.length)
383 notFull.await();
384 }
385 catch (InterruptedException ie) {
386 notFull.signal(); // propagate to non-interrupted thread
387 throw ie;
388 }
389 insert(x);
390 }
391 finally {
392 lock.unlock();
393 }
394 }
395
396 // this doc comment is overridden to remove the reference to collections
397 // greater in size than Integer.MAX_VALUE
398 /**
399 * Return the number of elements in this collection.
400 */
401 public int size() {
402 lock.lock();
403 try {
404 return count;
405 }
406 finally {
407 lock.unlock();
408 }
409 }
410
411 // this doc comment is a modified copy of the inherited doc comment,
412 // without the reference to unlimited queues.
413 /**
414 * Return the number of elements that this queue can ideally (in
415 * the absence of memory or resource constraints) accept without
416 * blocking. This is always equal to the initial capacity of this queue
417 * less the current <tt>size</tt> of this queue.
418 * <p>Note that you <em>cannot</em> always tell if
419 * an attempt to <tt>add</tt> an element will succeed by
420 * inspecting <tt>remainingCapacity</tt> because it may be the
421 * case that a waiting consumer is ready to <tt>take</tt> an
422 * element out of an otherwise full queue.
423 */
424 public int remainingCapacity() {
425 lock.lock();
426 try {
427 return items.length - count;
428 }
429 finally {
430 lock.unlock();
431 }
432 }
433
434
435 public boolean contains(Object x) {
436 if (x == null) return false;
437 lock.lock();
438 try {
439 int i = takeIndex;
440 int k = 0;
441 while (k++ < count) {
442 if (x.equals(items[i]))
443 return true;
444 i = inc(i);
445 }
446 return false;
447 }
448 finally {
449 lock.unlock();
450 }
451 }
452
453 public Object[] toArray() {
454 lock.lock();
455 try {
456 E[] a = (E[]) new Object[count];
457 int k = 0;
458 int i = takeIndex;
459 while (k < count) {
460 a[k++] = items[i];
461 i = inc(i);
462 }
463 return a;
464 }
465 finally {
466 lock.unlock();
467 }
468 }
469
470 public <T> T[] toArray(T[] a) {
471 lock.lock();
472 try {
473 if (a.length < count)
474 a = (T[])java.lang.reflect.Array.newInstance(
475 a.getClass().getComponentType(),
476 count
477 );
478
479 int k = 0;
480 int i = takeIndex;
481 while (k < count) {
482 a[k++] = (T)items[i];
483 i = inc(i);
484 }
485 if (a.length > count)
486 a[count] = null;
487 return a;
488 }
489 finally {
490 lock.unlock();
491 }
492 }
493
494 public String toString() {
495 lock.lock();
496 try {
497 return super.toString();
498 }
499 finally {
500 lock.unlock();
501 }
502 }
503
504 /**
505 * Returns an iterator over the elements in this queue in proper sequence.
506 *
507 * @return an iterator over the elements in this queue in proper sequence.
508 */
509 public Iterator<E> iterator() {
510 lock.lock();
511 try {
512 return new Itr();
513 }
514 finally {
515 lock.unlock();
516 }
517 }
518
519 /**
520 * Iterator for ArrayBlockingQueue
521 */
522 private class Itr implements Iterator<E> {
523 /**
524 * Index of element to be returned by next,
525 * or a negative number if no such.
526 */
527 private int nextIndex;
528
529 /**
530 * nextItem holds on to item fields because once we claim
531 * that an element exists in hasNext(), we must return it in
532 * the following next() call even if it was in the process of
533 * being removed when hasNext() was called.
534 **/
535 private E nextItem;
536
537 /**
538 * Index of element returned by most recent call to next.
539 * Reset to -1 if this element is deleted by a call to remove.
540 */
541 private int lastRet;
542
543 Itr() {
544 lastRet = -1;
545 if (count == 0)
546 nextIndex = -1;
547 else {
548 nextIndex = takeIndex;
549 nextItem = items[takeIndex];
550 }
551 }
552
553 public boolean hasNext() {
554 /*
555 * No sync. We can return true by mistake here
556 * only if this iterator passed across threads,
557 * which we don't support anyway.
558 */
559 return nextIndex >= 0;
560 }
561
562 /**
563 * Check whether nextIndex is valid; if so setting nextItem.
564 * Stops iterator when either hits putIndex or sees null item.
565 */
566 private void checkNext() {
567 if (nextIndex == putIndex) {
568 nextIndex = -1;
569 nextItem = null;
570 }
571 else {
572 nextItem = items[nextIndex];
573 if (nextItem == null)
574 nextIndex = -1;
575 }
576 }
577
578 public E next() {
579 lock.lock();
580 try {
581 if (nextIndex < 0)
582 throw new NoSuchElementException();
583 lastRet = nextIndex;
584 E x = nextItem;
585 nextIndex = inc(nextIndex);
586 checkNext();
587 return x;
588 }
589 finally {
590 lock.unlock();
591 }
592 }
593
594 public void remove() {
595 lock.lock();
596 try {
597 int i = lastRet;
598 if (i == -1)
599 throw new IllegalStateException();
600 lastRet = -1;
601
602 int ti = takeIndex;
603 removeAt(i);
604 // back up cursor (reset to front if was first element)
605 nextIndex = (i == ti) ? takeIndex : i;
606 checkNext();
607 }
608 finally {
609 lock.unlock();
610 }
611 }
612 }
613
614 /**
615 * Save the state to a stream (that is, serialize it).
616 *
617 * @serialData The maximumSize is emitted (int), followed by all of
618 * its elements (each an <tt>E</tt>) in the proper order.
619 * @param s the stream
620 */
621 private void writeObject(java.io.ObjectOutputStream s)
622 throws java.io.IOException {
623
624 // Write out element count, and any hidden stuff
625 s.defaultWriteObject();
626 // Write out maximumSize == items length
627 s.writeInt(items.length);
628
629 // Write out all elements in the proper order.
630 int i = takeIndex;
631 int k = 0;
632 while (k++ < count) {
633 s.writeObject(items[i]);
634 i = inc(i);
635 }
636 }
637
638 /**
639 * Reconstitute this queue instance from a stream (that is,
640 * deserialize it).
641 * @param s the stream
642 */
643 private void readObject(java.io.ObjectInputStream s)
644 throws java.io.IOException, ClassNotFoundException {
645 // Read in size, and any hidden stuff
646 s.defaultReadObject();
647 int size = count;
648
649 // Read in array length and allocate array
650 int arrayLength = s.readInt();
651
652 // We use deserializedItems here because "items" is final
653 deserializedItems = (E[]) new Object[arrayLength];
654
655 // Read in all elements in the proper order into deserializedItems
656 for (int i = 0; i < size; i++)
657 deserializedItems[i] = (E)s.readObject();
658 }
659
660 /**
661 * Throw away the object created with readObject, and replace it
662 * with a usable ArrayBlockingQueue.
663 * @return the ArrayBlockingQueue
664 */
665 private Object readResolve() throws java.io.ObjectStreamException {
666 E[] array = deserializedItems;
667 deserializedItems = null;
668 return new ArrayBlockingQueue<E>(array.length, array, count, lock);
669 }
670 }