ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.12
Committed: Mon Aug 4 16:14:48 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.11: +47 -47 lines
Log Message:
Make atomics emulation classes match the main atomics.
Fix docs for atomics (both in main and emulation).
Restored more specific iterator types in both blocking queue impls.
Fix unchecked cast warning in PQ.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@link BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time.
20 * Linked queues typically have higher throughput than array-based queues but
21 * less predictable performance in most concurrent applications.
22 *
23 * <p> The optional capacity bound constructor argument serves as a
24 * way to prevent excessive queue expansion. The capacity, if unspecified,
25 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 * dynamically created upon each insertion unless this would bring the
27 * queue above capacity.
28 *
29 * @since 1.5
30 * @author Doug Lea
31 *
32 **/
33 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 implements BlockingQueue<E>, java.io.Serializable {
35
36 /*
37 * A variant of the "two lock queue" algorithm. The putLock gates
38 * entry to put (and offer), and has an associated condition for
39 * waiting puts. Similarly for the takeLock. The "count" field
40 * that they both rely on is maintained as an atomic to avoid
41 * needing to get both locks in most cases. Also, to minimize need
42 * for puts to get takeLock and vice-versa, cascading notifies are
43 * used. When a put notices that it has enabled at least one take,
44 * it signals taker. That taker in turn signals others if more
45 * items have been entered since the signal. And symmetrically for
46 * takes signalling puts. Operations such as remove(Object) and
47 * iterators acquire both locks.
48 */
49
50 /**
51 * Linked list node class
52 */
53 static class Node<E> {
54 /** The item, volatile to ensure barrier separating write and read */
55 volatile E item;
56 Node<E> next;
57 Node(E x) { item = x; }
58 }
59
60 /** The capacity bound, or Integer.MAX_VALUE if none */
61 private final int capacity;
62
63 /** Current number of elements */
64 private transient final AtomicInteger count = new AtomicInteger(0);
65
66 /** Head of linked list */
67 private transient Node<E> head;
68
69 /** Tail of linked list */
70 private transient Node<E> last;
71
72 /** Lock held by take, poll, etc */
73 private final ReentrantLock takeLock = new ReentrantLock();
74
75 /** Wait queue for waiting takes */
76 private final Condition notEmpty = takeLock.newCondition();
77
78 /** Lock held by put, offer, etc */
79 private final ReentrantLock putLock = new ReentrantLock();
80
81 /** Wait queue for waiting puts */
82 private final Condition notFull = putLock.newCondition();
83
84 /**
85 * Signal a waiting take. Called only from put/offer (which do not
86 * otherwise ordinarily lock takeLock.)
87 */
88 private void signalNotEmpty() {
89 takeLock.lock();
90 try {
91 notEmpty.signal();
92 }
93 finally {
94 takeLock.unlock();
95 }
96 }
97
98 /**
99 * Signal a waiting put. Called only from take/poll.
100 */
101 private void signalNotFull() {
102 putLock.lock();
103 try {
104 notFull.signal();
105 }
106 finally {
107 putLock.unlock();
108 }
109 }
110
111 /**
112 * Create a node and link it at end of queue
113 * @param x the item
114 */
115 private void insert(E x) {
116 last = last.next = new Node<E>(x);
117 }
118
119 /**
120 * Remove a node from head of queue,
121 * @return the node
122 */
123 private E extract() {
124 Node<E> first = head.next;
125 head = first;
126 E x = (E)first.item;
127 first.item = null;
128 return x;
129 }
130
131 /**
132 * Lock to prevent both puts and takes.
133 */
134 private void fullyLock() {
135 putLock.lock();
136 takeLock.lock();
137 }
138
139 /**
140 * Unlock to allow both puts and takes.
141 */
142 private void fullyUnlock() {
143 takeLock.unlock();
144 putLock.unlock();
145 }
146
147
148 /**
149 * Create a <tt>LinkedBlockingQueue</tt> with a capacity of
150 * {@link Integer#MAX_VALUE}.
151 */
152 public LinkedBlockingQueue() {
153 this(Integer.MAX_VALUE);
154 }
155
156 /**
157 * Create a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity
158 * @param capacity the capacity of this queue.
159 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
160 * than zero.
161 */
162 public LinkedBlockingQueue(int capacity) {
163 if (capacity <= 0) throw new IllegalArgumentException();
164 this.capacity = capacity;
165 last = head = new Node<E>(null);
166 }
167
168 /**
169 * Create a <tt>LinkedBlockingQueue</tt> with a capacity of
170 * {@link Integer#MAX_VALUE}, initially holding the elements of the
171 * given collection,
172 * added in traversal order of the collection's iterator.
173 * @param c the collection of elements to initially contain
174 * @throws NullPointerException if <tt>c</tt> or any element within it
175 * is <tt>null</tt>
176 */
177 public LinkedBlockingQueue(Collection<? extends E> c) {
178 this(Integer.MAX_VALUE);
179 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
180 add(it.next());
181 }
182
183
184 // Have to override just to update the javadoc for @throws
185
186 /**
187 * @throws IllegalStateException {@inheritDoc}
188 * @throws NullPointerException {@inheritDoc}
189 */
190 public boolean add(E o) {
191 return super.add(o);
192 }
193
194 /**
195 * @throws IllegalStateException {@inheritDoc}
196 * @throws NullPointerException {@inheritDoc}
197 */
198 public boolean addAll(Collection<? extends E> c) {
199 return super.addAll(c);
200 }
201
202
203 // this doc comment is overridden to remove the reference to collections
204 // greater in size than Integer.MAX_VALUE
205 /**
206 * Return the number of elements in this collection.
207 */
208 public int size() {
209 return count.get();
210 }
211
212 // this doc comment is a modified copy of the inherited doc comment,
213 // without the reference to unlimited queues.
214 /**
215 * Return the number of elements that this queue can ideally (in
216 * the absence of memory or resource constraints) accept without
217 * blocking. This is always equal to the initial capacity of this queue
218 * less the current <tt>size</tt> of this queue.
219 * <p>Note that you <em>cannot</em> always tell if
220 * an attempt to <tt>add</tt> an element will succeed by
221 * inspecting <tt>remainingCapacity</tt> because it may be the
222 * case that a waiting consumer is ready to <tt>take</tt> an
223 * element out of an otherwise full queue.
224 */
225 public int remainingCapacity() {
226 return capacity - count.get();
227 }
228
229 /**
230 * Add the specified element to the tail of this queue, waiting if
231 * necessary for space to become available.
232 * @throws NullPointerException {@inheritDoc}
233 */
234 public void put(E x) throws InterruptedException {
235 if (x == null) throw new NullPointerException();
236 // Note: convention in all put/take/etc is to preset
237 // local var holding count negative to indicate failure unless set.
238 int c = -1;
239 putLock.lockInterruptibly();
240 try {
241 /*
242 * Note that count is used in wait guard even though it is
243 * not protected by lock. This works because count can
244 * only decrease at this point (all other puts are shut
245 * out by lock), and we (or some other waiting put) are
246 * signalled if it ever changes from
247 * capacity. Similarly for all other uses of count in
248 * other wait guards.
249 */
250 try {
251 while (count.get() == capacity)
252 notFull.await();
253 }
254 catch (InterruptedException ie) {
255 notFull.signal(); // propagate to a non-interrupted thread
256 throw ie;
257 }
258 insert(x);
259 c = count.getAndIncrement();
260 if (c + 1 < capacity)
261 notFull.signal();
262 }
263 finally {
264 putLock.unlock();
265 }
266 if (c == 0)
267 signalNotEmpty();
268 }
269
270 /**
271 * Add the specified element to the tail of this queue, waiting if
272 * necessary up to the specified wait time for space to become available.
273 * @throws NullPointerException {@inheritDoc}
274 */
275 public boolean offer(E x, long timeout, TimeUnit unit)
276 throws InterruptedException {
277
278 if (x == null) throw new NullPointerException();
279 long nanos = unit.toNanos(timeout);
280 int c = -1;
281 putLock.lockInterruptibly();
282 try {
283 for (;;) {
284 if (count.get() < capacity) {
285 insert(x);
286 c = count.getAndIncrement();
287 if (c + 1 < capacity)
288 notFull.signal();
289 break;
290 }
291 if (nanos <= 0)
292 return false;
293 try {
294 nanos = notFull.awaitNanos(nanos);
295 }
296 catch (InterruptedException ie) {
297 notFull.signal(); // propagate to a non-interrupted thread
298 throw ie;
299 }
300 }
301 }
302 finally {
303 putLock.unlock();
304 }
305 if (c == 0)
306 signalNotEmpty();
307 return true;
308 }
309
310 /**
311 * Add the specified element to the tail of this queue if possible,
312 * returning immediately if this queue is full.
313 *
314 * @throws NullPointerException {@inheritDoc}
315 */
316 public boolean offer(E x) {
317 if (x == null) throw new NullPointerException();
318 if (count.get() == capacity)
319 return false;
320 int c = -1;
321 putLock.lock();
322 try {
323 if (count.get() < capacity) {
324 insert(x);
325 c = count.getAndIncrement();
326 if (c + 1 < capacity)
327 notFull.signal();
328 }
329 }
330 finally {
331 putLock.unlock();
332 }
333 if (c == 0)
334 signalNotEmpty();
335 return c >= 0;
336 }
337
338
339 public E take() throws InterruptedException {
340 E x;
341 int c = -1;
342 takeLock.lockInterruptibly();
343 try {
344 try {
345 while (count.get() == 0)
346 notEmpty.await();
347 }
348 catch (InterruptedException ie) {
349 notEmpty.signal(); // propagate to a non-interrupted thread
350 throw ie;
351 }
352
353 x = extract();
354 c = count.getAndDecrement();
355 if (c > 1)
356 notEmpty.signal();
357 }
358 finally {
359 takeLock.unlock();
360 }
361 if (c == capacity)
362 signalNotFull();
363 return x;
364 }
365
366 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
367 E x = null;
368 int c = -1;
369 long nanos = unit.toNanos(timeout);
370 takeLock.lockInterruptibly();
371 try {
372 for (;;) {
373 if (count.get() > 0) {
374 x = extract();
375 c = count.getAndDecrement();
376 if (c > 1)
377 notEmpty.signal();
378 break;
379 }
380 if (nanos <= 0)
381 return null;
382 try {
383 nanos = notEmpty.awaitNanos(nanos);
384 }
385 catch (InterruptedException ie) {
386 notEmpty.signal(); // propagate to a non-interrupted thread
387 throw ie;
388 }
389 }
390 }
391 finally {
392 takeLock.unlock();
393 }
394 if (c == capacity)
395 signalNotFull();
396 return x;
397 }
398
399 public E poll() {
400 if (count.get() == 0)
401 return null;
402 E x = null;
403 int c = -1;
404 takeLock.tryLock();
405 try {
406 if (count.get() > 0) {
407 x = extract();
408 c = count.getAndDecrement();
409 if (c > 1)
410 notEmpty.signal();
411 }
412 }
413 finally {
414 takeLock.unlock();
415 }
416 if (c == capacity)
417 signalNotFull();
418 return x;
419 }
420
421
422 public E peek() {
423 if (count.get() == 0)
424 return null;
425 takeLock.lock();
426 try {
427 Node<E> first = head.next;
428 if (first == null)
429 return null;
430 else
431 return first.item;
432 }
433 finally {
434 takeLock.unlock();
435 }
436 }
437
438 public boolean remove(Object o) {
439 if (o == null) return false;
440 boolean removed = false;
441 fullyLock();
442 try {
443 Node<E> trail = head;
444 Node<E> p = head.next;
445 while (p != null) {
446 if (o.equals(p.item)) {
447 removed = true;
448 break;
449 }
450 trail = p;
451 p = p.next;
452 }
453 if (removed) {
454 p.item = null;
455 trail.next = p.next;
456 if (count.getAndDecrement() == capacity)
457 notFull.signalAll();
458 }
459 }
460 finally {
461 fullyUnlock();
462 }
463 return removed;
464 }
465
466 public Object[] toArray() {
467 fullyLock();
468 try {
469 int size = count.get();
470 Object[] a = new Object[size];
471 int k = 0;
472 for (Node<E> p = head.next; p != null; p = p.next)
473 a[k++] = p.item;
474 return a;
475 }
476 finally {
477 fullyUnlock();
478 }
479 }
480
481 public <T> T[] toArray(T[] a) {
482 fullyLock();
483 try {
484 int size = count.get();
485 if (a.length < size)
486 a = (T[])java.lang.reflect.Array.newInstance
487 (a.getClass().getComponentType(), size);
488
489 int k = 0;
490 for (Node p = head.next; p != null; p = p.next)
491 a[k++] = (T)p.item;
492 return a;
493 }
494 finally {
495 fullyUnlock();
496 }
497 }
498
499 public String toString() {
500 fullyLock();
501 try {
502 return super.toString();
503 }
504 finally {
505 fullyUnlock();
506 }
507 }
508
509 public Iterator<E> iterator() {
510 return new Itr();
511 }
512
513 private class Itr implements Iterator<E> {
514 /*
515 * Basic weak-consistent iterator. At all times hold the next
516 * item to hand out so that if hasNext() reports true, we will
517 * still have it to return even if lost race with a take etc.
518 */
519 Node<E> current;
520 Node<E> lastRet;
521 E currentElement;
522
523 Itr() {
524 fullyLock();
525 try {
526 current = head.next;
527 if (current != null)
528 currentElement = current.item;
529 }
530 finally {
531 fullyUnlock();
532 }
533 }
534
535 public boolean hasNext() {
536 return current != null;
537 }
538
539 public E next() {
540 fullyLock();
541 try {
542 if (current == null)
543 throw new NoSuchElementException();
544 E x = currentElement;
545 lastRet = current;
546 current = current.next;
547 if (current != null)
548 currentElement = current.item;
549 return x;
550 }
551 finally {
552 fullyUnlock();
553 }
554
555 }
556
557 public void remove() {
558 if (lastRet == null)
559 throw new IllegalStateException();
560 fullyLock();
561 try {
562 Node<E> node = lastRet;
563 lastRet = null;
564 Node<E> trail = head;
565 Node<E> p = head.next;
566 while (p != null && p != node) {
567 trail = p;
568 p = p.next;
569 }
570 if (p == node) {
571 p.item = null;
572 trail.next = p.next;
573 int c = count.getAndDecrement();
574 if (c == capacity)
575 notFull.signalAll();
576 }
577 }
578 finally {
579 fullyUnlock();
580 }
581 }
582 }
583
584 /**
585 * Save the state to a stream (that is, serialize it).
586 *
587 * @serialData The capacity is emitted (int), followed by all of
588 * its elements (each an <tt>Object</tt>) in the proper order,
589 * followed by a null
590 * @param s the stream
591 */
592 private void writeObject(java.io.ObjectOutputStream s)
593 throws java.io.IOException {
594
595 fullyLock();
596 try {
597 // Write out any hidden stuff, plus capacity
598 s.defaultWriteObject();
599
600 // Write out all elements in the proper order.
601 for (Node<E> p = head.next; p != null; p = p.next)
602 s.writeObject(p.item);
603
604 // Use trailing null as sentinel
605 s.writeObject(null);
606 }
607 finally {
608 fullyUnlock();
609 }
610 }
611
612 /**
613 * Reconstitute this queue instance from a stream (that is,
614 * deserialize it).
615 * @param s the stream
616 */
617 private void readObject(java.io.ObjectInputStream s)
618 throws java.io.IOException, ClassNotFoundException {
619 // Read in capacity, and any hidden stuff
620 s.defaultReadObject();
621
622 // Read in all elements and place in queue
623 for (;;) {
624 E item = (E)s.readObject();
625 if (item == null)
626 break;
627 add(item);
628 }
629 }
630 }
631
632
633
634
635