ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.15
Committed: Mon Jul 28 16:00:19 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.14: +21 -21 lines
Log Message:
Added addAll() back in.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A bounded {@link BlockingQueue blocking queue} backed by an array.
13 * This queue orders elements FIFO (first-in-first-out).
14 * The <em>head</em> of the queue is that element that has been on the
15 * queue the longest time.
16 * The <em>tail</em> of the queue is that element that has been on the
17 * queue the shortest time.
18 *
19 * <p>This is a classic &quot;bounded buffer&quot;, in which a fixed-sized
20 * array holds
21 * elements inserted by producers and extracted by consumers. Once
22 * created, the capacity can not be increased. Attempts to offer an
23 * element to a full queue will result in the offer operation
24 * blocking; attempts to retrieve an element from an empty queue will
25 * similarly block.
26 *
27 * <p> This class supports an optional fairness policy for ordering
28 * threads blocked on an insertion or removal. By default, this
29 * ordering is not guaranteed. However, an <tt>ArrayBlockingQueue</tt>
30 * constructed with fairness set to <tt>true</tt> grants blocked
31 * threads access in FIFO order. Fairness generally substantially
32 * decreases throughput but reduces variablility and avoids
33 * starvation.
34 *
35 * @since 1.5
36 * @author Doug Lea
37 */
38 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40
41 /** The queued items */
42 private transient final E[] items;
43 /** items index for next take, poll or remove */
44 private transient int takeIndex;
45 /** items index for next put, offer, or add. */
46 private transient int putIndex;
47 /** Number of items in the queue */
48 private int count;
49
50 /**
51 * An array used only during deserialization, to hold
52 * items read back in from the stream, and then used
53 * as "items" by readResolve via the private constructor.
54 */
55 private transient E[] deserializedItems;
56
57 /*
58 * Concurrency control via the classic two-condition algorithm
59 * found in any textbook.
60 */
61
62 /** Main lock guarding all access */
63 private final ReentrantLock lock;
64 /** Condition for waiting takes */
65 private final Condition notEmpty;
66 /** Condition for wiating puts */
67 private final Condition notFull;
68
69 // Internal helper methods
70
71 /**
72 * Circularly increment i.
73 */
74 int inc(int i) {
75 return (++i == items.length)? 0 : i;
76 }
77
78 /**
79 * Insert element at current put position, advance, and signal.
80 * Call only when holding lock.
81 */
82 private void insert(E x) {
83 items[putIndex] = x;
84 putIndex = inc(putIndex);
85 ++count;
86 notEmpty.signal();
87 }
88
89 /**
90 * Extract element at current take position, advance, and signal.
91 * Call only when holding lock.
92 */
93 private E extract() {
94 E x = items[takeIndex];
95 items[takeIndex] = null;
96 takeIndex = inc(takeIndex);
97 --count;
98 notFull.signal();
99 return x;
100 }
101
102 /**
103 * Utility for remove and iterator.remove: Delete item at position i.
104 * Call only when holding lock.
105 */
106 void removeAt(int i) {
107 // if removing front item, just advance
108 if (i == takeIndex) {
109 items[takeIndex] = null;
110 takeIndex = inc(takeIndex);
111 }
112 else {
113 // slide over all others up through putIndex.
114 for (;;) {
115 int nexti = inc(i);
116 if (nexti != putIndex) {
117 items[i] = items[nexti];
118 i = nexti;
119 }
120 else {
121 items[i] = null;
122 putIndex = i;
123 break;
124 }
125 }
126 }
127 --count;
128 notFull.signal();
129 }
130
131 /**
132 * Internal constructor also used by readResolve.
133 * Sets all final fields, plus count.
134 * @param cap the capacity
135 * @param array the array to use or null if should create new one
136 * @param count the number of items in the array, where indices 0
137 * to count-1 hold items.
138 */
139 private ArrayBlockingQueue(int cap, E[] array, int count,
140 ReentrantLock lk) {
141 if (cap <= 0)
142 throw new IllegalArgumentException();
143 if (array == null)
144 this.items = (E[]) new Object[cap];
145 else
146 this.items = array;
147 this.putIndex = count;
148 this.count = count;
149 lock = lk;
150 notEmpty = lock.newCondition();
151 notFull = lock.newCondition();
152 }
153
154 /**
155 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
156 * capacity and default access policy.
157 * @param capacity the capacity of this queue
158 */
159 public ArrayBlockingQueue(int capacity) {
160 this(capacity, null, 0, new ReentrantLock());
161 }
162
163 /**
164 * Create an <tt>ArrayBlockingQueue</tt> with the given (fixed)
165 * capacity and the specified access policy.
166 * @param capacity the capacity of this queue
167 * @param fair if <tt>true</tt> then queue accesses for threads blocked
168 * on insertion or removal, are processed in FIFO order; if <tt>false</tt>
169 * the access order is unspecified.
170 */
171 public ArrayBlockingQueue(int capacity, boolean fair) {
172 this(capacity, null, 0, new ReentrantLock(fair));
173 }
174
175
176 // Have to override just to update the javadoc for @throws
177
178 /**
179 * @throws IllegalStateException {@inheritDoc}
180 */
181 public boolean add(E x) {
182 return super.add(x);
183 }
184
185 /**
186 * @throws IllegalStateException {@inheritDoc}
187 */
188 public boolean addAll(Collection<? extends E> c) {
189 return super.addAll(c);
190 }
191
192 /**
193 * Add the specified element to the tail of this queue if possible,
194 * returning immediately if this queue is full.
195 *
196 * @throws NullPointerException {@inheritDoc}
197 */
198 public boolean offer(E x) {
199 if (x == null) throw new NullPointerException();
200 lock.lock();
201 try {
202 if (count == items.length)
203 return false;
204 else {
205 insert(x);
206 return true;
207 }
208 }
209 finally {
210 lock.unlock();
211 }
212 }
213
214 /**
215 * Add the specified element to the tail of this queue, waiting if
216 * necessary up to the specified wait time for space to become available.
217 * @throws NullPointerException {@inheritDoc}
218 */
219 public boolean offer(E x, long timeout, TimeUnit unit)
220 throws InterruptedException {
221
222 if (x == null) throw new NullPointerException();
223
224 lock.lockInterruptibly();
225 try {
226 long nanos = unit.toNanos(timeout);
227 for (;;) {
228 if (count != items.length) {
229 insert(x);
230 return true;
231 }
232 if (nanos <= 0)
233 return false;
234 try {
235 nanos = notFull.awaitNanos(nanos);
236 }
237 catch (InterruptedException ie) {
238 notFull.signal(); // propagate to non-interrupted thread
239 throw ie;
240 }
241 }
242 }
243 finally {
244 lock.unlock();
245 }
246 }
247
248
249 public E poll() {
250 lock.lock();
251 try {
252 if (count == 0)
253 return null;
254 E x = extract();
255 return x;
256 }
257 finally {
258 lock.unlock();
259 }
260 }
261
262 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
263 lock.lockInterruptibly();
264 try {
265 long nanos = unit.toNanos(timeout);
266 for (;;) {
267 if (count != 0) {
268 E x = extract();
269 return x;
270 }
271 if (nanos <= 0)
272 return null;
273 try {
274 nanos = notEmpty.awaitNanos(nanos);
275 }
276 catch (InterruptedException ie) {
277 notEmpty.signal(); // propagate to non-interrupted thread
278 throw ie;
279 }
280
281 }
282 }
283 finally {
284 lock.unlock();
285 }
286 }
287
288
289 public boolean remove(Object x) {
290 if (x == null) return false;
291 lock.lock();
292 try {
293 int i = takeIndex;
294 int k = 0;
295 for (;;) {
296 if (k++ >= count)
297 return false;
298 if (x.equals(items[i])) {
299 removeAt(i);
300 return true;
301 }
302 i = inc(i);
303 }
304
305 }
306 finally {
307 lock.unlock();
308 }
309 }
310
311 public E peek() {
312 lock.lock();
313 try {
314 return (count == 0) ? null : items[takeIndex];
315 }
316 finally {
317 lock.unlock();
318 }
319 }
320
321 public E take() throws InterruptedException {
322 lock.lockInterruptibly();
323 try {
324 try {
325 while (count == 0)
326 notEmpty.await();
327 }
328 catch (InterruptedException ie) {
329 notEmpty.signal(); // propagate to non-interrupted thread
330 throw ie;
331 }
332 E x = extract();
333 return x;
334 }
335 finally {
336 lock.unlock();
337 }
338 }
339
340 /**
341 * Add the specified element to the tail of this queue, waiting if
342 * necessary for space to become available.
343 * @throws NullPointerException {@inheritDoc}
344 */
345 public void put(E x) throws InterruptedException {
346
347 if (x == null) throw new NullPointerException();
348
349 lock.lockInterruptibly();
350 try {
351 try {
352 while (count == items.length)
353 notFull.await();
354 }
355 catch (InterruptedException ie) {
356 notFull.signal(); // propagate to non-interrupted thread
357 throw ie;
358 }
359 insert(x);
360 }
361 finally {
362 lock.unlock();
363 }
364 }
365
366 // this doc comment is overridden to remove the reference to collections
367 // greater in size than Integer.MAX_VALUE
368 /**
369 * Return the number of elements in this collection.
370 */
371 public int size() {
372 lock.lock();
373 try {
374 return count;
375 }
376 finally {
377 lock.unlock();
378 }
379 }
380
381 // this doc comment is a modified copy of the inherited doc comment,
382 // without the reference to unlimited queues.
383 /**
384 * Return the number of elements that this queue can ideally (in
385 * the absence of memory or resource constraints) accept without
386 * blocking. This is always equal to the initial capacity of this queue
387 * less the current <tt>size</tt> of this queue.
388 * <p>Note that you <em>cannot</em> always tell if
389 * an attempt to <tt>add</tt> an element will succeed by
390 * inspecting <tt>remainingCapacity</tt> because it may be the
391 * case that a waiting consumer is ready to <tt>take</tt> an
392 * element out of an otherwise full queue.
393 */
394 public int remainingCapacity() {
395 lock.lock();
396 try {
397 return items.length - count;
398 }
399 finally {
400 lock.unlock();
401 }
402 }
403
404
405 public boolean contains(Object x) {
406 if (x == null) return false;
407 lock.lock();
408 try {
409 int i = takeIndex;
410 int k = 0;
411 while (k++ < count) {
412 if (x.equals(items[i]))
413 return true;
414 i = inc(i);
415 }
416 return false;
417 }
418 finally {
419 lock.unlock();
420 }
421 }
422
423 public Object[] toArray() {
424 lock.lock();
425 try {
426 E[] a = (E[]) new Object[count];
427 int k = 0;
428 int i = takeIndex;
429 while (k < count) {
430 a[k++] = items[i];
431 i = inc(i);
432 }
433 return a;
434 }
435 finally {
436 lock.unlock();
437 }
438 }
439
440 public <T> T[] toArray(T[] a) {
441 lock.lock();
442 try {
443 if (a.length < count)
444 a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
445
446 int k = 0;
447 int i = takeIndex;
448 while (k < count) {
449 a[k++] = (T)items[i];
450 i = inc(i);
451 }
452 if (a.length > count)
453 a[count] = null;
454 return a;
455 }
456 finally {
457 lock.unlock();
458 }
459 }
460
461 public String toString() {
462 lock.lock();
463 try {
464 return super.toString();
465 }
466 finally {
467 lock.unlock();
468 }
469 }
470
471 /**
472 * Returns an iterator over the elements in this queue in proper sequence.
473 *
474 * @return an iterator over the elements in this queue in proper sequence.
475 */
476 public Iterator<E> iterator() {
477 lock.lock();
478 try {
479 return new Itr();
480 }
481 finally {
482 lock.unlock();
483 }
484 }
485
486 /**
487 * Iterator for ArrayBlockingQueue
488 */
489 private class Itr implements Iterator<E> {
490 /**
491 * Index of element to be returned by next,
492 * or a negative number if no such.
493 */
494 private int nextIndex;
495
496 /**
497 * nextItem holds on to item fields because once we claim
498 * that an element exists in hasNext(), we must return it in
499 * the following next() call even if it was in the process of
500 * being removed when hasNext() was called.
501 **/
502 private E nextItem;
503
504 /**
505 * Index of element returned by most recent call to next.
506 * Reset to -1 if this element is deleted by a call to remove.
507 */
508 private int lastRet;
509
510 Itr() {
511 lastRet = -1;
512 if (count == 0)
513 nextIndex = -1;
514 else {
515 nextIndex = takeIndex;
516 nextItem = items[takeIndex];
517 }
518 }
519
520 public boolean hasNext() {
521 /*
522 * No sync. We can return true by mistake here
523 * only if this iterator passed across threads,
524 * which we don't support anyway.
525 */
526 return nextIndex >= 0;
527 }
528
529 /**
530 * Check whether nextIndex is valid; if so setting nextItem.
531 * Stops iterator when either hits putIndex or sees null item.
532 */
533 private void checkNext() {
534 if (nextIndex == putIndex) {
535 nextIndex = -1;
536 nextItem = null;
537 }
538 else {
539 nextItem = items[nextIndex];
540 if (nextItem == null)
541 nextIndex = -1;
542 }
543 }
544
545 public E next() {
546 lock.lock();
547 try {
548 if (nextIndex < 0)
549 throw new NoSuchElementException();
550 lastRet = nextIndex;
551 E x = nextItem;
552 nextIndex = inc(nextIndex);
553 checkNext();
554 return x;
555 }
556 finally {
557 lock.unlock();
558 }
559 }
560
561 public void remove() {
562 lock.lock();
563 try {
564 int i = lastRet;
565 if (i == -1)
566 throw new IllegalStateException();
567 lastRet = -1;
568
569 int ti = takeIndex;
570 removeAt(i);
571 // back up cursor (reset to front if was first element)
572 nextIndex = (i == ti) ? takeIndex : i;
573 checkNext();
574 }
575 finally {
576 lock.unlock();
577 }
578 }
579 }
580
581 /**
582 * Save the state to a stream (that is, serialize it).
583 *
584 * @serialData The maximumSize is emitted (int), followed by all of
585 * its elements (each an <tt>E</tt>) in the proper order.
586 * @param s the stream
587 */
588 private void writeObject(java.io.ObjectOutputStream s)
589 throws java.io.IOException {
590
591 // Write out element count, and any hidden stuff
592 s.defaultWriteObject();
593 // Write out maximumSize == items length
594 s.writeInt(items.length);
595
596 // Write out all elements in the proper order.
597 int i = takeIndex;
598 int k = 0;
599 while (k++ < count) {
600 s.writeObject(items[i]);
601 i = inc(i);
602 }
603 }
604
605 /**
606 * Reconstitute this queue instance from a stream (that is,
607 * deserialize it).
608 * @param s the stream
609 */
610 private void readObject(java.io.ObjectInputStream s)
611 throws java.io.IOException, ClassNotFoundException {
612 // Read in size, and any hidden stuff
613 s.defaultReadObject();
614 int size = count;
615
616 // Read in array length and allocate array
617 int arrayLength = s.readInt();
618
619 // We use deserializedItems here because "items" is final
620 deserializedItems = (E[]) new Object[arrayLength];
621
622 // Read in all elements in the proper order into deserializedItems
623 for (int i = 0; i < size; i++)
624 deserializedItems[i] = (E)s.readObject();
625 }
626
627 /**
628 * Throw away the object created with readObject, and replace it
629 * with a usable ArrayBlockingQueue.
630 * @return the ArrayBlockingQueue
631 */
632 private Object readResolve() throws java.io.ObjectStreamException {
633 E[] array = deserializedItems;
634 deserializedItems = null;
635 return new ArrayBlockingQueue(array.length, array, count, lock);
636 }
637 }