ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.9
Committed: Thu Jul 31 07:18:02 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.8: +29 -7 lines
Log Message:
Continued updates to explicit and inherited doc comments.
Consistency over remove(null)
Some inherited doc is still not right.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.8 * An optionally-bounded {@link BlockingQueue blocking queue} based on
14     * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16     * The <em>head</em> of the queue is that element that has been on the
17     * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19     * queue the shortest time.
20     * Linked queues typically have higher throughput than array-based queues but
21     * less predictable performance in most concurrent applications.
22 dl 1.3 *
23     * <p> The optional capacity bound constructor argument serves as a
24 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
25     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 dl 1.3 * dynamically created upon each insertion unless this would bring the
27     * queue above capacity.
28 dholmes 1.8 *
29 dl 1.6 * @since 1.5
30     * @author Doug Lea
31 dl 1.3 *
32 tim 1.1 **/
33 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
35    
36 dl 1.2 /*
37     * A variant of the "two lock queue" algorithm. The putLock gates
38     * entry to put (and offer), and has an associated condition for
39     * waiting puts. Similarly for the takeLock. The "count" field
40     * that they both rely on is maintained as an atomic to avoid
41     * needing to get both locks in most cases. Also, to minimize need
42     * for puts to get takeLock and vice-versa, cascading notifies are
43     * used. When a put notices that it has enabled at least one take,
44     * it signals taker. That taker in turn signals others if more
45     * items have been entered since the signal. And symmetrically for
46     * takes signalling puts. Operations such as remove(Object) and
47     * iterators acquire both locks.
48     */
49    
50 dl 1.6 /**
51     * Linked list node class
52     */
53 dl 1.2 static class Node<E> {
54 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
55 dl 1.2 volatile E item;
56     Node<E> next;
57     Node(E x) { item = x; }
58     }
59    
60 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
61 dl 1.2 private final int capacity;
62 dl 1.6
63     /** Current number of elements */
64 dl 1.2 private transient final AtomicInteger count = new AtomicInteger(0);
65    
66 dl 1.6 /** Head of linked list */
67     private transient Node<E> head;
68    
69 dholmes 1.8 /** Tail of linked list */
70 dl 1.6 private transient Node<E> last;
71 dl 1.2
72 dl 1.6 /** Lock held by take, poll, etc */
73 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
74 dl 1.6
75     /** Wait queue for waiting takes */
76 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
77 dl 1.2
78 dl 1.6 /** Lock held by put, offer, etc */
79 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
80 dl 1.6
81     /** Wait queue for waiting puts */
82 dl 1.5 private final Condition notFull = putLock.newCondition();
83 dl 1.2
84     /**
85     * Signal a waiting take. Called only from put/offer (which do not
86 dl 1.4 * otherwise ordinarily lock takeLock.)
87 dl 1.2 */
88     private void signalNotEmpty() {
89     takeLock.lock();
90     try {
91     notEmpty.signal();
92     }
93     finally {
94     takeLock.unlock();
95     }
96     }
97    
98     /**
99     * Signal a waiting put. Called only from take/poll.
100     */
101     private void signalNotFull() {
102     putLock.lock();
103     try {
104     notFull.signal();
105     }
106     finally {
107     putLock.unlock();
108     }
109     }
110    
111     /**
112 dholmes 1.8 * Create a node and link it at end of queue
113 dl 1.6 * @param x the item
114 dl 1.2 */
115     private void insert(E x) {
116     last = last.next = new Node<E>(x);
117     }
118    
119     /**
120     * Remove a node from head of queue,
121 dl 1.6 * @return the node
122 dl 1.2 */
123     private E extract() {
124     Node<E> first = head.next;
125     head = first;
126     E x = (E)first.item;
127     first.item = null;
128     return x;
129     }
130    
131     /**
132     * Lock to prevent both puts and takes.
133     */
134     private void fullyLock() {
135     putLock.lock();
136     takeLock.lock();
137 tim 1.1 }
138 dl 1.2
139     /**
140     * Unlock to allow both puts and takes.
141     */
142     private void fullyUnlock() {
143     takeLock.unlock();
144     putLock.unlock();
145     }
146    
147    
148     /**
149 dholmes 1.8 * Create a <tt>LinkedBlockingQueue</tt> with a capacity of
150     * {@link Integer#MAX_VALUE}.
151 dl 1.2 */
152     public LinkedBlockingQueue() {
153     this(Integer.MAX_VALUE);
154     }
155    
156     /**
157 dholmes 1.8 * Create a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity
158     * @param capacity the capacity of this queue.
159     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
160     * than zero.
161 dl 1.2 */
162     public LinkedBlockingQueue(int capacity) {
163 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
164 dl 1.2 this.capacity = capacity;
165 dl 1.6 last = head = new Node<E>(null);
166 dl 1.2 }
167    
168     /**
169 dholmes 1.9 * Create a <tt>LinkedBlockingQueue</tt> with a capacity of
170 dholmes 1.8 * {@link Integer#MAX_VALUE}, initially holding the elements of the
171     * given collection,
172     * added in traversal order of the collection's iterator.
173 dholmes 1.9 * @param c the collection of elements to initially contain
174     * @throws NullPointerException if <tt>c</tt> or any element within it
175     * is <tt>null</tt>
176 dl 1.2 */
177 dholmes 1.9 public LinkedBlockingQueue(Collection<E> c) {
178 dl 1.2 this(Integer.MAX_VALUE);
179 dholmes 1.9 for (Iterator<E> it = c.iterator(); it.hasNext();)
180 dl 1.2 add(it.next());
181     }
182    
183 dholmes 1.9
184     // Have to override just to update the javadoc for @throws
185    
186     /**
187     * @throws IllegalStateException {@inheritDoc}
188     * @throws NullPointerException {@inheritDoc}
189     */
190     public boolean add(E o) {
191     return super.add(o);
192     }
193    
194     /**
195     * @throws IllegalStateException {@inheritDoc}
196     * @throws NullPointerException {@inheritDoc}
197     */
198     public boolean addAll(Collection<? extends E> c) {
199     return super.addAll(c);
200     }
201    
202    
203 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
204     // greater in size than Integer.MAX_VALUE
205     /**
206     * Return the number of elements in this collection.
207     */
208 dl 1.2 public int size() {
209     return count.get();
210 tim 1.1 }
211 dl 1.2
212 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
213     // without the reference to unlimited queues.
214     /**
215     * Return the number of elements that this queue can ideally (in
216     * the absence of memory or resource constraints) accept without
217     * blocking. This is always equal to the initial capacity of this queue
218     * less the current <tt>size</tt> of this queue.
219     * <p>Note that you <em>cannot</em> always tell if
220     * an attempt to <tt>add</tt> an element will succeed by
221     * inspecting <tt>remainingCapacity</tt> because it may be the
222     * case that a waiting consumer is ready to <tt>take</tt> an
223     * element out of an otherwise full queue.
224     */
225 dl 1.2 public int remainingCapacity() {
226     return capacity - count.get();
227     }
228    
229 dholmes 1.8 /**
230     * Add the specified element to the tail of this queue, waiting if
231     * necessary for space to become available.
232     * @throws NullPointerException {@inheritDoc}
233     */
234 dl 1.2 public void put(E x) throws InterruptedException {
235 dl 1.6 if (x == null) throw new NullPointerException();
236 dl 1.2 // Note: convention in all put/take/etc is to preset
237     // local var holding count negative to indicate failure unless set.
238     int c = -1;
239     putLock.lockInterruptibly();
240     try {
241     /*
242     * Note that count is used in wait guard even though it is
243     * not protected by lock. This works because count can
244     * only decrease at this point (all other puts are shut
245     * out by lock), and we (or some other waiting put) are
246     * signalled if it ever changes from
247     * capacity. Similarly for all other uses of count in
248     * other wait guards.
249     */
250     try {
251     while (count.get() == capacity)
252     notFull.await();
253     }
254     catch (InterruptedException ie) {
255     notFull.signal(); // propagate to a non-interrupted thread
256     throw ie;
257     }
258     insert(x);
259     c = count.getAndIncrement();
260 dl 1.6 if (c + 1 < capacity)
261 dl 1.2 notFull.signal();
262     }
263     finally {
264     putLock.unlock();
265     }
266     if (c == 0)
267     signalNotEmpty();
268 tim 1.1 }
269 dl 1.2
270 dholmes 1.8 /**
271     * Add the specified element to the tail of this queue, waiting if
272     * necessary up to the specified wait time for space to become available.
273     * @throws NullPointerException {@inheritDoc}
274     */
275     public boolean offer(E x, long timeout, TimeUnit unit)
276     throws InterruptedException {
277    
278 dl 1.6 if (x == null) throw new NullPointerException();
279 dl 1.2 long nanos = unit.toNanos(timeout);
280     int c = -1;
281 dholmes 1.8 putLock.lockInterruptibly();
282 dl 1.2 try {
283     for (;;) {
284     if (count.get() < capacity) {
285     insert(x);
286     c = count.getAndIncrement();
287 dl 1.6 if (c + 1 < capacity)
288 dl 1.2 notFull.signal();
289     break;
290     }
291     if (nanos <= 0)
292     return false;
293     try {
294     nanos = notFull.awaitNanos(nanos);
295     }
296     catch (InterruptedException ie) {
297     notFull.signal(); // propagate to a non-interrupted thread
298     throw ie;
299     }
300     }
301     }
302     finally {
303     putLock.unlock();
304     }
305     if (c == 0)
306     signalNotEmpty();
307     return true;
308 tim 1.1 }
309 dl 1.2
310 dholmes 1.8 /**
311     * Add the specified element to the tail of this queue if possible,
312     * returning immediately if this queue is full.
313     *
314     * @throws NullPointerException {@inheritDoc}
315     */
316 tim 1.1 public boolean offer(E x) {
317 dl 1.6 if (x == null) throw new NullPointerException();
318 dl 1.2 if (count.get() == capacity)
319     return false;
320     int c = -1;
321 dholmes 1.8 putLock.lock();
322 dl 1.2 try {
323     if (count.get() < capacity) {
324     insert(x);
325     c = count.getAndIncrement();
326 dl 1.6 if (c + 1 < capacity)
327 dl 1.2 notFull.signal();
328     }
329     }
330     finally {
331     putLock.unlock();
332     }
333     if (c == 0)
334     signalNotEmpty();
335     return c >= 0;
336 tim 1.1 }
337 dl 1.2
338    
339     public E take() throws InterruptedException {
340     E x;
341     int c = -1;
342     takeLock.lockInterruptibly();
343     try {
344     try {
345     while (count.get() == 0)
346     notEmpty.await();
347     }
348     catch (InterruptedException ie) {
349     notEmpty.signal(); // propagate to a non-interrupted thread
350     throw ie;
351     }
352    
353     x = extract();
354     c = count.getAndDecrement();
355     if (c > 1)
356     notEmpty.signal();
357     }
358     finally {
359     takeLock.unlock();
360     }
361     if (c == capacity)
362     signalNotFull();
363     return x;
364     }
365    
366     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
367     E x = null;
368     int c = -1;
369 dholmes 1.8 long nanos = unit.toNanos(timeout);
370 dl 1.2 takeLock.lockInterruptibly();
371     try {
372     for (;;) {
373     if (count.get() > 0) {
374     x = extract();
375     c = count.getAndDecrement();
376     if (c > 1)
377     notEmpty.signal();
378     break;
379     }
380     if (nanos <= 0)
381     return null;
382     try {
383     nanos = notEmpty.awaitNanos(nanos);
384     }
385     catch (InterruptedException ie) {
386     notEmpty.signal(); // propagate to a non-interrupted thread
387     throw ie;
388     }
389     }
390     }
391     finally {
392     takeLock.unlock();
393     }
394     if (c == capacity)
395     signalNotFull();
396     return x;
397     }
398    
399     public E poll() {
400     if (count.get() == 0)
401     return null;
402     E x = null;
403     int c = -1;
404     takeLock.tryLock();
405     try {
406     if (count.get() > 0) {
407     x = extract();
408     c = count.getAndDecrement();
409     if (c > 1)
410     notEmpty.signal();
411     }
412     }
413     finally {
414     takeLock.unlock();
415     }
416     if (c == capacity)
417     signalNotFull();
418     return x;
419 tim 1.1 }
420 dl 1.2
421    
422     public E peek() {
423     if (count.get() == 0)
424     return null;
425 dholmes 1.8 takeLock.lock();
426 dl 1.2 try {
427     Node<E> first = head.next;
428     if (first == null)
429     return null;
430     else
431     return first.item;
432     }
433     finally {
434     takeLock.unlock();
435     }
436 tim 1.1 }
437    
438 dholmes 1.9 public boolean remove(Object o) {
439     if (o == null) return false;
440 dl 1.2 boolean removed = false;
441     fullyLock();
442     try {
443     Node<E> trail = head;
444     Node<E> p = head.next;
445     while (p != null) {
446 dholmes 1.9 if (o.equals(p.item)) {
447 dl 1.2 removed = true;
448     break;
449     }
450     trail = p;
451     p = p.next;
452     }
453     if (removed) {
454     p.item = null;
455     trail.next = p.next;
456     if (count.getAndDecrement() == capacity)
457     notFull.signalAll();
458     }
459     }
460     finally {
461     fullyUnlock();
462     }
463     return removed;
464 tim 1.1 }
465 dl 1.2
466     public Object[] toArray() {
467     fullyLock();
468     try {
469     int size = count.get();
470     Object[] a = new Object[size];
471     int k = 0;
472     for (Node<E> p = head.next; p != null; p = p.next)
473     a[k++] = p.item;
474     return a;
475     }
476     finally {
477     fullyUnlock();
478     }
479 tim 1.1 }
480 dl 1.2
481     public <T> T[] toArray(T[] a) {
482     fullyLock();
483     try {
484     int size = count.get();
485     if (a.length < size)
486 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
487     (a.getClass().getComponentType(), size);
488    
489 dl 1.2 int k = 0;
490     for (Node p = head.next; p != null; p = p.next)
491     a[k++] = (T)p.item;
492     return a;
493     }
494     finally {
495     fullyUnlock();
496     }
497 tim 1.1 }
498 dl 1.2
499     public String toString() {
500     fullyLock();
501     try {
502     return super.toString();
503     }
504     finally {
505     fullyUnlock();
506     }
507 tim 1.1 }
508 dl 1.2
509     public Iterator<E> iterator() {
510     return new Itr();
511 tim 1.1 }
512 dl 1.2
513     private class Itr implements Iterator<E> {
514 dl 1.4 /*
515     * Basic weak-consistent iterator. At all times hold the next
516     * item to hand out so that if hasNext() reports true, we will
517     * still have it to return even if lost race with a take etc.
518     */
519 dl 1.2 Node<E> current;
520     Node<E> lastRet;
521 dl 1.4 E currentElement;
522 dl 1.2
523     Itr() {
524     fullyLock();
525     try {
526     current = head.next;
527 dl 1.4 if (current != null)
528     currentElement = current.item;
529 dl 1.2 }
530     finally {
531     fullyUnlock();
532     }
533     }
534    
535     public boolean hasNext() {
536     return current != null;
537     }
538    
539     public E next() {
540     fullyLock();
541     try {
542     if (current == null)
543     throw new NoSuchElementException();
544 dl 1.4 E x = currentElement;
545 dl 1.2 lastRet = current;
546     current = current.next;
547 dl 1.4 if (current != null)
548     currentElement = current.item;
549 dl 1.2 return x;
550     }
551     finally {
552     fullyUnlock();
553     }
554    
555     }
556    
557     public void remove() {
558     if (lastRet == null)
559     throw new IllegalStateException();
560     fullyLock();
561     try {
562     Node<E> node = lastRet;
563     lastRet = null;
564     Node<E> trail = head;
565     Node<E> p = head.next;
566     while (p != null && p != node) {
567     trail = p;
568     p = p.next;
569     }
570     if (p == node) {
571     p.item = null;
572     trail.next = p.next;
573     int c = count.getAndDecrement();
574     if (c == capacity)
575     notFull.signalAll();
576     }
577     }
578     finally {
579     fullyUnlock();
580     }
581     }
582 tim 1.1 }
583 dl 1.2
584     /**
585     * Save the state to a stream (that is, serialize it).
586     *
587     * @serialData The capacity is emitted (int), followed by all of
588     * its elements (each an <tt>Object</tt>) in the proper order,
589     * followed by a null
590 dl 1.6 * @param s the stream
591 dl 1.2 */
592     private void writeObject(java.io.ObjectOutputStream s)
593     throws java.io.IOException {
594    
595     fullyLock();
596     try {
597     // Write out any hidden stuff, plus capacity
598     s.defaultWriteObject();
599    
600     // Write out all elements in the proper order.
601     for (Node<E> p = head.next; p != null; p = p.next)
602     s.writeObject(p.item);
603    
604     // Use trailing null as sentinel
605     s.writeObject(null);
606     }
607     finally {
608     fullyUnlock();
609     }
610 tim 1.1 }
611    
612 dl 1.2 /**
613 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
614 dl 1.2 * deserialize it).
615 dl 1.6 * @param s the stream
616 dl 1.2 */
617     private void readObject(java.io.ObjectInputStream s)
618     throws java.io.IOException, ClassNotFoundException {
619     // Read in capacity, and any hidden stuff
620     s.defaultReadObject();
621    
622 dl 1.6 // Read in all elements and place in queue
623 dl 1.2 for (;;) {
624     E item = (E)s.readObject();
625     if (item == null)
626     break;
627     add(item);
628     }
629 tim 1.1 }
630     }
631 dholmes 1.8
632    
633    
634    
635 tim 1.1