ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.40
Committed: Tue Apr 26 01:17:18 2005 UTC (19 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.39: +9 -9 lines
Log Message:
doc fixes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time. New elements
20 * are inserted at the tail of the queue, and the queue retrieval
21 * operations obtain elements at the head of the queue.
22 * Linked queues typically have higher throughput than array-based queues but
23 * less predictable performance in most concurrent applications.
24 *
25 * <p> The optional capacity bound constructor argument serves as a
26 * way to prevent excessive queue expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * queue above capacity.
30 *
31 * <p>This class and its iterator implement all of the
32 * <em>optional</em> methods of the {@link Collection} and {@link
33 * Iterator} interfaces.
34 *
35 * <p>This class is a member of the
36 * <a href="{@docRoot}/../guide/collections/index.html">
37 * Java Collections Framework</a>.
38 *
39 * @since 1.5
40 * @author Doug Lea
41 * @param <E> the type of elements held in this collection
42 *
43 */
44 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
45 implements BlockingQueue<E>, java.io.Serializable {
46 private static final long serialVersionUID = -6903933977591709194L;
47
48 /*
49 * A variant of the "two lock queue" algorithm. The putLock gates
50 * entry to put (and offer), and has an associated condition for
51 * waiting puts. Similarly for the takeLock. The "count" field
52 * that they both rely on is maintained as an atomic to avoid
53 * needing to get both locks in most cases. Also, to minimize need
54 * for puts to get takeLock and vice-versa, cascading notifies are
55 * used. When a put notices that it has enabled at least one take,
56 * it signals taker. That taker in turn signals others if more
57 * items have been entered since the signal. And symmetrically for
58 * takes signalling puts. Operations such as remove(Object) and
59 * iterators acquire both locks.
60 */
61
62 /**
63 * Linked list node class
64 */
65 static class Node<E> {
66 /** The item, volatile to ensure barrier separating write and read */
67 volatile E item;
68 Node<E> next;
69 Node(E x) { item = x; }
70 }
71
72 /** The capacity bound, or Integer.MAX_VALUE if none */
73 private final int capacity;
74
75 /** Current number of elements */
76 private final AtomicInteger count = new AtomicInteger(0);
77
78 /** Head of linked list */
79 private transient Node<E> head;
80
81 /** Tail of linked list */
82 private transient Node<E> last;
83
84 /** Lock held by take, poll, etc */
85 private final ReentrantLock takeLock = new ReentrantLock();
86
87 /** Wait queue for waiting takes */
88 private final Condition notEmpty = takeLock.newCondition();
89
90 /** Lock held by put, offer, etc */
91 private final ReentrantLock putLock = new ReentrantLock();
92
93 /** Wait queue for waiting puts */
94 private final Condition notFull = putLock.newCondition();
95
96 /**
97 * Signals a waiting take. Called only from put/offer (which do not
98 * otherwise ordinarily lock takeLock.)
99 */
100 private void signalNotEmpty() {
101 final ReentrantLock takeLock = this.takeLock;
102 takeLock.lock();
103 try {
104 notEmpty.signal();
105 } finally {
106 takeLock.unlock();
107 }
108 }
109
110 /**
111 * Signals a waiting put. Called only from take/poll.
112 */
113 private void signalNotFull() {
114 final ReentrantLock putLock = this.putLock;
115 putLock.lock();
116 try {
117 notFull.signal();
118 } finally {
119 putLock.unlock();
120 }
121 }
122
123 /**
124 * Creates a node and links it at end of queue.
125 * @param x the item
126 */
127 private void insert(E x) {
128 last = last.next = new Node<E>(x);
129 }
130
131 /**
132 * Removes a node from head of queue,
133 * @return the node
134 */
135 private E extract() {
136 Node<E> first = head.next;
137 head = first;
138 E x = first.item;
139 first.item = null;
140 return x;
141 }
142
143 /**
144 * Lock to prevent both puts and takes.
145 */
146 private void fullyLock() {
147 putLock.lock();
148 takeLock.lock();
149 }
150
151 /**
152 * Unlock to allow both puts and takes.
153 */
154 private void fullyUnlock() {
155 takeLock.unlock();
156 putLock.unlock();
157 }
158
159
160 /**
161 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
162 * {@link Integer#MAX_VALUE}.
163 */
164 public LinkedBlockingQueue() {
165 this(Integer.MAX_VALUE);
166 }
167
168 /**
169 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
170 *
171 * @param capacity the capacity of this queue.
172 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
173 * than zero.
174 */
175 public LinkedBlockingQueue(int capacity) {
176 if (capacity <= 0) throw new IllegalArgumentException();
177 this.capacity = capacity;
178 last = head = new Node<E>(null);
179 }
180
181 /**
182 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
183 * {@link Integer#MAX_VALUE}, initially containing the elements of the
184 * given collection,
185 * added in traversal order of the collection's iterator.
186 * @param c the collection of elements to initially contain
187 * @throws NullPointerException if <tt>c</tt> or any element within it
188 * is <tt>null</tt>.
189 */
190 public LinkedBlockingQueue(Collection<? extends E> c) {
191 this(Integer.MAX_VALUE);
192 for (E e : c)
193 add(e);
194 }
195
196
197 // this doc comment is overridden to remove the reference to collections
198 // greater in size than Integer.MAX_VALUE
199 /**
200 * Returns the number of elements in this queue.
201 *
202 * @return the number of elements in this queue.
203 */
204 public int size() {
205 return count.get();
206 }
207
208 // this doc comment is a modified copy of the inherited doc comment,
209 // without the reference to unlimited queues.
210 /**
211 * Returns the number of elements that this queue can ideally (in
212 * the absence of memory or resource constraints) accept without
213 * blocking. This is always equal to the initial capacity of this queue
214 * less the current <tt>size</tt> of this queue.
215 * <p>Note that you <em>cannot</em> always tell if
216 * an attempt to <tt>add</tt> an element will succeed by
217 * inspecting <tt>remainingCapacity</tt> because it may be the
218 * case that a waiting consumer is ready to <tt>take</tt> an
219 * element out of an otherwise full queue.
220 */
221 public int remainingCapacity() {
222 return capacity - count.get();
223 }
224
225 /**
226 * Adds the specified element to the tail of this queue, waiting if
227 * necessary for space to become available.
228 * @param o the element to add
229 * @throws InterruptedException if interrupted while waiting.
230 * @throws NullPointerException if the specified element is <tt>null</tt>.
231 */
232 public void put(E o) throws InterruptedException {
233 if (o == null) throw new NullPointerException();
234 // Note: convention in all put/take/etc is to preset
235 // local var holding count negative to indicate failure unless set.
236 int c = -1;
237 final ReentrantLock putLock = this.putLock;
238 final AtomicInteger count = this.count;
239 putLock.lockInterruptibly();
240 try {
241 /*
242 * Note that count is used in wait guard even though it is
243 * not protected by lock. This works because count can
244 * only decrease at this point (all other puts are shut
245 * out by lock), and we (or some other waiting put) are
246 * signalled if it ever changes from
247 * capacity. Similarly for all other uses of count in
248 * other wait guards.
249 */
250 try {
251 while (count.get() == capacity)
252 notFull.await();
253 } catch (InterruptedException ie) {
254 notFull.signal(); // propagate to a non-interrupted thread
255 throw ie;
256 }
257 insert(o);
258 c = count.getAndIncrement();
259 if (c + 1 < capacity)
260 notFull.signal();
261 } finally {
262 putLock.unlock();
263 }
264 if (c == 0)
265 signalNotEmpty();
266 }
267
268 /**
269 * Inserts the specified element at the tail of this queue, waiting if
270 * necessary up to the specified wait time for space to become available.
271 * @param o the element to add
272 * @param timeout how long to wait before giving up, in units of
273 * <tt>unit</tt>
274 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
275 * <tt>timeout</tt> parameter
276 * @return <tt>true</tt> if successful, or <tt>false</tt> if
277 * the specified waiting time elapses before space is available.
278 * @throws InterruptedException if interrupted while waiting.
279 * @throws NullPointerException if the specified element is <tt>null</tt>.
280 */
281 public boolean offer(E o, long timeout, TimeUnit unit)
282 throws InterruptedException {
283
284 if (o == null) throw new NullPointerException();
285 long nanos = unit.toNanos(timeout);
286 int c = -1;
287 final ReentrantLock putLock = this.putLock;
288 final AtomicInteger count = this.count;
289 putLock.lockInterruptibly();
290 try {
291 for (;;) {
292 if (count.get() < capacity) {
293 insert(o);
294 c = count.getAndIncrement();
295 if (c + 1 < capacity)
296 notFull.signal();
297 break;
298 }
299 if (nanos <= 0)
300 return false;
301 try {
302 nanos = notFull.awaitNanos(nanos);
303 } catch (InterruptedException ie) {
304 notFull.signal(); // propagate to a non-interrupted thread
305 throw ie;
306 }
307 }
308 } finally {
309 putLock.unlock();
310 }
311 if (c == 0)
312 signalNotEmpty();
313 return true;
314 }
315
316 /**
317 * Inserts the specified element at the tail of this queue if possible,
318 * returning immediately if this queue is full.
319 *
320 * @param o the element to add.
321 * @return <tt>true</tt> if it was possible to add the element to
322 * this queue, else <tt>false</tt>
323 * @throws NullPointerException if the specified element is <tt>null</tt>.
324 */
325 public boolean offer(E o) {
326 if (o == null) throw new NullPointerException();
327 final AtomicInteger count = this.count;
328 if (count.get() == capacity)
329 return false;
330 int c = -1;
331 final ReentrantLock putLock = this.putLock;
332 putLock.lock();
333 try {
334 if (count.get() < capacity) {
335 insert(o);
336 c = count.getAndIncrement();
337 if (c + 1 < capacity)
338 notFull.signal();
339 }
340 } finally {
341 putLock.unlock();
342 }
343 if (c == 0)
344 signalNotEmpty();
345 return c >= 0;
346 }
347
348
349 public E take() throws InterruptedException {
350 E x;
351 int c = -1;
352 final AtomicInteger count = this.count;
353 final ReentrantLock takeLock = this.takeLock;
354 takeLock.lockInterruptibly();
355 try {
356 try {
357 while (count.get() == 0)
358 notEmpty.await();
359 } catch (InterruptedException ie) {
360 notEmpty.signal(); // propagate to a non-interrupted thread
361 throw ie;
362 }
363
364 x = extract();
365 c = count.getAndDecrement();
366 if (c > 1)
367 notEmpty.signal();
368 } finally {
369 takeLock.unlock();
370 }
371 if (c == capacity)
372 signalNotFull();
373 return x;
374 }
375
376 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
377 E x = null;
378 int c = -1;
379 long nanos = unit.toNanos(timeout);
380 final AtomicInteger count = this.count;
381 final ReentrantLock takeLock = this.takeLock;
382 takeLock.lockInterruptibly();
383 try {
384 for (;;) {
385 if (count.get() > 0) {
386 x = extract();
387 c = count.getAndDecrement();
388 if (c > 1)
389 notEmpty.signal();
390 break;
391 }
392 if (nanos <= 0)
393 return null;
394 try {
395 nanos = notEmpty.awaitNanos(nanos);
396 } catch (InterruptedException ie) {
397 notEmpty.signal(); // propagate to a non-interrupted thread
398 throw ie;
399 }
400 }
401 } finally {
402 takeLock.unlock();
403 }
404 if (c == capacity)
405 signalNotFull();
406 return x;
407 }
408
409 public E poll() {
410 final AtomicInteger count = this.count;
411 if (count.get() == 0)
412 return null;
413 E x = null;
414 int c = -1;
415 final ReentrantLock takeLock = this.takeLock;
416 takeLock.lock();
417 try {
418 if (count.get() > 0) {
419 x = extract();
420 c = count.getAndDecrement();
421 if (c > 1)
422 notEmpty.signal();
423 }
424 } finally {
425 takeLock.unlock();
426 }
427 if (c == capacity)
428 signalNotFull();
429 return x;
430 }
431
432
433 public E peek() {
434 if (count.get() == 0)
435 return null;
436 final ReentrantLock takeLock = this.takeLock;
437 takeLock.lock();
438 try {
439 Node<E> first = head.next;
440 if (first == null)
441 return null;
442 else
443 return first.item;
444 } finally {
445 takeLock.unlock();
446 }
447 }
448
449 /**
450 * Removes a single instance of the specified element from this
451 * queue, if it is present.
452 */
453 public boolean remove(Object o) {
454 if (o == null) return false;
455 boolean removed = false;
456 fullyLock();
457 try {
458 Node<E> trail = head;
459 Node<E> p = head.next;
460 while (p != null) {
461 if (o.equals(p.item)) {
462 removed = true;
463 break;
464 }
465 trail = p;
466 p = p.next;
467 }
468 if (removed) {
469 p.item = null;
470 trail.next = p.next;
471 if (last == p)
472 last = trail;
473 if (count.getAndDecrement() == capacity)
474 notFull.signalAll();
475 }
476 } finally {
477 fullyUnlock();
478 }
479 return removed;
480 }
481
482 public Object[] toArray() {
483 fullyLock();
484 try {
485 int size = count.get();
486 Object[] a = new Object[size];
487 int k = 0;
488 for (Node<E> p = head.next; p != null; p = p.next)
489 a[k++] = p.item;
490 return a;
491 } finally {
492 fullyUnlock();
493 }
494 }
495
496 public <T> T[] toArray(T[] a) {
497 fullyLock();
498 try {
499 int size = count.get();
500 if (a.length < size)
501 a = (T[])java.lang.reflect.Array.newInstance
502 (a.getClass().getComponentType(), size);
503
504 int k = 0;
505 for (Node p = head.next; p != null; p = p.next)
506 a[k++] = (T)p.item;
507 return a;
508 } finally {
509 fullyUnlock();
510 }
511 }
512
513 public String toString() {
514 fullyLock();
515 try {
516 return super.toString();
517 } finally {
518 fullyUnlock();
519 }
520 }
521
522 /**
523 * Atomically removes all of the elements from this queue.
524 * The queue will be empty after this call returns.
525 */
526 public void clear() {
527 fullyLock();
528 try {
529 head.next = null;
530 assert head.item == null;
531 last = head;
532 if (count.getAndSet(0) == capacity)
533 notFull.signalAll();
534 } finally {
535 fullyUnlock();
536 }
537 }
538
539 public int drainTo(Collection<? super E> c) {
540 if (c == null)
541 throw new NullPointerException();
542 if (c == this)
543 throw new IllegalArgumentException();
544 Node first;
545 fullyLock();
546 try {
547 first = head.next;
548 head.next = null;
549 assert head.item == null;
550 last = head;
551 if (count.getAndSet(0) == capacity)
552 notFull.signalAll();
553 } finally {
554 fullyUnlock();
555 }
556 // Transfer the elements outside of locks
557 int n = 0;
558 for (Node<E> p = first; p != null; p = p.next) {
559 c.add(p.item);
560 p.item = null;
561 ++n;
562 }
563 return n;
564 }
565
566 public int drainTo(Collection<? super E> c, int maxElements) {
567 if (c == null)
568 throw new NullPointerException();
569 if (c == this)
570 throw new IllegalArgumentException();
571 fullyLock();
572 try {
573 int n = 0;
574 Node<E> p = head.next;
575 while (p != null && n < maxElements) {
576 c.add(p.item);
577 p.item = null;
578 p = p.next;
579 ++n;
580 }
581 if (n != 0) {
582 head.next = p;
583 assert head.item == null;
584 if (p == null)
585 last = head;
586 if (count.getAndAdd(-n) == capacity)
587 notFull.signalAll();
588 }
589 return n;
590 } finally {
591 fullyUnlock();
592 }
593 }
594
595 /**
596 * Returns an iterator over the elements in this queue in proper sequence.
597 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
598 * will never throw {@link ConcurrentModificationException},
599 * and guarantees to traverse elements as they existed upon
600 * construction of the iterator, and may (but is not guaranteed to)
601 * reflect any modifications subsequent to construction.
602 *
603 * @return an iterator over the elements in this queue in proper sequence.
604 */
605 public Iterator<E> iterator() {
606 return new Itr();
607 }
608
609 private class Itr implements Iterator<E> {
610 /*
611 * Basic weak-consistent iterator. At all times hold the next
612 * item to hand out so that if hasNext() reports true, we will
613 * still have it to return even if lost race with a take etc.
614 */
615 private Node<E> current;
616 private Node<E> lastRet;
617 private E currentElement;
618
619 Itr() {
620 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
621 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
622 putLock.lock();
623 takeLock.lock();
624 try {
625 current = head.next;
626 if (current != null)
627 currentElement = current.item;
628 } finally {
629 takeLock.unlock();
630 putLock.unlock();
631 }
632 }
633
634 public boolean hasNext() {
635 return current != null;
636 }
637
638 public E next() {
639 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
640 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
641 putLock.lock();
642 takeLock.lock();
643 try {
644 if (current == null)
645 throw new NoSuchElementException();
646 E x = currentElement;
647 lastRet = current;
648 current = current.next;
649 if (current != null)
650 currentElement = current.item;
651 return x;
652 } finally {
653 takeLock.unlock();
654 putLock.unlock();
655 }
656 }
657
658 public void remove() {
659 if (lastRet == null)
660 throw new IllegalStateException();
661 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
662 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
663 putLock.lock();
664 takeLock.lock();
665 try {
666 Node<E> node = lastRet;
667 lastRet = null;
668 Node<E> trail = head;
669 Node<E> p = head.next;
670 while (p != null && p != node) {
671 trail = p;
672 p = p.next;
673 }
674 if (p == node) {
675 p.item = null;
676 trail.next = p.next;
677 if (last == p)
678 last = trail;
679 int c = count.getAndDecrement();
680 if (c == capacity)
681 notFull.signalAll();
682 }
683 } finally {
684 takeLock.unlock();
685 putLock.unlock();
686 }
687 }
688 }
689
690 /**
691 * Save the state to a stream (that is, serialize it).
692 *
693 * @serialData The capacity is emitted (int), followed by all of
694 * its elements (each an <tt>Object</tt>) in the proper order,
695 * followed by a null
696 * @param s the stream
697 */
698 private void writeObject(java.io.ObjectOutputStream s)
699 throws java.io.IOException {
700
701 fullyLock();
702 try {
703 // Write out any hidden stuff, plus capacity
704 s.defaultWriteObject();
705
706 // Write out all elements in the proper order.
707 for (Node<E> p = head.next; p != null; p = p.next)
708 s.writeObject(p.item);
709
710 // Use trailing null as sentinel
711 s.writeObject(null);
712 } finally {
713 fullyUnlock();
714 }
715 }
716
717 /**
718 * Reconstitute this queue instance from a stream (that is,
719 * deserialize it).
720 * @param s the stream
721 */
722 private void readObject(java.io.ObjectInputStream s)
723 throws java.io.IOException, ClassNotFoundException {
724 // Read in capacity, and any hidden stuff
725 s.defaultReadObject();
726
727 count.set(0);
728 last = head = new Node<E>(null);
729
730 // Read in all elements and place in queue
731 for (;;) {
732 E item = (E)s.readObject();
733 if (item == null)
734 break;
735 add(item);
736 }
737 }
738 }