ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.25
Committed: Fri Sep 12 15:40:10 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.24: +31 -59 lines
Log Message:
Adapt AbstractQueue changes; Conditionalize CancellableTask.reset; new TimeUnit methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@linkplain BlockingQueue blocking queue} backed by an
13 * array. This queue orders elements FIFO (first-in-first-out). The
14 * <em>head</em> of the queue is that element that has been on the
15 * queue the longest time. The <em>tail</em> of the queue is that
16 * element that has been on the queue the shortest time. New elements
17 * are inserted at the tail of the queue, and the queue retrieval
18 * operations obtain elements at the head of the queue.
19 *
20 * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
21 * array holds
22 * elements inserted by producers and extracted by consumers. Once
23 * created, the capacity can not be increased. Attempts to offer an
24 * element to a full queue will result in the offer operation
25 * blocking; attempts to retrieve an element from an empty queue will
26 * similarly block.
27 *
28 * <p> This class supports an optional fairness policy for ordering
29 * threads blocked on an insertion or removal. By default, this
30 * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
31 * constructed with fairness set to <tt>true</tt> grants blocked
32 * threads access in FIFO order. Fairness generally decreases
33 * throughput but reduces variablility and avoids starvation.
34 *
35 * @since 1.5
36 * @author Doug Lea
37 */
38 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = -817911632652898425L;
41
42 /** The queued items */
43 private transient final E[] items;
44 /** items index for next take, poll or remove */
45 private transient int takeIndex;
46 /** items index for next put, offer, or add. */
47 private transient int putIndex;
48 /** Number of items in the queue */
49 private int count;
50
51 /**
52 * An array used only during deserialization, to hold
53 * items read back in from the stream, and then used
54 * as "items" by readResolve via the private constructor.
55 */
56 private transient E[] deserializedItems;
57
58 /*
59 * Concurrency control via the classic two-condition algorithm
60 * found in any textbook.
61 */
62
63 /** Main lock guarding all access */
64 private final ReentrantLock lock;
65 /** Condition for waiting takes */
66 private final Condition notEmpty;
67 /** Condition for wiating puts */
68 private final Condition notFull;
69
70 // Internal helper methods
71
72 /**
73 * Circularly increment i.
74 */
75 int inc(int i) {
76 return (++i == items.length)? 0 : i;
77 }
78
79 /**
80 * Insert element at current put position, advance, and signal.
81 * Call only when holding lock.
82 */
83 private void insert(E x) {
84 items[putIndex] = x;
85 putIndex = inc(putIndex);
86 ++count;
87 notEmpty.signal();
88 }
89
90 /**
91 * Extract element at current take position, advance, and signal.
92 * Call only when holding lock.
93 */
94 private E extract() {
95 E x = items[takeIndex];
96 items[takeIndex] = null;
97 takeIndex = inc(takeIndex);
98 --count;
99 notFull.signal();
100 return x;
101 }
102
103 /**
104 * Utility for remove and iterator.remove: Delete item at position i.
105 * Call only when holding lock.
106 */
107 void removeAt(int i) {
108 // if removing front item, just advance
109 if (i == takeIndex) {
110 items[takeIndex] = null;
111 takeIndex = inc(takeIndex);
112 } else {
113 // slide over all others up through putIndex.
114 for (;;) {
115 int nexti = inc(i);
116 if (nexti != putIndex) {
117 items[i] = items[nexti];
118 i = nexti;
119 } else {
120 items[i] = null;
121 putIndex = i;
122 break;
123 }
124 }
125 }
126 --count;
127 notFull.signal();
128 }
129
130 /**
131 * Internal constructor also used by readResolve.
132 * Sets all final fields, plus count.
133 * @param cap the capacity
134 * @param array the array to use or null if should create new one
135 * @param count the number of items in the array, where indices 0
136 * to count-1 hold items.
137 * @param lk the lock to use with this queue
138 */
139 private ArrayBlockingQueue(int cap, E[] array, int count,
140 ReentrantLock lk) {
141 if (cap <= 0)
142 throw new IllegalArgumentException();
143 if (array == null)
144 this.items = (E[]) new Object[cap];
145 else
146 this.items = array;
147 this.putIndex = count;
148 this.count = count;
149 lock = lk;
150 notEmpty = lock.newCondition();
151 notFull = lock.newCondition();
152 }
153
154 /**
155 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
156 * capacity and default access policy.
157 * @param capacity the capacity of this queue
158 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
159 */
160 public ArrayBlockingQueue(int capacity) {
161 this(capacity, null, 0, new ReentrantLock());
162 }
163
164 /**
165 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
166 * capacity and the specified access policy.
167 * @param capacity the capacity of this queue
168 * @param fair if <tt>true</tt> then queue accesses for threads blocked
169 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
170 * the access order is unspecified.
171 * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1
172 */
173 public ArrayBlockingQueue(int capacity, boolean fair) {
174 this(capacity, null, 0, new ReentrantLock(fair));
175 }
176
177 /**
178 * Creates an <tt>ArrayBlockingQueue</tt> with the given (fixed)
179 * capacity, the specified access policy and initially containing the
180 * elements of the given collection,
181 * added in traversal order of the collection's iterator.
182 * @param capacity the capacity of this queue
183 * @param fair if <tt>true</tt> then queue accesses for threads blocked
184 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
185 * the access order is unspecified.
186 * @param c the collection of elements to initially contain
187 * @throws IllegalArgumentException if <tt>capacity</tt> is less than
188 * <tt>c.size()</tt>, or less than 1.
189 * @throws NullPointerException if <tt>c</tt> or any element within it
190 * is <tt>null</tt>
191 */
192 public ArrayBlockingQueue(int capacity, boolean fair,
193 Collection<? extends E> c) {
194 this(capacity, null, 0, new ReentrantLock(fair));
195
196 if (capacity < c.size())
197 throw new IllegalArgumentException();
198
199 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
200 add(it.next());
201 }
202
203 /**
204 * Inserts the specified element to this queue, if possible;
205 * failing if the queue is full.
206 *
207 * @param o the element to add.
208 * @return <tt>true</tt> if it was possible to add the element to
209 * this queue, else <tt>false</tt>
210 * @throws NullPointerException if the specified element is <tt>null</tt>
211 */
212 public boolean offer(E o) {
213 if (o == null) throw new NullPointerException();
214 lock.lock();
215 try {
216 if (count == items.length)
217 return false;
218 else {
219 insert(o);
220 return true;
221 }
222 } finally {
223 lock.unlock();
224 }
225 }
226
227 /**
228 * Adds the specified element to the tail of this queue, waiting if
229 * necessary up to the specified wait time for space to become available.
230 * @param o the element to add
231 * @param timeout how long to wait before giving up, in units of
232 * <tt>unit</tt>
233 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
234 * <tt>timeout</tt> parameter
235 * @return <tt>true</tt> if successful, or <tt>false</tt> if
236 * the specified waiting time elapses before space is available.
237 * @throws InterruptedException if interrupted while waiting.
238 * @throws NullPointerException if the specified element is <tt>null</tt>.
239 */
240 public boolean offer(E o, long timeout, TimeUnit unit)
241 throws InterruptedException {
242
243 if (o == null) throw new NullPointerException();
244
245 lock.lockInterruptibly();
246 try {
247 long nanos = unit.toNanos(timeout);
248 for (;;) {
249 if (count != items.length) {
250 insert(o);
251 return true;
252 }
253 if (nanos <= 0)
254 return false;
255 try {
256 nanos = notFull.awaitNanos(nanos);
257 } catch (InterruptedException ie) {
258 notFull.signal(); // propagate to non-interrupted thread
259 throw ie;
260 }
261 }
262 } finally {
263 lock.unlock();
264 }
265 }
266
267
268 public E poll() {
269 lock.lock();
270 try {
271 if (count == 0)
272 return null;
273 E x = extract();
274 return x;
275 } finally {
276 lock.unlock();
277 }
278 }
279
280 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
281 lock.lockInterruptibly();
282 try {
283 long nanos = unit.toNanos(timeout);
284 for (;;) {
285 if (count != 0) {
286 E x = extract();
287 return x;
288 }
289 if (nanos <= 0)
290 return null;
291 try {
292 nanos = notEmpty.awaitNanos(nanos);
293 } catch (InterruptedException ie) {
294 notEmpty.signal(); // propagate to non-interrupted thread
295 throw ie;
296 }
297
298 }
299 } finally {
300 lock.unlock();
301 }
302 }
303
304
305 public boolean remove(Object o) {
306 if (o == null) return false;
307 lock.lock();
308 try {
309 int i = takeIndex;
310 int k = 0;
311 for (;;) {
312 if (k++ >= count)
313 return false;
314 if (o.equals(items[i])) {
315 removeAt(i);
316 return true;
317 }
318 i = inc(i);
319 }
320
321 } finally {
322 lock.unlock();
323 }
324 }
325
326 public E peek() {
327 lock.lock();
328 try {
329 return (count == 0) ? null : items[takeIndex];
330 } finally {
331 lock.unlock();
332 }
333 }
334
335 public E take() throws InterruptedException {
336 lock.lockInterruptibly();
337 try {
338 try {
339 while (count == 0)
340 notEmpty.await();
341 } catch (InterruptedException ie) {
342 notEmpty.signal(); // propagate to non-interrupted thread
343 throw ie;
344 }
345 E x = extract();
346 return x;
347 } finally {
348 lock.unlock();
349 }
350 }
351
352 /**
353 * Adds the specified element to the tail of this queue, waiting if
354 * necessary for space to become available.
355 * @param o the element to add
356 * @throws InterruptedException if interrupted while waiting.
357 * @throws NullPointerException if the specified element is <tt>null</tt>.
358 */
359 public void put(E o) throws InterruptedException {
360
361 if (o == null) throw new NullPointerException();
362
363 lock.lockInterruptibly();
364 try {
365 try {
366 while (count == items.length)
367 notFull.await();
368 } catch (InterruptedException ie) {
369 notFull.signal(); // propagate to non-interrupted thread
370 throw ie;
371 }
372 insert(o);
373 } finally {
374 lock.unlock();
375 }
376 }
377
378 // this doc comment is overridden to remove the reference to collections
379 // greater in size than Integer.MAX_VALUE
380 /**
381 * Returns the number of elements in this queue.
382 *
383 * @return the number of elements in this queue.
384 */
385 public int size() {
386 lock.lock();
387 try {
388 return count;
389 } finally {
390 lock.unlock();
391 }
392 }
393
394 // this doc comment is a modified copy of the inherited doc comment,
395 // without the reference to unlimited queues.
396 /**
397 * Returns the number of elements that this queue can ideally (in
398 * the absence of memory or resource constraints) accept without
399 * blocking. This is always equal to the initial capacity of this queue
400 * less the current <tt>size</tt> of this queue.
401 * <p>Note that you <em>cannot</em> always tell if
402 * an attempt to <tt>add</tt> an element will succeed by
403 * inspecting <tt>remainingCapacity</tt> because it may be the
404 * case that a waiting consumer is ready to <tt>take</tt> an
405 * element out of an otherwise full queue.
406 */
407 public int remainingCapacity() {
408 lock.lock();
409 try {
410 return items.length - count;
411 } finally {
412 lock.unlock();
413 }
414 }
415
416
417 public boolean contains(Object o) {
418 if (o == null) return false;
419 lock.lock();
420 try {
421 int i = takeIndex;
422 int k = 0;
423 while (k++ < count) {
424 if (o.equals(items[i]))
425 return true;
426 i = inc(i);
427 }
428 return false;
429 } finally {
430 lock.unlock();
431 }
432 }
433
434 public Object[] toArray() {
435 lock.lock();
436 try {
437 E[] a = (E[]) new Object[count];
438 int k = 0;
439 int i = takeIndex;
440 while (k < count) {
441 a[k++] = items[i];
442 i = inc(i);
443 }
444 return a;
445 } finally {
446 lock.unlock();
447 }
448 }
449
450 public <T> T[] toArray(T[] a) {
451 lock.lock();
452 try {
453 if (a.length < count)
454 a = (T[])java.lang.reflect.Array.newInstance(
455 a.getClass().getComponentType(),
456 count
457 );
458
459 int k = 0;
460 int i = takeIndex;
461 while (k < count) {
462 a[k++] = (T)items[i];
463 i = inc(i);
464 }
465 if (a.length > count)
466 a[count] = null;
467 return a;
468 } finally {
469 lock.unlock();
470 }
471 }
472
473 public String toString() {
474 lock.lock();
475 try {
476 return super.toString();
477 } finally {
478 lock.unlock();
479 }
480 }
481
482 /**
483 * Returns an iterator over the elements in this queue in proper sequence.
484 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
485 * will never throw {@link java.util.ConcurrentModificationException},
486 * and guarantees to traverse elements as they existed upon
487 * construction of the iterator, and may (but is not guaranteed to)
488 * reflect any modifications subsequent to construction.
489 *
490 * @return an iterator over the elements in this queue in proper sequence.
491 */
492 public Iterator<E> iterator() {
493 lock.lock();
494 try {
495 return new Itr();
496 } finally {
497 lock.unlock();
498 }
499 }
500
501 /**
502 * Iterator for ArrayBlockingQueue
503 */
504 private class Itr implements Iterator<E> {
505 /**
506 * Index of element to be returned by next,
507 * or a negative number if no such.
508 */
509 private int nextIndex;
510
511 /**
512 * nextItem holds on to item fields because once we claim
513 * that an element exists in hasNext(), we must return it in
514 * the following next() call even if it was in the process of
515 * being removed when hasNext() was called.
516 **/
517 private E nextItem;
518
519 /**
520 * Index of element returned by most recent call to next.
521 * Reset to -1 if this element is deleted by a call to remove.
522 */
523 private int lastRet;
524
525 Itr() {
526 lastRet = -1;
527 if (count == 0)
528 nextIndex = -1;
529 else {
530 nextIndex = takeIndex;
531 nextItem = items[takeIndex];
532 }
533 }
534
535 public boolean hasNext() {
536 /*
537 * No sync. We can return true by mistake here
538 * only if this iterator passed across threads,
539 * which we don't support anyway.
540 */
541 return nextIndex >= 0;
542 }
543
544 /**
545 * Check whether nextIndex is valid; if so setting nextItem.
546 * Stops iterator when either hits putIndex or sees null item.
547 */
548 private void checkNext() {
549 if (nextIndex == putIndex) {
550 nextIndex = -1;
551 nextItem = null;
552 } else {
553 nextItem = items[nextIndex];
554 if (nextItem == null)
555 nextIndex = -1;
556 }
557 }
558
559 public E next() {
560 lock.lock();
561 try {
562 if (nextIndex < 0)
563 throw new NoSuchElementException();
564 lastRet = nextIndex;
565 E x = nextItem;
566 nextIndex = inc(nextIndex);
567 checkNext();
568 return x;
569 } finally {
570 lock.unlock();
571 }
572 }
573
574 public void remove() {
575 lock.lock();
576 try {
577 int i = lastRet;
578 if (i == -1)
579 throw new IllegalStateException();
580 lastRet = -1;
581
582 int ti = takeIndex;
583 removeAt(i);
584 // back up cursor (reset to front if was first element)
585 nextIndex = (i == ti) ? takeIndex : i;
586 checkNext();
587 } finally {
588 lock.unlock();
589 }
590 }
591 }
592
593 /**
594 * Save the state to a stream (that is, serialize it).
595 *
596 * @serialData The maximumSize is emitted (int), followed by all of
597 * its elements (each an <tt>E</tt>) in the proper order.
598 * @param s the stream
599 */
600 private void writeObject(java.io.ObjectOutputStream s)
601 throws java.io.IOException {
602
603 // Write out element count, and any hidden stuff
604 s.defaultWriteObject();
605 // Write out maximumSize == items length
606 s.writeInt(items.length);
607
608 // Write out all elements in the proper order.
609 int i = takeIndex;
610 int k = 0;
611 while (k++ < count) {
612 s.writeObject(items[i]);
613 i = inc(i);
614 }
615 }
616
617 /**
618 * Reconstitute this queue instance from a stream (that is,
619 * deserialize it).
620 * @param s the stream
621 */
622 private void readObject(java.io.ObjectInputStream s)
623 throws java.io.IOException, ClassNotFoundException {
624 // Read in size, and any hidden stuff
625 s.defaultReadObject();
626 int size = count;
627
628 // Read in array length and allocate array
629 int arrayLength = s.readInt();
630
631 // We use deserializedItems here because "items" is final
632 deserializedItems = (E[]) new Object[arrayLength];
633
634 // Read in all elements in the proper order into deserializedItems
635 for (int i = 0; i < size; i++)
636 deserializedItems[i] = (E)s.readObject();
637 }
638
639 /**
640 * Throw away the object created with readObject, and replace it
641 * with a usable ArrayBlockingQueue.
642 * @return the ArrayBlockingQueue
643 */
644 private Object readResolve() throws java.io.ObjectStreamException {
645 E[] array = deserializedItems;
646 deserializedItems = null;
647 return new ArrayBlockingQueue<E>(array.length, array, count, lock);
648 }
649 }