ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.27
Committed: Sun Oct 19 13:38:34 2003 UTC (20 years, 7 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_NOV3_FREEZE
Changes since 1.26: +1 -1 lines
Log Message:
Changed doc strings for generic params

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19 dl 1.20 * queue the shortest time. New elements
20     * are inserted at the tail of the queue, and the queue retrieval
21     * operations obtain elements at the head of the queue.
22 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
23     * less predictable performance in most concurrent applications.
24 tim 1.12 *
25 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
26 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
27     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 dl 1.3 * dynamically created upon each insertion unless this would bring the
29     * queue above capacity.
30 dholmes 1.8 *
31 dl 1.21 * <p>This class implements all of the <em>optional</em> methods
32     * of the {@link Collection} and {@link Iterator} interfaces.
33     *
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36 dl 1.27 * @param <E> the type of elements held in this collection
37 tim 1.12 *
38 tim 1.1 **/
39 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
40 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
41 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
42 tim 1.1
43 dl 1.2 /*
44     * A variant of the "two lock queue" algorithm. The putLock gates
45     * entry to put (and offer), and has an associated condition for
46     * waiting puts. Similarly for the takeLock. The "count" field
47     * that they both rely on is maintained as an atomic to avoid
48     * needing to get both locks in most cases. Also, to minimize need
49     * for puts to get takeLock and vice-versa, cascading notifies are
50     * used. When a put notices that it has enabled at least one take,
51     * it signals taker. That taker in turn signals others if more
52     * items have been entered since the signal. And symmetrically for
53 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
54 dl 1.2 * iterators acquire both locks.
55     */
56    
57 dl 1.6 /**
58     * Linked list node class
59     */
60 dl 1.2 static class Node<E> {
61 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
62 dl 1.2 volatile E item;
63     Node<E> next;
64     Node(E x) { item = x; }
65     }
66    
67 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
68 dl 1.2 private final int capacity;
69 dl 1.6
70     /** Current number of elements */
71 dl 1.19 private final AtomicInteger count = new AtomicInteger(0);
72 dl 1.2
73 dl 1.6 /** Head of linked list */
74     private transient Node<E> head;
75    
76 dholmes 1.8 /** Tail of linked list */
77 dl 1.6 private transient Node<E> last;
78 dl 1.2
79 dl 1.6 /** Lock held by take, poll, etc */
80 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
81 dl 1.6
82     /** Wait queue for waiting takes */
83 dl 1.25 private final ReentrantLock.ConditionObject notEmpty = takeLock.newCondition();
84 dl 1.2
85 dl 1.6 /** Lock held by put, offer, etc */
86 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
87 dl 1.6
88     /** Wait queue for waiting puts */
89 dl 1.25 private final ReentrantLock.ConditionObject notFull = putLock.newCondition();
90 dl 1.2
91     /**
92     * Signal a waiting take. Called only from put/offer (which do not
93 dl 1.4 * otherwise ordinarily lock takeLock.)
94 dl 1.2 */
95     private void signalNotEmpty() {
96     takeLock.lock();
97     try {
98     notEmpty.signal();
99 tim 1.17 } finally {
100 dl 1.2 takeLock.unlock();
101     }
102     }
103    
104     /**
105     * Signal a waiting put. Called only from take/poll.
106     */
107     private void signalNotFull() {
108     putLock.lock();
109     try {
110     notFull.signal();
111 tim 1.17 } finally {
112 dl 1.2 putLock.unlock();
113     }
114     }
115    
116     /**
117 dholmes 1.8 * Create a node and link it at end of queue
118 dl 1.6 * @param x the item
119 dl 1.2 */
120     private void insert(E x) {
121     last = last.next = new Node<E>(x);
122     }
123    
124     /**
125     * Remove a node from head of queue,
126 dl 1.6 * @return the node
127 dl 1.2 */
128     private E extract() {
129     Node<E> first = head.next;
130     head = first;
131     E x = (E)first.item;
132     first.item = null;
133     return x;
134     }
135    
136     /**
137 tim 1.12 * Lock to prevent both puts and takes.
138 dl 1.2 */
139     private void fullyLock() {
140     putLock.lock();
141     takeLock.lock();
142 tim 1.1 }
143 dl 1.2
144     /**
145 tim 1.12 * Unlock to allow both puts and takes.
146 dl 1.2 */
147     private void fullyUnlock() {
148     takeLock.unlock();
149     putLock.unlock();
150     }
151    
152    
153     /**
154 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
155 dholmes 1.8 * {@link Integer#MAX_VALUE}.
156 dl 1.2 */
157     public LinkedBlockingQueue() {
158     this(Integer.MAX_VALUE);
159     }
160    
161     /**
162 tim 1.16 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
163     *
164 dholmes 1.8 * @param capacity the capacity of this queue.
165     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
166 tim 1.16 * than zero.
167 dl 1.2 */
168     public LinkedBlockingQueue(int capacity) {
169 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
170 dl 1.2 this.capacity = capacity;
171 dl 1.6 last = head = new Node<E>(null);
172 dl 1.2 }
173    
174     /**
175 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
176 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
177 tim 1.12 * given collection,
178 dholmes 1.8 * added in traversal order of the collection's iterator.
179 dholmes 1.9 * @param c the collection of elements to initially contain
180     * @throws NullPointerException if <tt>c</tt> or any element within it
181     * is <tt>null</tt>
182 dl 1.2 */
183 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
184 dl 1.2 this(Integer.MAX_VALUE);
185 tim 1.12 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
186     add(it.next());
187 dl 1.2 }
188    
189 dholmes 1.9
190 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
191     // greater in size than Integer.MAX_VALUE
192 tim 1.12 /**
193 dl 1.20 * Returns the number of elements in this queue.
194     *
195     * @return the number of elements in this queue.
196 dholmes 1.8 */
197 dl 1.2 public int size() {
198     return count.get();
199 tim 1.1 }
200 dl 1.2
201 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
202     // without the reference to unlimited queues.
203 tim 1.12 /**
204 dholmes 1.13 * Returns the number of elements that this queue can ideally (in
205 dholmes 1.8 * the absence of memory or resource constraints) accept without
206     * blocking. This is always equal to the initial capacity of this queue
207     * less the current <tt>size</tt> of this queue.
208     * <p>Note that you <em>cannot</em> always tell if
209     * an attempt to <tt>add</tt> an element will succeed by
210     * inspecting <tt>remainingCapacity</tt> because it may be the
211     * case that a waiting consumer is ready to <tt>take</tt> an
212     * element out of an otherwise full queue.
213     */
214 dl 1.2 public int remainingCapacity() {
215     return capacity - count.get();
216     }
217    
218 dholmes 1.22 /**
219     * Adds the specified element to the tail of this queue, waiting if
220     * necessary for space to become available.
221 dl 1.23 * @param o the element to add
222     * @throws InterruptedException if interrupted while waiting.
223 dholmes 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
224     */
225 dholmes 1.14 public void put(E o) throws InterruptedException {
226     if (o == null) throw new NullPointerException();
227 dl 1.2 // Note: convention in all put/take/etc is to preset
228     // local var holding count negative to indicate failure unless set.
229 tim 1.12 int c = -1;
230 dl 1.2 putLock.lockInterruptibly();
231     try {
232     /*
233     * Note that count is used in wait guard even though it is
234     * not protected by lock. This works because count can
235     * only decrease at this point (all other puts are shut
236     * out by lock), and we (or some other waiting put) are
237     * signalled if it ever changes from
238     * capacity. Similarly for all other uses of count in
239     * other wait guards.
240     */
241     try {
242 tim 1.12 while (count.get() == capacity)
243 dl 1.2 notFull.await();
244 tim 1.17 } catch (InterruptedException ie) {
245 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
246     throw ie;
247     }
248 dholmes 1.14 insert(o);
249 dl 1.2 c = count.getAndIncrement();
250 dl 1.6 if (c + 1 < capacity)
251 dl 1.2 notFull.signal();
252 tim 1.17 } finally {
253 dl 1.2 putLock.unlock();
254     }
255 tim 1.12 if (c == 0)
256 dl 1.2 signalNotEmpty();
257 tim 1.1 }
258 dl 1.2
259 dholmes 1.22 /**
260     * Inserts the specified element at the tail of this queue, waiting if
261     * necessary up to the specified wait time for space to become available.
262 dl 1.23 * @param o the element to add
263     * @param timeout how long to wait before giving up, in units of
264     * <tt>unit</tt>
265     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
266     * <tt>timeout</tt> parameter
267     * @return <tt>true</tt> if successful, or <tt>false</tt> if
268     * the specified waiting time elapses before space is available.
269     * @throws InterruptedException if interrupted while waiting.
270 dholmes 1.22 * @throws NullPointerException if the specified element is <tt>null</tt>.
271     */
272 dholmes 1.14 public boolean offer(E o, long timeout, TimeUnit unit)
273 dholmes 1.8 throws InterruptedException {
274 tim 1.12
275 dholmes 1.14 if (o == null) throw new NullPointerException();
276 dl 1.2 long nanos = unit.toNanos(timeout);
277     int c = -1;
278 dholmes 1.8 putLock.lockInterruptibly();
279 dl 1.2 try {
280     for (;;) {
281     if (count.get() < capacity) {
282 dholmes 1.14 insert(o);
283 dl 1.2 c = count.getAndIncrement();
284 dl 1.6 if (c + 1 < capacity)
285 dl 1.2 notFull.signal();
286     break;
287     }
288     if (nanos <= 0)
289     return false;
290     try {
291     nanos = notFull.awaitNanos(nanos);
292 tim 1.17 } catch (InterruptedException ie) {
293 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
294     throw ie;
295     }
296     }
297 tim 1.17 } finally {
298 dl 1.2 putLock.unlock();
299     }
300 tim 1.12 if (c == 0)
301 dl 1.2 signalNotEmpty();
302     return true;
303 tim 1.1 }
304 dl 1.2
305 dl 1.23 /**
306     * Inserts the specified element at the tail of this queue if possible,
307     * returning immediately if this queue is full.
308     *
309     * @param o the element to add.
310     * @return <tt>true</tt> if it was possible to add the element to
311     * this queue, else <tt>false</tt>
312     * @throws NullPointerException if the specified element is <tt>null</tt>
313     */
314 dholmes 1.14 public boolean offer(E o) {
315     if (o == null) throw new NullPointerException();
316 dl 1.2 if (count.get() == capacity)
317     return false;
318 tim 1.12 int c = -1;
319 dholmes 1.8 putLock.lock();
320 dl 1.2 try {
321     if (count.get() < capacity) {
322 dholmes 1.14 insert(o);
323 dl 1.2 c = count.getAndIncrement();
324 dl 1.6 if (c + 1 < capacity)
325 dl 1.2 notFull.signal();
326     }
327 tim 1.17 } finally {
328 dl 1.2 putLock.unlock();
329     }
330 tim 1.12 if (c == 0)
331 dl 1.2 signalNotEmpty();
332     return c >= 0;
333 tim 1.1 }
334 dl 1.2
335    
336     public E take() throws InterruptedException {
337     E x;
338     int c = -1;
339     takeLock.lockInterruptibly();
340     try {
341     try {
342 tim 1.12 while (count.get() == 0)
343 dl 1.2 notEmpty.await();
344 tim 1.17 } catch (InterruptedException ie) {
345 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
346     throw ie;
347     }
348    
349     x = extract();
350     c = count.getAndDecrement();
351     if (c > 1)
352     notEmpty.signal();
353 tim 1.17 } finally {
354 dl 1.2 takeLock.unlock();
355     }
356 tim 1.12 if (c == capacity)
357 dl 1.2 signalNotFull();
358     return x;
359     }
360    
361     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
362     E x = null;
363     int c = -1;
364 dholmes 1.8 long nanos = unit.toNanos(timeout);
365 dl 1.2 takeLock.lockInterruptibly();
366     try {
367     for (;;) {
368     if (count.get() > 0) {
369     x = extract();
370     c = count.getAndDecrement();
371     if (c > 1)
372     notEmpty.signal();
373     break;
374     }
375     if (nanos <= 0)
376     return null;
377     try {
378     nanos = notEmpty.awaitNanos(nanos);
379 tim 1.17 } catch (InterruptedException ie) {
380 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
381     throw ie;
382     }
383     }
384 tim 1.17 } finally {
385 dl 1.2 takeLock.unlock();
386     }
387 tim 1.12 if (c == capacity)
388 dl 1.2 signalNotFull();
389     return x;
390     }
391    
392     public E poll() {
393     if (count.get() == 0)
394     return null;
395     E x = null;
396 tim 1.12 int c = -1;
397 dl 1.2 takeLock.tryLock();
398     try {
399     if (count.get() > 0) {
400     x = extract();
401     c = count.getAndDecrement();
402     if (c > 1)
403     notEmpty.signal();
404     }
405 tim 1.17 } finally {
406 dl 1.2 takeLock.unlock();
407     }
408 tim 1.12 if (c == capacity)
409 dl 1.2 signalNotFull();
410     return x;
411 tim 1.1 }
412 dl 1.2
413    
414     public E peek() {
415     if (count.get() == 0)
416     return null;
417 dholmes 1.8 takeLock.lock();
418 dl 1.2 try {
419     Node<E> first = head.next;
420     if (first == null)
421     return null;
422     else
423     return first.item;
424 tim 1.17 } finally {
425 dl 1.2 takeLock.unlock();
426     }
427 tim 1.1 }
428    
429 dholmes 1.9 public boolean remove(Object o) {
430     if (o == null) return false;
431 dl 1.2 boolean removed = false;
432     fullyLock();
433     try {
434     Node<E> trail = head;
435     Node<E> p = head.next;
436     while (p != null) {
437 dholmes 1.9 if (o.equals(p.item)) {
438 dl 1.2 removed = true;
439     break;
440     }
441     trail = p;
442     p = p.next;
443     }
444     if (removed) {
445     p.item = null;
446     trail.next = p.next;
447     if (count.getAndDecrement() == capacity)
448     notFull.signalAll();
449     }
450 tim 1.17 } finally {
451 dl 1.2 fullyUnlock();
452     }
453     return removed;
454 tim 1.1 }
455 dl 1.2
456     public Object[] toArray() {
457     fullyLock();
458     try {
459     int size = count.get();
460 tim 1.12 Object[] a = new Object[size];
461 dl 1.2 int k = 0;
462 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
463 dl 1.2 a[k++] = p.item;
464     return a;
465 tim 1.17 } finally {
466 dl 1.2 fullyUnlock();
467     }
468 tim 1.1 }
469 dl 1.2
470     public <T> T[] toArray(T[] a) {
471     fullyLock();
472     try {
473     int size = count.get();
474     if (a.length < size)
475 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
476     (a.getClass().getComponentType(), size);
477 tim 1.12
478 dl 1.2 int k = 0;
479 tim 1.12 for (Node p = head.next; p != null; p = p.next)
480 dl 1.2 a[k++] = (T)p.item;
481     return a;
482 tim 1.17 } finally {
483 dl 1.2 fullyUnlock();
484     }
485 tim 1.1 }
486 dl 1.2
487     public String toString() {
488     fullyLock();
489     try {
490     return super.toString();
491 tim 1.17 } finally {
492 dl 1.2 fullyUnlock();
493     }
494 tim 1.1 }
495 dl 1.2
496 dl 1.24 public void clear() {
497     fullyLock();
498     try {
499     head.next = null;
500     if (count.getAndSet(0) == capacity)
501     notFull.signalAll();
502     } finally {
503     fullyUnlock();
504     }
505     }
506    
507     public int drainTo(Collection<? super E> c) {
508     if (c == null)
509     throw new NullPointerException();
510     if (c == this)
511     throw new IllegalArgumentException();
512     Node first;
513     fullyLock();
514     try {
515     first = head.next;
516     head.next = null;
517     if (count.getAndSet(0) == capacity)
518     notFull.signalAll();
519     } finally {
520     fullyUnlock();
521     }
522     // Transfer the elements outside of locks
523     int n = 0;
524     for (Node p = first; p != null; p = p.next) {
525     c.add((E)p.item);
526     p.item = null;
527     ++n;
528     }
529     return n;
530     }
531    
532     public int drainTo(Collection<? super E> c, int maxElements) {
533     if (c == null)
534     throw new NullPointerException();
535     if (c == this)
536     throw new IllegalArgumentException();
537     if (maxElements <= 0)
538     return 0;
539     fullyLock();
540     try {
541     int n = 0;
542     Node p = head.next;
543     while (p != null && n < maxElements) {
544     c.add((E)p.item);
545     p.item = null;
546     p = p.next;
547     ++n;
548     }
549     if (n != 0) {
550     head.next = p;
551     if (count.getAndAdd(-n) == capacity)
552     notFull.signalAll();
553     }
554     return n;
555     } finally {
556     fullyUnlock();
557     }
558     }
559    
560    
561    
562 dholmes 1.14 /**
563     * Returns an iterator over the elements in this queue in proper sequence.
564 dl 1.15 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
565     * will never throw {@link java.util.ConcurrentModificationException},
566     * and guarantees to traverse elements as they existed upon
567     * construction of the iterator, and may (but is not guaranteed to)
568     * reflect any modifications subsequent to construction.
569 dholmes 1.14 *
570     * @return an iterator over the elements in this queue in proper sequence.
571     */
572 dl 1.2 public Iterator<E> iterator() {
573     return new Itr();
574 tim 1.1 }
575 dl 1.2
576     private class Itr implements Iterator<E> {
577 tim 1.12 /*
578 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
579     * item to hand out so that if hasNext() reports true, we will
580     * still have it to return even if lost race with a take etc.
581     */
582 dl 1.2 Node<E> current;
583     Node<E> lastRet;
584 dl 1.4 E currentElement;
585 tim 1.12
586 dl 1.2 Itr() {
587     fullyLock();
588     try {
589     current = head.next;
590 dl 1.4 if (current != null)
591     currentElement = current.item;
592 tim 1.17 } finally {
593 dl 1.2 fullyUnlock();
594     }
595     }
596 tim 1.12
597     public boolean hasNext() {
598 dl 1.2 return current != null;
599     }
600    
601 tim 1.12 public E next() {
602 dl 1.2 fullyLock();
603     try {
604     if (current == null)
605     throw new NoSuchElementException();
606 dl 1.4 E x = currentElement;
607 dl 1.2 lastRet = current;
608     current = current.next;
609 dl 1.4 if (current != null)
610     currentElement = current.item;
611 dl 1.2 return x;
612 tim 1.17 } finally {
613 dl 1.2 fullyUnlock();
614     }
615 tim 1.12
616 dl 1.2 }
617    
618 tim 1.12 public void remove() {
619 dl 1.2 if (lastRet == null)
620 tim 1.12 throw new IllegalStateException();
621 dl 1.2 fullyLock();
622     try {
623     Node<E> node = lastRet;
624     lastRet = null;
625     Node<E> trail = head;
626     Node<E> p = head.next;
627     while (p != null && p != node) {
628     trail = p;
629     p = p.next;
630     }
631     if (p == node) {
632     p.item = null;
633     trail.next = p.next;
634     int c = count.getAndDecrement();
635     if (c == capacity)
636     notFull.signalAll();
637     }
638 tim 1.17 } finally {
639 dl 1.2 fullyUnlock();
640     }
641     }
642 tim 1.1 }
643 dl 1.2
644     /**
645     * Save the state to a stream (that is, serialize it).
646     *
647     * @serialData The capacity is emitted (int), followed by all of
648     * its elements (each an <tt>Object</tt>) in the proper order,
649     * followed by a null
650 dl 1.6 * @param s the stream
651 dl 1.2 */
652     private void writeObject(java.io.ObjectOutputStream s)
653     throws java.io.IOException {
654    
655 tim 1.12 fullyLock();
656 dl 1.2 try {
657     // Write out any hidden stuff, plus capacity
658     s.defaultWriteObject();
659    
660     // Write out all elements in the proper order.
661 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
662 dl 1.2 s.writeObject(p.item);
663    
664     // Use trailing null as sentinel
665     s.writeObject(null);
666 tim 1.17 } finally {
667 dl 1.2 fullyUnlock();
668     }
669 tim 1.1 }
670    
671 dl 1.2 /**
672 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
673 dl 1.2 * deserialize it).
674 dl 1.6 * @param s the stream
675 dl 1.2 */
676     private void readObject(java.io.ObjectInputStream s)
677     throws java.io.IOException, ClassNotFoundException {
678 tim 1.12 // Read in capacity, and any hidden stuff
679     s.defaultReadObject();
680 dl 1.2
681 dl 1.19 count.set(0);
682     last = head = new Node<E>(null);
683    
684 dl 1.6 // Read in all elements and place in queue
685 dl 1.2 for (;;) {
686     E item = (E)s.readObject();
687     if (item == null)
688     break;
689     add(item);
690     }
691 tim 1.1 }
692     }