ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.8
Committed: Tue Jun 24 14:34:47 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.7: +35 -68 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11 * A bounded blocking queue backed by an array. The implementation is
12 * a classic "bounded buffer", in which a fixed-sized array holds
13 * elements inserted by producers and extracted by
14 * consumers. Once created, the capacity can not be increased.
15 * Array-based queues typically have more predictable
16 * performance than linked queues but lower throughput in most
17 * concurrent applications.
18 *
19 * Attempts to offer an element to a full queue will result
20 * in the offer operation blocking; attempts to retrieve an element from
21 * an empty queue will similarly block. Threads blocked on an insertion or
22 * removal will be services in FIFO order.
23 * @since 1.5
24 * @author Doug Lea
25 */
26 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
27 implements BlockingQueue<E>, java.io.Serializable {
28
29 /** The queued items */
30 private transient final E[] items;
31 /** items index for next take, poll or remove */
32 private transient int takeIndex;
33 /** items index for next put, offer, or add. */
34 private transient int putIndex;
35 /** Number of items in the queue */
36 private int count;
37
38 /**
39 * An array used only during deserialization, to hold
40 * items read back in from the stream, and then used
41 * as "items" by readResolve via the private constructor.
42 */
43 private transient E[] deserializedItems;
44
45 /*
46 * Concurrency control via the classic two-condition algorithm
47 * found in any textbook.
48 */
49
50 /** Main lock gurding all access */
51 private final FairReentrantLock lock = new FairReentrantLock();
52 /** Condition wor waiting takes */
53 private final Condition notEmpty = lock.newCondition();
54 /** Condition for wiating puts */
55 private final Condition notFull = lock.newCondition();
56
57 // Internal helper methods
58
59 /**
60 * Circularly increment i.
61 */
62 int inc(int i) {
63 return (++i == items.length)? 0 : i;
64 }
65
66 /**
67 * Insert element at current put position and advance.
68 */
69 private void insert(E x) {
70 items[putIndex] = x;
71 putIndex = inc(putIndex);
72 ++count;
73 }
74
75 /**
76 * Extract element at current take position and advance.
77 */
78 private E extract() {
79 E x = items[takeIndex];
80 items[takeIndex] = null;
81 takeIndex = inc(takeIndex);
82 --count;
83 return x;
84 }
85
86 /**
87 * Utility for remove and iterator.remove: Delete item at position
88 * i by sliding over all others up through putIndex.
89 */
90 void removeAt(int i) {
91 for (;;) {
92 int nexti = inc(i);
93 items[i] = items[nexti];
94 if (nexti != putIndex)
95 i = nexti;
96 else {
97 items[nexti] = null;
98 putIndex = i;
99 --count;
100 return;
101 }
102 }
103 }
104
105 /**
106 * Internal constructor also used by readResolve.
107 * Sets all final fields, plus count.
108 * @param cap the maximumSize
109 * @param array the array to use or null if should create new one
110 * @param count the number of items in the array, where indices 0
111 * to count-1 hold items.
112 */
113 private ArrayBlockingQueue(int cap, E[] array, int count) {
114 if (cap <= 0)
115 throw new IllegalArgumentException();
116 if (array == null)
117 this.items = new E[cap];
118 else
119 this.items = array;
120 this.putIndex = count;
121 this.count = count;
122 }
123
124 /**
125 * Creates a new ArrayBlockingQueue with the given (fixed) capacity.
126 * @param maximumSize the capacity
127 */
128 public ArrayBlockingQueue(int maximumSize) {
129 this(maximumSize, null, 0);
130 }
131
132 /**
133 * Creates a new ArrayBlockingQueue with the given (fixed)
134 * capacity, and initially contianing the given elements, added in
135 * iterator traversal order.
136 * @param maximumSize the capacity
137 * @param initialElements the items to hold initially.
138 * @throws IllegalArgumentException if the given capacity is
139 * less than the number of initialElements.
140 */
141 public ArrayBlockingQueue(int maximumSize, Collection<E> initialElements) {
142 this(maximumSize, null, 0);
143 int n = 0;
144 for (Iterator<E> it = initialElements.iterator(); it.hasNext();) {
145 if (++n >= items.length)
146 throw new IllegalArgumentException();
147 items[n] = it.next();
148 }
149 putIndex = count = n;
150 }
151
152 /** Return the number of elements currently in the queue */
153 public int size() {
154 lock.lock();
155 try {
156 return count;
157 }
158 finally {
159 lock.unlock();
160 }
161 }
162
163 /** Return the remaining capacity of the queue, which is the
164 * number of elements that can be inserted before the queue is
165 * full. */
166 public int remainingCapacity() {
167 lock.lock();
168 try {
169 return items.length - count;
170 }
171 finally {
172 lock.unlock();
173 }
174 }
175
176
177 /** Insert a new element into the queue, blocking if the queue is full. */
178 public void put(E x) throws InterruptedException {
179 if (x == null) throw new NullPointerException();
180 lock.lockInterruptibly();
181 try {
182 try {
183 while (count == items.length)
184 notFull.await();
185 }
186 catch (InterruptedException ie) {
187 notFull.signal(); // propagate to non-interrupted thread
188 throw ie;
189 }
190 insert(x);
191 notEmpty.signal();
192 }
193 finally {
194 lock.unlock();
195 }
196 }
197
198 /** Remove and return the first element from the queue, blocking
199 * if the queue is empty.
200 */
201 public E take() throws InterruptedException {
202 lock.lockInterruptibly();
203 try {
204 try {
205 while (count == 0)
206 notEmpty.await();
207 }
208 catch (InterruptedException ie) {
209 notEmpty.signal(); // propagate to non-interrupted thread
210 throw ie;
211 }
212 E x = extract();
213 notFull.signal();
214 return x;
215 }
216 finally {
217 lock.unlock();
218 }
219 }
220
221 /** Attempt to insert a new element into the queue, but return
222 * immediately without inserting the element if the queue is full.
223 * @return <tt>true</tt> if the element was inserted successfully,
224 * <tt>false</tt> otherwise
225 */
226 public boolean offer(E x) {
227 if (x == null) throw new NullPointerException();
228 lock.lock();
229 try {
230 if (count == items.length)
231 return false;
232 else {
233 insert(x);
234 notEmpty.signal();
235 return true;
236 }
237 }
238 finally {
239 lock.unlock();
240 }
241 }
242
243 /** Attempt to retrieve the first insert element from the queue,
244 * but return immediately if the queue is empty.
245 * @return The first element of the queue if the queue is not
246 * empty, or <tt>null</tt> otherwise.
247 */
248 public E poll() {
249 lock.lock();
250 try {
251 if (count == 0)
252 return null;
253 E x = extract();
254 notFull.signal();
255 return x;
256 }
257 finally {
258 lock.unlock();
259 }
260 }
261
262 /** Attempt to insert a new element into the queue. If the queue
263 * is full, wait up to the specified amount of time before giving
264 * up.
265 * @param x the element to be inserted
266 * @param timeout how long to wait before giving up, in units of
267 * <tt>unit</tt>
268 * @param unit a TimeUnit determining how to interpret the timeout
269 * parameter
270 * @return <tt>true</tt> if the element was inserted successfully,
271 * <tt>false</tt> otherwise
272 * @throws InterruptedException if interrupted while waiting
273 */
274 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
275 if (x == null) throw new NullPointerException();
276 lock.lockInterruptibly();
277 long nanos = unit.toNanos(timeout);
278 try {
279 for (;;) {
280 if (count != items.length) {
281 insert(x);
282 notEmpty.signal();
283 return true;
284 }
285 if (nanos <= 0)
286 return false;
287 try {
288 nanos = notFull.awaitNanos(nanos);
289 }
290 catch (InterruptedException ie) {
291 notFull.signal(); // propagate to non-interrupted thread
292 throw ie;
293 }
294 }
295 }
296 finally {
297 lock.unlock();
298 }
299 }
300
301 /**
302 * Attempt to retrieve the first insert element from the queue.
303 * If the queue is empty, wait up to the specified amount of time
304 * before giving up.
305 * @param timeout how long to wait before giving up, in units of
306 * <tt>unit</tt>
307 * @param unit a TimeUnit determining how to interpret the timeout
308 * parameter
309 * @return The first element of the queue if an item was
310 * successfully retrieved, or <tt>null</tt> otherwise.
311 * @throws InterruptedException if interrupted while waiting
312 *
313 */
314 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
315 lock.lockInterruptibly();
316 long nanos = unit.toNanos(timeout);
317 try {
318 for (;;) {
319 if (count != 0) {
320 E x = extract();
321 notFull.signal();
322 return x;
323 }
324 if (nanos <= 0)
325 return null;
326 try {
327 nanos = notEmpty.awaitNanos(nanos);
328 }
329 catch (InterruptedException ie) {
330 notEmpty.signal(); // propagate to non-interrupted thread
331 throw ie;
332 }
333
334 }
335 }
336 finally {
337 lock.unlock();
338 }
339 }
340
341 /** Return, but do not remove, the first element from the queue,
342 * if the queue is not empty. This will return the same result as
343 * <tt>poll</tt>, but will not remove it from the queue.
344 * @return The first element of the queue if the queue is not
345 * empty, or <tt>null</tt> otherwise.
346 */
347 public E peek() {
348 lock.lock();
349 try {
350 return (count == 0) ? null : items[takeIndex];
351 }
352 finally {
353 lock.unlock();
354 }
355 }
356
357
358 public boolean remove(Object x) {
359 if (x == null) return false;
360 lock.lock();
361 try {
362 int i = takeIndex;
363 int k = 0;
364 for (;;) {
365 if (k++ >= count)
366 return false;
367 if (x.equals(items[i])) {
368 removeAt(i);
369 return true;
370 }
371 i = inc(i);
372 }
373
374 }
375 finally {
376 lock.unlock();
377 }
378 }
379
380 public boolean contains(Object x) {
381 if (x == null) return false;
382 lock.lock();
383 try {
384 int i = takeIndex;
385 int k = 0;
386 while (k++ < count) {
387 if (x.equals(items[i]))
388 return true;
389 i = inc(i);
390 }
391 return false;
392 }
393 finally {
394 lock.unlock();
395 }
396 }
397
398 public Object[] toArray() {
399 lock.lock();
400 try {
401 E[] a = new E[count];
402 int k = 0;
403 int i = takeIndex;
404 while (k < count) {
405 a[k++] = items[i];
406 i = inc(i);
407 }
408 return a;
409 }
410 finally {
411 lock.unlock();
412 }
413 }
414
415 public <T> T[] toArray(T[] a) {
416 lock.lock();
417 try {
418 if (a.length < count)
419 a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
420
421 int k = 0;
422 int i = takeIndex;
423 while (k < count) {
424 a[k++] = (T)items[i];
425 i = inc(i);
426 }
427 if (a.length > count)
428 a[count] = null;
429 return a;
430 }
431 finally {
432 lock.unlock();
433 }
434 }
435
436 public String toString() {
437 lock.lock();
438 try {
439 return super.toString();
440 }
441 finally {
442 lock.unlock();
443 }
444 }
445
446 /**
447 * Returns an iterator over the elements in this queue in proper sequence.
448 *
449 * @return an iterator over the elements in this queue in proper sequence.
450 */
451 public Iterator<E> iterator() {
452 lock.lock();
453 try {
454 return new Itr();
455 }
456 finally {
457 lock.unlock();
458 }
459 }
460
461 /**
462 * Iterator for ArrayBlockingQueue
463 */
464 private class Itr implements Iterator<E> {
465 /**
466 * Index of element to be returned by next,
467 * or a negative number if no such.
468 */
469 private int nextIndex;
470
471 /**
472 * nextItem holds on to item fields because once we claim
473 * that an element exists in hasNext(), we must return it in
474 * the following next() call even if it was in the process of
475 * being removed when hasNext() was called.
476 **/
477 private E nextItem;
478
479 /**
480 * Index of element returned by most recent call to next.
481 * Reset to -1 if this element is deleted by a call to remove.
482 */
483 private int lastRet;
484
485 Itr() {
486 lastRet = -1;
487 if (count == 0)
488 nextIndex = -1;
489 else {
490 nextIndex = takeIndex;
491 nextItem = items[takeIndex];
492 }
493 }
494
495 public boolean hasNext() {
496 /*
497 * No sync. We can return true by mistake here
498 * only if this iterator passed across threads,
499 * which we don't support anyway.
500 */
501 return nextIndex >= 0;
502 }
503
504 /**
505 * Check whether nextIndex is valied; if so setting nextItem.
506 * Stops iterator when either hits putIndex or sees null item.
507 */
508 private void checkNext() {
509 if (nextIndex == putIndex) {
510 nextIndex = -1;
511 nextItem = null;
512 }
513 else {
514 nextItem = items[nextIndex];
515 if (nextItem == null)
516 nextIndex = -1;
517 }
518 }
519
520 public E next() {
521 lock.lock();
522 try {
523 if (nextIndex < 0)
524 throw new NoSuchElementException();
525 lastRet = nextIndex;
526 E x = nextItem;
527 nextIndex = inc(nextIndex);
528 checkNext();
529 return x;
530 }
531 finally {
532 lock.unlock();
533 }
534 }
535
536 public void remove() {
537 lock.lock();
538 try {
539 int i = lastRet;
540 if (i == -1)
541 throw new IllegalStateException();
542 lastRet = -1;
543
544 nextIndex = i; // back up cursor
545 removeAt(i);
546 checkNext();
547 }
548 finally {
549 lock.unlock();
550 }
551 }
552 }
553
554 /**
555 * Save the state to a stream (that is, serialize it).
556 *
557 * @serialData The maximumSize is emitted (int), followed by all of
558 * its elements (each an <tt>E</tt>) in the proper order.
559 * @param s the stream
560 */
561 private void writeObject(java.io.ObjectOutputStream s)
562 throws java.io.IOException {
563
564 // Write out element count, and any hidden stuff
565 s.defaultWriteObject();
566 // Write out maximumSize == items length
567 s.writeInt(items.length);
568
569 // Write out all elements in the proper order.
570 int i = takeIndex;
571 int k = 0;
572 while (k++ < count) {
573 s.writeObject(items[i]);
574 i = inc(i);
575 }
576 }
577
578 /**
579 * Reconstitute the Queue instance from a stream (that is,
580 * deserialize it).
581 * @param s the stream
582 */
583 private void readObject(java.io.ObjectInputStream s)
584 throws java.io.IOException, ClassNotFoundException {
585 // Read in size, and any hidden stuff
586 s.defaultReadObject();
587 int size = count;
588
589 // Read in array length and allocate array
590 int arrayLength = s.readInt();
591
592 // We use deserializedItems here because "items" is final
593 deserializedItems = new E[arrayLength];
594
595 // Read in all elements in the proper order into deserializedItems
596 for (int i = 0; i < size; i++)
597 deserializedItems[i] = (E)s.readObject();
598 }
599
600 /**
601 * Throw away the object created with readObject, and replace it
602 * with a usable ArrayBlockingQueue.
603 * @return the ArrayBlockingQueue
604 */
605 private Object readResolve() throws java.io.ObjectStreamException {
606 E[] array = deserializedItems;
607 deserializedItems = null;
608 return new ArrayBlockingQueue(array.length, array, count);
609 }
610 }