ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.20
Committed: Fri Sep 12 15:40:10 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.19: +10 -51 lines
Log Message:
Adapt AbstractQueue changes; Conditionalize CancellableTask.reset; new TimeUnit methods

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19 dl 1.20 * queue the shortest time. New elements
20     * are inserted at the tail of the queue, and the queue retrieval
21     * operations obtain elements at the head of the queue.
22 dholmes 1.8 * Linked queues typically have higher throughput than array-based queues but
23     * less predictable performance in most concurrent applications.
24 tim 1.12 *
25 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
26 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
27     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
28 dl 1.3 * dynamically created upon each insertion unless this would bring the
29     * queue above capacity.
30 dholmes 1.8 *
31 dl 1.6 * @since 1.5
32     * @author Doug Lea
33 tim 1.12 *
34 tim 1.1 **/
35 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
36 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
37 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
38 tim 1.1
39 dl 1.2 /*
40     * A variant of the "two lock queue" algorithm. The putLock gates
41     * entry to put (and offer), and has an associated condition for
42     * waiting puts. Similarly for the takeLock. The "count" field
43     * that they both rely on is maintained as an atomic to avoid
44     * needing to get both locks in most cases. Also, to minimize need
45     * for puts to get takeLock and vice-versa, cascading notifies are
46     * used. When a put notices that it has enabled at least one take,
47     * it signals taker. That taker in turn signals others if more
48     * items have been entered since the signal. And symmetrically for
49 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
50 dl 1.2 * iterators acquire both locks.
51     */
52    
53 dl 1.6 /**
54     * Linked list node class
55     */
56 dl 1.2 static class Node<E> {
57 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
58 dl 1.2 volatile E item;
59     Node<E> next;
60     Node(E x) { item = x; }
61     }
62    
63 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
64 dl 1.2 private final int capacity;
65 dl 1.6
66     /** Current number of elements */
67 dl 1.19 private final AtomicInteger count = new AtomicInteger(0);
68 dl 1.2
69 dl 1.6 /** Head of linked list */
70     private transient Node<E> head;
71    
72 dholmes 1.8 /** Tail of linked list */
73 dl 1.6 private transient Node<E> last;
74 dl 1.2
75 dl 1.6 /** Lock held by take, poll, etc */
76 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
77 dl 1.6
78     /** Wait queue for waiting takes */
79 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
80 dl 1.2
81 dl 1.6 /** Lock held by put, offer, etc */
82 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
83 dl 1.6
84     /** Wait queue for waiting puts */
85 dl 1.5 private final Condition notFull = putLock.newCondition();
86 dl 1.2
87     /**
88     * Signal a waiting take. Called only from put/offer (which do not
89 dl 1.4 * otherwise ordinarily lock takeLock.)
90 dl 1.2 */
91     private void signalNotEmpty() {
92     takeLock.lock();
93     try {
94     notEmpty.signal();
95 tim 1.17 } finally {
96 dl 1.2 takeLock.unlock();
97     }
98     }
99    
100     /**
101     * Signal a waiting put. Called only from take/poll.
102     */
103     private void signalNotFull() {
104     putLock.lock();
105     try {
106     notFull.signal();
107 tim 1.17 } finally {
108 dl 1.2 putLock.unlock();
109     }
110     }
111    
112     /**
113 dholmes 1.8 * Create a node and link it at end of queue
114 dl 1.6 * @param x the item
115 dl 1.2 */
116     private void insert(E x) {
117     last = last.next = new Node<E>(x);
118     }
119    
120     /**
121     * Remove a node from head of queue,
122 dl 1.6 * @return the node
123 dl 1.2 */
124     private E extract() {
125     Node<E> first = head.next;
126     head = first;
127     E x = (E)first.item;
128     first.item = null;
129     return x;
130     }
131    
132     /**
133 tim 1.12 * Lock to prevent both puts and takes.
134 dl 1.2 */
135     private void fullyLock() {
136     putLock.lock();
137     takeLock.lock();
138 tim 1.1 }
139 dl 1.2
140     /**
141 tim 1.12 * Unlock to allow both puts and takes.
142 dl 1.2 */
143     private void fullyUnlock() {
144     takeLock.unlock();
145     putLock.unlock();
146     }
147    
148    
149     /**
150 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
151 dholmes 1.8 * {@link Integer#MAX_VALUE}.
152 dl 1.2 */
153     public LinkedBlockingQueue() {
154     this(Integer.MAX_VALUE);
155     }
156    
157     /**
158 tim 1.16 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
159     *
160 dholmes 1.8 * @param capacity the capacity of this queue.
161     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
162 tim 1.16 * than zero.
163 dl 1.2 */
164     public LinkedBlockingQueue(int capacity) {
165 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
166 dl 1.2 this.capacity = capacity;
167 dl 1.6 last = head = new Node<E>(null);
168 dl 1.2 }
169    
170     /**
171 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
172 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
173 tim 1.12 * given collection,
174 dholmes 1.8 * added in traversal order of the collection's iterator.
175 dholmes 1.9 * @param c the collection of elements to initially contain
176     * @throws NullPointerException if <tt>c</tt> or any element within it
177     * is <tt>null</tt>
178 dl 1.2 */
179 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
180 dl 1.2 this(Integer.MAX_VALUE);
181 tim 1.12 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
182     add(it.next());
183 dl 1.2 }
184    
185 dholmes 1.9
186 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
187     // greater in size than Integer.MAX_VALUE
188 tim 1.12 /**
189 dl 1.20 * Returns the number of elements in this queue.
190     *
191     * @return the number of elements in this queue.
192 dholmes 1.8 */
193 dl 1.2 public int size() {
194     return count.get();
195 tim 1.1 }
196 dl 1.2
197 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
198     // without the reference to unlimited queues.
199 tim 1.12 /**
200 dholmes 1.13 * Returns the number of elements that this queue can ideally (in
201 dholmes 1.8 * the absence of memory or resource constraints) accept without
202     * blocking. This is always equal to the initial capacity of this queue
203     * less the current <tt>size</tt> of this queue.
204     * <p>Note that you <em>cannot</em> always tell if
205     * an attempt to <tt>add</tt> an element will succeed by
206     * inspecting <tt>remainingCapacity</tt> because it may be the
207     * case that a waiting consumer is ready to <tt>take</tt> an
208     * element out of an otherwise full queue.
209     */
210 dl 1.2 public int remainingCapacity() {
211     return capacity - count.get();
212     }
213    
214 dholmes 1.14 public void put(E o) throws InterruptedException {
215     if (o == null) throw new NullPointerException();
216 dl 1.2 // Note: convention in all put/take/etc is to preset
217     // local var holding count negative to indicate failure unless set.
218 tim 1.12 int c = -1;
219 dl 1.2 putLock.lockInterruptibly();
220     try {
221     /*
222     * Note that count is used in wait guard even though it is
223     * not protected by lock. This works because count can
224     * only decrease at this point (all other puts are shut
225     * out by lock), and we (or some other waiting put) are
226     * signalled if it ever changes from
227     * capacity. Similarly for all other uses of count in
228     * other wait guards.
229     */
230     try {
231 tim 1.12 while (count.get() == capacity)
232 dl 1.2 notFull.await();
233 tim 1.17 } catch (InterruptedException ie) {
234 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
235     throw ie;
236     }
237 dholmes 1.14 insert(o);
238 dl 1.2 c = count.getAndIncrement();
239 dl 1.6 if (c + 1 < capacity)
240 dl 1.2 notFull.signal();
241 tim 1.17 } finally {
242 dl 1.2 putLock.unlock();
243     }
244 tim 1.12 if (c == 0)
245 dl 1.2 signalNotEmpty();
246 tim 1.1 }
247 dl 1.2
248 dholmes 1.14 public boolean offer(E o, long timeout, TimeUnit unit)
249 dholmes 1.8 throws InterruptedException {
250 tim 1.12
251 dholmes 1.14 if (o == null) throw new NullPointerException();
252 dl 1.2 long nanos = unit.toNanos(timeout);
253     int c = -1;
254 dholmes 1.8 putLock.lockInterruptibly();
255 dl 1.2 try {
256     for (;;) {
257     if (count.get() < capacity) {
258 dholmes 1.14 insert(o);
259 dl 1.2 c = count.getAndIncrement();
260 dl 1.6 if (c + 1 < capacity)
261 dl 1.2 notFull.signal();
262     break;
263     }
264     if (nanos <= 0)
265     return false;
266     try {
267     nanos = notFull.awaitNanos(nanos);
268 tim 1.17 } catch (InterruptedException ie) {
269 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
270     throw ie;
271     }
272     }
273 tim 1.17 } finally {
274 dl 1.2 putLock.unlock();
275     }
276 tim 1.12 if (c == 0)
277 dl 1.2 signalNotEmpty();
278     return true;
279 tim 1.1 }
280 dl 1.2
281 tim 1.12 /**
282 dholmes 1.13 * Adds the specified element to the tail of this queue if possible,
283 dholmes 1.8 * returning immediately if this queue is full.
284     *
285 dl 1.20 * @param o the element to add.
286     * @return <tt>true</tt> if it was possible to add the element to
287     * this queue, else <tt>false</tt>
288     * @throws NullPointerException if the specified element is <tt>null</tt>
289 dholmes 1.8 */
290 dholmes 1.14 public boolean offer(E o) {
291     if (o == null) throw new NullPointerException();
292 dl 1.2 if (count.get() == capacity)
293     return false;
294 tim 1.12 int c = -1;
295 dholmes 1.8 putLock.lock();
296 dl 1.2 try {
297     if (count.get() < capacity) {
298 dholmes 1.14 insert(o);
299 dl 1.2 c = count.getAndIncrement();
300 dl 1.6 if (c + 1 < capacity)
301 dl 1.2 notFull.signal();
302     }
303 tim 1.17 } finally {
304 dl 1.2 putLock.unlock();
305     }
306 tim 1.12 if (c == 0)
307 dl 1.2 signalNotEmpty();
308     return c >= 0;
309 tim 1.1 }
310 dl 1.2
311    
312     public E take() throws InterruptedException {
313     E x;
314     int c = -1;
315     takeLock.lockInterruptibly();
316     try {
317     try {
318 tim 1.12 while (count.get() == 0)
319 dl 1.2 notEmpty.await();
320 tim 1.17 } catch (InterruptedException ie) {
321 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
322     throw ie;
323     }
324    
325     x = extract();
326     c = count.getAndDecrement();
327     if (c > 1)
328     notEmpty.signal();
329 tim 1.17 } finally {
330 dl 1.2 takeLock.unlock();
331     }
332 tim 1.12 if (c == capacity)
333 dl 1.2 signalNotFull();
334     return x;
335     }
336    
337     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
338     E x = null;
339     int c = -1;
340 dholmes 1.8 long nanos = unit.toNanos(timeout);
341 dl 1.2 takeLock.lockInterruptibly();
342     try {
343     for (;;) {
344     if (count.get() > 0) {
345     x = extract();
346     c = count.getAndDecrement();
347     if (c > 1)
348     notEmpty.signal();
349     break;
350     }
351     if (nanos <= 0)
352     return null;
353     try {
354     nanos = notEmpty.awaitNanos(nanos);
355 tim 1.17 } catch (InterruptedException ie) {
356 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
357     throw ie;
358     }
359     }
360 tim 1.17 } finally {
361 dl 1.2 takeLock.unlock();
362     }
363 tim 1.12 if (c == capacity)
364 dl 1.2 signalNotFull();
365     return x;
366     }
367    
368     public E poll() {
369     if (count.get() == 0)
370     return null;
371     E x = null;
372 tim 1.12 int c = -1;
373 dl 1.2 takeLock.tryLock();
374     try {
375     if (count.get() > 0) {
376     x = extract();
377     c = count.getAndDecrement();
378     if (c > 1)
379     notEmpty.signal();
380     }
381 tim 1.17 } finally {
382 dl 1.2 takeLock.unlock();
383     }
384 tim 1.12 if (c == capacity)
385 dl 1.2 signalNotFull();
386     return x;
387 tim 1.1 }
388 dl 1.2
389    
390     public E peek() {
391     if (count.get() == 0)
392     return null;
393 dholmes 1.8 takeLock.lock();
394 dl 1.2 try {
395     Node<E> first = head.next;
396     if (first == null)
397     return null;
398     else
399     return first.item;
400 tim 1.17 } finally {
401 dl 1.2 takeLock.unlock();
402     }
403 tim 1.1 }
404    
405 dholmes 1.9 public boolean remove(Object o) {
406     if (o == null) return false;
407 dl 1.2 boolean removed = false;
408     fullyLock();
409     try {
410     Node<E> trail = head;
411     Node<E> p = head.next;
412     while (p != null) {
413 dholmes 1.9 if (o.equals(p.item)) {
414 dl 1.2 removed = true;
415     break;
416     }
417     trail = p;
418     p = p.next;
419     }
420     if (removed) {
421     p.item = null;
422     trail.next = p.next;
423     if (count.getAndDecrement() == capacity)
424     notFull.signalAll();
425     }
426 tim 1.17 } finally {
427 dl 1.2 fullyUnlock();
428     }
429     return removed;
430 tim 1.1 }
431 dl 1.2
432     public Object[] toArray() {
433     fullyLock();
434     try {
435     int size = count.get();
436 tim 1.12 Object[] a = new Object[size];
437 dl 1.2 int k = 0;
438 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
439 dl 1.2 a[k++] = p.item;
440     return a;
441 tim 1.17 } finally {
442 dl 1.2 fullyUnlock();
443     }
444 tim 1.1 }
445 dl 1.2
446     public <T> T[] toArray(T[] a) {
447     fullyLock();
448     try {
449     int size = count.get();
450     if (a.length < size)
451 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
452     (a.getClass().getComponentType(), size);
453 tim 1.12
454 dl 1.2 int k = 0;
455 tim 1.12 for (Node p = head.next; p != null; p = p.next)
456 dl 1.2 a[k++] = (T)p.item;
457     return a;
458 tim 1.17 } finally {
459 dl 1.2 fullyUnlock();
460     }
461 tim 1.1 }
462 dl 1.2
463     public String toString() {
464     fullyLock();
465     try {
466     return super.toString();
467 tim 1.17 } finally {
468 dl 1.2 fullyUnlock();
469     }
470 tim 1.1 }
471 dl 1.2
472 dholmes 1.14 /**
473     * Returns an iterator over the elements in this queue in proper sequence.
474 dl 1.15 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
475     * will never throw {@link java.util.ConcurrentModificationException},
476     * and guarantees to traverse elements as they existed upon
477     * construction of the iterator, and may (but is not guaranteed to)
478     * reflect any modifications subsequent to construction.
479 dholmes 1.14 *
480     * @return an iterator over the elements in this queue in proper sequence.
481     */
482 dl 1.2 public Iterator<E> iterator() {
483     return new Itr();
484 tim 1.1 }
485 dl 1.2
486     private class Itr implements Iterator<E> {
487 tim 1.12 /*
488 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
489     * item to hand out so that if hasNext() reports true, we will
490     * still have it to return even if lost race with a take etc.
491     */
492 dl 1.2 Node<E> current;
493     Node<E> lastRet;
494 dl 1.4 E currentElement;
495 tim 1.12
496 dl 1.2 Itr() {
497     fullyLock();
498     try {
499     current = head.next;
500 dl 1.4 if (current != null)
501     currentElement = current.item;
502 tim 1.17 } finally {
503 dl 1.2 fullyUnlock();
504     }
505     }
506 tim 1.12
507     public boolean hasNext() {
508 dl 1.2 return current != null;
509     }
510    
511 tim 1.12 public E next() {
512 dl 1.2 fullyLock();
513     try {
514     if (current == null)
515     throw new NoSuchElementException();
516 dl 1.4 E x = currentElement;
517 dl 1.2 lastRet = current;
518     current = current.next;
519 dl 1.4 if (current != null)
520     currentElement = current.item;
521 dl 1.2 return x;
522 tim 1.17 } finally {
523 dl 1.2 fullyUnlock();
524     }
525 tim 1.12
526 dl 1.2 }
527    
528 tim 1.12 public void remove() {
529 dl 1.2 if (lastRet == null)
530 tim 1.12 throw new IllegalStateException();
531 dl 1.2 fullyLock();
532     try {
533     Node<E> node = lastRet;
534     lastRet = null;
535     Node<E> trail = head;
536     Node<E> p = head.next;
537     while (p != null && p != node) {
538     trail = p;
539     p = p.next;
540     }
541     if (p == node) {
542     p.item = null;
543     trail.next = p.next;
544     int c = count.getAndDecrement();
545     if (c == capacity)
546     notFull.signalAll();
547     }
548 tim 1.17 } finally {
549 dl 1.2 fullyUnlock();
550     }
551     }
552 tim 1.1 }
553 dl 1.2
554     /**
555     * Save the state to a stream (that is, serialize it).
556     *
557     * @serialData The capacity is emitted (int), followed by all of
558     * its elements (each an <tt>Object</tt>) in the proper order,
559     * followed by a null
560 dl 1.6 * @param s the stream
561 dl 1.2 */
562     private void writeObject(java.io.ObjectOutputStream s)
563     throws java.io.IOException {
564    
565 tim 1.12 fullyLock();
566 dl 1.2 try {
567     // Write out any hidden stuff, plus capacity
568     s.defaultWriteObject();
569    
570     // Write out all elements in the proper order.
571 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
572 dl 1.2 s.writeObject(p.item);
573    
574     // Use trailing null as sentinel
575     s.writeObject(null);
576 tim 1.17 } finally {
577 dl 1.2 fullyUnlock();
578     }
579 tim 1.1 }
580    
581 dl 1.2 /**
582 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
583 dl 1.2 * deserialize it).
584 dl 1.6 * @param s the stream
585 dl 1.2 */
586     private void readObject(java.io.ObjectInputStream s)
587     throws java.io.IOException, ClassNotFoundException {
588 tim 1.12 // Read in capacity, and any hidden stuff
589     s.defaultReadObject();
590 dl 1.2
591 dl 1.19 count.set(0);
592     last = head = new Node<E>(null);
593    
594 dl 1.6 // Read in all elements and place in queue
595 dl 1.2 for (;;) {
596     E item = (E)s.readObject();
597     if (item == null)
598     break;
599     add(item);
600     }
601 tim 1.1 }
602     }