ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.36
Committed: Wed Jun 2 23:49:07 2004 UTC (20 years ago) by dl
Branch: MAIN
Changes since 1.35: +4 -3 lines
Log Message:
CopyOnWriteArraySet and ConcurrentHashMap no longer implement Cloneable
Improve javadoc wording in other classes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.atomic.*;
9 import java.util.concurrent.locks.*;
10 import java.util.*;
11
12 /**
13 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 * linked nodes.
15 * This queue orders elements FIFO (first-in-first-out).
16 * The <em>head</em> of the queue is that element that has been on the
17 * queue the longest time.
18 * The <em>tail</em> of the queue is that element that has been on the
19 * queue the shortest time. New elements
20 * are inserted at the tail of the queue, and the queue retrieval
21 * operations obtain elements at the head of the queue.
22 * Linked queues typically have higher throughput than array-based queues but
23 * less predictable performance in most concurrent applications.
24 *
25 * <p> The optional capacity bound constructor argument serves as a
26 * way to prevent excessive queue expansion. The capacity, if unspecified,
27 * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 * dynamically created upon each insertion unless this would bring the
29 * queue above capacity.
30 *
31 * <p>This class and its iterator implement all of the
32 * <em>optional</em> methods of the {@link Collection} and {@link
33 * Iterator} interfaces.
34 *
35 * <p>This class is a member of the
36 * <a href="{@docRoot}/../guide/collections/index.html">
37 * Java Collections Framework</a>.
38 *
39 * @since 1.5
40 * @author Doug Lea
41 * @param <E> the type of elements held in this collection
42 *
43 **/
44 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
45 implements BlockingQueue<E>, java.io.Serializable {
46 private static final long serialVersionUID = -6903933977591709194L;
47
48 /*
49 * A variant of the "two lock queue" algorithm. The putLock gates
50 * entry to put (and offer), and has an associated condition for
51 * waiting puts. Similarly for the takeLock. The "count" field
52 * that they both rely on is maintained as an atomic to avoid
53 * needing to get both locks in most cases. Also, to minimize need
54 * for puts to get takeLock and vice-versa, cascading notifies are
55 * used. When a put notices that it has enabled at least one take,
56 * it signals taker. That taker in turn signals others if more
57 * items have been entered since the signal. And symmetrically for
58 * takes signalling puts. Operations such as remove(Object) and
59 * iterators acquire both locks.
60 */
61
62 /**
63 * Linked list node class
64 */
65 static class Node<E> {
66 /** The item, volatile to ensure barrier separating write and read */
67 volatile E item;
68 Node<E> next;
69 Node(E x) { item = x; }
70 }
71
72 /** The capacity bound, or Integer.MAX_VALUE if none */
73 private final int capacity;
74
75 /** Current number of elements */
76 private final AtomicInteger count = new AtomicInteger(0);
77
78 /** Head of linked list */
79 private transient Node<E> head;
80
81 /** Tail of linked list */
82 private transient Node<E> last;
83
84 /** Lock held by take, poll, etc */
85 private final ReentrantLock takeLock = new ReentrantLock();
86
87 /** Wait queue for waiting takes */
88 private final Condition notEmpty = takeLock.newCondition();
89
90 /** Lock held by put, offer, etc */
91 private final ReentrantLock putLock = new ReentrantLock();
92
93 /** Wait queue for waiting puts */
94 private final Condition notFull = putLock.newCondition();
95
96 /**
97 * Signal a waiting take. Called only from put/offer (which do not
98 * otherwise ordinarily lock takeLock.)
99 */
100 private void signalNotEmpty() {
101 final ReentrantLock takeLock = this.takeLock;
102 takeLock.lock();
103 try {
104 notEmpty.signal();
105 } finally {
106 takeLock.unlock();
107 }
108 }
109
110 /**
111 * Signal a waiting put. Called only from take/poll.
112 */
113 private void signalNotFull() {
114 final ReentrantLock putLock = this.putLock;
115 putLock.lock();
116 try {
117 notFull.signal();
118 } finally {
119 putLock.unlock();
120 }
121 }
122
123 /**
124 * Create a node and link it at end of queue
125 * @param x the item
126 */
127 private void insert(E x) {
128 last = last.next = new Node<E>(x);
129 }
130
131 /**
132 * Remove a node from head of queue,
133 * @return the node
134 */
135 private E extract() {
136 Node<E> first = head.next;
137 head = first;
138 E x = first.item;
139 first.item = null;
140 return x;
141 }
142
143 /**
144 * Lock to prevent both puts and takes.
145 */
146 private void fullyLock() {
147 putLock.lock();
148 takeLock.lock();
149 }
150
151 /**
152 * Unlock to allow both puts and takes.
153 */
154 private void fullyUnlock() {
155 takeLock.unlock();
156 putLock.unlock();
157 }
158
159
160 /**
161 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
162 * {@link Integer#MAX_VALUE}.
163 */
164 public LinkedBlockingQueue() {
165 this(Integer.MAX_VALUE);
166 }
167
168 /**
169 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
170 *
171 * @param capacity the capacity of this queue.
172 * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
173 * than zero.
174 */
175 public LinkedBlockingQueue(int capacity) {
176 if (capacity <= 0) throw new IllegalArgumentException();
177 this.capacity = capacity;
178 last = head = new Node<E>(null);
179 }
180
181 /**
182 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
183 * {@link Integer#MAX_VALUE}, initially containing the elements of the
184 * given collection,
185 * added in traversal order of the collection's iterator.
186 * @param c the collection of elements to initially contain
187 * @throws NullPointerException if <tt>c</tt> or any element within it
188 * is <tt>null</tt>
189 */
190 public LinkedBlockingQueue(Collection<? extends E> c) {
191 this(Integer.MAX_VALUE);
192 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
193 add(it.next());
194 }
195
196
197 // this doc comment is overridden to remove the reference to collections
198 // greater in size than Integer.MAX_VALUE
199 /**
200 * Returns the number of elements in this queue.
201 *
202 * @return the number of elements in this queue.
203 */
204 public int size() {
205 return count.get();
206 }
207
208 // this doc comment is a modified copy of the inherited doc comment,
209 // without the reference to unlimited queues.
210 /**
211 * Returns the number of elements that this queue can ideally (in
212 * the absence of memory or resource constraints) accept without
213 * blocking. This is always equal to the initial capacity of this queue
214 * less the current <tt>size</tt> of this queue.
215 * <p>Note that you <em>cannot</em> always tell if
216 * an attempt to <tt>add</tt> an element will succeed by
217 * inspecting <tt>remainingCapacity</tt> because it may be the
218 * case that a waiting consumer is ready to <tt>take</tt> an
219 * element out of an otherwise full queue.
220 */
221 public int remainingCapacity() {
222 return capacity - count.get();
223 }
224
225 /**
226 * Adds the specified element to the tail of this queue, waiting if
227 * necessary for space to become available.
228 * @param o the element to add
229 * @throws InterruptedException if interrupted while waiting.
230 * @throws NullPointerException if the specified element is <tt>null</tt>.
231 */
232 public void put(E o) throws InterruptedException {
233 if (o == null) throw new NullPointerException();
234 // Note: convention in all put/take/etc is to preset
235 // local var holding count negative to indicate failure unless set.
236 int c = -1;
237 final ReentrantLock putLock = this.putLock;
238 final AtomicInteger count = this.count;
239 putLock.lockInterruptibly();
240 try {
241 /*
242 * Note that count is used in wait guard even though it is
243 * not protected by lock. This works because count can
244 * only decrease at this point (all other puts are shut
245 * out by lock), and we (or some other waiting put) are
246 * signalled if it ever changes from
247 * capacity. Similarly for all other uses of count in
248 * other wait guards.
249 */
250 try {
251 while (count.get() == capacity)
252 notFull.await();
253 } catch (InterruptedException ie) {
254 notFull.signal(); // propagate to a non-interrupted thread
255 throw ie;
256 }
257 insert(o);
258 c = count.getAndIncrement();
259 if (c + 1 < capacity)
260 notFull.signal();
261 } finally {
262 putLock.unlock();
263 }
264 if (c == 0)
265 signalNotEmpty();
266 }
267
268 /**
269 * Inserts the specified element at the tail of this queue, waiting if
270 * necessary up to the specified wait time for space to become available.
271 * @param o the element to add
272 * @param timeout how long to wait before giving up, in units of
273 * <tt>unit</tt>
274 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
275 * <tt>timeout</tt> parameter
276 * @return <tt>true</tt> if successful, or <tt>false</tt> if
277 * the specified waiting time elapses before space is available.
278 * @throws InterruptedException if interrupted while waiting.
279 * @throws NullPointerException if the specified element is <tt>null</tt>.
280 */
281 public boolean offer(E o, long timeout, TimeUnit unit)
282 throws InterruptedException {
283
284 if (o == null) throw new NullPointerException();
285 long nanos = unit.toNanos(timeout);
286 int c = -1;
287 final ReentrantLock putLock = this.putLock;
288 final AtomicInteger count = this.count;
289 putLock.lockInterruptibly();
290 try {
291 for (;;) {
292 if (count.get() < capacity) {
293 insert(o);
294 c = count.getAndIncrement();
295 if (c + 1 < capacity)
296 notFull.signal();
297 break;
298 }
299 if (nanos <= 0)
300 return false;
301 try {
302 nanos = notFull.awaitNanos(nanos);
303 } catch (InterruptedException ie) {
304 notFull.signal(); // propagate to a non-interrupted thread
305 throw ie;
306 }
307 }
308 } finally {
309 putLock.unlock();
310 }
311 if (c == 0)
312 signalNotEmpty();
313 return true;
314 }
315
316 /**
317 * Inserts the specified element at the tail of this queue if possible,
318 * returning immediately if this queue is full.
319 *
320 * @param o the element to add.
321 * @return <tt>true</tt> if it was possible to add the element to
322 * this queue, else <tt>false</tt>
323 * @throws NullPointerException if the specified element is <tt>null</tt>
324 */
325 public boolean offer(E o) {
326 if (o == null) throw new NullPointerException();
327 final AtomicInteger count = this.count;
328 if (count.get() == capacity)
329 return false;
330 int c = -1;
331 final ReentrantLock putLock = this.putLock;
332 putLock.lock();
333 try {
334 if (count.get() < capacity) {
335 insert(o);
336 c = count.getAndIncrement();
337 if (c + 1 < capacity)
338 notFull.signal();
339 }
340 } finally {
341 putLock.unlock();
342 }
343 if (c == 0)
344 signalNotEmpty();
345 return c >= 0;
346 }
347
348
349 public E take() throws InterruptedException {
350 E x;
351 int c = -1;
352 final AtomicInteger count = this.count;
353 final ReentrantLock takeLock = this.takeLock;
354 takeLock.lockInterruptibly();
355 try {
356 try {
357 while (count.get() == 0)
358 notEmpty.await();
359 } catch (InterruptedException ie) {
360 notEmpty.signal(); // propagate to a non-interrupted thread
361 throw ie;
362 }
363
364 x = extract();
365 c = count.getAndDecrement();
366 if (c > 1)
367 notEmpty.signal();
368 } finally {
369 takeLock.unlock();
370 }
371 if (c == capacity)
372 signalNotFull();
373 return x;
374 }
375
376 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
377 E x = null;
378 int c = -1;
379 long nanos = unit.toNanos(timeout);
380 final AtomicInteger count = this.count;
381 final ReentrantLock takeLock = this.takeLock;
382 takeLock.lockInterruptibly();
383 try {
384 for (;;) {
385 if (count.get() > 0) {
386 x = extract();
387 c = count.getAndDecrement();
388 if (c > 1)
389 notEmpty.signal();
390 break;
391 }
392 if (nanos <= 0)
393 return null;
394 try {
395 nanos = notEmpty.awaitNanos(nanos);
396 } catch (InterruptedException ie) {
397 notEmpty.signal(); // propagate to a non-interrupted thread
398 throw ie;
399 }
400 }
401 } finally {
402 takeLock.unlock();
403 }
404 if (c == capacity)
405 signalNotFull();
406 return x;
407 }
408
409 public E poll() {
410 final AtomicInteger count = this.count;
411 if (count.get() == 0)
412 return null;
413 E x = null;
414 int c = -1;
415 final ReentrantLock takeLock = this.takeLock;
416 takeLock.lock();
417 try {
418 if (count.get() > 0) {
419 x = extract();
420 c = count.getAndDecrement();
421 if (c > 1)
422 notEmpty.signal();
423 }
424 } finally {
425 takeLock.unlock();
426 }
427 if (c == capacity)
428 signalNotFull();
429 return x;
430 }
431
432
433 public E peek() {
434 if (count.get() == 0)
435 return null;
436 final ReentrantLock takeLock = this.takeLock;
437 takeLock.lock();
438 try {
439 Node<E> first = head.next;
440 if (first == null)
441 return null;
442 else
443 return first.item;
444 } finally {
445 takeLock.unlock();
446 }
447 }
448
449 /**
450 * Removes a single instance of the specified element from this
451 * queue, if it is present.
452 */
453 public boolean remove(Object o) {
454 if (o == null) return false;
455 boolean removed = false;
456 fullyLock();
457 try {
458 Node<E> trail = head;
459 Node<E> p = head.next;
460 while (p != null) {
461 if (o.equals(p.item)) {
462 removed = true;
463 break;
464 }
465 trail = p;
466 p = p.next;
467 }
468 if (removed) {
469 p.item = null;
470 trail.next = p.next;
471 if (count.getAndDecrement() == capacity)
472 notFull.signalAll();
473 }
474 } finally {
475 fullyUnlock();
476 }
477 return removed;
478 }
479
480 public Object[] toArray() {
481 fullyLock();
482 try {
483 int size = count.get();
484 Object[] a = new Object[size];
485 int k = 0;
486 for (Node<E> p = head.next; p != null; p = p.next)
487 a[k++] = p.item;
488 return a;
489 } finally {
490 fullyUnlock();
491 }
492 }
493
494 public <T> T[] toArray(T[] a) {
495 fullyLock();
496 try {
497 int size = count.get();
498 if (a.length < size)
499 a = (T[])java.lang.reflect.Array.newInstance
500 (a.getClass().getComponentType(), size);
501
502 int k = 0;
503 for (Node p = head.next; p != null; p = p.next)
504 a[k++] = (T)p.item;
505 return a;
506 } finally {
507 fullyUnlock();
508 }
509 }
510
511 public String toString() {
512 fullyLock();
513 try {
514 return super.toString();
515 } finally {
516 fullyUnlock();
517 }
518 }
519
520 /**
521 * Atomically removes all of the elements from this queue.
522 * The queue will be empty after this call returns.
523 */
524 public void clear() {
525 fullyLock();
526 try {
527 head.next = null;
528 if (count.getAndSet(0) == capacity)
529 notFull.signalAll();
530 } finally {
531 fullyUnlock();
532 }
533 }
534
535 public int drainTo(Collection<? super E> c) {
536 if (c == null)
537 throw new NullPointerException();
538 if (c == this)
539 throw new IllegalArgumentException();
540 Node first;
541 fullyLock();
542 try {
543 first = head.next;
544 head.next = null;
545 if (count.getAndSet(0) == capacity)
546 notFull.signalAll();
547 } finally {
548 fullyUnlock();
549 }
550 // Transfer the elements outside of locks
551 int n = 0;
552 for (Node<E> p = first; p != null; p = p.next) {
553 c.add(p.item);
554 p.item = null;
555 ++n;
556 }
557 return n;
558 }
559
560 public int drainTo(Collection<? super E> c, int maxElements) {
561 if (c == null)
562 throw new NullPointerException();
563 if (c == this)
564 throw new IllegalArgumentException();
565 if (maxElements <= 0)
566 return 0;
567 fullyLock();
568 try {
569 int n = 0;
570 Node<E> p = head.next;
571 while (p != null && n < maxElements) {
572 c.add(p.item);
573 p.item = null;
574 p = p.next;
575 ++n;
576 }
577 if (n != 0) {
578 head.next = p;
579 if (count.getAndAdd(-n) == capacity)
580 notFull.signalAll();
581 }
582 return n;
583 } finally {
584 fullyUnlock();
585 }
586 }
587
588 /**
589 * Returns an iterator over the elements in this queue in proper sequence.
590 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
591 * will never throw {@link java.util.ConcurrentModificationException},
592 * and guarantees to traverse elements as they existed upon
593 * construction of the iterator, and may (but is not guaranteed to)
594 * reflect any modifications subsequent to construction.
595 *
596 * @return an iterator over the elements in this queue in proper sequence.
597 */
598 public Iterator<E> iterator() {
599 return new Itr();
600 }
601
602 private class Itr implements Iterator<E> {
603 /*
604 * Basic weak-consistent iterator. At all times hold the next
605 * item to hand out so that if hasNext() reports true, we will
606 * still have it to return even if lost race with a take etc.
607 */
608 private Node<E> current;
609 private Node<E> lastRet;
610 private E currentElement;
611
612 Itr() {
613 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
614 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
615 putLock.lock();
616 takeLock.lock();
617 try {
618 current = head.next;
619 if (current != null)
620 currentElement = current.item;
621 } finally {
622 takeLock.unlock();
623 putLock.unlock();
624 }
625 }
626
627 public boolean hasNext() {
628 return current != null;
629 }
630
631 public E next() {
632 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
633 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
634 putLock.lock();
635 takeLock.lock();
636 try {
637 if (current == null)
638 throw new NoSuchElementException();
639 E x = currentElement;
640 lastRet = current;
641 current = current.next;
642 if (current != null)
643 currentElement = current.item;
644 return x;
645 } finally {
646 takeLock.unlock();
647 putLock.unlock();
648 }
649 }
650
651 public void remove() {
652 if (lastRet == null)
653 throw new IllegalStateException();
654 final ReentrantLock putLock = LinkedBlockingQueue.this.putLock;
655 final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock;
656 putLock.lock();
657 takeLock.lock();
658 try {
659 Node<E> node = lastRet;
660 lastRet = null;
661 Node<E> trail = head;
662 Node<E> p = head.next;
663 while (p != null && p != node) {
664 trail = p;
665 p = p.next;
666 }
667 if (p == node) {
668 p.item = null;
669 trail.next = p.next;
670 int c = count.getAndDecrement();
671 if (c == capacity)
672 notFull.signalAll();
673 }
674 } finally {
675 takeLock.unlock();
676 putLock.unlock();
677 }
678 }
679 }
680
681 /**
682 * Save the state to a stream (that is, serialize it).
683 *
684 * @serialData The capacity is emitted (int), followed by all of
685 * its elements (each an <tt>Object</tt>) in the proper order,
686 * followed by a null
687 * @param s the stream
688 */
689 private void writeObject(java.io.ObjectOutputStream s)
690 throws java.io.IOException {
691
692 fullyLock();
693 try {
694 // Write out any hidden stuff, plus capacity
695 s.defaultWriteObject();
696
697 // Write out all elements in the proper order.
698 for (Node<E> p = head.next; p != null; p = p.next)
699 s.writeObject(p.item);
700
701 // Use trailing null as sentinel
702 s.writeObject(null);
703 } finally {
704 fullyUnlock();
705 }
706 }
707
708 /**
709 * Reconstitute this queue instance from a stream (that is,
710 * deserialize it).
711 * @param s the stream
712 */
713 private void readObject(java.io.ObjectInputStream s)
714 throws java.io.IOException, ClassNotFoundException {
715 // Read in capacity, and any hidden stuff
716 s.defaultReadObject();
717
718 count.set(0);
719 last = head = new Node<E>(null);
720
721 // Read in all elements and place in queue
722 for (;;) {
723 E item = (E)s.readObject();
724 if (item == null)
725 break;
726 add(item);
727 }
728 }
729 }