ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.3
Committed: Fri Jun 6 16:53:05 2003 UTC (21 years ago) by dl
Branch: MAIN
Changes since 1.2: +9 -1 lines
Log Message:
Minor doc updates; FairReentrantLock serialize now

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.3 * An optionally-bounded blocking queue based on linked nodes. Linked
13     * queues typically have higher throughput than array-based queues but
14     * less predicatble performance in most concurrent applications.
15     *
16     * <p> The optional capacity bound constructor argument serves as a
17     * way to prevent unlmited queue expansion. Linked nodes are
18     * dynamically created upon each insertion unless this would bring the
19     * queue above capacity.
20     *
21 tim 1.1 **/
22 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
23 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
24    
25 dl 1.2 /*
26     * A variant of the "two lock queue" algorithm. The putLock gates
27     * entry to put (and offer), and has an associated condition for
28     * waiting puts. Similarly for the takeLock. The "count" field
29     * that they both rely on is maintained as an atomic to avoid
30     * needing to get both locks in most cases. Also, to minimize need
31     * for puts to get takeLock and vice-versa, cascading notifies are
32     * used. When a put notices that it has enabled at least one take,
33     * it signals taker. That taker in turn signals others if more
34     * items have been entered since the signal. And symmetrically for
35     * takes signalling puts. Operations such as remove(Object) and
36     * iterators acquire both locks.
37     */
38    
39     static class Node<E> {
40     volatile E item;
41     Node<E> next;
42     Node(E x) { item = x; }
43     }
44    
45     private final int capacity;
46     private transient final AtomicInteger count = new AtomicInteger(0);
47    
48     private transient Node<E> head = new Node<E>(null);
49     private transient Node<E> last = head;
50    
51     private transient final ReentrantLock takeLock = new ReentrantLock();
52     private transient final Condition notEmpty = takeLock.newCondition();
53    
54     private transient final ReentrantLock putLock = new ReentrantLock();
55     private transient final Condition notFull = putLock.newCondition();
56    
57    
58     /**
59     * Signal a waiting take. Called only from put/offer (which do not
60     * otherwise ordinarily have takeLock.)
61     */
62     private void signalNotEmpty() {
63     takeLock.lock();
64     try {
65     notEmpty.signal();
66     }
67     finally {
68     takeLock.unlock();
69     }
70     }
71    
72     /**
73     * Signal a waiting put. Called only from take/poll.
74     */
75     private void signalNotFull() {
76     putLock.lock();
77     try {
78     notFull.signal();
79     }
80     finally {
81     putLock.unlock();
82     }
83     }
84    
85     /**
86     * Create a node and link it and end of queue
87     */
88     private void insert(E x) {
89     last = last.next = new Node<E>(x);
90     }
91    
92     /**
93     * Remove a node from head of queue,
94     */
95     private E extract() {
96     Node<E> first = head.next;
97     head = first;
98     E x = (E)first.item;
99     first.item = null;
100     return x;
101     }
102    
103     /**
104     * Lock to prevent both puts and takes.
105     */
106     private void fullyLock() {
107     putLock.lock();
108     takeLock.lock();
109 tim 1.1 }
110 dl 1.2
111     /**
112     * Unlock to allow both puts and takes.
113     */
114     private void fullyUnlock() {
115     takeLock.unlock();
116     putLock.unlock();
117     }
118    
119    
120     /**
121     * Creates a LinkedBlockingQueue with no intrinsic capacity constraint.
122     */
123     public LinkedBlockingQueue() {
124     this(Integer.MAX_VALUE);
125     }
126    
127     /**
128     * Creates a LinkedBlockingQueue with the given capacity constraint.
129     */
130     public LinkedBlockingQueue(int capacity) {
131     if (capacity <= 0) throw new IllegalArgumentException();
132     this.capacity = capacity;
133     }
134    
135     /**
136     * Creates a LinkedBlockingQueue without an intrinsic capacity
137     * constraint, initially holding the given elements.
138     */
139     public LinkedBlockingQueue(Collection<E> initialElements) {
140     this(Integer.MAX_VALUE);
141     for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
142     add(it.next());
143     }
144    
145     public int size() {
146     return count.get();
147 tim 1.1 }
148 dl 1.2
149     public int remainingCapacity() {
150     return capacity - count.get();
151     }
152    
153     public void put(E x) throws InterruptedException {
154     if (x == null) throw new IllegalArgumentException();
155     // Note: convention in all put/take/etc is to preset
156     // local var holding count negative to indicate failure unless set.
157     int c = -1;
158     putLock.lockInterruptibly();
159     try {
160     /*
161     * Note that count is used in wait guard even though it is
162     * not protected by lock. This works because count can
163     * only decrease at this point (all other puts are shut
164     * out by lock), and we (or some other waiting put) are
165     * signalled if it ever changes from
166     * capacity. Similarly for all other uses of count in
167     * other wait guards.
168     */
169     try {
170     while (count.get() == capacity)
171     notFull.await();
172     }
173     catch (InterruptedException ie) {
174     notFull.signal(); // propagate to a non-interrupted thread
175     throw ie;
176     }
177     insert(x);
178     c = count.getAndIncrement();
179     if (c+1 < capacity)
180     notFull.signal();
181     }
182     finally {
183     putLock.unlock();
184     }
185     if (c == 0)
186     signalNotEmpty();
187 tim 1.1 }
188 dl 1.2
189     public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
190     if (x == null) throw new IllegalArgumentException();
191     putLock.lockInterruptibly();
192     long nanos = unit.toNanos(timeout);
193     int c = -1;
194     try {
195     for (;;) {
196     if (count.get() < capacity) {
197     insert(x);
198     c = count.getAndIncrement();
199     if (c+1 < capacity)
200     notFull.signal();
201     break;
202     }
203     if (nanos <= 0)
204     return false;
205     try {
206     nanos = notFull.awaitNanos(nanos);
207     }
208     catch (InterruptedException ie) {
209     notFull.signal(); // propagate to a non-interrupted thread
210     throw ie;
211     }
212     }
213     }
214     finally {
215     putLock.unlock();
216     }
217     if (c == 0)
218     signalNotEmpty();
219     return true;
220 tim 1.1 }
221 dl 1.2
222 tim 1.1 public boolean offer(E x) {
223 dl 1.2 if (x == null) throw new IllegalArgumentException();
224     if (count.get() == capacity)
225     return false;
226     putLock.tryLock();
227     int c = -1;
228     try {
229     if (count.get() < capacity) {
230     insert(x);
231     c = count.getAndIncrement();
232     if (c+1 < capacity)
233     notFull.signal();
234     }
235     }
236     finally {
237     putLock.unlock();
238     }
239     if (c == 0)
240     signalNotEmpty();
241     return c >= 0;
242 tim 1.1 }
243 dl 1.2
244    
245     public E take() throws InterruptedException {
246     E x;
247     int c = -1;
248     takeLock.lockInterruptibly();
249     try {
250     try {
251     while (count.get() == 0)
252     notEmpty.await();
253     }
254     catch (InterruptedException ie) {
255     notEmpty.signal(); // propagate to a non-interrupted thread
256     throw ie;
257     }
258    
259     x = extract();
260     c = count.getAndDecrement();
261     if (c > 1)
262     notEmpty.signal();
263     }
264     finally {
265     takeLock.unlock();
266     }
267     if (c == capacity)
268     signalNotFull();
269     return x;
270     }
271    
272     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
273    
274     E x = null;
275     int c = -1;
276     takeLock.lockInterruptibly();
277     long nanos = unit.toNanos(timeout);
278     try {
279     for (;;) {
280     if (count.get() > 0) {
281     x = extract();
282     c = count.getAndDecrement();
283     if (c > 1)
284     notEmpty.signal();
285     break;
286     }
287     if (nanos <= 0)
288     return null;
289     try {
290     nanos = notEmpty.awaitNanos(nanos);
291     }
292     catch (InterruptedException ie) {
293     notEmpty.signal(); // propagate to a non-interrupted thread
294     throw ie;
295     }
296     }
297     }
298     finally {
299     takeLock.unlock();
300     }
301     if (c == capacity)
302     signalNotFull();
303     return x;
304     }
305    
306     public E poll() {
307     if (count.get() == 0)
308     return null;
309     E x = null;
310     int c = -1;
311     takeLock.tryLock();
312     try {
313     if (count.get() > 0) {
314     x = extract();
315     c = count.getAndDecrement();
316     if (c > 1)
317     notEmpty.signal();
318     }
319     }
320     finally {
321     takeLock.unlock();
322     }
323     if (c == capacity)
324     signalNotFull();
325     return x;
326 tim 1.1 }
327 dl 1.2
328    
329     public E peek() {
330     if (count.get() == 0)
331     return null;
332     takeLock.tryLock();
333     try {
334     Node<E> first = head.next;
335     if (first == null)
336     return null;
337     else
338     return first.item;
339     }
340     finally {
341     takeLock.unlock();
342     }
343 tim 1.1 }
344    
345     public boolean remove(Object x) {
346 dl 1.2 if (x == null) return false;
347     boolean removed = false;
348     fullyLock();
349     try {
350     Node<E> trail = head;
351     Node<E> p = head.next;
352     while (p != null) {
353     if (x.equals(p.item)) {
354     removed = true;
355     break;
356     }
357     trail = p;
358     p = p.next;
359     }
360     if (removed) {
361     p.item = null;
362     trail.next = p.next;
363     if (count.getAndDecrement() == capacity)
364     notFull.signalAll();
365     }
366     }
367     finally {
368     fullyUnlock();
369     }
370     return removed;
371 tim 1.1 }
372 dl 1.2
373     public Object[] toArray() {
374     fullyLock();
375     try {
376     int size = count.get();
377     Object[] a = new Object[size];
378     int k = 0;
379     for (Node<E> p = head.next; p != null; p = p.next)
380     a[k++] = p.item;
381     return a;
382     }
383     finally {
384     fullyUnlock();
385     }
386 tim 1.1 }
387 dl 1.2
388     public <T> T[] toArray(T[] a) {
389     fullyLock();
390     try {
391     int size = count.get();
392     if (a.length < size)
393     a = (T[])java.lang.reflect.Array.newInstance(
394     a.getClass().getComponentType(), size);
395    
396     int k = 0;
397     for (Node p = head.next; p != null; p = p.next)
398     a[k++] = (T)p.item;
399     return a;
400     }
401     finally {
402     fullyUnlock();
403     }
404 tim 1.1 }
405 dl 1.2
406     public String toString() {
407     fullyLock();
408     try {
409     return super.toString();
410     }
411     finally {
412     fullyUnlock();
413     }
414 tim 1.1 }
415 dl 1.2
416     public Iterator<E> iterator() {
417     return new Itr();
418 tim 1.1 }
419 dl 1.2
420     private class Itr implements Iterator<E> {
421     Node<E> current;
422     Node<E> lastRet;
423    
424     // for comodification checks
425     Node<E> expectedHead;
426     Node<E> expectedLast;
427     int expectedCount;
428    
429     Itr() {
430     fullyLock();
431     try {
432     expectedHead = head;
433     current = head.next;
434     expectedLast = last;
435     expectedCount = count.get();
436     }
437     finally {
438     fullyUnlock();
439     }
440     }
441    
442     public boolean hasNext() {
443     return current != null;
444     }
445    
446     private void checkForModification() {
447     if (expectedHead != head ||
448     expectedLast != last ||
449     expectedCount != count.get())
450     throw new ConcurrentModificationException();
451     }
452    
453     public E next() {
454     fullyLock();
455     try {
456     if (current == null)
457     throw new NoSuchElementException();
458     checkForModification();
459     E x = current.item;
460     lastRet = current;
461     current = current.next;
462     return x;
463     }
464     finally {
465     fullyUnlock();
466     }
467    
468     }
469    
470     public void remove() {
471     if (lastRet == null)
472     throw new IllegalStateException();
473     fullyLock();
474     try {
475     checkForModification();
476     Node<E> node = lastRet;
477     lastRet = null;
478     Node<E> trail = head;
479     Node<E> p = head.next;
480     while (p != null && p != node) {
481     trail = p;
482     p = p.next;
483     }
484     if (p == node) {
485     p.item = null;
486     trail.next = p.next;
487     int c = count.getAndDecrement();
488     expectedHead = head;
489     expectedLast = last;
490     expectedCount = c;
491     if (c == capacity)
492     notFull.signalAll();
493     }
494     }
495     finally {
496     fullyUnlock();
497     }
498     }
499 tim 1.1 }
500 dl 1.2
501     /**
502     * Save the state to a stream (that is, serialize it).
503     *
504     * @serialData The capacity is emitted (int), followed by all of
505     * its elements (each an <tt>Object</tt>) in the proper order,
506     * followed by a null
507     */
508     private void writeObject(java.io.ObjectOutputStream s)
509     throws java.io.IOException {
510    
511     fullyLock();
512     try {
513     // Write out any hidden stuff, plus capacity
514     s.defaultWriteObject();
515    
516     // Write out all elements in the proper order.
517     for (Node<E> p = head.next; p != null; p = p.next)
518     s.writeObject(p.item);
519    
520     // Use trailing null as sentinel
521     s.writeObject(null);
522     }
523     finally {
524     fullyUnlock();
525     }
526 tim 1.1 }
527    
528 dl 1.2 /**
529     * Reconstitute the Queue instance from a stream (that is,
530     * deserialize it).
531     */
532     private void readObject(java.io.ObjectInputStream s)
533     throws java.io.IOException, ClassNotFoundException {
534     // Read in capacity, and any hidden stuff
535     s.defaultReadObject();
536    
537     // Read in all elements and place in queue
538     for (;;) {
539     E item = (E)s.readObject();
540     if (item == null)
541     break;
542     add(item);
543     }
544 tim 1.1 }
545     }
546