ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.35
Committed: Thu May 27 11:06:11 2004 UTC (20 years ago) by dl
Branch: MAIN
Changes since 1.34: +8 -0 lines
Log Message:
Override javadoc specs when overriding AbstractQueue implementations
Clarify atomicity in BlockingQueue

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time. New elements
20 * are inserted at the tail of the queue, and the queue retrieval
21 * operations obtain elements at the head of the queue.
22 * Linked queues typically have higher throughput than array-based queues but
23 * less predictable performance in most concurrent applications.
24 *
25 * <p> The optional capacity bound constructor argument serves as a
26 * way to prevent excessive queue expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * queue above capacity.
30 *
31 * <p>This class implements all of the <em>optional</em> methods
32 * of the {@link Collection} and {@link Iterator} interfaces.
33 *
34 * <p>This class is a member of the
35 * <a href="{@docRoot}/../guide/collections/index.html">
36 * Java Collections Framework</a>.
37 *
38 * @since 1.5
39 * @author Doug Lea
40 * @param <E> the type of elements held in this collection
41 *
42 **/
43 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
44 implements BlockingQueue<E>, java.io.Serializable {
45 private static final long serialVersionUID = -6903933977591709194L;
46
47 /*
48 * A variant of the "two lock queue" algorithm. The putLock gates
49 * entry to put (and offer), and has an associated condition for
50 * waiting puts. Similarly for the takeLock. The "count" field
51 * that they both rely on is maintained as an atomic to avoid
52 * needing to get both locks in most cases. Also, to minimize need
53 * for puts to get takeLock and vice-versa, cascading notifies are
54 * used. When a put notices that it has enabled at least one take,
55 * it signals taker. That taker in turn signals others if more
56 * items have been entered since the signal. And symmetrically for
57 * takes signalling puts. Operations such as remove(Object) and
58 * iterators acquire both locks.
59 */
60
61 /**
62 * Linked list node class
63 */
64 static class Node<E> {
65 /** The item, volatile to ensure barrier separating write and read */
66 volatile E item;
67 Node<E> next;
68 Node(E x) { item = x; }
69 }
70
71 /** The capacity bound, or Integer.MAX_VALUE if none */
72 private final int capacity;
73
74 /** Current number of elements */
75 private final AtomicInteger count = new AtomicInteger(0);
76
77 /** Head of linked list */
78 private transient Node<E> head;
79
80 /** Tail of linked list */
81 private transient Node<E> last;
82
83 /** Lock held by take, poll, etc */
84 private final ReentrantLock takeLock = new ReentrantLock();
85
86 /** Wait queue for waiting takes */
87 private final Condition notEmpty = takeLock.newCondition();
88
89 /** Lock held by put, offer, etc */
90 private final ReentrantLock putLock = new ReentrantLock();
91
92 /** Wait queue for waiting puts */
93 private final Condition notFull = putLock.newCondition();
94
95 /**
96 * Signal a waiting take. Called only from put/offer (which do not
97 * otherwise ordinarily lock takeLock.)
98 */
99 private void signalNotEmpty() {
100 final ReentrantLock takeLock = this.takeLock;
101 takeLock.lock();
102 try {
103 notEmpty.signal();
104 } finally {
105 takeLock.unlock();
106 }
107 }
108
109 /**
110 * Signal a waiting put. Called only from take/poll.
111 */
112 private void signalNotFull() {
113 final ReentrantLock putLock = this.putLock;
114 putLock.lock();
115 try {
116 notFull.signal();
117 } finally {
118 putLock.unlock();
119 }
120 }
121
122 /**
123 * Create a node and link it at end of queue
124 * @param x the item
125 */
126 private void insert(E x) {
127 last = last.next = new Node<E>(x);
128 }
129
130 /**
131 * Remove a node from head of queue,
132 * @return the node
133 */
134 private E extract() {
135 Node<E> first = head.next;
136 head = first;
137 E x = first.item;
138 first.item = null;
139 return x;
140 }
141
142 /**
143 * Lock to prevent both puts and takes.
144 */
145 private void fullyLock() {
146 putLock.lock();
147 takeLock.lock();
148 }
149
150 /**
151 * Unlock to allow both puts and takes.
152 */
153 private void fullyUnlock() {
154 takeLock.unlock();
155 putLock.unlock();
156 }
157
158
159 /**
160 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
161 * {@link Integer#MAX_VALUE}.
162 */
163 public LinkedBlockingQueue() {
164 this(Integer.MAX_VALUE);
165 }
166
167 /**
168 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
169 *
170 * @param capacity the capacity of this queue.
171 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
172 * than zero.
173 */
174 public LinkedBlockingQueue(int capacity) {
175 if (capacity <= 0) throw new IllegalArgumentException();
176 this.capacity = capacity;
177 last = head = new Node<E>(null);
178 }
179
180 /**
181 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
182 * {@link Integer#MAX_VALUE}, initially containing the elements of the
183 * given collection,
184 * added in traversal order of the collection's iterator.
185 * @param c the collection of elements to initially contain
186 * @throws NullPointerException if <tt>c</tt> or any element within it
187 * is <tt>null</tt>
188 */
189 public LinkedBlockingQueue(Collection<? extends E> c) {
190 this(Integer.MAX_VALUE);
191 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
192 add(it.next());
193 }
194
195
196 // this doc comment is overridden to remove the reference to collections
197 // greater in size than Integer.MAX_VALUE
198 /**
199 * Returns the number of elements in this queue.
200 *
201 * @return the number of elements in this queue.
202 */
203 public int size() {
204 return count.get();
205 }
206
207 // this doc comment is a modified copy of the inherited doc comment,
208 // without the reference to unlimited queues.
209 /**
210 * Returns the number of elements that this queue can ideally (in
211 * the absence of memory or resource constraints) accept without
212 * blocking. This is always equal to the initial capacity of this queue
213 * less the current <tt>size</tt> of this queue.
214 * <p>Note that you <em>cannot</em> always tell if
215 * an attempt to <tt>add</tt> an element will succeed by
216 * inspecting <tt>remainingCapacity</tt> because it may be the
217 * case that a waiting consumer is ready to <tt>take</tt> an
218 * element out of an otherwise full queue.
219 */
220 public int remainingCapacity() {
221 return capacity - count.get();
222 }
223
224 /**
225 * Adds the specified element to the tail of this queue, waiting if
226 * necessary for space to become available.
227 * @param o the element to add
228 * @throws InterruptedException if interrupted while waiting.
229 * @throws NullPointerException if the specified element is <tt>null</tt>.
230 */
231 public void put(E o) throws InterruptedException {
232 if (o == null) throw new NullPointerException();
233 // Note: convention in all put/take/etc is to preset
234 // local var holding count negative to indicate failure unless set.
235 int c = -1;
236 final ReentrantLock putLock = this.putLock;
237 final AtomicInteger count = this.count;
238 putLock.lockInterruptibly();
239 try {
240 /*
241 * Note that count is used in wait guard even though it is
242 * not protected by lock. This works because count can
243 * only decrease at this point (all other puts are shut
244 * out by lock), and we (or some other waiting put) are
245 * signalled if it ever changes from
246 * capacity. Similarly for all other uses of count in
247 * other wait guards.
248 */
249 try {
250 while (count.get() == capacity)
251 notFull.await();
252 } catch (InterruptedException ie) {
253 notFull.signal(); // propagate to a non-interrupted thread
254 throw ie;
255 }
256 insert(o);
257 c = count.getAndIncrement();
258 if (c + 1 < capacity)
259 notFull.signal();
260 } finally {
261 putLock.unlock();
262 }
263 if (c == 0)
264 signalNotEmpty();
265 }
266
267 /**
268 * Inserts the specified element at the tail of this queue, waiting if
269 * necessary up to the specified wait time for space to become available.
270 * @param o the element to add
271 * @param timeout how long to wait before giving up, in units of
272 * <tt>unit</tt>
273 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
274 * <tt>timeout</tt> parameter
275 * @return <tt>true</tt> if successful, or <tt>false</tt> if
276 * the specified waiting time elapses before space is available.
277 * @throws InterruptedException if interrupted while waiting.
278 * @throws NullPointerException if the specified element is <tt>null</tt>.
279 */
280 public boolean offer(E o, long timeout, TimeUnit unit)
281 throws InterruptedException {
282
283 if (o == null) throw new NullPointerException();
284 long nanos = unit.toNanos(timeout);
285 int c = -1;
286 final ReentrantLock putLock = this.putLock;
287 final AtomicInteger count = this.count;
288 putLock.lockInterruptibly();
289 try {
290 for (;;) {
291 if (count.get() < capacity) {
292 insert(o);
293 c = count.getAndIncrement();
294 if (c + 1 < capacity)
295 notFull.signal();
296 break;
297 }
298 if (nanos <= 0)
299 return false;
300 try {
301 nanos = notFull.awaitNanos(nanos);
302 } catch (InterruptedException ie) {
303 notFull.signal(); // propagate to a non-interrupted thread
304 throw ie;
305 }
306 }
307 } finally {
308 putLock.unlock();
309 }
310 if (c == 0)
311 signalNotEmpty();
312 return true;
313 }
314
315 /**
316 * Inserts the specified element at the tail of this queue if possible,
317 * returning immediately if this queue is full.
318 *
319 * @param o the element to add.
320 * @return <tt>true</tt> if it was possible to add the element to
321 * this queue, else <tt>false</tt>
322 * @throws NullPointerException if the specified element is <tt>null</tt>
323 */
324 public boolean offer(E o) {
325 if (o == null) throw new NullPointerException();
326 final AtomicInteger count = this.count;
327 if (count.get() == capacity)
328 return false;
329 int c = -1;
330 final ReentrantLock putLock = this.putLock;
331 putLock.lock();
332 try {
333 if (count.get() < capacity) {
334 insert(o);
335 c = count.getAndIncrement();
336 if (c + 1 < capacity)
337 notFull.signal();
338 }
339 } finally {
340 putLock.unlock();
341 }
342 if (c == 0)
343 signalNotEmpty();
344 return c >= 0;
345 }
346
347
348 public E take() throws InterruptedException {
349 E x;
350 int c = -1;
351 final AtomicInteger count = this.count;
352 final ReentrantLock takeLock = this.takeLock;
353 takeLock.lockInterruptibly();
354 try {
355 try {
356 while (count.get() == 0)
357 notEmpty.await();
358 } catch (InterruptedException ie) {
359 notEmpty.signal(); // propagate to a non-interrupted thread
360 throw ie;
361 }
362
363 x = extract();
364 c = count.getAndDecrement();
365 if (c > 1)
366 notEmpty.signal();
367 } finally {
368 takeLock.unlock();
369 }
370 if (c == capacity)
371 signalNotFull();
372 return x;
373 }
374
375 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
376 E x = null;
377 int c = -1;
378 long nanos = unit.toNanos(timeout);
379 final AtomicInteger count = this.count;
380 final ReentrantLock takeLock = this.takeLock;
381 takeLock.lockInterruptibly();
382 try {
383 for (;;) {
384 if (count.get() > 0) {
385 x = extract();
386 c = count.getAndDecrement();
387 if (c > 1)
388 notEmpty.signal();
389 break;
390 }
391 if (nanos <= 0)
392 return null;
393 try {
394 nanos = notEmpty.awaitNanos(nanos);
395 } catch (InterruptedException ie) {
396 notEmpty.signal(); // propagate to a non-interrupted thread
397 throw ie;
398 }
399 }
400 } finally {
401 takeLock.unlock();
402 }
403 if (c == capacity)
404 signalNotFull();
405 return x;
406 }
407
408 public E poll() {
409 final AtomicInteger count = this.count;
410 if (count.get() == 0)
411 return null;
412 E x = null;
413 int c = -1;
414 final ReentrantLock takeLock = this.takeLock;
415 takeLock.lock();
416 try {
417 if (count.get() > 0) {
418 x = extract();
419 c = count.getAndDecrement();
420 if (c > 1)
421 notEmpty.signal();
422 }
423 } finally {
424 takeLock.unlock();
425 }
426 if (c == capacity)
427 signalNotFull();
428 return x;
429 }
430
431
432 public E peek() {
433 if (count.get() == 0)
434 return null;
435 final ReentrantLock takeLock = this.takeLock;
436 takeLock.lock();
437 try {
438 Node<E> first = head.next;
439 if (first == null)
440 return null;
441 else
442 return first.item;
443 } finally {
444 takeLock.unlock();
445 }
446 }
447
448 /**
449 * Removes a single instance of the specified element from this
450 * collection, if it is present.
451 */
452 public boolean remove(Object o) {
453 if (o == null) return false;
454 boolean removed = false;
455 fullyLock();
456 try {
457 Node<E> trail = head;
458 Node<E> p = head.next;
459 while (p != null) {
460 if (o.equals(p.item)) {
461 removed = true;
462 break;
463 }
464 trail = p;
465 p = p.next;
466 }
467 if (removed) {
468 p.item = null;
469 trail.next = p.next;
470 if (count.getAndDecrement() == capacity)
471 notFull.signalAll();
472 }
473 } finally {
474 fullyUnlock();
475 }
476 return removed;
477 }
478
479 public Object[] toArray() {
480 fullyLock();
481 try {
482 int size = count.get();
483 Object[] a = new Object[size];
484 int k = 0;
485 for (Node<E> p = head.next; p != null; p = p.next)
486 a[k++] = p.item;
487 return a;
488 } finally {
489 fullyUnlock();
490 }
491 }
492
493 public <T> T[] toArray(T[] a) {
494 fullyLock();
495 try {
496 int size = count.get();
497 if (a.length < size)
498 a = (T[])java.lang.reflect.Array.newInstance
499 (a.getClass().getComponentType(), size);
500
501 int k = 0;
502 for (Node p = head.next; p != null; p = p.next)
503 a[k++] = (T)p.item;
504 return a;
505 } finally {
506 fullyUnlock();
507 }
508 }
509
510 public String toString() {
511 fullyLock();
512 try {
513 return super.toString();
514 } finally {
515 fullyUnlock();
516 }
517 }
518
519 /**
520 * Atomically removes all of the elements from this queue.
521 * The queue will be empty after this call returns.
522 */
523 public void clear() {
524 fullyLock();
525 try {
526 head.next = null;
527 if (count.getAndSet(0) == capacity)
528 notFull.signalAll();
529 } finally {
530 fullyUnlock();
531 }
532 }
533
534 public int drainTo(Collection<? super E> c) {
535 if (c == null)
536 throw new NullPointerException();
537 if (c == this)
538 throw new IllegalArgumentException();
539 Node first;
540 fullyLock();
541 try {
542 first = head.next;
543 head.next = null;
544 if (count.getAndSet(0) == capacity)
545 notFull.signalAll();
546 } finally {
547 fullyUnlock();
548 }
549 // Transfer the elements outside of locks
550 int n = 0;
551 for (Node<E> p = first; p != null; p = p.next) {
552 c.add(p.item);
553 p.item = null;
554 ++n;
555 }
556 return n;
557 }
558
559 public int drainTo(Collection<? super E> c, int maxElements) {
560 if (c == null)
561 throw new NullPointerException();
562 if (c == this)
563 throw new IllegalArgumentException();
564 if (maxElements <= 0)
565 return 0;
566 fullyLock();
567 try {
568 int n = 0;
569 Node<E> p = head.next;
570 while (p != null && n < maxElements) {
571 c.add(p.item);
572 p.item = null;
573 p = p.next;
574 ++n;
575 }
576 if (n != 0) {
577 head.next = p;
578 if (count.getAndAdd(-n) == capacity)
579 notFull.signalAll();
580 }
581 return n;
582 } finally {
583 fullyUnlock();
584 }
585 }
586
587 /**
588 * Returns an iterator over the elements in this queue in proper sequence.
589 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
590 * will never throw {@link java.util.ConcurrentModificationException},
591 * and guarantees to traverse elements as they existed upon
592 * construction of the iterator, and may (but is not guaranteed to)
593 * reflect any modifications subsequent to construction.
594 *
595 * @return an iterator over the elements in this queue in proper sequence.
596 */
597 public Iterator<E> iterator() {
598 return new Itr();
599 }
600
601 private class Itr implements Iterator<E> {
602 /*
603 * Basic weak-consistent iterator. At all times hold the next
604 * item to hand out so that if hasNext() reports true, we will
605 * still have it to return even if lost race with a take etc.
606 */
607 private Node<E> current;
608 private Node<E> lastRet;
609 private E currentElement;
610
611 Itr() {
612 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
613 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
614 putLock.lock();
615 takeLock.lock();
616 try {
617 current = head.next;
618 if (current != null)
619 currentElement = current.item;
620 } finally {
621 takeLock.unlock();
622 putLock.unlock();
623 }
624 }
625
626 public boolean hasNext() {
627 return current != null;
628 }
629
630 public E next() {
631 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
632 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
633 putLock.lock();
634 takeLock.lock();
635 try {
636 if (current == null)
637 throw new NoSuchElementException();
638 E x = currentElement;
639 lastRet = current;
640 current = current.next;
641 if (current != null)
642 currentElement = current.item;
643 return x;
644 } finally {
645 takeLock.unlock();
646 putLock.unlock();
647 }
648 }
649
650 public void remove() {
651 if (lastRet == null)
652 throw new IllegalStateException();
653 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
654 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
655 putLock.lock();
656 takeLock.lock();
657 try {
658 Node<E> node = lastRet;
659 lastRet = null;
660 Node<E> trail = head;
661 Node<E> p = head.next;
662 while (p != null && p != node) {
663 trail = p;
664 p = p.next;
665 }
666 if (p == node) {
667 p.item = null;
668 trail.next = p.next;
669 int c = count.getAndDecrement();
670 if (c == capacity)
671 notFull.signalAll();
672 }
673 } finally {
674 takeLock.unlock();
675 putLock.unlock();
676 }
677 }
678 }
679
680 /**
681 * Save the state to a stream (that is, serialize it).
682 *
683 * @serialData The capacity is emitted (int), followed by all of
684 * its elements (each an <tt>Object</tt>) in the proper order,
685 * followed by a null
686 * @param s the stream
687 */
688 private void writeObject(java.io.ObjectOutputStream s)
689 throws java.io.IOException {
690
691 fullyLock();
692 try {
693 // Write out any hidden stuff, plus capacity
694 s.defaultWriteObject();
695
696 // Write out all elements in the proper order.
697 for (Node<E> p = head.next; p != null; p = p.next)
698 s.writeObject(p.item);
699
700 // Use trailing null as sentinel
701 s.writeObject(null);
702 } finally {
703 fullyUnlock();
704 }
705 }
706
707 /**
708 * Reconstitute this queue instance from a stream (that is,
709 * deserialize it).
710 * @param s the stream
711 */
712 private void readObject(java.io.ObjectInputStream s)
713 throws java.io.IOException, ClassNotFoundException {
714 // Read in capacity, and any hidden stuff
715 s.defaultReadObject();
716
717 count.set(0);
718 last = head = new Node<E>(null);
719
720 // Read in all elements and place in queue
721 for (;;) {
722 E item = (E)s.readObject();
723 if (item == null)
724 break;
725 add(item);
726 }
727 }
728 }