ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.34
Committed: Tue Jan 27 11:36:31 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PFD
Changes since 1.33: +4 -0 lines
Log Message:
Add Collection framework membership doc

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time. New elements
20 * are inserted at the tail of the queue, and the queue retrieval
21 * operations obtain elements at the head of the queue.
22 * Linked queues typically have higher throughput than array-based queues but
23 * less predictable performance in most concurrent applications.
24 *
25 * <p> The optional capacity bound constructor argument serves as a
26 * way to prevent excessive queue expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * queue above capacity.
30 *
31 * <p>This class implements all of the <em>optional</em> methods
32 * of the {@link Collection} and {@link Iterator} interfaces.
33 *
34 * <p>This class is a member of the
35 * <a href="{@docRoot}/../guide/collections/index.html">
36 * Java Collections Framework</a>.
37 *
38 * @since 1.5
39 * @author Doug Lea
40 * @param <E> the type of elements held in this collection
41 *
42 **/
43 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
44 implements BlockingQueue<E>, java.io.Serializable {
45 private static final long serialVersionUID = -6903933977591709194L;
46
47 /*
48 * A variant of the "two lock queue" algorithm. The putLock gates
49 * entry to put (and offer), and has an associated condition for
50 * waiting puts. Similarly for the takeLock. The "count" field
51 * that they both rely on is maintained as an atomic to avoid
52 * needing to get both locks in most cases. Also, to minimize need
53 * for puts to get takeLock and vice-versa, cascading notifies are
54 * used. When a put notices that it has enabled at least one take,
55 * it signals taker. That taker in turn signals others if more
56 * items have been entered since the signal. And symmetrically for
57 * takes signalling puts. Operations such as remove(Object) and
58 * iterators acquire both locks.
59 */
60
61 /**
62 * Linked list node class
63 */
64 static class Node<E> {
65 /** The item, volatile to ensure barrier separating write and read */
66 volatile E item;
67 Node<E> next;
68 Node(E x) { item = x; }
69 }
70
71 /** The capacity bound, or Integer.MAX_VALUE if none */
72 private final int capacity;
73
74 /** Current number of elements */
75 private final AtomicInteger count = new AtomicInteger(0);
76
77 /** Head of linked list */
78 private transient Node<E> head;
79
80 /** Tail of linked list */
81 private transient Node<E> last;
82
83 /** Lock held by take, poll, etc */
84 private final ReentrantLock takeLock = new ReentrantLock();
85
86 /** Wait queue for waiting takes */
87 private final Condition notEmpty = takeLock.newCondition();
88
89 /** Lock held by put, offer, etc */
90 private final ReentrantLock putLock = new ReentrantLock();
91
92 /** Wait queue for waiting puts */
93 private final Condition notFull = putLock.newCondition();
94
95 /**
96 * Signal a waiting take. Called only from put/offer (which do not
97 * otherwise ordinarily lock takeLock.)
98 */
99 private void signalNotEmpty() {
100 final ReentrantLock takeLock = this.takeLock;
101 takeLock.lock();
102 try {
103 notEmpty.signal();
104 } finally {
105 takeLock.unlock();
106 }
107 }
108
109 /**
110 * Signal a waiting put. Called only from take/poll.
111 */
112 private void signalNotFull() {
113 final ReentrantLock putLock = this.putLock;
114 putLock.lock();
115 try {
116 notFull.signal();
117 } finally {
118 putLock.unlock();
119 }
120 }
121
122 /**
123 * Create a node and link it at end of queue
124 * @param x the item
125 */
126 private void insert(E x) {
127 last = last.next = new Node<E>(x);
128 }
129
130 /**
131 * Remove a node from head of queue,
132 * @return the node
133 */
134 private E extract() {
135 Node<E> first = head.next;
136 head = first;
137 E x = first.item;
138 first.item = null;
139 return x;
140 }
141
142 /**
143 * Lock to prevent both puts and takes.
144 */
145 private void fullyLock() {
146 putLock.lock();
147 takeLock.lock();
148 }
149
150 /**
151 * Unlock to allow both puts and takes.
152 */
153 private void fullyUnlock() {
154 takeLock.unlock();
155 putLock.unlock();
156 }
157
158
159 /**
160 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
161 * {@link Integer#MAX_VALUE}.
162 */
163 public LinkedBlockingQueue() {
164 this(Integer.MAX_VALUE);
165 }
166
167 /**
168 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
169 *
170 * @param capacity the capacity of this queue.
171 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
172 * than zero.
173 */
174 public LinkedBlockingQueue(int capacity) {
175 if (capacity <= 0) throw new IllegalArgumentException();
176 this.capacity = capacity;
177 last = head = new Node<E>(null);
178 }
179
180 /**
181 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
182 * {@link Integer#MAX_VALUE}, initially containing the elements of the
183 * given collection,
184 * added in traversal order of the collection's iterator.
185 * @param c the collection of elements to initially contain
186 * @throws NullPointerException if <tt>c</tt> or any element within it
187 * is <tt>null</tt>
188 */
189 public LinkedBlockingQueue(Collection<? extends E> c) {
190 this(Integer.MAX_VALUE);
191 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
192 add(it.next());
193 }
194
195
196 // this doc comment is overridden to remove the reference to collections
197 // greater in size than Integer.MAX_VALUE
198 /**
199 * Returns the number of elements in this queue.
200 *
201 * @return the number of elements in this queue.
202 */
203 public int size() {
204 return count.get();
205 }
206
207 // this doc comment is a modified copy of the inherited doc comment,
208 // without the reference to unlimited queues.
209 /**
210 * Returns the number of elements that this queue can ideally (in
211 * the absence of memory or resource constraints) accept without
212 * blocking. This is always equal to the initial capacity of this queue
213 * less the current <tt>size</tt> of this queue.
214 * <p>Note that you <em>cannot</em> always tell if
215 * an attempt to <tt>add</tt> an element will succeed by
216 * inspecting <tt>remainingCapacity</tt> because it may be the
217 * case that a waiting consumer is ready to <tt>take</tt> an
218 * element out of an otherwise full queue.
219 */
220 public int remainingCapacity() {
221 return capacity - count.get();
222 }
223
224 /**
225 * Adds the specified element to the tail of this queue, waiting if
226 * necessary for space to become available.
227 * @param o the element to add
228 * @throws InterruptedException if interrupted while waiting.
229 * @throws NullPointerException if the specified element is <tt>null</tt>.
230 */
231 public void put(E o) throws InterruptedException {
232 if (o == null) throw new NullPointerException();
233 // Note: convention in all put/take/etc is to preset
234 // local var holding count negative to indicate failure unless set.
235 int c = -1;
236 final ReentrantLock putLock = this.putLock;
237 final AtomicInteger count = this.count;
238 putLock.lockInterruptibly();
239 try {
240 /*
241 * Note that count is used in wait guard even though it is
242 * not protected by lock. This works because count can
243 * only decrease at this point (all other puts are shut
244 * out by lock), and we (or some other waiting put) are
245 * signalled if it ever changes from
246 * capacity. Similarly for all other uses of count in
247 * other wait guards.
248 */
249 try {
250 while (count.get() == capacity)
251 notFull.await();
252 } catch (InterruptedException ie) {
253 notFull.signal(); // propagate to a non-interrupted thread
254 throw ie;
255 }
256 insert(o);
257 c = count.getAndIncrement();
258 if (c + 1 < capacity)
259 notFull.signal();
260 } finally {
261 putLock.unlock();
262 }
263 if (c == 0)
264 signalNotEmpty();
265 }
266
267 /**
268 * Inserts the specified element at the tail of this queue, waiting if
269 * necessary up to the specified wait time for space to become available.
270 * @param o the element to add
271 * @param timeout how long to wait before giving up, in units of
272 * <tt>unit</tt>
273 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
274 * <tt>timeout</tt> parameter
275 * @return <tt>true</tt> if successful, or <tt>false</tt> if
276 * the specified waiting time elapses before space is available.
277 * @throws InterruptedException if interrupted while waiting.
278 * @throws NullPointerException if the specified element is <tt>null</tt>.
279 */
280 public boolean offer(E o, long timeout, TimeUnit unit)
281 throws InterruptedException {
282
283 if (o == null) throw new NullPointerException();
284 long nanos = unit.toNanos(timeout);
285 int c = -1;
286 final ReentrantLock putLock = this.putLock;
287 final AtomicInteger count = this.count;
288 putLock.lockInterruptibly();
289 try {
290 for (;;) {
291 if (count.get() < capacity) {
292 insert(o);
293 c = count.getAndIncrement();
294 if (c + 1 < capacity)
295 notFull.signal();
296 break;
297 }
298 if (nanos <= 0)
299 return false;
300 try {
301 nanos = notFull.awaitNanos(nanos);
302 } catch (InterruptedException ie) {
303 notFull.signal(); // propagate to a non-interrupted thread
304 throw ie;
305 }
306 }
307 } finally {
308 putLock.unlock();
309 }
310 if (c == 0)
311 signalNotEmpty();
312 return true;
313 }
314
315 /**
316 * Inserts the specified element at the tail of this queue if possible,
317 * returning immediately if this queue is full.
318 *
319 * @param o the element to add.
320 * @return <tt>true</tt> if it was possible to add the element to
321 * this queue, else <tt>false</tt>
322 * @throws NullPointerException if the specified element is <tt>null</tt>
323 */
324 public boolean offer(E o) {
325 if (o == null) throw new NullPointerException();
326 final AtomicInteger count = this.count;
327 if (count.get() == capacity)
328 return false;
329 int c = -1;
330 final ReentrantLock putLock = this.putLock;
331 putLock.lock();
332 try {
333 if (count.get() < capacity) {
334 insert(o);
335 c = count.getAndIncrement();
336 if (c + 1 < capacity)
337 notFull.signal();
338 }
339 } finally {
340 putLock.unlock();
341 }
342 if (c == 0)
343 signalNotEmpty();
344 return c >= 0;
345 }
346
347
348 public E take() throws InterruptedException {
349 E x;
350 int c = -1;
351 final AtomicInteger count = this.count;
352 final ReentrantLock takeLock = this.takeLock;
353 takeLock.lockInterruptibly();
354 try {
355 try {
356 while (count.get() == 0)
357 notEmpty.await();
358 } catch (InterruptedException ie) {
359 notEmpty.signal(); // propagate to a non-interrupted thread
360 throw ie;
361 }
362
363 x = extract();
364 c = count.getAndDecrement();
365 if (c > 1)
366 notEmpty.signal();
367 } finally {
368 takeLock.unlock();
369 }
370 if (c == capacity)
371 signalNotFull();
372 return x;
373 }
374
375 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
376 E x = null;
377 int c = -1;
378 long nanos = unit.toNanos(timeout);
379 final AtomicInteger count = this.count;
380 final ReentrantLock takeLock = this.takeLock;
381 takeLock.lockInterruptibly();
382 try {
383 for (;;) {
384 if (count.get() > 0) {
385 x = extract();
386 c = count.getAndDecrement();
387 if (c > 1)
388 notEmpty.signal();
389 break;
390 }
391 if (nanos <= 0)
392 return null;
393 try {
394 nanos = notEmpty.awaitNanos(nanos);
395 } catch (InterruptedException ie) {
396 notEmpty.signal(); // propagate to a non-interrupted thread
397 throw ie;
398 }
399 }
400 } finally {
401 takeLock.unlock();
402 }
403 if (c == capacity)
404 signalNotFull();
405 return x;
406 }
407
408 public E poll() {
409 final AtomicInteger count = this.count;
410 if (count.get() == 0)
411 return null;
412 E x = null;
413 int c = -1;
414 final ReentrantLock takeLock = this.takeLock;
415 takeLock.lock();
416 try {
417 if (count.get() > 0) {
418 x = extract();
419 c = count.getAndDecrement();
420 if (c > 1)
421 notEmpty.signal();
422 }
423 } finally {
424 takeLock.unlock();
425 }
426 if (c == capacity)
427 signalNotFull();
428 return x;
429 }
430
431
432 public E peek() {
433 if (count.get() == 0)
434 return null;
435 final ReentrantLock takeLock = this.takeLock;
436 takeLock.lock();
437 try {
438 Node<E> first = head.next;
439 if (first == null)
440 return null;
441 else
442 return first.item;
443 } finally {
444 takeLock.unlock();
445 }
446 }
447
448 public boolean remove(Object o) {
449 if (o == null) return false;
450 boolean removed = false;
451 fullyLock();
452 try {
453 Node<E> trail = head;
454 Node<E> p = head.next;
455 while (p != null) {
456 if (o.equals(p.item)) {
457 removed = true;
458 break;
459 }
460 trail = p;
461 p = p.next;
462 }
463 if (removed) {
464 p.item = null;
465 trail.next = p.next;
466 if (count.getAndDecrement() == capacity)
467 notFull.signalAll();
468 }
469 } finally {
470 fullyUnlock();
471 }
472 return removed;
473 }
474
475 public Object[] toArray() {
476 fullyLock();
477 try {
478 int size = count.get();
479 Object[] a = new Object[size];
480 int k = 0;
481 for (Node<E> p = head.next; p != null; p = p.next)
482 a[k++] = p.item;
483 return a;
484 } finally {
485 fullyUnlock();
486 }
487 }
488
489 public <T> T[] toArray(T[] a) {
490 fullyLock();
491 try {
492 int size = count.get();
493 if (a.length < size)
494 a = (T[])java.lang.reflect.Array.newInstance
495 (a.getClass().getComponentType(), size);
496
497 int k = 0;
498 for (Node p = head.next; p != null; p = p.next)
499 a[k++] = (T)p.item;
500 return a;
501 } finally {
502 fullyUnlock();
503 }
504 }
505
506 public String toString() {
507 fullyLock();
508 try {
509 return super.toString();
510 } finally {
511 fullyUnlock();
512 }
513 }
514
515 public void clear() {
516 fullyLock();
517 try {
518 head.next = null;
519 if (count.getAndSet(0) == capacity)
520 notFull.signalAll();
521 } finally {
522 fullyUnlock();
523 }
524 }
525
526 public int drainTo(Collection<? super E> c) {
527 if (c == null)
528 throw new NullPointerException();
529 if (c == this)
530 throw new IllegalArgumentException();
531 Node first;
532 fullyLock();
533 try {
534 first = head.next;
535 head.next = null;
536 if (count.getAndSet(0) == capacity)
537 notFull.signalAll();
538 } finally {
539 fullyUnlock();
540 }
541 // Transfer the elements outside of locks
542 int n = 0;
543 for (Node<E> p = first; p != null; p = p.next) {
544 c.add(p.item);
545 p.item = null;
546 ++n;
547 }
548 return n;
549 }
550
551 public int drainTo(Collection<? super E> c, int maxElements) {
552 if (c == null)
553 throw new NullPointerException();
554 if (c == this)
555 throw new IllegalArgumentException();
556 if (maxElements <= 0)
557 return 0;
558 fullyLock();
559 try {
560 int n = 0;
561 Node<E> p = head.next;
562 while (p != null && n < maxElements) {
563 c.add(p.item);
564 p.item = null;
565 p = p.next;
566 ++n;
567 }
568 if (n != 0) {
569 head.next = p;
570 if (count.getAndAdd(-n) == capacity)
571 notFull.signalAll();
572 }
573 return n;
574 } finally {
575 fullyUnlock();
576 }
577 }
578
579 /**
580 * Returns an iterator over the elements in this queue in proper sequence.
581 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
582 * will never throw {@link java.util.ConcurrentModificationException},
583 * and guarantees to traverse elements as they existed upon
584 * construction of the iterator, and may (but is not guaranteed to)
585 * reflect any modifications subsequent to construction.
586 *
587 * @return an iterator over the elements in this queue in proper sequence.
588 */
589 public Iterator<E> iterator() {
590 return new Itr();
591 }
592
593 private class Itr implements Iterator<E> {
594 /*
595 * Basic weak-consistent iterator. At all times hold the next
596 * item to hand out so that if hasNext() reports true, we will
597 * still have it to return even if lost race with a take etc.
598 */
599 private Node<E> current;
600 private Node<E> lastRet;
601 private E currentElement;
602
603 Itr() {
604 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
605 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
606 putLock.lock();
607 takeLock.lock();
608 try {
609 current = head.next;
610 if (current != null)
611 currentElement = current.item;
612 } finally {
613 takeLock.unlock();
614 putLock.unlock();
615 }
616 }
617
618 public boolean hasNext() {
619 return current != null;
620 }
621
622 public E next() {
623 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
624 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
625 putLock.lock();
626 takeLock.lock();
627 try {
628 if (current == null)
629 throw new NoSuchElementException();
630 E x = currentElement;
631 lastRet = current;
632 current = current.next;
633 if (current != null)
634 currentElement = current.item;
635 return x;
636 } finally {
637 takeLock.unlock();
638 putLock.unlock();
639 }
640 }
641
642 public void remove() {
643 if (lastRet == null)
644 throw new IllegalStateException();
645 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
646 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
647 putLock.lock();
648 takeLock.lock();
649 try {
650 Node<E> node = lastRet;
651 lastRet = null;
652 Node<E> trail = head;
653 Node<E> p = head.next;
654 while (p != null && p != node) {
655 trail = p;
656 p = p.next;
657 }
658 if (p == node) {
659 p.item = null;
660 trail.next = p.next;
661 int c = count.getAndDecrement();
662 if (c == capacity)
663 notFull.signalAll();
664 }
665 } finally {
666 takeLock.unlock();
667 putLock.unlock();
668 }
669 }
670 }
671
672 /**
673 * Save the state to a stream (that is, serialize it).
674 *
675 * @serialData The capacity is emitted (int), followed by all of
676 * its elements (each an <tt>Object</tt>) in the proper order,
677 * followed by a null
678 * @param s the stream
679 */
680 private void writeObject(java.io.ObjectOutputStream s)
681 throws java.io.IOException {
682
683 fullyLock();
684 try {
685 // Write out any hidden stuff, plus capacity
686 s.defaultWriteObject();
687
688 // Write out all elements in the proper order.
689 for (Node<E> p = head.next; p != null; p = p.next)
690 s.writeObject(p.item);
691
692 // Use trailing null as sentinel
693 s.writeObject(null);
694 } finally {
695 fullyUnlock();
696 }
697 }
698
699 /**
700 * Reconstitute this queue instance from a stream (that is,
701 * deserialize it).
702 * @param s the stream
703 */
704 private void readObject(java.io.ObjectInputStream s)
705 throws java.io.IOException, ClassNotFoundException {
706 // Read in capacity, and any hidden stuff
707 s.defaultReadObject();
708
709 count.set(0);
710 last = head = new Node<E>(null);
711
712 // Read in all elements and place in queue
713 for (;;) {
714 E item = (E)s.readObject();
715 if (item == null)
716 break;
717 add(item);
718 }
719 }
720 }