ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.8
Committed: Mon Jul 28 04:11:54 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.7: +71 -19 lines
Log Message:
Significant doc updates:
 - inherit comments where appropriate
 - ensure runtime exception comments inherited (overriding as needed)
 - consistent descriptions
 - introduce head and tail terminology

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.8 * An optionally-bounded {@link BlockingQueue blocking queue} based on
14     * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16     * The <em>head</em> of the queue is that element that has been on the
17     * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19     * queue the shortest time.
20     * Linked queues typically have higher throughput than array-based queues but
21     * less predictable performance in most concurrent applications.
22 dl 1.3 *
23     * <p> The optional capacity bound constructor argument serves as a
24 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
25     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 dl 1.3 * dynamically created upon each insertion unless this would bring the
27     * queue above capacity.
28 dholmes 1.8 *
29 dl 1.6 * @since 1.5
30     * @author Doug Lea
31 dl 1.3 *
32 tim 1.1 **/
33 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
35    
36 dl 1.2 /*
37     * A variant of the "two lock queue" algorithm. The putLock gates
38     * entry to put (and offer), and has an associated condition for
39     * waiting puts. Similarly for the takeLock. The "count" field
40     * that they both rely on is maintained as an atomic to avoid
41     * needing to get both locks in most cases. Also, to minimize need
42     * for puts to get takeLock and vice-versa, cascading notifies are
43     * used. When a put notices that it has enabled at least one take,
44     * it signals taker. That taker in turn signals others if more
45     * items have been entered since the signal. And symmetrically for
46     * takes signalling puts. Operations such as remove(Object) and
47     * iterators acquire both locks.
48     */
49    
50 dl 1.6 /**
51     * Linked list node class
52     */
53 dl 1.2 static class Node<E> {
54 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
55 dl 1.2 volatile E item;
56     Node<E> next;
57     Node(E x) { item = x; }
58     }
59    
60 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
61 dl 1.2 private final int capacity;
62 dl 1.6
63     /** Current number of elements */
64 dl 1.2 private transient final AtomicInteger count = new AtomicInteger(0);
65    
66 dl 1.6 /** Head of linked list */
67     private transient Node<E> head;
68    
69 dholmes 1.8 /** Tail of linked list */
70 dl 1.6 private transient Node<E> last;
71 dl 1.2
72 dl 1.6 /** Lock held by take, poll, etc */
73 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
74 dl 1.6
75     /** Wait queue for waiting takes */
76 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
77 dl 1.2
78 dl 1.6 /** Lock held by put, offer, etc */
79 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
80 dl 1.6
81     /** Wait queue for waiting puts */
82 dl 1.5 private final Condition notFull = putLock.newCondition();
83 dl 1.2
84     /**
85     * Signal a waiting take. Called only from put/offer (which do not
86 dl 1.4 * otherwise ordinarily lock takeLock.)
87 dl 1.2 */
88     private void signalNotEmpty() {
89     takeLock.lock();
90     try {
91     notEmpty.signal();
92     }
93     finally {
94     takeLock.unlock();
95     }
96     }
97    
98     /**
99     * Signal a waiting put. Called only from take/poll.
100     */
101     private void signalNotFull() {
102     putLock.lock();
103     try {
104     notFull.signal();
105     }
106     finally {
107     putLock.unlock();
108     }
109     }
110    
111     /**
112 dholmes 1.8 * Create a node and link it at end of queue
113 dl 1.6 * @param x the item
114 dl 1.2 */
115     private void insert(E x) {
116     last = last.next = new Node<E>(x);
117     }
118    
119     /**
120     * Remove a node from head of queue,
121 dl 1.6 * @return the node
122 dl 1.2 */
123     private E extract() {
124     Node<E> first = head.next;
125     head = first;
126     E x = (E)first.item;
127     first.item = null;
128     return x;
129     }
130    
131     /**
132     * Lock to prevent both puts and takes.
133     */
134     private void fullyLock() {
135     putLock.lock();
136     takeLock.lock();
137 tim 1.1 }
138 dl 1.2
139     /**
140     * Unlock to allow both puts and takes.
141     */
142     private void fullyUnlock() {
143     takeLock.unlock();
144     putLock.unlock();
145     }
146    
147    
148     /**
149 dholmes 1.8 * Create a <tt>LinkedBlockingQueue</tt> with a capacity of
150     * {@link Integer#MAX_VALUE}.
151 dl 1.2 */
152     public LinkedBlockingQueue() {
153     this(Integer.MAX_VALUE);
154     }
155    
156     /**
157 dholmes 1.8 * Create a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity
158     * @param capacity the capacity of this queue.
159     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
160     * than zero.
161 dl 1.2 */
162     public LinkedBlockingQueue(int capacity) {
163 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
164 dl 1.2 this.capacity = capacity;
165 dl 1.6 last = head = new Node<E>(null);
166 dl 1.2 }
167    
168     /**
169 dholmes 1.8 * Create a <tt>LinkedBlockingQueue</tt> with a capacity if
170     * {@link Integer#MAX_VALUE}, initially holding the elements of the
171     * given collection,
172     * added in traversal order of the collection's iterator.
173 dl 1.6 * @param initialElements the elements to initially contain
174 dl 1.2 */
175     public LinkedBlockingQueue(Collection<E> initialElements) {
176     this(Integer.MAX_VALUE);
177     for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
178     add(it.next());
179     }
180    
181 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
182     // greater in size than Integer.MAX_VALUE
183     /**
184     * Return the number of elements in this collection.
185     */
186 dl 1.2 public int size() {
187     return count.get();
188 tim 1.1 }
189 dl 1.2
190 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
191     // without the reference to unlimited queues.
192     /**
193     * Return the number of elements that this queue can ideally (in
194     * the absence of memory or resource constraints) accept without
195     * blocking. This is always equal to the initial capacity of this queue
196     * less the current <tt>size</tt> of this queue.
197     * <p>Note that you <em>cannot</em> always tell if
198     * an attempt to <tt>add</tt> an element will succeed by
199     * inspecting <tt>remainingCapacity</tt> because it may be the
200     * case that a waiting consumer is ready to <tt>take</tt> an
201     * element out of an otherwise full queue.
202     */
203 dl 1.2 public int remainingCapacity() {
204     return capacity - count.get();
205     }
206    
207 dholmes 1.8 /**
208     * Add the specified element to the tail of this queue, waiting if
209     * necessary for space to become available.
210     * @throws NullPointerException {@inheritDoc}
211     */
212 dl 1.2 public void put(E x) throws InterruptedException {
213 dl 1.6 if (x == null) throw new NullPointerException();
214 dl 1.2 // Note: convention in all put/take/etc is to preset
215     // local var holding count negative to indicate failure unless set.
216     int c = -1;
217     putLock.lockInterruptibly();
218     try {
219     /*
220     * Note that count is used in wait guard even though it is
221     * not protected by lock. This works because count can
222     * only decrease at this point (all other puts are shut
223     * out by lock), and we (or some other waiting put) are
224     * signalled if it ever changes from
225     * capacity. Similarly for all other uses of count in
226     * other wait guards.
227     */
228     try {
229     while (count.get() == capacity)
230     notFull.await();
231     }
232     catch (InterruptedException ie) {
233     notFull.signal(); // propagate to a non-interrupted thread
234     throw ie;
235     }
236     insert(x);
237     c = count.getAndIncrement();
238 dl 1.6 if (c + 1 < capacity)
239 dl 1.2 notFull.signal();
240     }
241     finally {
242     putLock.unlock();
243     }
244     if (c == 0)
245     signalNotEmpty();
246 tim 1.1 }
247 dl 1.2
248 dholmes 1.8 /**
249     * Add the specified element to the tail of this queue, waiting if
250     * necessary up to the specified wait time for space to become available.
251     * @throws NullPointerException {@inheritDoc}
252     */
253     public boolean offer(E x, long timeout, TimeUnit unit)
254     throws InterruptedException {
255    
256 dl 1.6 if (x == null) throw new NullPointerException();
257 dl 1.2 long nanos = unit.toNanos(timeout);
258     int c = -1;
259 dholmes 1.8 putLock.lockInterruptibly();
260 dl 1.2 try {
261     for (;;) {
262     if (count.get() < capacity) {
263     insert(x);
264     c = count.getAndIncrement();
265 dl 1.6 if (c + 1 < capacity)
266 dl 1.2 notFull.signal();
267     break;
268     }
269     if (nanos <= 0)
270     return false;
271     try {
272     nanos = notFull.awaitNanos(nanos);
273     }
274     catch (InterruptedException ie) {
275     notFull.signal(); // propagate to a non-interrupted thread
276     throw ie;
277     }
278     }
279     }
280     finally {
281     putLock.unlock();
282     }
283     if (c == 0)
284     signalNotEmpty();
285     return true;
286 tim 1.1 }
287 dl 1.2
288 dholmes 1.8 /**
289     * Add the specified element to the tail of this queue if possible,
290     * returning immediately if this queue is full.
291     *
292     * @throws NullPointerException {@inheritDoc}
293     */
294 tim 1.1 public boolean offer(E x) {
295 dl 1.6 if (x == null) throw new NullPointerException();
296 dl 1.2 if (count.get() == capacity)
297     return false;
298     int c = -1;
299 dholmes 1.8 putLock.lock();
300 dl 1.2 try {
301     if (count.get() < capacity) {
302     insert(x);
303     c = count.getAndIncrement();
304 dl 1.6 if (c + 1 < capacity)
305 dl 1.2 notFull.signal();
306     }
307     }
308     finally {
309     putLock.unlock();
310     }
311     if (c == 0)
312     signalNotEmpty();
313     return c >= 0;
314 tim 1.1 }
315 dl 1.2
316    
317     public E take() throws InterruptedException {
318     E x;
319     int c = -1;
320     takeLock.lockInterruptibly();
321     try {
322     try {
323     while (count.get() == 0)
324     notEmpty.await();
325     }
326     catch (InterruptedException ie) {
327     notEmpty.signal(); // propagate to a non-interrupted thread
328     throw ie;
329     }
330    
331     x = extract();
332     c = count.getAndDecrement();
333     if (c > 1)
334     notEmpty.signal();
335     }
336     finally {
337     takeLock.unlock();
338     }
339     if (c == capacity)
340     signalNotFull();
341     return x;
342     }
343    
344     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
345     E x = null;
346     int c = -1;
347 dholmes 1.8 long nanos = unit.toNanos(timeout);
348 dl 1.2 takeLock.lockInterruptibly();
349     try {
350     for (;;) {
351     if (count.get() > 0) {
352     x = extract();
353     c = count.getAndDecrement();
354     if (c > 1)
355     notEmpty.signal();
356     break;
357     }
358     if (nanos <= 0)
359     return null;
360     try {
361     nanos = notEmpty.awaitNanos(nanos);
362     }
363     catch (InterruptedException ie) {
364     notEmpty.signal(); // propagate to a non-interrupted thread
365     throw ie;
366     }
367     }
368     }
369     finally {
370     takeLock.unlock();
371     }
372     if (c == capacity)
373     signalNotFull();
374     return x;
375     }
376    
377     public E poll() {
378     if (count.get() == 0)
379     return null;
380     E x = null;
381     int c = -1;
382     takeLock.tryLock();
383     try {
384     if (count.get() > 0) {
385     x = extract();
386     c = count.getAndDecrement();
387     if (c > 1)
388     notEmpty.signal();
389     }
390     }
391     finally {
392     takeLock.unlock();
393     }
394     if (c == capacity)
395     signalNotFull();
396     return x;
397 tim 1.1 }
398 dl 1.2
399    
400     public E peek() {
401     if (count.get() == 0)
402     return null;
403 dholmes 1.8 takeLock.lock();
404 dl 1.2 try {
405     Node<E> first = head.next;
406     if (first == null)
407     return null;
408     else
409     return first.item;
410     }
411     finally {
412     takeLock.unlock();
413     }
414 tim 1.1 }
415    
416     public boolean remove(Object x) {
417 dl 1.2 if (x == null) return false;
418     boolean removed = false;
419     fullyLock();
420     try {
421     Node<E> trail = head;
422     Node<E> p = head.next;
423     while (p != null) {
424     if (x.equals(p.item)) {
425     removed = true;
426     break;
427     }
428     trail = p;
429     p = p.next;
430     }
431     if (removed) {
432     p.item = null;
433     trail.next = p.next;
434     if (count.getAndDecrement() == capacity)
435     notFull.signalAll();
436     }
437     }
438     finally {
439     fullyUnlock();
440     }
441     return removed;
442 tim 1.1 }
443 dl 1.2
444     public Object[] toArray() {
445     fullyLock();
446     try {
447     int size = count.get();
448     Object[] a = new Object[size];
449     int k = 0;
450     for (Node<E> p = head.next; p != null; p = p.next)
451     a[k++] = p.item;
452     return a;
453     }
454     finally {
455     fullyUnlock();
456     }
457 tim 1.1 }
458 dl 1.2
459     public <T> T[] toArray(T[] a) {
460     fullyLock();
461     try {
462     int size = count.get();
463     if (a.length < size)
464 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
465     (a.getClass().getComponentType(), size);
466    
467 dl 1.2 int k = 0;
468     for (Node p = head.next; p != null; p = p.next)
469     a[k++] = (T)p.item;
470     return a;
471     }
472     finally {
473     fullyUnlock();
474     }
475 tim 1.1 }
476 dl 1.2
477     public String toString() {
478     fullyLock();
479     try {
480     return super.toString();
481     }
482     finally {
483     fullyUnlock();
484     }
485 tim 1.1 }
486 dl 1.2
487     public Iterator<E> iterator() {
488     return new Itr();
489 tim 1.1 }
490 dl 1.2
491     private class Itr implements Iterator<E> {
492 dl 1.4 /*
493     * Basic weak-consistent iterator. At all times hold the next
494     * item to hand out so that if hasNext() reports true, we will
495     * still have it to return even if lost race with a take etc.
496     */
497 dl 1.2 Node<E> current;
498     Node<E> lastRet;
499 dl 1.4 E currentElement;
500 dl 1.2
501     Itr() {
502     fullyLock();
503     try {
504     current = head.next;
505 dl 1.4 if (current != null)
506     currentElement = current.item;
507 dl 1.2 }
508     finally {
509     fullyUnlock();
510     }
511     }
512    
513     public boolean hasNext() {
514     return current != null;
515     }
516    
517     public E next() {
518     fullyLock();
519     try {
520     if (current == null)
521     throw new NoSuchElementException();
522 dl 1.4 E x = currentElement;
523 dl 1.2 lastRet = current;
524     current = current.next;
525 dl 1.4 if (current != null)
526     currentElement = current.item;
527 dl 1.2 return x;
528     }
529     finally {
530     fullyUnlock();
531     }
532    
533     }
534    
535     public void remove() {
536     if (lastRet == null)
537     throw new IllegalStateException();
538     fullyLock();
539     try {
540     Node<E> node = lastRet;
541     lastRet = null;
542     Node<E> trail = head;
543     Node<E> p = head.next;
544     while (p != null && p != node) {
545     trail = p;
546     p = p.next;
547     }
548     if (p == node) {
549     p.item = null;
550     trail.next = p.next;
551     int c = count.getAndDecrement();
552     if (c == capacity)
553     notFull.signalAll();
554     }
555     }
556     finally {
557     fullyUnlock();
558     }
559     }
560 tim 1.1 }
561 dl 1.2
562     /**
563     * Save the state to a stream (that is, serialize it).
564     *
565     * @serialData The capacity is emitted (int), followed by all of
566     * its elements (each an <tt>Object</tt>) in the proper order,
567     * followed by a null
568 dl 1.6 * @param s the stream
569 dl 1.2 */
570     private void writeObject(java.io.ObjectOutputStream s)
571     throws java.io.IOException {
572    
573     fullyLock();
574     try {
575     // Write out any hidden stuff, plus capacity
576     s.defaultWriteObject();
577    
578     // Write out all elements in the proper order.
579     for (Node<E> p = head.next; p != null; p = p.next)
580     s.writeObject(p.item);
581    
582     // Use trailing null as sentinel
583     s.writeObject(null);
584     }
585     finally {
586     fullyUnlock();
587     }
588 tim 1.1 }
589    
590 dl 1.2 /**
591 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
592 dl 1.2 * deserialize it).
593 dl 1.6 * @param s the stream
594 dl 1.2 */
595     private void readObject(java.io.ObjectInputStream s)
596     throws java.io.IOException, ClassNotFoundException {
597     // Read in capacity, and any hidden stuff
598     s.defaultReadObject();
599    
600 dl 1.6 // Read in all elements and place in queue
601 dl 1.2 for (;;) {
602     E item = (E)s.readObject();
603     if (item == null)
604     break;
605     add(item);
606     }
607 tim 1.1 }
608     }
609 dholmes 1.8
610    
611    
612    
613 tim 1.1