ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.6
Committed: Tue Jun 24 14:34:48 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.5: +37 -13 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.3 * An optionally-bounded blocking queue based on linked nodes. Linked
13     * queues typically have higher throughput than array-based queues but
14     * less predicatble performance in most concurrent applications.
15     *
16     * <p> The optional capacity bound constructor argument serves as a
17     * way to prevent unlmited queue expansion. Linked nodes are
18     * dynamically created upon each insertion unless this would bring the
19     * queue above capacity.
20 dl 1.6 * @since 1.5
21     * @author Doug Lea
22 dl 1.3 *
23 tim 1.1 **/
24 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
25 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
26    
27 dl 1.2 /*
28     * A variant of the "two lock queue" algorithm. The putLock gates
29     * entry to put (and offer), and has an associated condition for
30     * waiting puts. Similarly for the takeLock. The "count" field
31     * that they both rely on is maintained as an atomic to avoid
32     * needing to get both locks in most cases. Also, to minimize need
33     * for puts to get takeLock and vice-versa, cascading notifies are
34     * used. When a put notices that it has enabled at least one take,
35     * it signals taker. That taker in turn signals others if more
36     * items have been entered since the signal. And symmetrically for
37     * takes signalling puts. Operations such as remove(Object) and
38     * iterators acquire both locks.
39     */
40    
41 dl 1.6 /**
42     * Linked list node class
43     */
44 dl 1.2 static class Node<E> {
45 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
46 dl 1.2 volatile E item;
47     Node<E> next;
48     Node(E x) { item = x; }
49     }
50    
51 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
52 dl 1.2 private final int capacity;
53 dl 1.6
54     /** Current number of elements */
55 dl 1.2 private transient final AtomicInteger count = new AtomicInteger(0);
56    
57 dl 1.6 /** Head of linked list */
58     private transient Node<E> head;
59    
60     /** Tail of lined list */
61     private transient Node<E> last;
62 dl 1.2
63 dl 1.6 /** Lock held by take, poll, etc */
64 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
65 dl 1.6
66     /** Wait queue for waiting takes */
67 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
68 dl 1.2
69 dl 1.6 /** Lock held by put, offer, etc */
70 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
71 dl 1.6
72     /** Wait queue for waiting puts */
73 dl 1.5 private final Condition notFull = putLock.newCondition();
74 dl 1.2
75     /**
76     * Signal a waiting take. Called only from put/offer (which do not
77 dl 1.4 * otherwise ordinarily lock takeLock.)
78 dl 1.2 */
79     private void signalNotEmpty() {
80     takeLock.lock();
81     try {
82     notEmpty.signal();
83     }
84     finally {
85     takeLock.unlock();
86     }
87     }
88    
89     /**
90     * Signal a waiting put. Called only from take/poll.
91     */
92     private void signalNotFull() {
93     putLock.lock();
94     try {
95     notFull.signal();
96     }
97     finally {
98     putLock.unlock();
99     }
100     }
101    
102     /**
103     * Create a node and link it and end of queue
104 dl 1.6 * @param x the item
105 dl 1.2 */
106     private void insert(E x) {
107     last = last.next = new Node<E>(x);
108     }
109    
110     /**
111     * Remove a node from head of queue,
112 dl 1.6 * @return the node
113 dl 1.2 */
114     private E extract() {
115     Node<E> first = head.next;
116     head = first;
117     E x = (E)first.item;
118     first.item = null;
119     return x;
120     }
121    
122     /**
123     * Lock to prevent both puts and takes.
124     */
125     private void fullyLock() {
126     putLock.lock();
127     takeLock.lock();
128 tim 1.1 }
129 dl 1.2
130     /**
131     * Unlock to allow both puts and takes.
132     */
133     private void fullyUnlock() {
134     takeLock.unlock();
135     putLock.unlock();
136     }
137    
138    
139     /**
140     * Creates a LinkedBlockingQueue with no intrinsic capacity constraint.
141     */
142     public LinkedBlockingQueue() {
143     this(Integer.MAX_VALUE);
144     }
145    
146     /**
147     * Creates a LinkedBlockingQueue with the given capacity constraint.
148 dl 1.6 * @param capacity the maminum number of elements to hold without blocking.
149 dl 1.2 */
150     public LinkedBlockingQueue(int capacity) {
151 dl 1.6 if (capacity <= 0) throw new NullPointerException();
152 dl 1.2 this.capacity = capacity;
153 dl 1.6 last = head = new Node<E>(null);
154 dl 1.2 }
155    
156     /**
157     * Creates a LinkedBlockingQueue without an intrinsic capacity
158 dl 1.6 * constraint, initially holding the given elements, added in
159     * traveral order of the collection's iterator.
160     * @param initialElements the elements to initially contain
161 dl 1.2 */
162     public LinkedBlockingQueue(Collection<E> initialElements) {
163     this(Integer.MAX_VALUE);
164     for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
165     add(it.next());
166     }
167    
168     public int size() {
169     return count.get();
170 tim 1.1 }
171 dl 1.2
172     public int remainingCapacity() {
173     return capacity - count.get();
174     }
175    
176     public void put(E x) throws InterruptedException {
177 dl 1.6 if (x == null) throw new NullPointerException();
178 dl 1.2 // Note: convention in all put/take/etc is to preset
179     // local var holding count negative to indicate failure unless set.
180     int c = -1;
181     putLock.lockInterruptibly();
182     try {
183     /*
184     * Note that count is used in wait guard even though it is
185     * not protected by lock. This works because count can
186     * only decrease at this point (all other puts are shut
187     * out by lock), and we (or some other waiting put) are
188     * signalled if it ever changes from
189     * capacity. Similarly for all other uses of count in
190     * other wait guards.
191     */
192     try {
193     while (count.get() == capacity)
194     notFull.await();
195     }
196     catch (InterruptedException ie) {
197     notFull.signal(); // propagate to a non-interrupted thread
198     throw ie;
199     }
200     insert(x);
201     c = count.getAndIncrement();
202 dl 1.6 if (c + 1 < capacity)
203 dl 1.2 notFull.signal();
204     }
205     finally {
206     putLock.unlock();
207     }
208     if (c == 0)
209     signalNotEmpty();
210 tim 1.1 }
211 dl 1.2
212     public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
213 dl 1.6 if (x == null) throw new NullPointerException();
214 dl 1.2 putLock.lockInterruptibly();
215     long nanos = unit.toNanos(timeout);
216     int c = -1;
217     try {
218     for (;;) {
219     if (count.get() < capacity) {
220     insert(x);
221     c = count.getAndIncrement();
222 dl 1.6 if (c + 1 < capacity)
223 dl 1.2 notFull.signal();
224     break;
225     }
226     if (nanos <= 0)
227     return false;
228     try {
229     nanos = notFull.awaitNanos(nanos);
230     }
231     catch (InterruptedException ie) {
232     notFull.signal(); // propagate to a non-interrupted thread
233     throw ie;
234     }
235     }
236     }
237     finally {
238     putLock.unlock();
239     }
240     if (c == 0)
241     signalNotEmpty();
242     return true;
243 tim 1.1 }
244 dl 1.2
245 tim 1.1 public boolean offer(E x) {
246 dl 1.6 if (x == null) throw new NullPointerException();
247 dl 1.2 if (count.get() == capacity)
248     return false;
249     putLock.tryLock();
250     int c = -1;
251     try {
252     if (count.get() < capacity) {
253     insert(x);
254     c = count.getAndIncrement();
255 dl 1.6 if (c + 1 < capacity)
256 dl 1.2 notFull.signal();
257     }
258     }
259     finally {
260     putLock.unlock();
261     }
262     if (c == 0)
263     signalNotEmpty();
264     return c >= 0;
265 tim 1.1 }
266 dl 1.2
267    
268     public E take() throws InterruptedException {
269     E x;
270     int c = -1;
271     takeLock.lockInterruptibly();
272     try {
273     try {
274     while (count.get() == 0)
275     notEmpty.await();
276     }
277     catch (InterruptedException ie) {
278     notEmpty.signal(); // propagate to a non-interrupted thread
279     throw ie;
280     }
281    
282     x = extract();
283     c = count.getAndDecrement();
284     if (c > 1)
285     notEmpty.signal();
286     }
287     finally {
288     takeLock.unlock();
289     }
290     if (c == capacity)
291     signalNotFull();
292     return x;
293     }
294    
295     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
296     E x = null;
297     int c = -1;
298     takeLock.lockInterruptibly();
299     long nanos = unit.toNanos(timeout);
300     try {
301     for (;;) {
302     if (count.get() > 0) {
303     x = extract();
304     c = count.getAndDecrement();
305     if (c > 1)
306     notEmpty.signal();
307     break;
308     }
309     if (nanos <= 0)
310     return null;
311     try {
312     nanos = notEmpty.awaitNanos(nanos);
313     }
314     catch (InterruptedException ie) {
315     notEmpty.signal(); // propagate to a non-interrupted thread
316     throw ie;
317     }
318     }
319     }
320     finally {
321     takeLock.unlock();
322     }
323     if (c == capacity)
324     signalNotFull();
325     return x;
326     }
327    
328     public E poll() {
329     if (count.get() == 0)
330     return null;
331     E x = null;
332     int c = -1;
333     takeLock.tryLock();
334     try {
335     if (count.get() > 0) {
336     x = extract();
337     c = count.getAndDecrement();
338     if (c > 1)
339     notEmpty.signal();
340     }
341     }
342     finally {
343     takeLock.unlock();
344     }
345     if (c == capacity)
346     signalNotFull();
347     return x;
348 tim 1.1 }
349 dl 1.2
350    
351     public E peek() {
352     if (count.get() == 0)
353     return null;
354     takeLock.tryLock();
355     try {
356     Node<E> first = head.next;
357     if (first == null)
358     return null;
359     else
360     return first.item;
361     }
362     finally {
363     takeLock.unlock();
364     }
365 tim 1.1 }
366    
367     public boolean remove(Object x) {
368 dl 1.2 if (x == null) return false;
369     boolean removed = false;
370     fullyLock();
371     try {
372     Node<E> trail = head;
373     Node<E> p = head.next;
374     while (p != null) {
375     if (x.equals(p.item)) {
376     removed = true;
377     break;
378     }
379     trail = p;
380     p = p.next;
381     }
382     if (removed) {
383     p.item = null;
384     trail.next = p.next;
385     if (count.getAndDecrement() == capacity)
386     notFull.signalAll();
387     }
388     }
389     finally {
390     fullyUnlock();
391     }
392     return removed;
393 tim 1.1 }
394 dl 1.2
395     public Object[] toArray() {
396     fullyLock();
397     try {
398     int size = count.get();
399     Object[] a = new Object[size];
400     int k = 0;
401     for (Node<E> p = head.next; p != null; p = p.next)
402     a[k++] = p.item;
403     return a;
404     }
405     finally {
406     fullyUnlock();
407     }
408 tim 1.1 }
409 dl 1.2
410     public <T> T[] toArray(T[] a) {
411     fullyLock();
412     try {
413     int size = count.get();
414     if (a.length < size)
415 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
416     (a.getClass().getComponentType(), size);
417    
418 dl 1.2 int k = 0;
419     for (Node p = head.next; p != null; p = p.next)
420     a[k++] = (T)p.item;
421     return a;
422     }
423     finally {
424     fullyUnlock();
425     }
426 tim 1.1 }
427 dl 1.2
428     public String toString() {
429     fullyLock();
430     try {
431     return super.toString();
432     }
433     finally {
434     fullyUnlock();
435     }
436 tim 1.1 }
437 dl 1.2
438     public Iterator<E> iterator() {
439     return new Itr();
440 tim 1.1 }
441 dl 1.2
442     private class Itr implements Iterator<E> {
443 dl 1.4 /*
444     * Basic weak-consistent iterator. At all times hold the next
445     * item to hand out so that if hasNext() reports true, we will
446     * still have it to return even if lost race with a take etc.
447     */
448 dl 1.2 Node<E> current;
449     Node<E> lastRet;
450 dl 1.4 E currentElement;
451 dl 1.2
452     Itr() {
453     fullyLock();
454     try {
455     current = head.next;
456 dl 1.4 if (current != null)
457     currentElement = current.item;
458 dl 1.2 }
459     finally {
460     fullyUnlock();
461     }
462     }
463    
464     public boolean hasNext() {
465     return current != null;
466     }
467    
468     public E next() {
469     fullyLock();
470     try {
471     if (current == null)
472     throw new NoSuchElementException();
473 dl 1.4 E x = currentElement;
474 dl 1.2 lastRet = current;
475     current = current.next;
476 dl 1.4 if (current != null)
477     currentElement = current.item;
478 dl 1.2 return x;
479     }
480     finally {
481     fullyUnlock();
482     }
483    
484     }
485    
486     public void remove() {
487     if (lastRet == null)
488     throw new IllegalStateException();
489     fullyLock();
490     try {
491     Node<E> node = lastRet;
492     lastRet = null;
493     Node<E> trail = head;
494     Node<E> p = head.next;
495     while (p != null && p != node) {
496     trail = p;
497     p = p.next;
498     }
499     if (p == node) {
500     p.item = null;
501     trail.next = p.next;
502     int c = count.getAndDecrement();
503     if (c == capacity)
504     notFull.signalAll();
505     }
506     }
507     finally {
508     fullyUnlock();
509     }
510     }
511 tim 1.1 }
512 dl 1.2
513     /**
514     * Save the state to a stream (that is, serialize it).
515     *
516     * @serialData The capacity is emitted (int), followed by all of
517     * its elements (each an <tt>Object</tt>) in the proper order,
518     * followed by a null
519 dl 1.6 * @param s the stream
520 dl 1.2 */
521     private void writeObject(java.io.ObjectOutputStream s)
522     throws java.io.IOException {
523    
524     fullyLock();
525     try {
526     // Write out any hidden stuff, plus capacity
527     s.defaultWriteObject();
528    
529     // Write out all elements in the proper order.
530     for (Node<E> p = head.next; p != null; p = p.next)
531     s.writeObject(p.item);
532    
533     // Use trailing null as sentinel
534     s.writeObject(null);
535     }
536     finally {
537     fullyUnlock();
538     }
539 tim 1.1 }
540    
541 dl 1.2 /**
542     * Reconstitute the Queue instance from a stream (that is,
543     * deserialize it).
544 dl 1.6 * @param s the stream
545 dl 1.2 */
546     private void readObject(java.io.ObjectInputStream s)
547     throws java.io.IOException, ClassNotFoundException {
548     // Read in capacity, and any hidden stuff
549     s.defaultReadObject();
550    
551 dl 1.6 // Read in all elements and place in queue
552 dl 1.2 for (;;) {
553     E item = (E)s.readObject();
554     if (item == null)
555     break;
556     add(item);
557     }
558 tim 1.1 }
559     }
560