ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.7
Committed: Tue Jul 8 00:46:34 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_2
Changes since 1.6: +1 -0 lines
Log Message:
Locks in subpackage; fairness params added

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dl 1.3 * An optionally-bounded blocking queue based on linked nodes. Linked
14     * queues typically have higher throughput than array-based queues but
15     * less predicatble performance in most concurrent applications.
16     *
17     * <p> The optional capacity bound constructor argument serves as a
18     * way to prevent unlmited queue expansion. Linked nodes are
19     * dynamically created upon each insertion unless this would bring the
20     * queue above capacity.
21 dl 1.6 * @since 1.5
22     * @author Doug Lea
23 dl 1.3 *
24 tim 1.1 **/
25 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
26 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
27    
28 dl 1.2 /*
29     * A variant of the "two lock queue" algorithm. The putLock gates
30     * entry to put (and offer), and has an associated condition for
31     * waiting puts. Similarly for the takeLock. The "count" field
32     * that they both rely on is maintained as an atomic to avoid
33     * needing to get both locks in most cases. Also, to minimize need
34     * for puts to get takeLock and vice-versa, cascading notifies are
35     * used. When a put notices that it has enabled at least one take,
36     * it signals taker. That taker in turn signals others if more
37     * items have been entered since the signal. And symmetrically for
38     * takes signalling puts. Operations such as remove(Object) and
39     * iterators acquire both locks.
40     */
41    
42 dl 1.6 /**
43     * Linked list node class
44     */
45 dl 1.2 static class Node<E> {
46 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
47 dl 1.2 volatile E item;
48     Node<E> next;
49     Node(E x) { item = x; }
50     }
51    
52 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
53 dl 1.2 private final int capacity;
54 dl 1.6
55     /** Current number of elements */
56 dl 1.2 private transient final AtomicInteger count = new AtomicInteger(0);
57    
58 dl 1.6 /** Head of linked list */
59     private transient Node<E> head;
60    
61     /** Tail of lined list */
62     private transient Node<E> last;
63 dl 1.2
64 dl 1.6 /** Lock held by take, poll, etc */
65 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
66 dl 1.6
67     /** Wait queue for waiting takes */
68 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
69 dl 1.2
70 dl 1.6 /** Lock held by put, offer, etc */
71 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
72 dl 1.6
73     /** Wait queue for waiting puts */
74 dl 1.5 private final Condition notFull = putLock.newCondition();
75 dl 1.2
76     /**
77     * Signal a waiting take. Called only from put/offer (which do not
78 dl 1.4 * otherwise ordinarily lock takeLock.)
79 dl 1.2 */
80     private void signalNotEmpty() {
81     takeLock.lock();
82     try {
83     notEmpty.signal();
84     }
85     finally {
86     takeLock.unlock();
87     }
88     }
89    
90     /**
91     * Signal a waiting put. Called only from take/poll.
92     */
93     private void signalNotFull() {
94     putLock.lock();
95     try {
96     notFull.signal();
97     }
98     finally {
99     putLock.unlock();
100     }
101     }
102    
103     /**
104     * Create a node and link it and end of queue
105 dl 1.6 * @param x the item
106 dl 1.2 */
107     private void insert(E x) {
108     last = last.next = new Node<E>(x);
109     }
110    
111     /**
112     * Remove a node from head of queue,
113 dl 1.6 * @return the node
114 dl 1.2 */
115     private E extract() {
116     Node<E> first = head.next;
117     head = first;
118     E x = (E)first.item;
119     first.item = null;
120     return x;
121     }
122    
123     /**
124     * Lock to prevent both puts and takes.
125     */
126     private void fullyLock() {
127     putLock.lock();
128     takeLock.lock();
129 tim 1.1 }
130 dl 1.2
131     /**
132     * Unlock to allow both puts and takes.
133     */
134     private void fullyUnlock() {
135     takeLock.unlock();
136     putLock.unlock();
137     }
138    
139    
140     /**
141     * Creates a LinkedBlockingQueue with no intrinsic capacity constraint.
142     */
143     public LinkedBlockingQueue() {
144     this(Integer.MAX_VALUE);
145     }
146    
147     /**
148     * Creates a LinkedBlockingQueue with the given capacity constraint.
149 dl 1.6 * @param capacity the maminum number of elements to hold without blocking.
150 dl 1.2 */
151     public LinkedBlockingQueue(int capacity) {
152 dl 1.6 if (capacity <= 0) throw new NullPointerException();
153 dl 1.2 this.capacity = capacity;
154 dl 1.6 last = head = new Node<E>(null);
155 dl 1.2 }
156    
157     /**
158     * Creates a LinkedBlockingQueue without an intrinsic capacity
159 dl 1.6 * constraint, initially holding the given elements, added in
160     * traveral order of the collection's iterator.
161     * @param initialElements the elements to initially contain
162 dl 1.2 */
163     public LinkedBlockingQueue(Collection<E> initialElements) {
164     this(Integer.MAX_VALUE);
165     for (Iterator<E> it = initialElements.iterator(); it.hasNext();)
166     add(it.next());
167     }
168    
169     public int size() {
170     return count.get();
171 tim 1.1 }
172 dl 1.2
173     public int remainingCapacity() {
174     return capacity - count.get();
175     }
176    
177     public void put(E x) throws InterruptedException {
178 dl 1.6 if (x == null) throw new NullPointerException();
179 dl 1.2 // Note: convention in all put/take/etc is to preset
180     // local var holding count negative to indicate failure unless set.
181     int c = -1;
182     putLock.lockInterruptibly();
183     try {
184     /*
185     * Note that count is used in wait guard even though it is
186     * not protected by lock. This works because count can
187     * only decrease at this point (all other puts are shut
188     * out by lock), and we (or some other waiting put) are
189     * signalled if it ever changes from
190     * capacity. Similarly for all other uses of count in
191     * other wait guards.
192     */
193     try {
194     while (count.get() == capacity)
195     notFull.await();
196     }
197     catch (InterruptedException ie) {
198     notFull.signal(); // propagate to a non-interrupted thread
199     throw ie;
200     }
201     insert(x);
202     c = count.getAndIncrement();
203 dl 1.6 if (c + 1 < capacity)
204 dl 1.2 notFull.signal();
205     }
206     finally {
207     putLock.unlock();
208     }
209     if (c == 0)
210     signalNotEmpty();
211 tim 1.1 }
212 dl 1.2
213     public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
214 dl 1.6 if (x == null) throw new NullPointerException();
215 dl 1.2 putLock.lockInterruptibly();
216     long nanos = unit.toNanos(timeout);
217     int c = -1;
218     try {
219     for (;;) {
220     if (count.get() < capacity) {
221     insert(x);
222     c = count.getAndIncrement();
223 dl 1.6 if (c + 1 < capacity)
224 dl 1.2 notFull.signal();
225     break;
226     }
227     if (nanos <= 0)
228     return false;
229     try {
230     nanos = notFull.awaitNanos(nanos);
231     }
232     catch (InterruptedException ie) {
233     notFull.signal(); // propagate to a non-interrupted thread
234     throw ie;
235     }
236     }
237     }
238     finally {
239     putLock.unlock();
240     }
241     if (c == 0)
242     signalNotEmpty();
243     return true;
244 tim 1.1 }
245 dl 1.2
246 tim 1.1 public boolean offer(E x) {
247 dl 1.6 if (x == null) throw new NullPointerException();
248 dl 1.2 if (count.get() == capacity)
249     return false;
250     putLock.tryLock();
251     int c = -1;
252     try {
253     if (count.get() < capacity) {
254     insert(x);
255     c = count.getAndIncrement();
256 dl 1.6 if (c + 1 < capacity)
257 dl 1.2 notFull.signal();
258     }
259     }
260     finally {
261     putLock.unlock();
262     }
263     if (c == 0)
264     signalNotEmpty();
265     return c >= 0;
266 tim 1.1 }
267 dl 1.2
268    
269     public E take() throws InterruptedException {
270     E x;
271     int c = -1;
272     takeLock.lockInterruptibly();
273     try {
274     try {
275     while (count.get() == 0)
276     notEmpty.await();
277     }
278     catch (InterruptedException ie) {
279     notEmpty.signal(); // propagate to a non-interrupted thread
280     throw ie;
281     }
282    
283     x = extract();
284     c = count.getAndDecrement();
285     if (c > 1)
286     notEmpty.signal();
287     }
288     finally {
289     takeLock.unlock();
290     }
291     if (c == capacity)
292     signalNotFull();
293     return x;
294     }
295    
296     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
297     E x = null;
298     int c = -1;
299     takeLock.lockInterruptibly();
300     long nanos = unit.toNanos(timeout);
301     try {
302     for (;;) {
303     if (count.get() > 0) {
304     x = extract();
305     c = count.getAndDecrement();
306     if (c > 1)
307     notEmpty.signal();
308     break;
309     }
310     if (nanos <= 0)
311     return null;
312     try {
313     nanos = notEmpty.awaitNanos(nanos);
314     }
315     catch (InterruptedException ie) {
316     notEmpty.signal(); // propagate to a non-interrupted thread
317     throw ie;
318     }
319     }
320     }
321     finally {
322     takeLock.unlock();
323     }
324     if (c == capacity)
325     signalNotFull();
326     return x;
327     }
328    
329     public E poll() {
330     if (count.get() == 0)
331     return null;
332     E x = null;
333     int c = -1;
334     takeLock.tryLock();
335     try {
336     if (count.get() > 0) {
337     x = extract();
338     c = count.getAndDecrement();
339     if (c > 1)
340     notEmpty.signal();
341     }
342     }
343     finally {
344     takeLock.unlock();
345     }
346     if (c == capacity)
347     signalNotFull();
348     return x;
349 tim 1.1 }
350 dl 1.2
351    
352     public E peek() {
353     if (count.get() == 0)
354     return null;
355     takeLock.tryLock();
356     try {
357     Node<E> first = head.next;
358     if (first == null)
359     return null;
360     else
361     return first.item;
362     }
363     finally {
364     takeLock.unlock();
365     }
366 tim 1.1 }
367    
368     public boolean remove(Object x) {
369 dl 1.2 if (x == null) return false;
370     boolean removed = false;
371     fullyLock();
372     try {
373     Node<E> trail = head;
374     Node<E> p = head.next;
375     while (p != null) {
376     if (x.equals(p.item)) {
377     removed = true;
378     break;
379     }
380     trail = p;
381     p = p.next;
382     }
383     if (removed) {
384     p.item = null;
385     trail.next = p.next;
386     if (count.getAndDecrement() == capacity)
387     notFull.signalAll();
388     }
389     }
390     finally {
391     fullyUnlock();
392     }
393     return removed;
394 tim 1.1 }
395 dl 1.2
396     public Object[] toArray() {
397     fullyLock();
398     try {
399     int size = count.get();
400     Object[] a = new Object[size];
401     int k = 0;
402     for (Node<E> p = head.next; p != null; p = p.next)
403     a[k++] = p.item;
404     return a;
405     }
406     finally {
407     fullyUnlock();
408     }
409 tim 1.1 }
410 dl 1.2
411     public <T> T[] toArray(T[] a) {
412     fullyLock();
413     try {
414     int size = count.get();
415     if (a.length < size)
416 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
417     (a.getClass().getComponentType(), size);
418    
419 dl 1.2 int k = 0;
420     for (Node p = head.next; p != null; p = p.next)
421     a[k++] = (T)p.item;
422     return a;
423     }
424     finally {
425     fullyUnlock();
426     }
427 tim 1.1 }
428 dl 1.2
429     public String toString() {
430     fullyLock();
431     try {
432     return super.toString();
433     }
434     finally {
435     fullyUnlock();
436     }
437 tim 1.1 }
438 dl 1.2
439     public Iterator<E> iterator() {
440     return new Itr();
441 tim 1.1 }
442 dl 1.2
443     private class Itr implements Iterator<E> {
444 dl 1.4 /*
445     * Basic weak-consistent iterator. At all times hold the next
446     * item to hand out so that if hasNext() reports true, we will
447     * still have it to return even if lost race with a take etc.
448     */
449 dl 1.2 Node<E> current;
450     Node<E> lastRet;
451 dl 1.4 E currentElement;
452 dl 1.2
453     Itr() {
454     fullyLock();
455     try {
456     current = head.next;
457 dl 1.4 if (current != null)
458     currentElement = current.item;
459 dl 1.2 }
460     finally {
461     fullyUnlock();
462     }
463     }
464    
465     public boolean hasNext() {
466     return current != null;
467     }
468    
469     public E next() {
470     fullyLock();
471     try {
472     if (current == null)
473     throw new NoSuchElementException();
474 dl 1.4 E x = currentElement;
475 dl 1.2 lastRet = current;
476     current = current.next;
477 dl 1.4 if (current != null)
478     currentElement = current.item;
479 dl 1.2 return x;
480     }
481     finally {
482     fullyUnlock();
483     }
484    
485     }
486    
487     public void remove() {
488     if (lastRet == null)
489     throw new IllegalStateException();
490     fullyLock();
491     try {
492     Node<E> node = lastRet;
493     lastRet = null;
494     Node<E> trail = head;
495     Node<E> p = head.next;
496     while (p != null && p != node) {
497     trail = p;
498     p = p.next;
499     }
500     if (p == node) {
501     p.item = null;
502     trail.next = p.next;
503     int c = count.getAndDecrement();
504     if (c == capacity)
505     notFull.signalAll();
506     }
507     }
508     finally {
509     fullyUnlock();
510     }
511     }
512 tim 1.1 }
513 dl 1.2
514     /**
515     * Save the state to a stream (that is, serialize it).
516     *
517     * @serialData The capacity is emitted (int), followed by all of
518     * its elements (each an <tt>Object</tt>) in the proper order,
519     * followed by a null
520 dl 1.6 * @param s the stream
521 dl 1.2 */
522     private void writeObject(java.io.ObjectOutputStream s)
523     throws java.io.IOException {
524    
525     fullyLock();
526     try {
527     // Write out any hidden stuff, plus capacity
528     s.defaultWriteObject();
529    
530     // Write out all elements in the proper order.
531     for (Node<E> p = head.next; p != null; p = p.next)
532     s.writeObject(p.item);
533    
534     // Use trailing null as sentinel
535     s.writeObject(null);
536     }
537     finally {
538     fullyUnlock();
539     }
540 tim 1.1 }
541    
542 dl 1.2 /**
543     * Reconstitute the Queue instance from a stream (that is,
544     * deserialize it).
545 dl 1.6 * @param s the stream
546 dl 1.2 */
547     private void readObject(java.io.ObjectInputStream s)
548     throws java.io.IOException, ClassNotFoundException {
549     // Read in capacity, and any hidden stuff
550     s.defaultReadObject();
551    
552 dl 1.6 // Read in all elements and place in queue
553 dl 1.2 for (;;) {
554     E item = (E)s.readObject();
555     if (item == null)
556     break;
557     add(item);
558     }
559 tim 1.1 }
560     }
561