ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.16
Committed: Wed Aug 6 18:22:09 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
CVS Tags: JSR166_CR1
Changes since 1.15: +3 -2 lines
Log Message:
Fixes to minor errors found by DocCheck

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19     * queue the shortest time.
20     * Linked queues typically have higher throughput than array-based queues but
21     * less predictable performance in most concurrent applications.
22 tim 1.12 *
23 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
24 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
25     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 dl 1.3 * dynamically created upon each insertion unless this would bring the
27     * queue above capacity.
28 dholmes 1.8 *
29 dl 1.6 * @since 1.5
30     * @author Doug Lea
31 tim 1.12 *
32 tim 1.1 **/
33 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
35    
36 dl 1.2 /*
37     * A variant of the "two lock queue" algorithm. The putLock gates
38     * entry to put (and offer), and has an associated condition for
39     * waiting puts. Similarly for the takeLock. The "count" field
40     * that they both rely on is maintained as an atomic to avoid
41     * needing to get both locks in most cases. Also, to minimize need
42     * for puts to get takeLock and vice-versa, cascading notifies are
43     * used. When a put notices that it has enabled at least one take,
44     * it signals taker. That taker in turn signals others if more
45     * items have been entered since the signal. And symmetrically for
46 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
47 dl 1.2 * iterators acquire both locks.
48     */
49    
50 dl 1.6 /**
51     * Linked list node class
52     */
53 dl 1.2 static class Node<E> {
54 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
55 dl 1.2 volatile E item;
56     Node<E> next;
57     Node(E x) { item = x; }
58     }
59    
60 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
61 dl 1.2 private final int capacity;
62 dl 1.6
63     /** Current number of elements */
64 dl 1.2 private transient final AtomicInteger count = new AtomicInteger(0);
65    
66 dl 1.6 /** Head of linked list */
67     private transient Node<E> head;
68    
69 dholmes 1.8 /** Tail of linked list */
70 dl 1.6 private transient Node<E> last;
71 dl 1.2
72 dl 1.6 /** Lock held by take, poll, etc */
73 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
74 dl 1.6
75     /** Wait queue for waiting takes */
76 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
77 dl 1.2
78 dl 1.6 /** Lock held by put, offer, etc */
79 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
80 dl 1.6
81     /** Wait queue for waiting puts */
82 dl 1.5 private final Condition notFull = putLock.newCondition();
83 dl 1.2
84     /**
85     * Signal a waiting take. Called only from put/offer (which do not
86 dl 1.4 * otherwise ordinarily lock takeLock.)
87 dl 1.2 */
88     private void signalNotEmpty() {
89     takeLock.lock();
90     try {
91     notEmpty.signal();
92     }
93     finally {
94     takeLock.unlock();
95     }
96     }
97    
98     /**
99     * Signal a waiting put. Called only from take/poll.
100     */
101     private void signalNotFull() {
102     putLock.lock();
103     try {
104     notFull.signal();
105     }
106     finally {
107     putLock.unlock();
108     }
109     }
110    
111     /**
112 dholmes 1.8 * Create a node and link it at end of queue
113 dl 1.6 * @param x the item
114 dl 1.2 */
115     private void insert(E x) {
116     last = last.next = new Node<E>(x);
117     }
118    
119     /**
120     * Remove a node from head of queue,
121 dl 1.6 * @return the node
122 dl 1.2 */
123     private E extract() {
124     Node<E> first = head.next;
125     head = first;
126     E x = (E)first.item;
127     first.item = null;
128     return x;
129     }
130    
131     /**
132 tim 1.12 * Lock to prevent both puts and takes.
133 dl 1.2 */
134     private void fullyLock() {
135     putLock.lock();
136     takeLock.lock();
137 tim 1.1 }
138 dl 1.2
139     /**
140 tim 1.12 * Unlock to allow both puts and takes.
141 dl 1.2 */
142     private void fullyUnlock() {
143     takeLock.unlock();
144     putLock.unlock();
145     }
146    
147    
148     /**
149 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
150 dholmes 1.8 * {@link Integer#MAX_VALUE}.
151 dl 1.2 */
152     public LinkedBlockingQueue() {
153     this(Integer.MAX_VALUE);
154     }
155    
156     /**
157 tim 1.16 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
158     *
159 dholmes 1.8 * @param capacity the capacity of this queue.
160     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
161 tim 1.16 * than zero.
162 dl 1.2 */
163     public LinkedBlockingQueue(int capacity) {
164 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
165 dl 1.2 this.capacity = capacity;
166 dl 1.6 last = head = new Node<E>(null);
167 dl 1.2 }
168    
169     /**
170 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
171 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
172 tim 1.12 * given collection,
173 dholmes 1.8 * added in traversal order of the collection's iterator.
174 dholmes 1.9 * @param c the collection of elements to initially contain
175     * @throws NullPointerException if <tt>c</tt> or any element within it
176     * is <tt>null</tt>
177 dl 1.2 */
178 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
179 dl 1.2 this(Integer.MAX_VALUE);
180 tim 1.12 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
181     add(it.next());
182 dl 1.2 }
183    
184 dholmes 1.14 // Have to override just to update the javadoc
185 dholmes 1.9
186     /**
187 dholmes 1.14 * Adds the specified element to the tail of this queue.
188     * @return <tt>true</tt> (as per the general contract of
189     * <tt>Collection.add</tt>).
190 dholmes 1.9 * @throws IllegalStateException {@inheritDoc}
191     * @throws NullPointerException {@inheritDoc}
192     */
193     public boolean add(E o) {
194     return super.add(o);
195     }
196    
197     /**
198 dholmes 1.14 * Adds all of the elements in the specified collection to this queue.
199     * The behavior of this operation is undefined if
200     * the specified collection is modified while the operation is in
201     * progress. (This implies that the behavior of this call is undefined if
202     * the specified collection is this queue, and this queue is nonempty.)
203     * <p>
204     * This implementation iterates over the specified collection, and adds
205     * each object returned by the iterator to this queue's tail, in turn.
206 dholmes 1.9 * @throws IllegalStateException {@inheritDoc}
207     * @throws NullPointerException {@inheritDoc}
208     */
209     public boolean addAll(Collection<? extends E> c) {
210     return super.addAll(c);
211     }
212    
213 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
214     // greater in size than Integer.MAX_VALUE
215 tim 1.12 /**
216 dholmes 1.13 * Returns the number of elements in this collection.
217 dholmes 1.8 */
218 dl 1.2 public int size() {
219     return count.get();
220 tim 1.1 }
221 dl 1.2
222 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
223     // without the reference to unlimited queues.
224 tim 1.12 /**
225 dholmes 1.13 * Returns the number of elements that this queue can ideally (in
226 dholmes 1.8 * the absence of memory or resource constraints) accept without
227     * blocking. This is always equal to the initial capacity of this queue
228     * less the current <tt>size</tt> of this queue.
229     * <p>Note that you <em>cannot</em> always tell if
230     * an attempt to <tt>add</tt> an element will succeed by
231     * inspecting <tt>remainingCapacity</tt> because it may be the
232     * case that a waiting consumer is ready to <tt>take</tt> an
233     * element out of an otherwise full queue.
234     */
235 dl 1.2 public int remainingCapacity() {
236     return capacity - count.get();
237     }
238    
239 dholmes 1.8 /**
240 dholmes 1.13 * Adds the specified element to the tail of this queue, waiting if
241 dholmes 1.8 * necessary for space to become available.
242     * @throws NullPointerException {@inheritDoc}
243     */
244 dholmes 1.14 public void put(E o) throws InterruptedException {
245     if (o == null) throw new NullPointerException();
246 dl 1.2 // Note: convention in all put/take/etc is to preset
247     // local var holding count negative to indicate failure unless set.
248 tim 1.12 int c = -1;
249 dl 1.2 putLock.lockInterruptibly();
250     try {
251     /*
252     * Note that count is used in wait guard even though it is
253     * not protected by lock. This works because count can
254     * only decrease at this point (all other puts are shut
255     * out by lock), and we (or some other waiting put) are
256     * signalled if it ever changes from
257     * capacity. Similarly for all other uses of count in
258     * other wait guards.
259     */
260     try {
261 tim 1.12 while (count.get() == capacity)
262 dl 1.2 notFull.await();
263     }
264     catch (InterruptedException ie) {
265     notFull.signal(); // propagate to a non-interrupted thread
266     throw ie;
267     }
268 dholmes 1.14 insert(o);
269 dl 1.2 c = count.getAndIncrement();
270 dl 1.6 if (c + 1 < capacity)
271 dl 1.2 notFull.signal();
272     }
273     finally {
274     putLock.unlock();
275     }
276 tim 1.12 if (c == 0)
277 dl 1.2 signalNotEmpty();
278 tim 1.1 }
279 dl 1.2
280 dholmes 1.8 /**
281 dholmes 1.13 * Adds the specified element to the tail of this queue, waiting if
282 dholmes 1.8 * necessary up to the specified wait time for space to become available.
283     * @throws NullPointerException {@inheritDoc}
284     */
285 dholmes 1.14 public boolean offer(E o, long timeout, TimeUnit unit)
286 dholmes 1.8 throws InterruptedException {
287 tim 1.12
288 dholmes 1.14 if (o == null) throw new NullPointerException();
289 dl 1.2 long nanos = unit.toNanos(timeout);
290     int c = -1;
291 dholmes 1.8 putLock.lockInterruptibly();
292 dl 1.2 try {
293     for (;;) {
294     if (count.get() < capacity) {
295 dholmes 1.14 insert(o);
296 dl 1.2 c = count.getAndIncrement();
297 dl 1.6 if (c + 1 < capacity)
298 dl 1.2 notFull.signal();
299     break;
300     }
301     if (nanos <= 0)
302     return false;
303     try {
304     nanos = notFull.awaitNanos(nanos);
305     }
306     catch (InterruptedException ie) {
307     notFull.signal(); // propagate to a non-interrupted thread
308     throw ie;
309     }
310     }
311     }
312     finally {
313     putLock.unlock();
314     }
315 tim 1.12 if (c == 0)
316 dl 1.2 signalNotEmpty();
317     return true;
318 tim 1.1 }
319 dl 1.2
320 tim 1.12 /**
321 dholmes 1.13 * Adds the specified element to the tail of this queue if possible,
322 dholmes 1.8 * returning immediately if this queue is full.
323     *
324     * @throws NullPointerException {@inheritDoc}
325     */
326 dholmes 1.14 public boolean offer(E o) {
327     if (o == null) throw new NullPointerException();
328 dl 1.2 if (count.get() == capacity)
329     return false;
330 tim 1.12 int c = -1;
331 dholmes 1.8 putLock.lock();
332 dl 1.2 try {
333     if (count.get() < capacity) {
334 dholmes 1.14 insert(o);
335 dl 1.2 c = count.getAndIncrement();
336 dl 1.6 if (c + 1 < capacity)
337 dl 1.2 notFull.signal();
338     }
339     }
340     finally {
341     putLock.unlock();
342     }
343 tim 1.12 if (c == 0)
344 dl 1.2 signalNotEmpty();
345     return c >= 0;
346 tim 1.1 }
347 dl 1.2
348    
349     public E take() throws InterruptedException {
350     E x;
351     int c = -1;
352     takeLock.lockInterruptibly();
353     try {
354     try {
355 tim 1.12 while (count.get() == 0)
356 dl 1.2 notEmpty.await();
357     }
358     catch (InterruptedException ie) {
359     notEmpty.signal(); // propagate to a non-interrupted thread
360     throw ie;
361     }
362    
363     x = extract();
364     c = count.getAndDecrement();
365     if (c > 1)
366     notEmpty.signal();
367     }
368     finally {
369     takeLock.unlock();
370     }
371 tim 1.12 if (c == capacity)
372 dl 1.2 signalNotFull();
373     return x;
374     }
375    
376     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
377     E x = null;
378     int c = -1;
379 dholmes 1.8 long nanos = unit.toNanos(timeout);
380 dl 1.2 takeLock.lockInterruptibly();
381     try {
382     for (;;) {
383     if (count.get() > 0) {
384     x = extract();
385     c = count.getAndDecrement();
386     if (c > 1)
387     notEmpty.signal();
388     break;
389     }
390     if (nanos <= 0)
391     return null;
392     try {
393     nanos = notEmpty.awaitNanos(nanos);
394     }
395     catch (InterruptedException ie) {
396     notEmpty.signal(); // propagate to a non-interrupted thread
397     throw ie;
398     }
399     }
400     }
401     finally {
402     takeLock.unlock();
403     }
404 tim 1.12 if (c == capacity)
405 dl 1.2 signalNotFull();
406     return x;
407     }
408    
409     public E poll() {
410     if (count.get() == 0)
411     return null;
412     E x = null;
413 tim 1.12 int c = -1;
414 dl 1.2 takeLock.tryLock();
415     try {
416     if (count.get() > 0) {
417     x = extract();
418     c = count.getAndDecrement();
419     if (c > 1)
420     notEmpty.signal();
421     }
422     }
423     finally {
424     takeLock.unlock();
425     }
426 tim 1.12 if (c == capacity)
427 dl 1.2 signalNotFull();
428     return x;
429 tim 1.1 }
430 dl 1.2
431    
432     public E peek() {
433     if (count.get() == 0)
434     return null;
435 dholmes 1.8 takeLock.lock();
436 dl 1.2 try {
437     Node<E> first = head.next;
438     if (first == null)
439     return null;
440     else
441     return first.item;
442     }
443     finally {
444     takeLock.unlock();
445     }
446 tim 1.1 }
447    
448 dholmes 1.14 /**
449     * Removes a single instance of the specified element from this
450     * queue, if it is present. More formally,
451     * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
452     * o.equals(e))</tt>, if the queue contains one or more such
453     * elements. Returns <tt>true</tt> if the queue contained the
454     * specified element (or equivalently, if the queue changed as a
455     * result of the call).
456     *
457     * <p>This implementation iterates over the queue looking for the
458     * specified element. If it finds the element, it removes the element
459     * from the queue using the iterator's remove method.<p>
460     *
461     */
462 dholmes 1.9 public boolean remove(Object o) {
463     if (o == null) return false;
464 dl 1.2 boolean removed = false;
465     fullyLock();
466     try {
467     Node<E> trail = head;
468     Node<E> p = head.next;
469     while (p != null) {
470 dholmes 1.9 if (o.equals(p.item)) {
471 dl 1.2 removed = true;
472     break;
473     }
474     trail = p;
475     p = p.next;
476     }
477     if (removed) {
478     p.item = null;
479     trail.next = p.next;
480     if (count.getAndDecrement() == capacity)
481     notFull.signalAll();
482     }
483     }
484     finally {
485     fullyUnlock();
486     }
487     return removed;
488 tim 1.1 }
489 dl 1.2
490     public Object[] toArray() {
491     fullyLock();
492     try {
493     int size = count.get();
494 tim 1.12 Object[] a = new Object[size];
495 dl 1.2 int k = 0;
496 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
497 dl 1.2 a[k++] = p.item;
498     return a;
499     }
500     finally {
501     fullyUnlock();
502     }
503 tim 1.1 }
504 dl 1.2
505     public <T> T[] toArray(T[] a) {
506     fullyLock();
507     try {
508     int size = count.get();
509     if (a.length < size)
510 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
511     (a.getClass().getComponentType(), size);
512 tim 1.12
513 dl 1.2 int k = 0;
514 tim 1.12 for (Node p = head.next; p != null; p = p.next)
515 dl 1.2 a[k++] = (T)p.item;
516     return a;
517     }
518     finally {
519     fullyUnlock();
520     }
521 tim 1.1 }
522 dl 1.2
523     public String toString() {
524     fullyLock();
525     try {
526     return super.toString();
527     }
528     finally {
529     fullyUnlock();
530     }
531 tim 1.1 }
532 dl 1.2
533 dholmes 1.14 /**
534     * Returns an iterator over the elements in this queue in proper sequence.
535 dl 1.15 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
536     * will never throw {@link java.util.ConcurrentModificationException},
537     * and guarantees to traverse elements as they existed upon
538     * construction of the iterator, and may (but is not guaranteed to)
539     * reflect any modifications subsequent to construction.
540 dholmes 1.14 *
541     * @return an iterator over the elements in this queue in proper sequence.
542     */
543 dl 1.2 public Iterator<E> iterator() {
544     return new Itr();
545 tim 1.1 }
546 dl 1.2
547     private class Itr implements Iterator<E> {
548 tim 1.12 /*
549 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
550     * item to hand out so that if hasNext() reports true, we will
551     * still have it to return even if lost race with a take etc.
552     */
553 dl 1.2 Node<E> current;
554     Node<E> lastRet;
555 dl 1.4 E currentElement;
556 tim 1.12
557 dl 1.2 Itr() {
558     fullyLock();
559     try {
560     current = head.next;
561 dl 1.4 if (current != null)
562     currentElement = current.item;
563 dl 1.2 }
564     finally {
565     fullyUnlock();
566     }
567     }
568 tim 1.12
569     public boolean hasNext() {
570 dl 1.2 return current != null;
571     }
572    
573 tim 1.12 public E next() {
574 dl 1.2 fullyLock();
575     try {
576     if (current == null)
577     throw new NoSuchElementException();
578 dl 1.4 E x = currentElement;
579 dl 1.2 lastRet = current;
580     current = current.next;
581 dl 1.4 if (current != null)
582     currentElement = current.item;
583 dl 1.2 return x;
584     }
585     finally {
586     fullyUnlock();
587     }
588 tim 1.12
589 dl 1.2 }
590    
591 tim 1.12 public void remove() {
592 dl 1.2 if (lastRet == null)
593 tim 1.12 throw new IllegalStateException();
594 dl 1.2 fullyLock();
595     try {
596     Node<E> node = lastRet;
597     lastRet = null;
598     Node<E> trail = head;
599     Node<E> p = head.next;
600     while (p != null && p != node) {
601     trail = p;
602     p = p.next;
603     }
604     if (p == node) {
605     p.item = null;
606     trail.next = p.next;
607     int c = count.getAndDecrement();
608     if (c == capacity)
609     notFull.signalAll();
610     }
611     }
612     finally {
613     fullyUnlock();
614     }
615     }
616 tim 1.1 }
617 dl 1.2
618     /**
619     * Save the state to a stream (that is, serialize it).
620     *
621     * @serialData The capacity is emitted (int), followed by all of
622     * its elements (each an <tt>Object</tt>) in the proper order,
623     * followed by a null
624 dl 1.6 * @param s the stream
625 dl 1.2 */
626     private void writeObject(java.io.ObjectOutputStream s)
627     throws java.io.IOException {
628    
629 tim 1.12 fullyLock();
630 dl 1.2 try {
631     // Write out any hidden stuff, plus capacity
632     s.defaultWriteObject();
633    
634     // Write out all elements in the proper order.
635 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
636 dl 1.2 s.writeObject(p.item);
637    
638     // Use trailing null as sentinel
639     s.writeObject(null);
640     }
641     finally {
642     fullyUnlock();
643     }
644 tim 1.1 }
645    
646 dl 1.2 /**
647 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
648 dl 1.2 * deserialize it).
649 dl 1.6 * @param s the stream
650 dl 1.2 */
651     private void readObject(java.io.ObjectInputStream s)
652     throws java.io.IOException, ClassNotFoundException {
653 tim 1.12 // Read in capacity, and any hidden stuff
654     s.defaultReadObject();
655 dl 1.2
656 dl 1.6 // Read in all elements and place in queue
657 dl 1.2 for (;;) {
658     E item = (E)s.readObject();
659     if (item == null)
660     break;
661     add(item);
662     }
663 tim 1.1 }
664     }
665 dholmes 1.8
666    
667    
668    
669 tim 1.1