ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.19
Committed: Sun Sep 7 15:06:24 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.18: +4 -6 lines
Log Message:
Serialization fixes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19     * queue the shortest time.
20     * Linked queues typically have higher throughput than array-based queues but
21     * less predictable performance in most concurrent applications.
22 tim 1.12 *
23 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
24 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
25     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 dl 1.3 * dynamically created upon each insertion unless this would bring the
27     * queue above capacity.
28 dholmes 1.8 *
29 dl 1.6 * @since 1.5
30     * @author Doug Lea
31 tim 1.12 *
32 tim 1.1 **/
33 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
35 dl 1.18 private static final long serialVersionUID = -6903933977591709194L;
36 tim 1.1
37 dl 1.2 /*
38     * A variant of the "two lock queue" algorithm. The putLock gates
39     * entry to put (and offer), and has an associated condition for
40     * waiting puts. Similarly for the takeLock. The "count" field
41     * that they both rely on is maintained as an atomic to avoid
42     * needing to get both locks in most cases. Also, to minimize need
43     * for puts to get takeLock and vice-versa, cascading notifies are
44     * used. When a put notices that it has enabled at least one take,
45     * it signals taker. That taker in turn signals others if more
46     * items have been entered since the signal. And symmetrically for
47 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
48 dl 1.2 * iterators acquire both locks.
49     */
50    
51 dl 1.6 /**
52     * Linked list node class
53     */
54 dl 1.2 static class Node<E> {
55 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
56 dl 1.2 volatile E item;
57     Node<E> next;
58     Node(E x) { item = x; }
59     }
60    
61 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
62 dl 1.2 private final int capacity;
63 dl 1.6
64     /** Current number of elements */
65 dl 1.19 private final AtomicInteger count = new AtomicInteger(0);
66 dl 1.2
67 dl 1.6 /** Head of linked list */
68     private transient Node<E> head;
69    
70 dholmes 1.8 /** Tail of linked list */
71 dl 1.6 private transient Node<E> last;
72 dl 1.2
73 dl 1.6 /** Lock held by take, poll, etc */
74 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
75 dl 1.6
76     /** Wait queue for waiting takes */
77 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
78 dl 1.2
79 dl 1.6 /** Lock held by put, offer, etc */
80 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
81 dl 1.6
82     /** Wait queue for waiting puts */
83 dl 1.5 private final Condition notFull = putLock.newCondition();
84 dl 1.2
85     /**
86     * Signal a waiting take. Called only from put/offer (which do not
87 dl 1.4 * otherwise ordinarily lock takeLock.)
88 dl 1.2 */
89     private void signalNotEmpty() {
90     takeLock.lock();
91     try {
92     notEmpty.signal();
93 tim 1.17 } finally {
94 dl 1.2 takeLock.unlock();
95     }
96     }
97    
98     /**
99     * Signal a waiting put. Called only from take/poll.
100     */
101     private void signalNotFull() {
102     putLock.lock();
103     try {
104     notFull.signal();
105 tim 1.17 } finally {
106 dl 1.2 putLock.unlock();
107     }
108     }
109    
110     /**
111 dholmes 1.8 * Create a node and link it at end of queue
112 dl 1.6 * @param x the item
113 dl 1.2 */
114     private void insert(E x) {
115     last = last.next = new Node<E>(x);
116     }
117    
118     /**
119     * Remove a node from head of queue,
120 dl 1.6 * @return the node
121 dl 1.2 */
122     private E extract() {
123     Node<E> first = head.next;
124     head = first;
125     E x = (E)first.item;
126     first.item = null;
127     return x;
128     }
129    
130     /**
131 tim 1.12 * Lock to prevent both puts and takes.
132 dl 1.2 */
133     private void fullyLock() {
134     putLock.lock();
135     takeLock.lock();
136 tim 1.1 }
137 dl 1.2
138     /**
139 tim 1.12 * Unlock to allow both puts and takes.
140 dl 1.2 */
141     private void fullyUnlock() {
142     takeLock.unlock();
143     putLock.unlock();
144     }
145    
146    
147     /**
148 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
149 dholmes 1.8 * {@link Integer#MAX_VALUE}.
150 dl 1.2 */
151     public LinkedBlockingQueue() {
152     this(Integer.MAX_VALUE);
153     }
154    
155     /**
156 tim 1.16 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity.
157     *
158 dholmes 1.8 * @param capacity the capacity of this queue.
159     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
160 tim 1.16 * than zero.
161 dl 1.2 */
162     public LinkedBlockingQueue(int capacity) {
163 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
164 dl 1.2 this.capacity = capacity;
165 dl 1.6 last = head = new Node<E>(null);
166 dl 1.2 }
167    
168     /**
169 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
170 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
171 tim 1.12 * given collection,
172 dholmes 1.8 * added in traversal order of the collection's iterator.
173 dholmes 1.9 * @param c the collection of elements to initially contain
174     * @throws NullPointerException if <tt>c</tt> or any element within it
175     * is <tt>null</tt>
176 dl 1.2 */
177 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
178 dl 1.2 this(Integer.MAX_VALUE);
179 tim 1.12 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
180     add(it.next());
181 dl 1.2 }
182    
183 dholmes 1.14 // Have to override just to update the javadoc
184 dholmes 1.9
185     /**
186 dholmes 1.14 * Adds the specified element to the tail of this queue.
187     * @return <tt>true</tt> (as per the general contract of
188     * <tt>Collection.add</tt>).
189 dholmes 1.9 * @throws IllegalStateException {@inheritDoc}
190     * @throws NullPointerException {@inheritDoc}
191     */
192     public boolean add(E o) {
193     return super.add(o);
194     }
195    
196     /**
197 dholmes 1.14 * Adds all of the elements in the specified collection to this queue.
198     * The behavior of this operation is undefined if
199     * the specified collection is modified while the operation is in
200     * progress. (This implies that the behavior of this call is undefined if
201     * the specified collection is this queue, and this queue is nonempty.)
202     * <p>
203     * This implementation iterates over the specified collection, and adds
204     * each object returned by the iterator to this queue's tail, in turn.
205 dholmes 1.9 * @throws IllegalStateException {@inheritDoc}
206     * @throws NullPointerException {@inheritDoc}
207     */
208     public boolean addAll(Collection<? extends E> c) {
209     return super.addAll(c);
210     }
211    
212 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
213     // greater in size than Integer.MAX_VALUE
214 tim 1.12 /**
215 dholmes 1.13 * Returns the number of elements in this collection.
216 dholmes 1.8 */
217 dl 1.2 public int size() {
218     return count.get();
219 tim 1.1 }
220 dl 1.2
221 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
222     // without the reference to unlimited queues.
223 tim 1.12 /**
224 dholmes 1.13 * Returns the number of elements that this queue can ideally (in
225 dholmes 1.8 * the absence of memory or resource constraints) accept without
226     * blocking. This is always equal to the initial capacity of this queue
227     * less the current <tt>size</tt> of this queue.
228     * <p>Note that you <em>cannot</em> always tell if
229     * an attempt to <tt>add</tt> an element will succeed by
230     * inspecting <tt>remainingCapacity</tt> because it may be the
231     * case that a waiting consumer is ready to <tt>take</tt> an
232     * element out of an otherwise full queue.
233     */
234 dl 1.2 public int remainingCapacity() {
235     return capacity - count.get();
236     }
237    
238 dholmes 1.8 /**
239 dholmes 1.13 * Adds the specified element to the tail of this queue, waiting if
240 dholmes 1.8 * necessary for space to become available.
241     * @throws NullPointerException {@inheritDoc}
242     */
243 dholmes 1.14 public void put(E o) throws InterruptedException {
244     if (o == null) throw new NullPointerException();
245 dl 1.2 // Note: convention in all put/take/etc is to preset
246     // local var holding count negative to indicate failure unless set.
247 tim 1.12 int c = -1;
248 dl 1.2 putLock.lockInterruptibly();
249     try {
250     /*
251     * Note that count is used in wait guard even though it is
252     * not protected by lock. This works because count can
253     * only decrease at this point (all other puts are shut
254     * out by lock), and we (or some other waiting put) are
255     * signalled if it ever changes from
256     * capacity. Similarly for all other uses of count in
257     * other wait guards.
258     */
259     try {
260 tim 1.12 while (count.get() == capacity)
261 dl 1.2 notFull.await();
262 tim 1.17 } catch (InterruptedException ie) {
263 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
264     throw ie;
265     }
266 dholmes 1.14 insert(o);
267 dl 1.2 c = count.getAndIncrement();
268 dl 1.6 if (c + 1 < capacity)
269 dl 1.2 notFull.signal();
270 tim 1.17 } finally {
271 dl 1.2 putLock.unlock();
272     }
273 tim 1.12 if (c == 0)
274 dl 1.2 signalNotEmpty();
275 tim 1.1 }
276 dl 1.2
277 dholmes 1.8 /**
278 dholmes 1.13 * Adds the specified element to the tail of this queue, waiting if
279 dholmes 1.8 * necessary up to the specified wait time for space to become available.
280     * @throws NullPointerException {@inheritDoc}
281     */
282 dholmes 1.14 public boolean offer(E o, long timeout, TimeUnit unit)
283 dholmes 1.8 throws InterruptedException {
284 tim 1.12
285 dholmes 1.14 if (o == null) throw new NullPointerException();
286 dl 1.2 long nanos = unit.toNanos(timeout);
287     int c = -1;
288 dholmes 1.8 putLock.lockInterruptibly();
289 dl 1.2 try {
290     for (;;) {
291     if (count.get() < capacity) {
292 dholmes 1.14 insert(o);
293 dl 1.2 c = count.getAndIncrement();
294 dl 1.6 if (c + 1 < capacity)
295 dl 1.2 notFull.signal();
296     break;
297     }
298     if (nanos <= 0)
299     return false;
300     try {
301     nanos = notFull.awaitNanos(nanos);
302 tim 1.17 } catch (InterruptedException ie) {
303 dl 1.2 notFull.signal(); // propagate to a non-interrupted thread
304     throw ie;
305     }
306     }
307 tim 1.17 } finally {
308 dl 1.2 putLock.unlock();
309     }
310 tim 1.12 if (c == 0)
311 dl 1.2 signalNotEmpty();
312     return true;
313 tim 1.1 }
314 dl 1.2
315 tim 1.12 /**
316 dholmes 1.13 * Adds the specified element to the tail of this queue if possible,
317 dholmes 1.8 * returning immediately if this queue is full.
318     *
319     * @throws NullPointerException {@inheritDoc}
320     */
321 dholmes 1.14 public boolean offer(E o) {
322     if (o == null) throw new NullPointerException();
323 dl 1.2 if (count.get() == capacity)
324     return false;
325 tim 1.12 int c = -1;
326 dholmes 1.8 putLock.lock();
327 dl 1.2 try {
328     if (count.get() < capacity) {
329 dholmes 1.14 insert(o);
330 dl 1.2 c = count.getAndIncrement();
331 dl 1.6 if (c + 1 < capacity)
332 dl 1.2 notFull.signal();
333     }
334 tim 1.17 } finally {
335 dl 1.2 putLock.unlock();
336     }
337 tim 1.12 if (c == 0)
338 dl 1.2 signalNotEmpty();
339     return c >= 0;
340 tim 1.1 }
341 dl 1.2
342    
343     public E take() throws InterruptedException {
344     E x;
345     int c = -1;
346     takeLock.lockInterruptibly();
347     try {
348     try {
349 tim 1.12 while (count.get() == 0)
350 dl 1.2 notEmpty.await();
351 tim 1.17 } catch (InterruptedException ie) {
352 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
353     throw ie;
354     }
355    
356     x = extract();
357     c = count.getAndDecrement();
358     if (c > 1)
359     notEmpty.signal();
360 tim 1.17 } finally {
361 dl 1.2 takeLock.unlock();
362     }
363 tim 1.12 if (c == capacity)
364 dl 1.2 signalNotFull();
365     return x;
366     }
367    
368     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
369     E x = null;
370     int c = -1;
371 dholmes 1.8 long nanos = unit.toNanos(timeout);
372 dl 1.2 takeLock.lockInterruptibly();
373     try {
374     for (;;) {
375     if (count.get() > 0) {
376     x = extract();
377     c = count.getAndDecrement();
378     if (c > 1)
379     notEmpty.signal();
380     break;
381     }
382     if (nanos <= 0)
383     return null;
384     try {
385     nanos = notEmpty.awaitNanos(nanos);
386 tim 1.17 } catch (InterruptedException ie) {
387 dl 1.2 notEmpty.signal(); // propagate to a non-interrupted thread
388     throw ie;
389     }
390     }
391 tim 1.17 } finally {
392 dl 1.2 takeLock.unlock();
393     }
394 tim 1.12 if (c == capacity)
395 dl 1.2 signalNotFull();
396     return x;
397     }
398    
399     public E poll() {
400     if (count.get() == 0)
401     return null;
402     E x = null;
403 tim 1.12 int c = -1;
404 dl 1.2 takeLock.tryLock();
405     try {
406     if (count.get() > 0) {
407     x = extract();
408     c = count.getAndDecrement();
409     if (c > 1)
410     notEmpty.signal();
411     }
412 tim 1.17 } finally {
413 dl 1.2 takeLock.unlock();
414     }
415 tim 1.12 if (c == capacity)
416 dl 1.2 signalNotFull();
417     return x;
418 tim 1.1 }
419 dl 1.2
420    
421     public E peek() {
422     if (count.get() == 0)
423     return null;
424 dholmes 1.8 takeLock.lock();
425 dl 1.2 try {
426     Node<E> first = head.next;
427     if (first == null)
428     return null;
429     else
430     return first.item;
431 tim 1.17 } finally {
432 dl 1.2 takeLock.unlock();
433     }
434 tim 1.1 }
435    
436 dholmes 1.14 /**
437     * Removes a single instance of the specified element from this
438     * queue, if it is present. More formally,
439     * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
440     * o.equals(e))</tt>, if the queue contains one or more such
441     * elements. Returns <tt>true</tt> if the queue contained the
442     * specified element (or equivalently, if the queue changed as a
443     * result of the call).
444     *
445     */
446 dholmes 1.9 public boolean remove(Object o) {
447     if (o == null) return false;
448 dl 1.2 boolean removed = false;
449     fullyLock();
450     try {
451     Node<E> trail = head;
452     Node<E> p = head.next;
453     while (p != null) {
454 dholmes 1.9 if (o.equals(p.item)) {
455 dl 1.2 removed = true;
456     break;
457     }
458     trail = p;
459     p = p.next;
460     }
461     if (removed) {
462     p.item = null;
463     trail.next = p.next;
464     if (count.getAndDecrement() == capacity)
465     notFull.signalAll();
466     }
467 tim 1.17 } finally {
468 dl 1.2 fullyUnlock();
469     }
470     return removed;
471 tim 1.1 }
472 dl 1.2
473     public Object[] toArray() {
474     fullyLock();
475     try {
476     int size = count.get();
477 tim 1.12 Object[] a = new Object[size];
478 dl 1.2 int k = 0;
479 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
480 dl 1.2 a[k++] = p.item;
481     return a;
482 tim 1.17 } finally {
483 dl 1.2 fullyUnlock();
484     }
485 tim 1.1 }
486 dl 1.2
487     public <T> T[] toArray(T[] a) {
488     fullyLock();
489     try {
490     int size = count.get();
491     if (a.length < size)
492 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
493     (a.getClass().getComponentType(), size);
494 tim 1.12
495 dl 1.2 int k = 0;
496 tim 1.12 for (Node p = head.next; p != null; p = p.next)
497 dl 1.2 a[k++] = (T)p.item;
498     return a;
499 tim 1.17 } finally {
500 dl 1.2 fullyUnlock();
501     }
502 tim 1.1 }
503 dl 1.2
504     public String toString() {
505     fullyLock();
506     try {
507     return super.toString();
508 tim 1.17 } finally {
509 dl 1.2 fullyUnlock();
510     }
511 tim 1.1 }
512 dl 1.2
513 dholmes 1.14 /**
514     * Returns an iterator over the elements in this queue in proper sequence.
515 dl 1.15 * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that
516     * will never throw {@link java.util.ConcurrentModificationException},
517     * and guarantees to traverse elements as they existed upon
518     * construction of the iterator, and may (but is not guaranteed to)
519     * reflect any modifications subsequent to construction.
520 dholmes 1.14 *
521     * @return an iterator over the elements in this queue in proper sequence.
522     */
523 dl 1.2 public Iterator<E> iterator() {
524     return new Itr();
525 tim 1.1 }
526 dl 1.2
527     private class Itr implements Iterator<E> {
528 tim 1.12 /*
529 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
530     * item to hand out so that if hasNext() reports true, we will
531     * still have it to return even if lost race with a take etc.
532     */
533 dl 1.2 Node<E> current;
534     Node<E> lastRet;
535 dl 1.4 E currentElement;
536 tim 1.12
537 dl 1.2 Itr() {
538     fullyLock();
539     try {
540     current = head.next;
541 dl 1.4 if (current != null)
542     currentElement = current.item;
543 tim 1.17 } finally {
544 dl 1.2 fullyUnlock();
545     }
546     }
547 tim 1.12
548     public boolean hasNext() {
549 dl 1.2 return current != null;
550     }
551    
552 tim 1.12 public E next() {
553 dl 1.2 fullyLock();
554     try {
555     if (current == null)
556     throw new NoSuchElementException();
557 dl 1.4 E x = currentElement;
558 dl 1.2 lastRet = current;
559     current = current.next;
560 dl 1.4 if (current != null)
561     currentElement = current.item;
562 dl 1.2 return x;
563 tim 1.17 } finally {
564 dl 1.2 fullyUnlock();
565     }
566 tim 1.12
567 dl 1.2 }
568    
569 tim 1.12 public void remove() {
570 dl 1.2 if (lastRet == null)
571 tim 1.12 throw new IllegalStateException();
572 dl 1.2 fullyLock();
573     try {
574     Node<E> node = lastRet;
575     lastRet = null;
576     Node<E> trail = head;
577     Node<E> p = head.next;
578     while (p != null && p != node) {
579     trail = p;
580     p = p.next;
581     }
582     if (p == node) {
583     p.item = null;
584     trail.next = p.next;
585     int c = count.getAndDecrement();
586     if (c == capacity)
587     notFull.signalAll();
588     }
589 tim 1.17 } finally {
590 dl 1.2 fullyUnlock();
591     }
592     }
593 tim 1.1 }
594 dl 1.2
595     /**
596     * Save the state to a stream (that is, serialize it).
597     *
598     * @serialData The capacity is emitted (int), followed by all of
599     * its elements (each an <tt>Object</tt>) in the proper order,
600     * followed by a null
601 dl 1.6 * @param s the stream
602 dl 1.2 */
603     private void writeObject(java.io.ObjectOutputStream s)
604     throws java.io.IOException {
605    
606 tim 1.12 fullyLock();
607 dl 1.2 try {
608     // Write out any hidden stuff, plus capacity
609     s.defaultWriteObject();
610    
611     // Write out all elements in the proper order.
612 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
613 dl 1.2 s.writeObject(p.item);
614    
615     // Use trailing null as sentinel
616     s.writeObject(null);
617 tim 1.17 } finally {
618 dl 1.2 fullyUnlock();
619     }
620 tim 1.1 }
621    
622 dl 1.2 /**
623 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
624 dl 1.2 * deserialize it).
625 dl 1.6 * @param s the stream
626 dl 1.2 */
627     private void readObject(java.io.ObjectInputStream s)
628     throws java.io.IOException, ClassNotFoundException {
629 tim 1.12 // Read in capacity, and any hidden stuff
630     s.defaultReadObject();
631 dl 1.2
632 dl 1.19 count.set(0);
633     last = head = new Node<E>(null);
634    
635 dl 1.6 // Read in all elements and place in queue
636 dl 1.2 for (;;) {
637     E item = (E)s.readObject();
638     if (item == null)
639     break;
640     add(item);
641     }
642 tim 1.1 }
643     }