ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ArrayBlockingQueue.java
Revision: 1.6
Committed: Sun Jun 22 23:51:37 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.5: +10 -0 lines
Log Message:
Added synched toString()

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11 * A bounded blocking queue based on an array. The implementation is
12 * a classic "bounded buffer", in which a fixed-sized array holds
13 * elements inserted by propducers and extracted by
14 * consumers. Array-based queues typically have more predictable
15 * performance than linked queues but lower throughput in most
16 * concurrent applications.
17 **/
18 public class ArrayBlockingQueue<E> extends AbstractQueue<E>
19 implements BlockingQueue<E>, java.io.Serializable {
20
21 private transient final E[] items;
22 private transient int takeIndex;
23 private transient int putIndex;
24 private int count;
25
26 /**
27 * An array used only during deserialization, to hold
28 * items read back in from the stream, and then used
29 * as "items" by readResolve via the private constructor.
30 */
31 private transient E[] deserializedItems;
32
33 /*
34 * Concurrency control via the classic two-condition algorithm
35 * found in any textbook.
36 */
37
38 private final FairReentrantLock lock = new FairReentrantLock();
39 private final Condition notEmpty = lock.newCondition();
40 private final Condition notFull = lock.newCondition();
41
42 // Internal helper methods
43
44 /**
45 * Circularly increment i.
46 */
47 int inc(int i) {
48 return (++i == items.length)? 0 : i;
49 }
50
51 /**
52 * Insert element at current put position and advance.
53 */
54 private void insert(E x) {
55 items[putIndex] = x;
56 putIndex = inc(putIndex);
57 ++count;
58 }
59
60 /**
61 * Extract element at current take position and advance.
62 */
63 private E extract() {
64 E x = items[takeIndex];
65 items[takeIndex] = null;
66 takeIndex = inc(takeIndex);
67 --count;
68 return x;
69 }
70
71 /**
72 * Utility for remove and iterator.remove: Delete item at position
73 * i by sliding over all others up through putIndex.
74 */
75 void removeAt(int i) {
76 for (;;) {
77 int nexti = inc(i);
78 items[i] = items[nexti];
79 if (nexti != putIndex)
80 i = nexti;
81 else {
82 items[nexti] = null;
83 putIndex = i;
84 --count;
85 return;
86 }
87 }
88 }
89
90 /**
91 * Internal constructor also used by readResolve.
92 * Sets all final fields, plus count.
93 * @param cap the maximumSize
94 * @param array the array to use or null if should create new one
95 * @param count the number of items in the array, where indices 0
96 * to count-1 hold items.
97 */
98 private ArrayBlockingQueue(int cap, E[] array, int count) {
99 if (cap <= 0)
100 throw new IllegalArgumentException();
101 if (array == null)
102 this.items = new E[cap];
103 else
104 this.items = array;
105 this.putIndex = count;
106 this.count = count;
107 }
108
109 /**
110 * Creates a new ArrayBlockingQueue with the given (fixed) capacity.
111 * @param maximumSize the capacity
112 */
113 public ArrayBlockingQueue(int maximumSize) {
114 this(maximumSize, null, 0);
115 }
116
117 /**
118 * Creates a new ArrayBlockingQueue with the given (fixed)
119 * capacity, and initially contianing the given elements, added in
120 * iterator traversal order.
121 * @param maximumSize the capacity
122 * @param initialElements the items to hold initially.
123 * @throws IllegalArgumentException if the given capacity is
124 * less than the number of initialElements.
125 */
126 public ArrayBlockingQueue(int maximumSize, Collection<E> initialElements) {
127 this(maximumSize, null, 0);
128 int n = 0;
129 for (Iterator<E> it = initialElements.iterator(); it.hasNext();) {
130 if (++n >= items.length)
131 throw new IllegalArgumentException();
132 items[n] = it.next();
133 }
134 putIndex = count = n;
135 }
136
137 public int size() {
138 lock.lock();
139 try {
140 return count;
141 }
142 finally {
143 lock.unlock();
144 }
145 }
146
147 public int remainingCapacity() {
148 lock.lock();
149 try {
150 return items.length - count;
151 }
152 finally {
153 lock.unlock();
154 }
155 }
156
157
158 public void put(E x) throws InterruptedException {
159 if (x == null) throw new IllegalArgumentException();
160 lock.lockInterruptibly();
161 try {
162 try {
163 while (count == items.length)
164 notFull.await();
165 }
166 catch (InterruptedException ie) {
167 notFull.signal(); // propagate to non-interrupted thread
168 throw ie;
169 }
170 insert(x);
171 notEmpty.signal();
172 }
173 finally {
174 lock.unlock();
175 }
176 }
177
178 public E take() throws InterruptedException {
179 lock.lockInterruptibly();
180 try {
181 try {
182 while (count == 0)
183 notEmpty.await();
184 }
185 catch (InterruptedException ie) {
186 notEmpty.signal(); // propagate to non-interrupted thread
187 throw ie;
188 }
189 E x = extract();
190 notFull.signal();
191 return x;
192 }
193 finally {
194 lock.unlock();
195 }
196 }
197
198 public boolean offer(E x) {
199 if (x == null) throw new IllegalArgumentException();
200 lock.lock();
201 try {
202 if (count == items.length)
203 return false;
204 else {
205 insert(x);
206 notEmpty.signal();
207 return true;
208 }
209 }
210 finally {
211 lock.unlock();
212 }
213 }
214
215 public E poll() {
216 lock.lock();
217 try {
218 if (count == 0)
219 return null;
220 E x = extract();
221 notFull.signal();
222 return x;
223 }
224 finally {
225 lock.unlock();
226 }
227 }
228
229 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
230 if (x == null) throw new IllegalArgumentException();
231 lock.lockInterruptibly();
232 long nanos = unit.toNanos(timeout);
233 try {
234 for (;;) {
235 if (count != items.length) {
236 insert(x);
237 notEmpty.signal();
238 return true;
239 }
240 if (nanos <= 0)
241 return false;
242 try {
243 nanos = notFull.awaitNanos(nanos);
244 }
245 catch (InterruptedException ie) {
246 notFull.signal(); // propagate to non-interrupted thread
247 throw ie;
248 }
249 }
250 }
251 finally {
252 lock.unlock();
253 }
254 }
255
256 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
257 lock.lockInterruptibly();
258 long nanos = unit.toNanos(timeout);
259 try {
260 for (;;) {
261 if (count != 0) {
262 E x = extract();
263 notFull.signal();
264 return x;
265 }
266 if (nanos <= 0)
267 return null;
268 try {
269 nanos = notEmpty.awaitNanos(nanos);
270 }
271 catch (InterruptedException ie) {
272 notEmpty.signal(); // propagate to non-interrupted thread
273 throw ie;
274 }
275
276 }
277 }
278 finally {
279 lock.unlock();
280 }
281 }
282
283
284 public E peek() {
285 lock.lock();
286 try {
287 return (count == 0)? null : items[takeIndex];
288 }
289 finally {
290 lock.unlock();
291 }
292 }
293
294
295 public boolean remove(Object x) {
296 lock.lock();
297 try {
298 int i = takeIndex;
299 int k = 0;
300 for (;;) {
301 if (k++ >= count)
302 return false;
303 if (x.equals(items[i])) {
304 removeAt(i);
305 return true;
306 }
307 i = inc(i);
308 }
309
310 }
311 finally {
312 lock.unlock();
313 }
314 }
315
316
317 public boolean contains(Object x) {
318 lock.lock();
319 try {
320 int i = takeIndex;
321 int k = 0;
322 while (k++ < count) {
323 if (x.equals(items[i]))
324 return true;
325 i = inc(i);
326 }
327 return false;
328 }
329 finally {
330 lock.unlock();
331 }
332 }
333
334 public Object[] toArray() {
335 lock.lock();
336 try {
337 E[] a = new E[count];
338 int k = 0;
339 int i = takeIndex;
340 while (k < count) {
341 a[k++] = items[i];
342 i = inc(i);
343 }
344 return a;
345 }
346 finally {
347 lock.unlock();
348 }
349 }
350
351 public <T> T[] toArray(T[] a) {
352 lock.lock();
353 try {
354 if (a.length < count)
355 a = (T[])java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), count);
356
357 int k = 0;
358 int i = takeIndex;
359 while (k < count) {
360 a[k++] = (T)items[i];
361 i = inc(i);
362 }
363 if (a.length > count)
364 a[count] = null;
365 return a;
366 }
367 finally {
368 lock.unlock();
369 }
370 }
371
372 public String toString() {
373 lock.lock();
374 try {
375 return super.toString();
376 }
377 finally {
378 lock.unlock();
379 }
380 }
381
382 public Iterator<E> iterator() {
383 lock.lock();
384 try {
385 return new Itr();
386 }
387 finally {
388 lock.unlock();
389 }
390 }
391
392 private class Itr implements Iterator<E> {
393 /**
394 * Index of element to be returned by next,
395 * or a negative number if no such.
396 */
397 int nextIndex;
398
399 /**
400 * nextItem holds on to item fields because once we claim
401 * that an element exists in hasNext(), we must return it in
402 * the following next() call even if it was in the process of
403 * being removed when hasNext() was called.
404 **/
405 E nextItem;
406
407 /**
408 * Index of element returned by most recent call to next.
409 * Reset to -1 if this element is deleted by a call to remove.
410 */
411 int lastRet;
412
413 Itr() {
414 lastRet = -1;
415 if (count == 0)
416 nextIndex = -1;
417 else {
418 nextIndex = takeIndex;
419 nextItem = items[takeIndex];
420 }
421 }
422
423 public boolean hasNext() {
424 /*
425 * No sync. We can return true by mistake here
426 * only if this iterator passed across threads,
427 * which we don't support anyway.
428 */
429 return nextIndex >= 0;
430 }
431
432 /**
433 * Check whether nextIndex is valied; if so setting nextItem.
434 * Stops iterator when either hits putIndex or sees null item.
435 */
436 private void checkNext() {
437 if (nextIndex == putIndex) {
438 nextIndex = -1;
439 nextItem = null;
440 }
441 else {
442 nextItem = items[nextIndex];
443 if (nextItem == null)
444 nextIndex = -1;
445 }
446 }
447
448 public E next() {
449 lock.lock();
450 try {
451 if (nextIndex < 0)
452 throw new NoSuchElementException();
453 lastRet = nextIndex;
454 E x = nextItem;
455 nextIndex = inc(nextIndex);
456 checkNext();
457 return x;
458 }
459 finally {
460 lock.unlock();
461 }
462 }
463
464 public void remove() {
465 lock.lock();
466 try {
467 int i = lastRet;
468 if (i == -1)
469 throw new IllegalStateException();
470 lastRet = -1;
471
472 nextIndex = i; // back up cursor
473 removeAt(i);
474 checkNext();
475 }
476 finally {
477 lock.unlock();
478 }
479 }
480 }
481
482 /**
483 * Save the state to a stream (that is, serialize it).
484 *
485 * @serialData The maximumSize is emitted (int), followed by all of
486 * its elements (each an <tt>E</tt>) in the proper order.
487 */
488 private void writeObject(java.io.ObjectOutputStream s)
489 throws java.io.IOException {
490
491 // Write out element count, and any hidden stuff
492 s.defaultWriteObject();
493 // Write out maximumSize == items length
494 s.writeInt(items.length);
495
496 // Write out all elements in the proper order.
497 int i = takeIndex;
498 int k = 0;
499 while (k++ < count) {
500 s.writeObject(items[i]);
501 i = inc(i);
502 }
503 }
504
505 /**
506 * Reconstitute the Queue instance from a stream (that is,
507 * deserialize it).
508 */
509 private void readObject(java.io.ObjectInputStream s)
510 throws java.io.IOException, ClassNotFoundException {
511 // Read in size, and any hidden stuff
512 s.defaultReadObject();
513 int size = count;
514
515 // Read in array length and allocate array
516 int arrayLength = s.readInt();
517
518 // We use deserializedItems here because "items" is final
519 deserializedItems = new E[arrayLength];
520
521 // Read in all elements in the proper order into deserializedItems
522 for (int i = 0; i < size; i++)
523 deserializedItems[i] = (E)s.readObject();
524 }
525
526 /**
527 * Throw away the object created with readObject, and replace it
528 * with a usable ArrayBlockingQueue.
529 */
530 private Object readResolve() throws java.io.ObjectStreamException {
531 E[] array = deserializedItems;
532 deserializedItems = null;
533 return new ArrayBlockingQueue(array.length, array, count);
534 }
535 }