ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.17
Committed: Fri Aug 8 20:05:07 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.16: +21 -42 lines
Log Message:
Scrunched catch, finally, else clauses.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19     * queue the shortest time.
20     * Linked queues typically have higher throughput than array-based queues but
21     * less predictable performance in most concurrent applications.
22 tim 1.12 *
23 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
24 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
25     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 dl 1.3 * dynamically created upon each insertion unless this would bring the
27     * queue above capacity.
28 dholmes 1.8 *
29 dl 1.6 * @since 1.5
30     * @author Doug Lea
31 tim 1.12 *
32 tim 1.1 **/
33 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
35    
36 dl 1.2 /*
37     * A variant of the "two lock queue" algorithm. The putLock gates
38     * entry to put (and offer), and has an associated condition for
39     * waiting puts. Similarly for the takeLock. The "count" field
40     * that they both rely on is maintained as an atomic to avoid
41     * needing to get both locks in most cases. Also, to minimize need
42     * for puts to get takeLock and vice-versa, cascading notifies are
43     * used. When a put notices that it has enabled at least one take,
44     * it signals taker. That taker in turn signals others if more
45     * items have been entered since the signal. And symmetrically for
46 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
47 dl 1.2 * iterators acquire both locks.
48     */
49    
50 dl 1.6 /**
51     * Linked list node class
52     */
53 dl 1.2 static class Node<E> {
54 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
55 dl 1.2 volatile E item;
56     Node<E> next;
57     Node(E x) { item = x; }
58     }
59    
60 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
61 dl 1.2 private final int capacity;
62 dl 1.6
63     /** Current number of elements */
64 dl 1.2 private transient final AtomicInteger count = new AtomicInteger(0);
65    
66 dl 1.6 /** Head of linked list */
67     private transient Node<E> head;
68    
69 dholmes 1.8 /** Tail of linked list */
70 dl 1.6 private transient Node<E> last;
71 dl 1.2
72 dl 1.6 /** Lock held by take, poll, etc */
73 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
74 dl 1.6
75     /** Wait queue for waiting takes */
76 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
77 dl 1.2
78 dl 1.6 /** Lock held by put, offer, etc */
79 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
80 dl 1.6
81     /** Wait queue for waiting puts */
82 dl 1.5 private final Condition notFull = putLock.newCondition();
83 dl 1.2
84     /**
85     * Signal a waiting take. Called only from put/offer (which do not
86 dl 1.4 * otherwise ordinarily lock takeLock.)
87 dl 1.2 */
88     private void signalNotEmpty() {
89     takeLock.lock();
90     try {
91     notEmpty.signal();
92 tim 1.17 } finally {
93 dl 1.2 takeLock.unlock();
94     }
95     }
96    
97     /**
98     * Signal a waiting put. Called only from take/poll.
99     */
100     private void signalNotFull() {
101     putLock.lock();
102     try {
103     notFull.signal();
104 tim 1.17 } finally {
105 dl 1.2 putLock.unlock();
106     }
107     }
108    
109     /**
110 dholmes 1.8 * Create a node and link it at end of queue
111 dl 1.6 * @param x the item
112 dl 1.2 */
113     private void insert(E x) {
114     last = last.next = new Node<E>(x);
115     }
116    
117     /**
118     * Remove a node from head of queue,
119 dl 1.6 * @return the node
120 dl 1.2 */
121     private E extract() {
122     Node<E> first = head.next;
123     head = first;
124     E x = (E)first.item;
125     first.item = null;
126     return x;
127     }
128    
129     /**
130 tim 1.12 * Lock to prevent both puts and takes.
131 dl 1.2 */
132     private void fullyLock() {
133     putLock.lock();
134     takeLock.lock();
135 tim 1.1 }
136 dl 1.2
137     /**
138 tim 1.12 * Unlock to allow both puts and takes.
139 dl 1.2 */
140     private void fullyUnlock() {
141     takeLock.unlock();
142     putLock.unlock();
143     }
144    
145    
146     /**
147 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
148 dholmes 1.8 * {@link Integer#MAX_VALUE}.
149 dl 1.2 */
150     public LinkedBlockingQueue() {
151     this(Integer.MAX_VALUE);
152     }
153    
154     /**
155 tim 1.16 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
156     *
157 dholmes 1.8 * @param capacity the capacity of this queue.
158     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
159 tim 1.16 * than zero.
160 dl 1.2 */
161     public LinkedBlockingQueue(int capacity) {
162 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
163 dl 1.2 this.capacity = capacity;
164 dl 1.6 last = head = new Node<E>(null);
165 dl 1.2 }
166    
167     /**
168 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
169 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
170 tim 1.12 * given collection,
171 dholmes 1.8 * added in traversal order of the collection's iterator.
172 dholmes 1.9 * @param c the collection of elements to initially contain
173     * @throws NullPointerException if <tt>c</tt> or any element within it
174     * is <tt>null</tt>
175 dl 1.2 */
176 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
177 dl 1.2 this(Integer.MAX_VALUE);
178 tim 1.12 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
179     add(it.next());
180 dl 1.2 }
181    
182 dholmes 1.14 // Have to override just to update the javadoc
183 dholmes 1.9
184     /**
185 dholmes 1.14 * Adds the specified element to the tail of this queue.
186     * @return <tt>true</tt> (as per the general contract of
187     * <tt>Collection.add</tt>).
188 dholmes 1.9 * @throws IllegalStateException {@inheritDoc}
189     * @throws NullPointerException {@inheritDoc}
190     */
191     public boolean add(E o) {
192     return super.add(o);
193     }
194    
195     /**
196 dholmes 1.14 * Adds all of the elements in the specified collection to this queue.
197     * The behavior of this operation is undefined if
198     * the specified collection is modified while the operation is in
199     * progress. (This implies that the behavior of this call is undefined if
200     * the specified collection is this queue, and this queue is nonempty.)
201     * <p>
202     * This implementation iterates over the specified collection, and adds
203     * each object returned by the iterator to this queue's tail, in turn.
204 dholmes 1.9 * @throws IllegalStateException {@inheritDoc}
205     * @throws NullPointerException {@inheritDoc}
206     */
207     public boolean addAll(Collection<? extends E> c) {
208     return super.addAll(c);
209     }
210    
211 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
212     // greater in size than Integer.MAX_VALUE
213 tim 1.12 /**
214 dholmes 1.13 * Returns the number of elements in this collection.
215 dholmes 1.8 */
216 dl 1.2 public int size() {
217     return count.get();
218 tim 1.1 }
219 dl 1.2
220 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
221     // without the reference to unlimited queues.
222 tim 1.12 /**
223 dholmes 1.13 * Returns the number of elements that this queue can ideally (in
224 dholmes 1.8 * the absence of memory or resource constraints) accept without
225     * blocking. This is always equal to the initial capacity of this queue
226     * less the current <tt>size</tt> of this queue.
227     * <p>Note that you <em>cannot</em> always tell if
228     * an attempt to <tt>add</tt> an element will succeed by
229     * inspecting <tt>remainingCapacity</tt> because it may be the
230     * case that a waiting consumer is ready to <tt>take</tt> an
231     * element out of an otherwise full queue.
232     */
233 dl 1.2 public int remainingCapacity() {
234     return capacity - count.get();
235     }
236    
237 dholmes 1.8 /**
238 dholmes 1.13 * Adds the specified element to the tail of this queue, waiting if
239 dholmes 1.8 * necessary for space to become available.
240     * @throws NullPointerException {@inheritDoc}
241     */
242 dholmes 1.14 public void put(E o) throws InterruptedException {
243     if (o == null) throw new NullPointerException();
244 dl 1.2 // Note: convention in all put/take/etc is to preset
245     // local var holding count negative to indicate failure unless set.
246 tim 1.12 int c = -1;
247 dl 1.2 putLock.lockInterruptibly();
248     try {
249     /*
250     * Note that count is used in wait guard even though it is
251     * not protected by lock. This works because count can
252     * only decrease at this point (all other puts are shut
253     * out by lock), and we (or some other waiting put) are
254     * signalled if it ever changes from
255     * capacity. Similarly for all other uses of count in
256     * other wait guards.
257     */
258     try {
259 tim 1.12 while (count.get() == capacity)
260 dl 1.2 notFull.await();
261 tim 1.17 } catch (InterruptedException ie) {
262 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
263     throw ie;
264     }
265 dholmes 1.14 insert(o);
266 dl 1.2 c = count.getAndIncrement();
267 dl 1.6 if (c + 1 < capacity)
268 dl 1.2 notFull.signal();
269 tim 1.17 } finally {
270 dl 1.2 putLock.unlock();
271     }
272 tim 1.12 if (c == 0)
273 dl 1.2 signalNotEmpty();
274 tim 1.1 }
275 dl 1.2
276 dholmes 1.8 /**
277 dholmes 1.13 * Adds the specified element to the tail of this queue, waiting if
278 dholmes 1.8 * necessary up to the specified wait time for space to become available.
279     * @throws NullPointerException {@inheritDoc}
280     */
281 dholmes 1.14 public boolean offer(E o, long timeout, TimeUnit unit)
282 dholmes 1.8 throws InterruptedException {
283 tim 1.12
284 dholmes 1.14 if (o == null) throw new NullPointerException();
285 dl 1.2 long nanos = unit.toNanos(timeout);
286     int c = -1;
287 dholmes 1.8 putLock.lockInterruptibly();
288 dl 1.2 try {
289     for (;;) {
290     if (count.get() < capacity) {
291 dholmes 1.14 insert(o);
292 dl 1.2 c = count.getAndIncrement();
293 dl 1.6 if (c + 1 < capacity)
294 dl 1.2 notFull.signal();
295     break;
296     }
297     if (nanos <= 0)
298     return false;
299     try {
300     nanos = notFull.awaitNanos(nanos);
301 tim 1.17 } catch (InterruptedException ie) {
302 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
303     throw ie;
304     }
305     }
306 tim 1.17 } finally {
307 dl 1.2 putLock.unlock();
308     }
309 tim 1.12 if (c == 0)
310 dl 1.2 signalNotEmpty();
311     return true;
312 tim 1.1 }
313 dl 1.2
314 tim 1.12 /**
315 dholmes 1.13 * Adds the specified element to the tail of this queue if possible,
316 dholmes 1.8 * returning immediately if this queue is full.
317     *
318     * @throws NullPointerException {@inheritDoc}
319     */
320 dholmes 1.14 public boolean offer(E o) {
321     if (o == null) throw new NullPointerException();
322 dl 1.2 if (count.get() == capacity)
323     return false;
324 tim 1.12 int c = -1;
325 dholmes 1.8 putLock.lock();
326 dl 1.2 try {
327     if (count.get() < capacity) {
328 dholmes 1.14 insert(o);
329 dl 1.2 c = count.getAndIncrement();
330 dl 1.6 if (c + 1 < capacity)
331 dl 1.2 notFull.signal();
332     }
333 tim 1.17 } finally {
334 dl 1.2 putLock.unlock();
335     }
336 tim 1.12 if (c == 0)
337 dl 1.2 signalNotEmpty();
338     return c >= 0;
339 tim 1.1 }
340 dl 1.2
341    
342     public E take() throws InterruptedException {
343     E x;
344     int c = -1;
345     takeLock.lockInterruptibly();
346     try {
347     try {
348 tim 1.12 while (count.get() == 0)
349 dl 1.2 notEmpty.await();
350 tim 1.17 } catch (InterruptedException ie) {
351 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
352     throw ie;
353     }
354    
355     x = extract();
356     c = count.getAndDecrement();
357     if (c > 1)
358     notEmpty.signal();
359 tim 1.17 } finally {
360 dl 1.2 takeLock.unlock();
361     }
362 tim 1.12 if (c == capacity)
363 dl 1.2 signalNotFull();
364     return x;
365     }
366    
367     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
368     E x = null;
369     int c = -1;
370 dholmes 1.8 long nanos = unit.toNanos(timeout);
371 dl 1.2 takeLock.lockInterruptibly();
372     try {
373     for (;;) {
374     if (count.get() > 0) {
375     x = extract();
376     c = count.getAndDecrement();
377     if (c > 1)
378     notEmpty.signal();
379     break;
380     }
381     if (nanos <= 0)
382     return null;
383     try {
384     nanos = notEmpty.awaitNanos(nanos);
385 tim 1.17 } catch (InterruptedException ie) {
386 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
387     throw ie;
388     }
389     }
390 tim 1.17 } finally {
391 dl 1.2 takeLock.unlock();
392     }
393 tim 1.12 if (c == capacity)
394 dl 1.2 signalNotFull();
395     return x;
396     }
397    
398     public E poll() {
399     if (count.get() == 0)
400     return null;
401     E x = null;
402 tim 1.12 int c = -1;
403 dl 1.2 takeLock.tryLock();
404     try {
405     if (count.get() > 0) {
406     x = extract();
407     c = count.getAndDecrement();
408     if (c > 1)
409     notEmpty.signal();
410     }
411 tim 1.17 } finally {
412 dl 1.2 takeLock.unlock();
413     }
414 tim 1.12 if (c == capacity)
415 dl 1.2 signalNotFull();
416     return x;
417 tim 1.1 }
418 dl 1.2
419    
420     public E peek() {
421     if (count.get() == 0)
422     return null;
423 dholmes 1.8 takeLock.lock();
424 dl 1.2 try {
425     Node<E> first = head.next;
426     if (first == null)
427     return null;
428     else
429     return first.item;
430 tim 1.17 } finally {
431 dl 1.2 takeLock.unlock();
432     }
433 tim 1.1 }
434    
435 dholmes 1.14 /**
436     * Removes a single instance of the specified element from this
437     * queue, if it is present. More formally,
438     * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
439     * o.equals(e))</tt>, if the queue contains one or more such
440     * elements. Returns <tt>true</tt> if the queue contained the
441     * specified element (or equivalently, if the queue changed as a
442     * result of the call).
443     *
444     * <p>This implementation iterates over the queue looking for the
445     * specified element. If it finds the element, it removes the element
446     * from the queue using the iterator's remove method.<p>
447     *
448     */
449 dholmes 1.9 public boolean remove(Object o) {
450     if (o == null) return false;
451 dl 1.2 boolean removed = false;
452     fullyLock();
453     try {
454     Node<E> trail = head;
455     Node<E> p = head.next;
456     while (p != null) {
457 dholmes 1.9 if (o.equals(p.item)) {
458 dl 1.2 removed = true;
459     break;
460     }
461     trail = p;
462     p = p.next;
463     }
464     if (removed) {
465     p.item = null;
466     trail.next = p.next;
467     if (count.getAndDecrement() == capacity)
468     notFull.signalAll();
469     }
470 tim 1.17 } finally {
471 dl 1.2 fullyUnlock();
472     }
473     return removed;
474 tim 1.1 }
475 dl 1.2
476     public Object[] toArray() {
477     fullyLock();
478     try {
479     int size = count.get();
480 tim 1.12 Object[] a = new Object[size];
481 dl 1.2 int k = 0;
482 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
483 dl 1.2 a[k++] = p.item;
484     return a;
485 tim 1.17 } finally {
486 dl 1.2 fullyUnlock();
487     }
488 tim 1.1 }
489 dl 1.2
490     public <T> T[] toArray(T[] a) {
491     fullyLock();
492     try {
493     int size = count.get();
494     if (a.length < size)
495 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
496     (a.getClass().getComponentType(), size);
497 tim 1.12
498 dl 1.2 int k = 0;
499 tim 1.12 for (Node p = head.next; p != null; p = p.next)
500 dl 1.2 a[k++] = (T)p.item;
501     return a;
502 tim 1.17 } finally {
503 dl 1.2 fullyUnlock();
504     }
505 tim 1.1 }
506 dl 1.2
507     public String toString() {
508     fullyLock();
509     try {
510     return super.toString();
511 tim 1.17 } finally {
512 dl 1.2 fullyUnlock();
513     }
514 tim 1.1 }
515 dl 1.2
516 dholmes 1.14 /**
517     * Returns an iterator over the elements in this queue in proper sequence.
518 dl 1.15 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
519     * will never throw {@link java.util.ConcurrentModificationException},
520     * and guarantees to traverse elements as they existed upon
521     * construction of the iterator, and may (but is not guaranteed to)
522     * reflect any modifications subsequent to construction.
523 dholmes 1.14 *
524     * @return an iterator over the elements in this queue in proper sequence.
525     */
526 dl 1.2 public Iterator<E> iterator() {
527     return new Itr();
528 tim 1.1 }
529 dl 1.2
530     private class Itr implements Iterator<E> {
531 tim 1.12 /*
532 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
533     * item to hand out so that if hasNext() reports true, we will
534     * still have it to return even if lost race with a take etc.
535     */
536 dl 1.2 Node<E> current;
537     Node<E> lastRet;
538 dl 1.4 E currentElement;
539 tim 1.12
540 dl 1.2 Itr() {
541     fullyLock();
542     try {
543     current = head.next;
544 dl 1.4 if (current != null)
545     currentElement = current.item;
546 tim 1.17 } finally {
547 dl 1.2 fullyUnlock();
548     }
549     }
550 tim 1.12
551     public boolean hasNext() {
552 dl 1.2 return current != null;
553     }
554    
555 tim 1.12 public E next() {
556 dl 1.2 fullyLock();
557     try {
558     if (current == null)
559     throw new NoSuchElementException();
560 dl 1.4 E x = currentElement;
561 dl 1.2 lastRet = current;
562     current = current.next;
563 dl 1.4 if (current != null)
564     currentElement = current.item;
565 dl 1.2 return x;
566 tim 1.17 } finally {
567 dl 1.2 fullyUnlock();
568     }
569 tim 1.12
570 dl 1.2 }
571    
572 tim 1.12 public void remove() {
573 dl 1.2 if (lastRet == null)
574 tim 1.12 throw new IllegalStateException();
575 dl 1.2 fullyLock();
576     try {
577     Node<E> node = lastRet;
578     lastRet = null;
579     Node<E> trail = head;
580     Node<E> p = head.next;
581     while (p != null && p != node) {
582     trail = p;
583     p = p.next;
584     }
585     if (p == node) {
586     p.item = null;
587     trail.next = p.next;
588     int c = count.getAndDecrement();
589     if (c == capacity)
590     notFull.signalAll();
591     }
592 tim 1.17 } finally {
593 dl 1.2 fullyUnlock();
594     }
595     }
596 tim 1.1 }
597 dl 1.2
598     /**
599     * Save the state to a stream (that is, serialize it).
600     *
601     * @serialData The capacity is emitted (int), followed by all of
602     * its elements (each an <tt>Object</tt>) in the proper order,
603     * followed by a null
604 dl 1.6 * @param s the stream
605 dl 1.2 */
606     private void writeObject(java.io.ObjectOutputStream s)
607     throws java.io.IOException {
608    
609 tim 1.12 fullyLock();
610 dl 1.2 try {
611     // Write out any hidden stuff, plus capacity
612     s.defaultWriteObject();
613    
614     // Write out all elements in the proper order.
615 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
616 dl 1.2 s.writeObject(p.item);
617    
618     // Use trailing null as sentinel
619     s.writeObject(null);
620 tim 1.17 } finally {
621 dl 1.2 fullyUnlock();
622     }
623 tim 1.1 }
624    
625 dl 1.2 /**
626 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
627 dl 1.2 * deserialize it).
628 dl 1.6 * @param s the stream
629 dl 1.2 */
630     private void readObject(java.io.ObjectInputStream s)
631     throws java.io.IOException, ClassNotFoundException {
632 tim 1.12 // Read in capacity, and any hidden stuff
633     s.defaultReadObject();
634 dl 1.2
635 dl 1.6 // Read in all elements and place in queue
636 dl 1.2 for (;;) {
637     E item = (E)s.readObject();
638     if (item == null)
639     break;
640     add(item);
641     }
642 tim 1.1 }
643     }
644 dholmes 1.8
645    
646    
647    
648 tim 1.1