ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.2
Committed: Tue May 27 18:14:40 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRERELEASE_0_1
Changes since 1.1: +509 -34 lines
Log Message:
re-check-in initial implementations

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12     * An unbounded queue based on linked nodes.
13     **/
14 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
15 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
16    
17 dl 1.2 /*
18     * A variant of the "two lock queue" algorithm. The putLock gates
19     * entry to put (and offer), and has an associated condition for
20     * waiting puts. Similarly for the takeLock. The "count" field
21     * that they both rely on is maintained as an atomic to avoid
22     * needing to get both locks in most cases. Also, to minimize need
23     * for puts to get takeLock and vice-versa, cascading notifies are
24     * used. When a put notices that it has enabled at least one take,
25     * it signals taker. That taker in turn signals others if more
26     * items have been entered since the signal. And symmetrically for
27     * takes signalling puts. Operations such as remove(Object) and
28     * iterators acquire both locks.
29     */
30    
31     static class Node<E> {
32     volatile E item;
33     Node<E> next;
34     Node(E x) { item = x; }
35     }
36    
37     private final int capacity;
38     private transient final AtomicInteger count = new AtomicInteger(0);
39    
40     private transient Node<E> head = new Node<E>(null);
41     private transient Node<E> last = head;
42    
43     private transient final ReentrantLock takeLock = new ReentrantLock();
44     private transient final Condition notEmpty = takeLock.newCondition();
45    
46     private transient final ReentrantLock putLock = new ReentrantLock();
47     private transient final Condition notFull = putLock.newCondition();
48    
49    
50     /**
51     * Signal a waiting take. Called only from put/offer (which do not
52     * otherwise ordinarily have takeLock.)
53     */
54     private void signalNotEmpty() {
55     takeLock.lock();
56     try {
57     notEmpty.signal();
58     }
59     finally {
60     takeLock.unlock();
61     }
62     }
63    
64     /**
65     * Signal a waiting put. Called only from take/poll.
66     */
67     private void signalNotFull() {
68     putLock.lock();
69     try {
70     notFull.signal();
71     }
72     finally {
73     putLock.unlock();
74     }
75     }
76    
77     /**
78     * Create a node and link it and end of queue
79     */
80     private void insert(E x) {
81     last = last.next = new Node<E>(x);
82     }
83    
84     /**
85     * Remove a node from head of queue,
86     */
87     private E extract() {
88     Node<E> first = head.next;
89     head = first;
90     E x = (E)first.item;
91     first.item = null;
92     return x;
93     }
94    
95     /**
96     * Lock to prevent both puts and takes.
97     */
98     private void fullyLock() {
99     putLock.lock();
100     takeLock.lock();
101 tim 1.1 }
102 dl 1.2
103     /**
104     * Unlock to allow both puts and takes.
105     */
106     private void fullyUnlock() {
107     takeLock.unlock();
108     putLock.unlock();
109     }
110    
111    
112     /**
113     * Creates a LinkedBlockingQueue with no intrinsic capacity constraint.
114     */
115     public LinkedBlockingQueue() {
116     this(Integer.MAX_VALUE);
117     }
118    
119     /**
120     * Creates a LinkedBlockingQueue with the given capacity constraint.
121     */
122     public LinkedBlockingQueue(int capacity) {
123     if (capacity <= 0) throw new IllegalArgumentException();
124     this.capacity = capacity;
125     }
126    
127     /**
128     * Creates a LinkedBlockingQueue without an intrinsic capacity
129     * constraint, initially holding the given elements.
130     */
131     public LinkedBlockingQueue(Collection<E> initialElements) {
132     this(Integer.MAX_VALUE);
133     for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
134     add(it.next());
135     }
136    
137     public int size() {
138     return count.get();
139 tim 1.1 }
140 dl 1.2
141     public int remainingCapacity() {
142     return capacity - count.get();
143     }
144    
145     public void put(E x) throws InterruptedException {
146     if (x == null) throw new IllegalArgumentException();
147     // Note: convention in all put/take/etc is to preset
148     // local var holding count negative to indicate failure unless set.
149     int c = -1;
150     putLock.lockInterruptibly();
151     try {
152     /*
153     * Note that count is used in wait guard even though it is
154     * not protected by lock. This works because count can
155     * only decrease at this point (all other puts are shut
156     * out by lock), and we (or some other waiting put) are
157     * signalled if it ever changes from
158     * capacity. Similarly for all other uses of count in
159     * other wait guards.
160     */
161     try {
162     while (count.get() == capacity)
163     notFull.await();
164     }
165     catch (InterruptedException ie) {
166     notFull.signal(); // propagate to a non-interrupted thread
167     throw ie;
168     }
169     insert(x);
170     c = count.getAndIncrement();
171     if (c+1 < capacity)
172     notFull.signal();
173     }
174     finally {
175     putLock.unlock();
176     }
177     if (c == 0)
178     signalNotEmpty();
179 tim 1.1 }
180 dl 1.2
181     public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
182     if (x == null) throw new IllegalArgumentException();
183     putLock.lockInterruptibly();
184     long nanos = unit.toNanos(timeout);
185     int c = -1;
186     try {
187     for (;;) {
188     if (count.get() < capacity) {
189     insert(x);
190     c = count.getAndIncrement();
191     if (c+1 < capacity)
192     notFull.signal();
193     break;
194     }
195     if (nanos <= 0)
196     return false;
197     try {
198     nanos = notFull.awaitNanos(nanos);
199     }
200     catch (InterruptedException ie) {
201     notFull.signal(); // propagate to a non-interrupted thread
202     throw ie;
203     }
204     }
205     }
206     finally {
207     putLock.unlock();
208     }
209     if (c == 0)
210     signalNotEmpty();
211     return true;
212 tim 1.1 }
213 dl 1.2
214 tim 1.1 public boolean offer(E x) {
215 dl 1.2 if (x == null) throw new IllegalArgumentException();
216     if (count.get() == capacity)
217     return false;
218     putLock.tryLock();
219     int c = -1;
220     try {
221     if (count.get() < capacity) {
222     insert(x);
223     c = count.getAndIncrement();
224     if (c+1 < capacity)
225     notFull.signal();
226     }
227     }
228     finally {
229     putLock.unlock();
230     }
231     if (c == 0)
232     signalNotEmpty();
233     return c >= 0;
234 tim 1.1 }
235 dl 1.2
236    
237     public E take() throws InterruptedException {
238     E x;
239     int c = -1;
240     takeLock.lockInterruptibly();
241     try {
242     try {
243     while (count.get() == 0)
244     notEmpty.await();
245     }
246     catch (InterruptedException ie) {
247     notEmpty.signal(); // propagate to a non-interrupted thread
248     throw ie;
249     }
250    
251     x = extract();
252     c = count.getAndDecrement();
253     if (c > 1)
254     notEmpty.signal();
255     }
256     finally {
257     takeLock.unlock();
258     }
259     if (c == capacity)
260     signalNotFull();
261     return x;
262     }
263    
264     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
265    
266     E x = null;
267     int c = -1;
268     takeLock.lockInterruptibly();
269     long nanos = unit.toNanos(timeout);
270     try {
271     for (;;) {
272     if (count.get() > 0) {
273     x = extract();
274     c = count.getAndDecrement();
275     if (c > 1)
276     notEmpty.signal();
277     break;
278     }
279     if (nanos <= 0)
280     return null;
281     try {
282     nanos = notEmpty.awaitNanos(nanos);
283     }
284     catch (InterruptedException ie) {
285     notEmpty.signal(); // propagate to a non-interrupted thread
286     throw ie;
287     }
288     }
289     }
290     finally {
291     takeLock.unlock();
292     }
293     if (c == capacity)
294     signalNotFull();
295     return x;
296     }
297    
298     public E poll() {
299     if (count.get() == 0)
300     return null;
301     E x = null;
302     int c = -1;
303     takeLock.tryLock();
304     try {
305     if (count.get() > 0) {
306     x = extract();
307     c = count.getAndDecrement();
308     if (c > 1)
309     notEmpty.signal();
310     }
311     }
312     finally {
313     takeLock.unlock();
314     }
315     if (c == capacity)
316     signalNotFull();
317     return x;
318 tim 1.1 }
319 dl 1.2
320    
321     public E peek() {
322     if (count.get() == 0)
323     return null;
324     takeLock.tryLock();
325     try {
326     Node<E> first = head.next;
327     if (first == null)
328     return null;
329     else
330     return first.item;
331     }
332     finally {
333     takeLock.unlock();
334     }
335 tim 1.1 }
336    
337     public boolean remove(Object x) {
338 dl 1.2 if (x == null) return false;
339     boolean removed = false;
340     fullyLock();
341     try {
342     Node<E> trail = head;
343     Node<E> p = head.next;
344     while (p != null) {
345     if (x.equals(p.item)) {
346     removed = true;
347     break;
348     }
349     trail = p;
350     p = p.next;
351     }
352     if (removed) {
353     p.item = null;
354     trail.next = p.next;
355     if (count.getAndDecrement() == capacity)
356     notFull.signalAll();
357     }
358     }
359     finally {
360     fullyUnlock();
361     }
362     return removed;
363 tim 1.1 }
364 dl 1.2
365     public Object[] toArray() {
366     fullyLock();
367     try {
368     int size = count.get();
369     Object[] a = new Object[size];
370     int k = 0;
371     for (Node<E> p = head.next; p != null; p = p.next)
372     a[k++] = p.item;
373     return a;
374     }
375     finally {
376     fullyUnlock();
377     }
378 tim 1.1 }
379 dl 1.2
380     public <T> T[] toArray(T[] a) {
381     fullyLock();
382     try {
383     int size = count.get();
384     if (a.length < size)
385     a = (T[])java.lang.reflect.Array.newInstance(
386     a.getClass().getComponentType(), size);
387    
388     int k = 0;
389     for (Node p = head.next; p != null; p = p.next)
390     a[k++] = (T)p.item;
391     return a;
392     }
393     finally {
394     fullyUnlock();
395     }
396 tim 1.1 }
397 dl 1.2
398     public String toString() {
399     fullyLock();
400     try {
401     return super.toString();
402     }
403     finally {
404     fullyUnlock();
405     }
406 tim 1.1 }
407 dl 1.2
408     public Iterator<E> iterator() {
409     return new Itr();
410 tim 1.1 }
411 dl 1.2
412     private class Itr implements Iterator<E> {
413     Node<E> current;
414     Node<E> lastRet;
415    
416     // for comodification checks
417     Node<E> expectedHead;
418     Node<E> expectedLast;
419     int expectedCount;
420    
421     Itr() {
422     fullyLock();
423     try {
424     expectedHead = head;
425     current = head.next;
426     expectedLast = last;
427     expectedCount = count.get();
428     }
429     finally {
430     fullyUnlock();
431     }
432     }
433    
434     public boolean hasNext() {
435     return current != null;
436     }
437    
438     private void checkForModification() {
439     if (expectedHead != head ||
440     expectedLast != last ||
441     expectedCount != count.get())
442     throw new ConcurrentModificationException();
443     }
444    
445     public E next() {
446     fullyLock();
447     try {
448     if (current == null)
449     throw new NoSuchElementException();
450     checkForModification();
451     E x = current.item;
452     lastRet = current;
453     current = current.next;
454     return x;
455     }
456     finally {
457     fullyUnlock();
458     }
459    
460     }
461    
462     public void remove() {
463     if (lastRet == null)
464     throw new IllegalStateException();
465     fullyLock();
466     try {
467     checkForModification();
468     Node<E> node = lastRet;
469     lastRet = null;
470     Node<E> trail = head;
471     Node<E> p = head.next;
472     while (p != null && p != node) {
473     trail = p;
474     p = p.next;
475     }
476     if (p == node) {
477     p.item = null;
478     trail.next = p.next;
479     int c = count.getAndDecrement();
480     expectedHead = head;
481     expectedLast = last;
482     expectedCount = c;
483     if (c == capacity)
484     notFull.signalAll();
485     }
486     }
487     finally {
488     fullyUnlock();
489     }
490     }
491 tim 1.1 }
492 dl 1.2
493     /**
494     * Save the state to a stream (that is, serialize it).
495     *
496     * @serialData The capacity is emitted (int), followed by all of
497     * its elements (each an <tt>Object</tt>) in the proper order,
498     * followed by a null
499     */
500     private void writeObject(java.io.ObjectOutputStream s)
501     throws java.io.IOException {
502    
503     fullyLock();
504     try {
505     // Write out any hidden stuff, plus capacity
506     s.defaultWriteObject();
507    
508     // Write out all elements in the proper order.
509     for (Node<E> p = head.next; p != null; p = p.next)
510     s.writeObject(p.item);
511    
512     // Use trailing null as sentinel
513     s.writeObject(null);
514     }
515     finally {
516     fullyUnlock();
517     }
518 tim 1.1 }
519    
520 dl 1.2 /**
521     * Reconstitute the Queue instance from a stream (that is,
522     * deserialize it).
523     */
524     private void readObject(java.io.ObjectInputStream s)
525     throws java.io.IOException, ClassNotFoundException {
526     // Read in capacity, and any hidden stuff
527     s.defaultReadObject();
528    
529     // Read in all elements and place in queue
530     for (;;) {
531     E item = (E)s.readObject();
532     if (item == null)
533     break;
534     add(item);
535     }
536 tim 1.1 }
537     }
538