ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.24
Committed: Sun Oct 5 23:00:18 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.23: +66 -0 lines
Log Message:
added drainTo; clarified various exception specs

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19 dl 1.20 * queue the shortest time. New elements
20     * are inserted at the tail of the queue, and the queue retrieval
21     * operations obtain elements at the head of the queue.
22 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
23     * less predictable performance in most concurrent applications.
24 tim 1.12 *
25 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
26 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
27     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 dl 1.3 * dynamically created upon each insertion unless this would bring the
29     * queue above capacity.
30 dholmes 1.8 *
31 dl 1.21 * <p>This class implements all of the <em>optional</em> methods
32     * of the {@link Collection} and {@link Iterator} interfaces.
33     *
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36 tim 1.12 *
37 tim 1.1 **/
38 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
39 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
40 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
41 tim 1.1
42 dl 1.2 /*
43     * A variant of the "two lock queue" algorithm. The putLock gates
44     * entry to put (and offer), and has an associated condition for
45     * waiting puts. Similarly for the takeLock. The "count" field
46     * that they both rely on is maintained as an atomic to avoid
47     * needing to get both locks in most cases. Also, to minimize need
48     * for puts to get takeLock and vice-versa, cascading notifies are
49     * used. When a put notices that it has enabled at least one take,
50     * it signals taker. That taker in turn signals others if more
51     * items have been entered since the signal. And symmetrically for
52 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
53 dl 1.2 * iterators acquire both locks.
54     */
55    
56 dl 1.6 /**
57     * Linked list node class
58     */
59 dl 1.2 static class Node<E> {
60 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
61 dl 1.2 volatile E item;
62     Node<E> next;
63     Node(E x) { item = x; }
64     }
65    
66 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
67 dl 1.2 private final int capacity;
68 dl 1.6
69     /** Current number of elements */
70 dl 1.19 private final AtomicInteger count = new AtomicInteger(0);
71 dl 1.2
72 dl 1.6 /** Head of linked list */
73     private transient Node<E> head;
74    
75 dholmes 1.8 /** Tail of linked list */
76 dl 1.6 private transient Node<E> last;
77 dl 1.2
78 dl 1.6 /** Lock held by take, poll, etc */
79 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
80 dl 1.6
81     /** Wait queue for waiting takes */
82 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
83 dl 1.2
84 dl 1.6 /** Lock held by put, offer, etc */
85 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
86 dl 1.6
87     /** Wait queue for waiting puts */
88 dl 1.5 private final Condition notFull = putLock.newCondition();
89 dl 1.2
90     /**
91     * Signal a waiting take. Called only from put/offer (which do not
92 dl 1.4 * otherwise ordinarily lock takeLock.)
93 dl 1.2 */
94     private void signalNotEmpty() {
95     takeLock.lock();
96     try {
97     notEmpty.signal();
98 tim 1.17 } finally {
99 dl 1.2 takeLock.unlock();
100     }
101     }
102    
103     /**
104     * Signal a waiting put. Called only from take/poll.
105     */
106     private void signalNotFull() {
107     putLock.lock();
108     try {
109     notFull.signal();
110 tim 1.17 } finally {
111 dl 1.2 putLock.unlock();
112     }
113     }
114    
115     /**
116 dholmes 1.8 * Create a node and link it at end of queue
117 dl 1.6 * @param x the item
118 dl 1.2 */
119     private void insert(E x) {
120     last = last.next = new Node<E>(x);
121     }
122    
123     /**
124     * Remove a node from head of queue,
125 dl 1.6 * @return the node
126 dl 1.2 */
127     private E extract() {
128     Node<E> first = head.next;
129     head = first;
130     E x = (E)first.item;
131     first.item = null;
132     return x;
133     }
134    
135     /**
136 tim 1.12 * Lock to prevent both puts and takes.
137 dl 1.2 */
138     private void fullyLock() {
139     putLock.lock();
140     takeLock.lock();
141 tim 1.1 }
142 dl 1.2
143     /**
144 tim 1.12 * Unlock to allow both puts and takes.
145 dl 1.2 */
146     private void fullyUnlock() {
147     takeLock.unlock();
148     putLock.unlock();
149     }
150    
151    
152     /**
153 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
154 dholmes 1.8 * {@link Integer#MAX_VALUE}.
155 dl 1.2 */
156     public LinkedBlockingQueue() {
157     this(Integer.MAX_VALUE);
158     }
159    
160     /**
161 tim 1.16 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
162     *
163 dholmes 1.8 * @param capacity the capacity of this queue.
164     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
165 tim 1.16 * than zero.
166 dl 1.2 */
167     public LinkedBlockingQueue(int capacity) {
168 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
169 dl 1.2 this.capacity = capacity;
170 dl 1.6 last = head = new Node<E>(null);
171 dl 1.2 }
172    
173     /**
174 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
175 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
176 tim 1.12 * given collection,
177 dholmes 1.8 * added in traversal order of the collection's iterator.
178 dholmes 1.9 * @param c the collection of elements to initially contain
179     * @throws NullPointerException if <tt>c</tt> or any element within it
180     * is <tt>null</tt>
181 dl 1.2 */
182 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
183 dl 1.2 this(Integer.MAX_VALUE);
184 tim 1.12 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
185     add(it.next());
186 dl 1.2 }
187    
188 dholmes 1.9
189 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
190     // greater in size than Integer.MAX_VALUE
191 tim 1.12 /**
192 dl 1.20 * Returns the number of elements in this queue.
193     *
194     * @return the number of elements in this queue.
195 dholmes 1.8 */
196 dl 1.2 public int size() {
197     return count.get();
198 tim 1.1 }
199 dl 1.2
200 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
201     // without the reference to unlimited queues.
202 tim 1.12 /**
203 dholmes 1.13 * Returns the number of elements that this queue can ideally (in
204 dholmes 1.8 * the absence of memory or resource constraints) accept without
205     * blocking. This is always equal to the initial capacity of this queue
206     * less the current <tt>size</tt> of this queue.
207     * <p>Note that you <em>cannot</em> always tell if
208     * an attempt to <tt>add</tt> an element will succeed by
209     * inspecting <tt>remainingCapacity</tt> because it may be the
210     * case that a waiting consumer is ready to <tt>take</tt> an
211     * element out of an otherwise full queue.
212     */
213 dl 1.2 public int remainingCapacity() {
214     return capacity - count.get();
215     }
216    
217 dholmes 1.22 /**
218     * Adds the specified element to the tail of this queue, waiting if
219     * necessary for space to become available.
220 dl 1.23 * @param o the element to add
221     * @throws InterruptedException if interrupted while waiting.
222 dholmes 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
223     */
224 dholmes 1.14 public void put(E o) throws InterruptedException {
225     if (o == null) throw new NullPointerException();
226 dl 1.2 // Note: convention in all put/take/etc is to preset
227     // local var holding count negative to indicate failure unless set.
228 tim 1.12 int c = -1;
229 dl 1.2 putLock.lockInterruptibly();
230     try {
231     /*
232     * Note that count is used in wait guard even though it is
233     * not protected by lock. This works because count can
234     * only decrease at this point (all other puts are shut
235     * out by lock), and we (or some other waiting put) are
236     * signalled if it ever changes from
237     * capacity. Similarly for all other uses of count in
238     * other wait guards.
239     */
240     try {
241 tim 1.12 while (count.get() == capacity)
242 dl 1.2 notFull.await();
243 tim 1.17 } catch (InterruptedException ie) {
244 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
245     throw ie;
246     }
247 dholmes 1.14 insert(o);
248 dl 1.2 c = count.getAndIncrement();
249 dl 1.6 if (c + 1 < capacity)
250 dl 1.2 notFull.signal();
251 tim 1.17 } finally {
252 dl 1.2 putLock.unlock();
253     }
254 tim 1.12 if (c == 0)
255 dl 1.2 signalNotEmpty();
256 tim 1.1 }
257 dl 1.2
258 dholmes 1.22 /**
259     * Inserts the specified element at the tail of this queue, waiting if
260     * necessary up to the specified wait time for space to become available.
261 dl 1.23 * @param o the element to add
262     * @param timeout how long to wait before giving up, in units of
263     * <tt>unit</tt>
264     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
265     * <tt>timeout</tt> parameter
266     * @return <tt>true</tt> if successful, or <tt>false</tt> if
267     * the specified waiting time elapses before space is available.
268     * @throws InterruptedException if interrupted while waiting.
269 dholmes 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
270     */
271 dholmes 1.14 public boolean offer(E o, long timeout, TimeUnit unit)
272 dholmes 1.8 throws InterruptedException {
273 tim 1.12
274 dholmes 1.14 if (o == null) throw new NullPointerException();
275 dl 1.2 long nanos = unit.toNanos(timeout);
276     int c = -1;
277 dholmes 1.8 putLock.lockInterruptibly();
278 dl 1.2 try {
279     for (;;) {
280     if (count.get() < capacity) {
281 dholmes 1.14 insert(o);
282 dl 1.2 c = count.getAndIncrement();
283 dl 1.6 if (c + 1 < capacity)
284 dl 1.2 notFull.signal();
285     break;
286     }
287     if (nanos <= 0)
288     return false;
289     try {
290     nanos = notFull.awaitNanos(nanos);
291 tim 1.17 } catch (InterruptedException ie) {
292 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
293     throw ie;
294     }
295     }
296 tim 1.17 } finally {
297 dl 1.2 putLock.unlock();
298     }
299 tim 1.12 if (c == 0)
300 dl 1.2 signalNotEmpty();
301     return true;
302 tim 1.1 }
303 dl 1.2
304 dl 1.23 /**
305     * Inserts the specified element at the tail of this queue if possible,
306     * returning immediately if this queue is full.
307     *
308     * @param o the element to add.
309     * @return <tt>true</tt> if it was possible to add the element to
310     * this queue, else <tt>false</tt>
311     * @throws NullPointerException if the specified element is <tt>null</tt>
312     */
313 dholmes 1.14 public boolean offer(E o) {
314     if (o == null) throw new NullPointerException();
315 dl 1.2 if (count.get() == capacity)
316     return false;
317 tim 1.12 int c = -1;
318 dholmes 1.8 putLock.lock();
319 dl 1.2 try {
320     if (count.get() < capacity) {
321 dholmes 1.14 insert(o);
322 dl 1.2 c = count.getAndIncrement();
323 dl 1.6 if (c + 1 < capacity)
324 dl 1.2 notFull.signal();
325     }
326 tim 1.17 } finally {
327 dl 1.2 putLock.unlock();
328     }
329 tim 1.12 if (c == 0)
330 dl 1.2 signalNotEmpty();
331     return c >= 0;
332 tim 1.1 }
333 dl 1.2
334    
335     public E take() throws InterruptedException {
336     E x;
337     int c = -1;
338     takeLock.lockInterruptibly();
339     try {
340     try {
341 tim 1.12 while (count.get() == 0)
342 dl 1.2 notEmpty.await();
343 tim 1.17 } catch (InterruptedException ie) {
344 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
345     throw ie;
346     }
347    
348     x = extract();
349     c = count.getAndDecrement();
350     if (c > 1)
351     notEmpty.signal();
352 tim 1.17 } finally {
353 dl 1.2 takeLock.unlock();
354     }
355 tim 1.12 if (c == capacity)
356 dl 1.2 signalNotFull();
357     return x;
358     }
359    
360     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
361     E x = null;
362     int c = -1;
363 dholmes 1.8 long nanos = unit.toNanos(timeout);
364 dl 1.2 takeLock.lockInterruptibly();
365     try {
366     for (;;) {
367     if (count.get() > 0) {
368     x = extract();
369     c = count.getAndDecrement();
370     if (c > 1)
371     notEmpty.signal();
372     break;
373     }
374     if (nanos <= 0)
375     return null;
376     try {
377     nanos = notEmpty.awaitNanos(nanos);
378 tim 1.17 } catch (InterruptedException ie) {
379 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
380     throw ie;
381     }
382     }
383 tim 1.17 } finally {
384 dl 1.2 takeLock.unlock();
385     }
386 tim 1.12 if (c == capacity)
387 dl 1.2 signalNotFull();
388     return x;
389     }
390    
391     public E poll() {
392     if (count.get() == 0)
393     return null;
394     E x = null;
395 tim 1.12 int c = -1;
396 dl 1.2 takeLock.tryLock();
397     try {
398     if (count.get() > 0) {
399     x = extract();
400     c = count.getAndDecrement();
401     if (c > 1)
402     notEmpty.signal();
403     }
404 tim 1.17 } finally {
405 dl 1.2 takeLock.unlock();
406     }
407 tim 1.12 if (c == capacity)
408 dl 1.2 signalNotFull();
409     return x;
410 tim 1.1 }
411 dl 1.2
412    
413     public E peek() {
414     if (count.get() == 0)
415     return null;
416 dholmes 1.8 takeLock.lock();
417 dl 1.2 try {
418     Node<E> first = head.next;
419     if (first == null)
420     return null;
421     else
422     return first.item;
423 tim 1.17 } finally {
424 dl 1.2 takeLock.unlock();
425     }
426 tim 1.1 }
427    
428 dholmes 1.9 public boolean remove(Object o) {
429     if (o == null) return false;
430 dl 1.2 boolean removed = false;
431     fullyLock();
432     try {
433     Node<E> trail = head;
434     Node<E> p = head.next;
435     while (p != null) {
436 dholmes 1.9 if (o.equals(p.item)) {
437 dl 1.2 removed = true;
438     break;
439     }
440     trail = p;
441     p = p.next;
442     }
443     if (removed) {
444     p.item = null;
445     trail.next = p.next;
446     if (count.getAndDecrement() == capacity)
447     notFull.signalAll();
448     }
449 tim 1.17 } finally {
450 dl 1.2 fullyUnlock();
451     }
452     return removed;
453 tim 1.1 }
454 dl 1.2
455     public Object[] toArray() {
456     fullyLock();
457     try {
458     int size = count.get();
459 tim 1.12 Object[] a = new Object[size];
460 dl 1.2 int k = 0;
461 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
462 dl 1.2 a[k++] = p.item;
463     return a;
464 tim 1.17 } finally {
465 dl 1.2 fullyUnlock();
466     }
467 tim 1.1 }
468 dl 1.2
469     public <T> T[] toArray(T[] a) {
470     fullyLock();
471     try {
472     int size = count.get();
473     if (a.length < size)
474 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
475     (a.getClass().getComponentType(), size);
476 tim 1.12
477 dl 1.2 int k = 0;
478 tim 1.12 for (Node p = head.next; p != null; p = p.next)
479 dl 1.2 a[k++] = (T)p.item;
480     return a;
481 tim 1.17 } finally {
482 dl 1.2 fullyUnlock();
483     }
484 tim 1.1 }
485 dl 1.2
486     public String toString() {
487     fullyLock();
488     try {
489     return super.toString();
490 tim 1.17 } finally {
491 dl 1.2 fullyUnlock();
492     }
493 tim 1.1 }
494 dl 1.2
495 dl 1.24 public void clear() {
496     fullyLock();
497     try {
498     head.next = null;
499     if (count.getAndSet(0) == capacity)
500     notFull.signalAll();
501     } finally {
502     fullyUnlock();
503     }
504     }
505    
506     public int drainTo(Collection<? super E> c) {
507     if (c == null)
508     throw new NullPointerException();
509     if (c == this)
510     throw new IllegalArgumentException();
511     Node first;
512     fullyLock();
513     try {
514     first = head.next;
515     head.next = null;
516     if (count.getAndSet(0) == capacity)
517     notFull.signalAll();
518     } finally {
519     fullyUnlock();
520     }
521     // Transfer the elements outside of locks
522     int n = 0;
523     for (Node p = first; p != null; p = p.next) {
524     c.add((E)p.item);
525     p.item = null;
526     ++n;
527     }
528     return n;
529     }
530    
531     public int drainTo(Collection<? super E> c, int maxElements) {
532     if (c == null)
533     throw new NullPointerException();
534     if (c == this)
535     throw new IllegalArgumentException();
536     if (maxElements <= 0)
537     return 0;
538     fullyLock();
539     try {
540     int n = 0;
541     Node p = head.next;
542     while (p != null && n < maxElements) {
543     c.add((E)p.item);
544     p.item = null;
545     p = p.next;
546     ++n;
547     }
548     if (n != 0) {
549     head.next = p;
550     if (count.getAndAdd(-n) == capacity)
551     notFull.signalAll();
552     }
553     return n;
554     } finally {
555     fullyUnlock();
556     }
557     }
558    
559    
560    
561 dholmes 1.14 /**
562     * Returns an iterator over the elements in this queue in proper sequence.
563 dl 1.15 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
564     * will never throw {@link java.util.ConcurrentModificationException},
565     * and guarantees to traverse elements as they existed upon
566     * construction of the iterator, and may (but is not guaranteed to)
567     * reflect any modifications subsequent to construction.
568 dholmes 1.14 *
569     * @return an iterator over the elements in this queue in proper sequence.
570     */
571 dl 1.2 public Iterator<E> iterator() {
572     return new Itr();
573 tim 1.1 }
574 dl 1.2
575     private class Itr implements Iterator<E> {
576 tim 1.12 /*
577 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
578     * item to hand out so that if hasNext() reports true, we will
579     * still have it to return even if lost race with a take etc.
580     */
581 dl 1.2 Node<E> current;
582     Node<E> lastRet;
583 dl 1.4 E currentElement;
584 tim 1.12
585 dl 1.2 Itr() {
586     fullyLock();
587     try {
588     current = head.next;
589 dl 1.4 if (current != null)
590     currentElement = current.item;
591 tim 1.17 } finally {
592 dl 1.2 fullyUnlock();
593     }
594     }
595 tim 1.12
596     public boolean hasNext() {
597 dl 1.2 return current != null;
598     }
599    
600 tim 1.12 public E next() {
601 dl 1.2 fullyLock();
602     try {
603     if (current == null)
604     throw new NoSuchElementException();
605 dl 1.4 E x = currentElement;
606 dl 1.2 lastRet = current;
607     current = current.next;
608 dl 1.4 if (current != null)
609     currentElement = current.item;
610 dl 1.2 return x;
611 tim 1.17 } finally {
612 dl 1.2 fullyUnlock();
613     }
614 tim 1.12
615 dl 1.2 }
616    
617 tim 1.12 public void remove() {
618 dl 1.2 if (lastRet == null)
619 tim 1.12 throw new IllegalStateException();
620 dl 1.2 fullyLock();
621     try {
622     Node<E> node = lastRet;
623     lastRet = null;
624     Node<E> trail = head;
625     Node<E> p = head.next;
626     while (p != null && p != node) {
627     trail = p;
628     p = p.next;
629     }
630     if (p == node) {
631     p.item = null;
632     trail.next = p.next;
633     int c = count.getAndDecrement();
634     if (c == capacity)
635     notFull.signalAll();
636     }
637 tim 1.17 } finally {
638 dl 1.2 fullyUnlock();
639     }
640     }
641 tim 1.1 }
642 dl 1.2
643     /**
644     * Save the state to a stream (that is, serialize it).
645     *
646     * @serialData The capacity is emitted (int), followed by all of
647     * its elements (each an <tt>Object</tt>) in the proper order,
648     * followed by a null
649 dl 1.6 * @param s the stream
650 dl 1.2 */
651     private void writeObject(java.io.ObjectOutputStream s)
652     throws java.io.IOException {
653    
654 tim 1.12 fullyLock();
655 dl 1.2 try {
656     // Write out any hidden stuff, plus capacity
657     s.defaultWriteObject();
658    
659     // Write out all elements in the proper order.
660 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
661 dl 1.2 s.writeObject(p.item);
662    
663     // Use trailing null as sentinel
664     s.writeObject(null);
665 tim 1.17 } finally {
666 dl 1.2 fullyUnlock();
667     }
668 tim 1.1 }
669    
670 dl 1.2 /**
671 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
672 dl 1.2 * deserialize it).
673 dl 1.6 * @param s the stream
674 dl 1.2 */
675     private void readObject(java.io.ObjectInputStream s)
676     throws java.io.IOException, ClassNotFoundException {
677 tim 1.12 // Read in capacity, and any hidden stuff
678     s.defaultReadObject();
679 dl 1.2
680 dl 1.19 count.set(0);
681     last = head = new Node<E>(null);
682    
683 dl 1.6 // Read in all elements and place in queue
684 dl 1.2 for (;;) {
685     E item = (E)s.readObject();
686     if (item == null)
687     break;
688     add(item);
689     }
690 tim 1.1 }
691     }