ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.38
Committed: Thu Nov 4 19:48:19 2004 UTC (19 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.37: +11 -13 lines
Log Message:
Simplify previous changes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time. New elements
20 * are inserted at the tail of the queue, and the queue retrieval
21 * operations obtain elements at the head of the queue.
22 * Linked queues typically have higher throughput than array-based queues but
23 * less predictable performance in most concurrent applications.
24 *
25 * <p> The optional capacity bound constructor argument serves as a
26 * way to prevent excessive queue expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * queue above capacity.
30 *
31 * <p>This class and its iterator implement all of the
32 * <em>optional</em> methods of the {@link Collection} and {@link
33 * Iterator} interfaces.
34 *
35 * <p>This class is a member of the
36 * <a href="{@docRoot}/../guide/collections/index.html">
37 * Java Collections Framework</a>.
38 *
39 * @since 1.5
40 * @author Doug Lea
41 * @param <E> the type of elements held in this collection
42 *
43 **/
44 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
45 implements BlockingQueue<E>, java.io.Serializable {
46 private static final long serialVersionUID = -6903933977591709194L;
47
48 /*
49 * A variant of the "two lock queue" algorithm. The putLock gates
50 * entry to put (and offer), and has an associated condition for
51 * waiting puts. Similarly for the takeLock. The "count" field
52 * that they both rely on is maintained as an atomic to avoid
53 * needing to get both locks in most cases. Also, to minimize need
54 * for puts to get takeLock and vice-versa, cascading notifies are
55 * used. When a put notices that it has enabled at least one take,
56 * it signals taker. That taker in turn signals others if more
57 * items have been entered since the signal. And symmetrically for
58 * takes signalling puts. Operations such as remove(Object) and
59 * iterators acquire both locks.
60 */
61
62 /**
63 * Linked list node class
64 */
65 static class Node<E> {
66 /** The item, volatile to ensure barrier separating write and read */
67 volatile E item;
68 Node<E> next;
69 Node(E x) { item = x; }
70 }
71
72 /** The capacity bound, or Integer.MAX_VALUE if none */
73 private final int capacity;
74
75 /** Current number of elements */
76 private final AtomicInteger count = new AtomicInteger(0);
77
78 /** Head of linked list */
79 private transient Node<E> head;
80
81 /** Tail of linked list */
82 private transient Node<E> last;
83
84 /** Lock held by take, poll, etc */
85 private final ReentrantLock takeLock = new ReentrantLock();
86
87 /** Wait queue for waiting takes */
88 private final Condition notEmpty = takeLock.newCondition();
89
90 /** Lock held by put, offer, etc */
91 private final ReentrantLock putLock = new ReentrantLock();
92
93 /** Wait queue for waiting puts */
94 private final Condition notFull = putLock.newCondition();
95
96 /**
97 * Signal a waiting take. Called only from put/offer (which do not
98 * otherwise ordinarily lock takeLock.)
99 */
100 private void signalNotEmpty() {
101 final ReentrantLock takeLock = this.takeLock;
102 takeLock.lock();
103 try {
104 notEmpty.signal();
105 } finally {
106 takeLock.unlock();
107 }
108 }
109
110 /**
111 * Signal a waiting put. Called only from take/poll.
112 */
113 private void signalNotFull() {
114 final ReentrantLock putLock = this.putLock;
115 putLock.lock();
116 try {
117 notFull.signal();
118 } finally {
119 putLock.unlock();
120 }
121 }
122
123 /**
124 * Create a node and link it at end of queue
125 * @param x the item
126 */
127 private void insert(E x) {
128 last = last.next = new Node<E>(x);
129 }
130
131 /**
132 * Remove a node from head of queue,
133 * @return the node
134 */
135 private E extract() {
136 Node<E> first = head.next;
137 head = first;
138 E x = first.item;
139 first.item = null;
140 return x;
141 }
142
143 /**
144 * Lock to prevent both puts and takes.
145 */
146 private void fullyLock() {
147 putLock.lock();
148 takeLock.lock();
149 }
150
151 /**
152 * Unlock to allow both puts and takes.
153 */
154 private void fullyUnlock() {
155 takeLock.unlock();
156 putLock.unlock();
157 }
158
159
160 /**
161 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
162 * {@link Integer#MAX_VALUE}.
163 */
164 public LinkedBlockingQueue() {
165 this(Integer.MAX_VALUE);
166 }
167
168 /**
169 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
170 *
171 * @param capacity the capacity of this queue.
172 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
173 * than zero.
174 */
175 public LinkedBlockingQueue(int capacity) {
176 if (capacity <= 0) throw new IllegalArgumentException();
177 this.capacity = capacity;
178 last = head = new Node<E>(null);
179 }
180
181 /**
182 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
183 * {@link Integer#MAX_VALUE}, initially containing the elements of the
184 * given collection,
185 * added in traversal order of the collection's iterator.
186 * @param c the collection of elements to initially contain
187 * @throws NullPointerException if <tt>c</tt> or any element within it
188 * is <tt>null</tt>
189 */
190 public LinkedBlockingQueue(Collection<? extends E> c) {
191 this(Integer.MAX_VALUE);
192 for (E e : c)
193 add(e);
194 }
195
196
197 // this doc comment is overridden to remove the reference to collections
198 // greater in size than Integer.MAX_VALUE
199 /**
200 * Returns the number of elements in this queue.
201 *
202 * @return the number of elements in this queue.
203 */
204 public int size() {
205 return count.get();
206 }
207
208 // this doc comment is a modified copy of the inherited doc comment,
209 // without the reference to unlimited queues.
210 /**
211 * Returns the number of elements that this queue can ideally (in
212 * the absence of memory or resource constraints) accept without
213 * blocking. This is always equal to the initial capacity of this queue
214 * less the current <tt>size</tt> of this queue.
215 * <p>Note that you <em>cannot</em> always tell if
216 * an attempt to <tt>add</tt> an element will succeed by
217 * inspecting <tt>remainingCapacity</tt> because it may be the
218 * case that a waiting consumer is ready to <tt>take</tt> an
219 * element out of an otherwise full queue.
220 */
221 public int remainingCapacity() {
222 return capacity - count.get();
223 }
224
225 /**
226 * Adds the specified element to the tail of this queue, waiting if
227 * necessary for space to become available.
228 * @param o the element to add
229 * @throws InterruptedException if interrupted while waiting.
230 * @throws NullPointerException if the specified element is <tt>null</tt>.
231 */
232 public void put(E o) throws InterruptedException {
233 if (o == null) throw new NullPointerException();
234 // Note: convention in all put/take/etc is to preset
235 // local var holding count negative to indicate failure unless set.
236 int c = -1;
237 final ReentrantLock putLock = this.putLock;
238 final AtomicInteger count = this.count;
239 putLock.lockInterruptibly();
240 try {
241 /*
242 * Note that count is used in wait guard even though it is
243 * not protected by lock. This works because count can
244 * only decrease at this point (all other puts are shut
245 * out by lock), and we (or some other waiting put) are
246 * signalled if it ever changes from
247 * capacity. Similarly for all other uses of count in
248 * other wait guards.
249 */
250 try {
251 while (count.get() == capacity)
252 notFull.await();
253 } catch (InterruptedException ie) {
254 notFull.signal(); // propagate to a non-interrupted thread
255 throw ie;
256 }
257 insert(o);
258 c = count.getAndIncrement();
259 if (c + 1 < capacity)
260 notFull.signal();
261 } finally {
262 putLock.unlock();
263 }
264 if (c == 0)
265 signalNotEmpty();
266 }
267
268 /**
269 * Inserts the specified element at the tail of this queue, waiting if
270 * necessary up to the specified wait time for space to become available.
271 * @param o the element to add
272 * @param timeout how long to wait before giving up, in units of
273 * <tt>unit</tt>
274 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
275 * <tt>timeout</tt> parameter
276 * @return <tt>true</tt> if successful, or <tt>false</tt> if
277 * the specified waiting time elapses before space is available.
278 * @throws InterruptedException if interrupted while waiting.
279 * @throws NullPointerException if the specified element is <tt>null</tt>.
280 */
281 public boolean offer(E o, long timeout, TimeUnit unit)
282 throws InterruptedException {
283
284 if (o == null) throw new NullPointerException();
285 long nanos = unit.toNanos(timeout);
286 int c = -1;
287 final ReentrantLock putLock = this.putLock;
288 final AtomicInteger count = this.count;
289 putLock.lockInterruptibly();
290 try {
291 for (;;) {
292 if (count.get() < capacity) {
293 insert(o);
294 c = count.getAndIncrement();
295 if (c + 1 < capacity)
296 notFull.signal();
297 break;
298 }
299 if (nanos <= 0)
300 return false;
301 try {
302 nanos = notFull.awaitNanos(nanos);
303 } catch (InterruptedException ie) {
304 notFull.signal(); // propagate to a non-interrupted thread
305 throw ie;
306 }
307 }
308 } finally {
309 putLock.unlock();
310 }
311 if (c == 0)
312 signalNotEmpty();
313 return true;
314 }
315
316 /**
317 * Inserts the specified element at the tail of this queue if possible,
318 * returning immediately if this queue is full.
319 *
320 * @param o the element to add.
321 * @return <tt>true</tt> if it was possible to add the element to
322 * this queue, else <tt>false</tt>
323 * @throws NullPointerException if the specified element is <tt>null</tt>
324 */
325 public boolean offer(E o) {
326 if (o == null) throw new NullPointerException();
327 final AtomicInteger count = this.count;
328 if (count.get() == capacity)
329 return false;
330 int c = -1;
331 final ReentrantLock putLock = this.putLock;
332 putLock.lock();
333 try {
334 if (count.get() < capacity) {
335 insert(o);
336 c = count.getAndIncrement();
337 if (c + 1 < capacity)
338 notFull.signal();
339 }
340 } finally {
341 putLock.unlock();
342 }
343 if (c == 0)
344 signalNotEmpty();
345 return c >= 0;
346 }
347
348
349 public E take() throws InterruptedException {
350 E x;
351 int c = -1;
352 final AtomicInteger count = this.count;
353 final ReentrantLock takeLock = this.takeLock;
354 takeLock.lockInterruptibly();
355 try {
356 try {
357 while (count.get() == 0)
358 notEmpty.await();
359 } catch (InterruptedException ie) {
360 notEmpty.signal(); // propagate to a non-interrupted thread
361 throw ie;
362 }
363
364 x = extract();
365 c = count.getAndDecrement();
366 if (c > 1)
367 notEmpty.signal();
368 } finally {
369 takeLock.unlock();
370 }
371 if (c == capacity)
372 signalNotFull();
373 return x;
374 }
375
376 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
377 E x = null;
378 int c = -1;
379 long nanos = unit.toNanos(timeout);
380 final AtomicInteger count = this.count;
381 final ReentrantLock takeLock = this.takeLock;
382 takeLock.lockInterruptibly();
383 try {
384 for (;;) {
385 if (count.get() > 0) {
386 x = extract();
387 c = count.getAndDecrement();
388 if (c > 1)
389 notEmpty.signal();
390 break;
391 }
392 if (nanos <= 0)
393 return null;
394 try {
395 nanos = notEmpty.awaitNanos(nanos);
396 } catch (InterruptedException ie) {
397 notEmpty.signal(); // propagate to a non-interrupted thread
398 throw ie;
399 }
400 }
401 } finally {
402 takeLock.unlock();
403 }
404 if (c == capacity)
405 signalNotFull();
406 return x;
407 }
408
409 public E poll() {
410 final AtomicInteger count = this.count;
411 if (count.get() == 0)
412 return null;
413 E x = null;
414 int c = -1;
415 final ReentrantLock takeLock = this.takeLock;
416 takeLock.lock();
417 try {
418 if (count.get() > 0) {
419 x = extract();
420 c = count.getAndDecrement();
421 if (c > 1)
422 notEmpty.signal();
423 }
424 } finally {
425 takeLock.unlock();
426 }
427 if (c == capacity)
428 signalNotFull();
429 return x;
430 }
431
432
433 public E peek() {
434 if (count.get() == 0)
435 return null;
436 final ReentrantLock takeLock = this.takeLock;
437 takeLock.lock();
438 try {
439 Node<E> first = head.next;
440 if (first == null)
441 return null;
442 else
443 return first.item;
444 } finally {
445 takeLock.unlock();
446 }
447 }
448
449 /**
450 * Removes a single instance of the specified element from this
451 * queue, if it is present.
452 */
453 public boolean remove(Object o) {
454 if (o == null) return false;
455 boolean removed = false;
456 fullyLock();
457 try {
458 Node<E> trail = head;
459 Node<E> p = head.next;
460 while (p != null) {
461 if (o.equals(p.item)) {
462 removed = true;
463 break;
464 }
465 trail = p;
466 p = p.next;
467 }
468 if (removed) {
469 p.item = null;
470 trail.next = p.next;
471 if (count.getAndDecrement() == capacity)
472 notFull.signalAll();
473 }
474 } finally {
475 fullyUnlock();
476 }
477 return removed;
478 }
479
480 public Object[] toArray() {
481 fullyLock();
482 try {
483 int size = count.get();
484 Object[] a = new Object[size];
485 int k = 0;
486 for (Node<E> p = head.next; p != null; p = p.next)
487 a[k++] = p.item;
488 return a;
489 } finally {
490 fullyUnlock();
491 }
492 }
493
494 public <T> T[] toArray(T[] a) {
495 fullyLock();
496 try {
497 int size = count.get();
498 if (a.length < size)
499 a = (T[])java.lang.reflect.Array.newInstance
500 (a.getClass().getComponentType(), size);
501
502 int k = 0;
503 for (Node p = head.next; p != null; p = p.next)
504 a[k++] = (T)p.item;
505 return a;
506 } finally {
507 fullyUnlock();
508 }
509 }
510
511 public String toString() {
512 fullyLock();
513 try {
514 return super.toString();
515 } finally {
516 fullyUnlock();
517 }
518 }
519
520 /**
521 * Atomically removes all of the elements from this queue.
522 * The queue will be empty after this call returns.
523 */
524 public void clear() {
525 fullyLock();
526 try {
527 head.next = null;
528 assert head.item == null;
529 last = head;
530 if (count.getAndSet(0) == capacity)
531 notFull.signalAll();
532 } finally {
533 fullyUnlock();
534 }
535 }
536
537 public int drainTo(Collection<? super E> c) {
538 if (c == null)
539 throw new NullPointerException();
540 if (c == this)
541 throw new IllegalArgumentException();
542 Node first;
543 fullyLock();
544 try {
545 first = head.next;
546 head.next = null;
547 assert head.item == null;
548 last = head;
549 if (count.getAndSet(0) == capacity)
550 notFull.signalAll();
551 } finally {
552 fullyUnlock();
553 }
554 // Transfer the elements outside of locks
555 int n = 0;
556 for (Node<E> p = first; p != null; p = p.next) {
557 c.add(p.item);
558 p.item = null;
559 ++n;
560 }
561 return n;
562 }
563
564 public int drainTo(Collection<? super E> c, int maxElements) {
565 if (c == null)
566 throw new NullPointerException();
567 if (c == this)
568 throw new IllegalArgumentException();
569 fullyLock();
570 try {
571 int n = 0;
572 Node<E> p = head.next;
573 while (p != null && n < maxElements) {
574 c.add(p.item);
575 p.item = null;
576 p = p.next;
577 ++n;
578 }
579 if (n != 0) {
580 head.next = p;
581 assert head.item == null;
582 if (p == null)
583 last = head;
584 if (count.getAndAdd(-n) == capacity)
585 notFull.signalAll();
586 }
587 return n;
588 } finally {
589 fullyUnlock();
590 }
591 }
592
593 /**
594 * Returns an iterator over the elements in this queue in proper sequence.
595 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
596 * will never throw {@link java.util.ConcurrentModificationException},
597 * and guarantees to traverse elements as they existed upon
598 * construction of the iterator, and may (but is not guaranteed to)
599 * reflect any modifications subsequent to construction.
600 *
601 * @return an iterator over the elements in this queue in proper sequence.
602 */
603 public Iterator<E> iterator() {
604 return new Itr();
605 }
606
607 private class Itr implements Iterator<E> {
608 /*
609 * Basic weak-consistent iterator. At all times hold the next
610 * item to hand out so that if hasNext() reports true, we will
611 * still have it to return even if lost race with a take etc.
612 */
613 private Node<E> current;
614 private Node<E> lastRet;
615 private E currentElement;
616
617 Itr() {
618 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
619 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
620 putLock.lock();
621 takeLock.lock();
622 try {
623 current = head.next;
624 if (current != null)
625 currentElement = current.item;
626 } finally {
627 takeLock.unlock();
628 putLock.unlock();
629 }
630 }
631
632 public boolean hasNext() {
633 return current != null;
634 }
635
636 public E next() {
637 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
638 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
639 putLock.lock();
640 takeLock.lock();
641 try {
642 if (current == null)
643 throw new NoSuchElementException();
644 E x = currentElement;
645 lastRet = current;
646 current = current.next;
647 if (current != null)
648 currentElement = current.item;
649 return x;
650 } finally {
651 takeLock.unlock();
652 putLock.unlock();
653 }
654 }
655
656 public void remove() {
657 if (lastRet == null)
658 throw new IllegalStateException();
659 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
660 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
661 putLock.lock();
662 takeLock.lock();
663 try {
664 Node<E> node = lastRet;
665 lastRet = null;
666 Node<E> trail = head;
667 Node<E> p = head.next;
668 while (p != null && p != node) {
669 trail = p;
670 p = p.next;
671 }
672 if (p == node) {
673 p.item = null;
674 trail.next = p.next;
675 int c = count.getAndDecrement();
676 if (c == capacity)
677 notFull.signalAll();
678 }
679 } finally {
680 takeLock.unlock();
681 putLock.unlock();
682 }
683 }
684 }
685
686 /**
687 * Save the state to a stream (that is, serialize it).
688 *
689 * @serialData The capacity is emitted (int), followed by all of
690 * its elements (each an <tt>Object</tt>) in the proper order,
691 * followed by a null
692 * @param s the stream
693 */
694 private void writeObject(java.io.ObjectOutputStream s)
695 throws java.io.IOException {
696
697 fullyLock();
698 try {
699 // Write out any hidden stuff, plus capacity
700 s.defaultWriteObject();
701
702 // Write out all elements in the proper order.
703 for (Node<E> p = head.next; p != null; p = p.next)
704 s.writeObject(p.item);
705
706 // Use trailing null as sentinel
707 s.writeObject(null);
708 } finally {
709 fullyUnlock();
710 }
711 }
712
713 /**
714 * Reconstitute this queue instance from a stream (that is,
715 * deserialize it).
716 * @param s the stream
717 */
718 private void readObject(java.io.ObjectInputStream s)
719 throws java.io.IOException, ClassNotFoundException {
720 // Read in capacity, and any hidden stuff
721 s.defaultReadObject();
722
723 count.set(0);
724 last = head = new Node<E>(null);
725
726 // Read in all elements and place in queue
727 for (;;) {
728 E item = (E)s.readObject();
729 if (item == null)
730 break;
731 add(item);
732 }
733 }
734 }