ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.14
Committed: Wed Aug 6 01:57:53 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.13: +42 -14 lines
Log Message:
Final major updates to Collection related classes.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 dholmes 1.14 * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19     * queue the shortest time.
20     * Linked queues typically have higher throughput than array-based queues but
21     * less predictable performance in most concurrent applications.
22 tim 1.12 *
23 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
24 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
25     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 dl 1.3 * dynamically created upon each insertion unless this would bring the
27     * queue above capacity.
28 dholmes 1.8 *
29 dl 1.6 * @since 1.5
30     * @author Doug Lea
31 tim 1.12 *
32 tim 1.1 **/
33 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
35    
36 dl 1.2 /*
37     * A variant of the "two lock queue" algorithm. The putLock gates
38     * entry to put (and offer), and has an associated condition for
39     * waiting puts. Similarly for the takeLock. The "count" field
40     * that they both rely on is maintained as an atomic to avoid
41     * needing to get both locks in most cases. Also, to minimize need
42     * for puts to get takeLock and vice-versa, cascading notifies are
43     * used. When a put notices that it has enabled at least one take,
44     * it signals taker. That taker in turn signals others if more
45     * items have been entered since the signal. And symmetrically for
46 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
47 dl 1.2 * iterators acquire both locks.
48     */
49    
50 dl 1.6 /**
51     * Linked list node class
52     */
53 dl 1.2 static class Node<E> {
54 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
55 dl 1.2 volatile E item;
56     Node<E> next;
57     Node(E x) { item = x; }
58     }
59    
60 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
61 dl 1.2 private final int capacity;
62 dl 1.6
63     /** Current number of elements */
64 dl 1.2 private transient final AtomicInteger count = new AtomicInteger(0);
65    
66 dl 1.6 /** Head of linked list */
67     private transient Node<E> head;
68    
69 dholmes 1.8 /** Tail of linked list */
70 dl 1.6 private transient Node<E> last;
71 dl 1.2
72 dl 1.6 /** Lock held by take, poll, etc */
73 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
74 dl 1.6
75     /** Wait queue for waiting takes */
76 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
77 dl 1.2
78 dl 1.6 /** Lock held by put, offer, etc */
79 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
80 dl 1.6
81     /** Wait queue for waiting puts */
82 dl 1.5 private final Condition notFull = putLock.newCondition();
83 dl 1.2
84     /**
85     * Signal a waiting take. Called only from put/offer (which do not
86 dl 1.4 * otherwise ordinarily lock takeLock.)
87 dl 1.2 */
88     private void signalNotEmpty() {
89     takeLock.lock();
90     try {
91     notEmpty.signal();
92     }
93     finally {
94     takeLock.unlock();
95     }
96     }
97    
98     /**
99     * Signal a waiting put. Called only from take/poll.
100     */
101     private void signalNotFull() {
102     putLock.lock();
103     try {
104     notFull.signal();
105     }
106     finally {
107     putLock.unlock();
108     }
109     }
110    
111     /**
112 dholmes 1.8 * Create a node and link it at end of queue
113 dl 1.6 * @param x the item
114 dl 1.2 */
115     private void insert(E x) {
116     last = last.next = new Node<E>(x);
117     }
118    
119     /**
120     * Remove a node from head of queue,
121 dl 1.6 * @return the node
122 dl 1.2 */
123     private E extract() {
124     Node<E> first = head.next;
125     head = first;
126     E x = (E)first.item;
127     first.item = null;
128     return x;
129     }
130    
131     /**
132 tim 1.12 * Lock to prevent both puts and takes.
133 dl 1.2 */
134     private void fullyLock() {
135     putLock.lock();
136     takeLock.lock();
137 tim 1.1 }
138 dl 1.2
139     /**
140 tim 1.12 * Unlock to allow both puts and takes.
141 dl 1.2 */
142     private void fullyUnlock() {
143     takeLock.unlock();
144     putLock.unlock();
145     }
146    
147    
148     /**
149 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
150 dholmes 1.8 * {@link Integer#MAX_VALUE}.
151 dl 1.2 */
152     public LinkedBlockingQueue() {
153     this(Integer.MAX_VALUE);
154     }
155    
156     /**
157 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity
158 dholmes 1.8 * @param capacity the capacity of this queue.
159     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
160     * than zero.
161 dl 1.2 */
162     public LinkedBlockingQueue(int capacity) {
163 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
164 dl 1.2 this.capacity = capacity;
165 dl 1.6 last = head = new Node<E>(null);
166 dl 1.2 }
167    
168     /**
169 dholmes 1.13 * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of
170 dholmes 1.14 * {@link Integer#MAX_VALUE}, initially containing the elements of the
171 tim 1.12 * given collection,
172 dholmes 1.8 * added in traversal order of the collection's iterator.
173 dholmes 1.9 * @param c the collection of elements to initially contain
174     * @throws NullPointerException if <tt>c</tt> or any element within it
175     * is <tt>null</tt>
176 dl 1.2 */
177 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
178 dl 1.2 this(Integer.MAX_VALUE);
179 tim 1.12 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
180     add(it.next());
181 dl 1.2 }
182    
183 dholmes 1.14 // Have to override just to update the javadoc
184 dholmes 1.9
185     /**
186 dholmes 1.14 * Adds the specified element to the tail of this queue.
187     * @return <tt>true</tt> (as per the general contract of
188     * <tt>Collection.add</tt>).
189 dholmes 1.9 * @throws IllegalStateException {@inheritDoc}
190     * @throws NullPointerException {@inheritDoc}
191     */
192     public boolean add(E o) {
193     return super.add(o);
194     }
195    
196     /**
197 dholmes 1.14 * Adds all of the elements in the specified collection to this queue.
198     * The behavior of this operation is undefined if
199     * the specified collection is modified while the operation is in
200     * progress. (This implies that the behavior of this call is undefined if
201     * the specified collection is this queue, and this queue is nonempty.)
202     * <p>
203     * This implementation iterates over the specified collection, and adds
204     * each object returned by the iterator to this queue's tail, in turn.
205 dholmes 1.9 * @throws IllegalStateException {@inheritDoc}
206     * @throws NullPointerException {@inheritDoc}
207     */
208     public boolean addAll(Collection<? extends E> c) {
209     return super.addAll(c);
210     }
211    
212 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
213     // greater in size than Integer.MAX_VALUE
214 tim 1.12 /**
215 dholmes 1.13 * Returns the number of elements in this collection.
216 dholmes 1.8 */
217 dl 1.2 public int size() {
218     return count.get();
219 tim 1.1 }
220 dl 1.2
221 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
222     // without the reference to unlimited queues.
223 tim 1.12 /**
224 dholmes 1.13 * Returns the number of elements that this queue can ideally (in
225 dholmes 1.8 * the absence of memory or resource constraints) accept without
226     * blocking. This is always equal to the initial capacity of this queue
227     * less the current <tt>size</tt> of this queue.
228     * <p>Note that you <em>cannot</em> always tell if
229     * an attempt to <tt>add</tt> an element will succeed by
230     * inspecting <tt>remainingCapacity</tt> because it may be the
231     * case that a waiting consumer is ready to <tt>take</tt> an
232     * element out of an otherwise full queue.
233     */
234 dl 1.2 public int remainingCapacity() {
235     return capacity - count.get();
236     }
237    
238 dholmes 1.8 /**
239 dholmes 1.13 * Adds the specified element to the tail of this queue, waiting if
240 dholmes 1.8 * necessary for space to become available.
241     * @throws NullPointerException {@inheritDoc}
242     */
243 dholmes 1.14 public void put(E o) throws InterruptedException {
244     if (o == null) throw new NullPointerException();
245 dl 1.2 // Note: convention in all put/take/etc is to preset
246     // local var holding count negative to indicate failure unless set.
247 tim 1.12 int c = -1;
248 dl 1.2 putLock.lockInterruptibly();
249     try {
250     /*
251     * Note that count is used in wait guard even though it is
252     * not protected by lock. This works because count can
253     * only decrease at this point (all other puts are shut
254     * out by lock), and we (or some other waiting put) are
255     * signalled if it ever changes from
256     * capacity. Similarly for all other uses of count in
257     * other wait guards.
258     */
259     try {
260 tim 1.12 while (count.get() == capacity)
261 dl 1.2 notFull.await();
262     }
263     catch (InterruptedException ie) {
264     notFull.signal(); // propagate to a non-interrupted thread
265     throw ie;
266     }
267 dholmes 1.14 insert(o);
268 dl 1.2 c = count.getAndIncrement();
269 dl 1.6 if (c + 1 < capacity)
270 dl 1.2 notFull.signal();
271     }
272     finally {
273     putLock.unlock();
274     }
275 tim 1.12 if (c == 0)
276 dl 1.2 signalNotEmpty();
277 tim 1.1 }
278 dl 1.2
279 dholmes 1.8 /**
280 dholmes 1.13 * Adds the specified element to the tail of this queue, waiting if
281 dholmes 1.8 * necessary up to the specified wait time for space to become available.
282     * @throws NullPointerException {@inheritDoc}
283     */
284 dholmes 1.14 public boolean offer(E o, long timeout, TimeUnit unit)
285 dholmes 1.8 throws InterruptedException {
286 tim 1.12
287 dholmes 1.14 if (o == null) throw new NullPointerException();
288 dl 1.2 long nanos = unit.toNanos(timeout);
289     int c = -1;
290 dholmes 1.8 putLock.lockInterruptibly();
291 dl 1.2 try {
292     for (;;) {
293     if (count.get() < capacity) {
294 dholmes 1.14 insert(o);
295 dl 1.2 c = count.getAndIncrement();
296 dl 1.6 if (c + 1 < capacity)
297 dl 1.2 notFull.signal();
298     break;
299     }
300     if (nanos <= 0)
301     return false;
302     try {
303     nanos = notFull.awaitNanos(nanos);
304     }
305     catch (InterruptedException ie) {
306     notFull.signal(); // propagate to a non-interrupted thread
307     throw ie;
308     }
309     }
310     }
311     finally {
312     putLock.unlock();
313     }
314 tim 1.12 if (c == 0)
315 dl 1.2 signalNotEmpty();
316     return true;
317 tim 1.1 }
318 dl 1.2
319 tim 1.12 /**
320 dholmes 1.13 * Adds the specified element to the tail of this queue if possible,
321 dholmes 1.8 * returning immediately if this queue is full.
322     *
323     * @throws NullPointerException {@inheritDoc}
324     */
325 dholmes 1.14 public boolean offer(E o) {
326     if (o == null) throw new NullPointerException();
327 dl 1.2 if (count.get() == capacity)
328     return false;
329 tim 1.12 int c = -1;
330 dholmes 1.8 putLock.lock();
331 dl 1.2 try {
332     if (count.get() < capacity) {
333 dholmes 1.14 insert(o);
334 dl 1.2 c = count.getAndIncrement();
335 dl 1.6 if (c + 1 < capacity)
336 dl 1.2 notFull.signal();
337     }
338     }
339     finally {
340     putLock.unlock();
341     }
342 tim 1.12 if (c == 0)
343 dl 1.2 signalNotEmpty();
344     return c >= 0;
345 tim 1.1 }
346 dl 1.2
347    
348     public E take() throws InterruptedException {
349     E x;
350     int c = -1;
351     takeLock.lockInterruptibly();
352     try {
353     try {
354 tim 1.12 while (count.get() == 0)
355 dl 1.2 notEmpty.await();
356     }
357     catch (InterruptedException ie) {
358     notEmpty.signal(); // propagate to a non-interrupted thread
359     throw ie;
360     }
361    
362     x = extract();
363     c = count.getAndDecrement();
364     if (c > 1)
365     notEmpty.signal();
366     }
367     finally {
368     takeLock.unlock();
369     }
370 tim 1.12 if (c == capacity)
371 dl 1.2 signalNotFull();
372     return x;
373     }
374    
375     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
376     E x = null;
377     int c = -1;
378 dholmes 1.8 long nanos = unit.toNanos(timeout);
379 dl 1.2 takeLock.lockInterruptibly();
380     try {
381     for (;;) {
382     if (count.get() > 0) {
383     x = extract();
384     c = count.getAndDecrement();
385     if (c > 1)
386     notEmpty.signal();
387     break;
388     }
389     if (nanos <= 0)
390     return null;
391     try {
392     nanos = notEmpty.awaitNanos(nanos);
393     }
394     catch (InterruptedException ie) {
395     notEmpty.signal(); // propagate to a non-interrupted thread
396     throw ie;
397     }
398     }
399     }
400     finally {
401     takeLock.unlock();
402     }
403 tim 1.12 if (c == capacity)
404 dl 1.2 signalNotFull();
405     return x;
406     }
407    
408     public E poll() {
409     if (count.get() == 0)
410     return null;
411     E x = null;
412 tim 1.12 int c = -1;
413 dl 1.2 takeLock.tryLock();
414     try {
415     if (count.get() > 0) {
416     x = extract();
417     c = count.getAndDecrement();
418     if (c > 1)
419     notEmpty.signal();
420     }
421     }
422     finally {
423     takeLock.unlock();
424     }
425 tim 1.12 if (c == capacity)
426 dl 1.2 signalNotFull();
427     return x;
428 tim 1.1 }
429 dl 1.2
430    
431     public E peek() {
432     if (count.get() == 0)
433     return null;
434 dholmes 1.8 takeLock.lock();
435 dl 1.2 try {
436     Node<E> first = head.next;
437     if (first == null)
438     return null;
439     else
440     return first.item;
441     }
442     finally {
443     takeLock.unlock();
444     }
445 tim 1.1 }
446    
447 dholmes 1.14 /**
448     * Removes a single instance of the specified element from this
449     * queue, if it is present. More formally,
450     * removes an element <tt>e</tt> such that <tt>(o==null ? e==null :
451     * o.equals(e))</tt>, if the queue contains one or more such
452     * elements. Returns <tt>true</tt> if the queue contained the
453     * specified element (or equivalently, if the queue changed as a
454     * result of the call).
455     *
456     * <p>This implementation iterates over the queue looking for the
457     * specified element. If it finds the element, it removes the element
458     * from the queue using the iterator's remove method.<p>
459     *
460     */
461 dholmes 1.9 public boolean remove(Object o) {
462     if (o == null) return false;
463 dl 1.2 boolean removed = false;
464     fullyLock();
465     try {
466     Node<E> trail = head;
467     Node<E> p = head.next;
468     while (p != null) {
469 dholmes 1.9 if (o.equals(p.item)) {
470 dl 1.2 removed = true;
471     break;
472     }
473     trail = p;
474     p = p.next;
475     }
476     if (removed) {
477     p.item = null;
478     trail.next = p.next;
479     if (count.getAndDecrement() == capacity)
480     notFull.signalAll();
481     }
482     }
483     finally {
484     fullyUnlock();
485     }
486     return removed;
487 tim 1.1 }
488 dl 1.2
489     public Object[] toArray() {
490     fullyLock();
491     try {
492     int size = count.get();
493 tim 1.12 Object[] a = new Object[size];
494 dl 1.2 int k = 0;
495 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
496 dl 1.2 a[k++] = p.item;
497     return a;
498     }
499     finally {
500     fullyUnlock();
501     }
502 tim 1.1 }
503 dl 1.2
504     public <T> T[] toArray(T[] a) {
505     fullyLock();
506     try {
507     int size = count.get();
508     if (a.length < size)
509 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
510     (a.getClass().getComponentType(), size);
511 tim 1.12
512 dl 1.2 int k = 0;
513 tim 1.12 for (Node p = head.next; p != null; p = p.next)
514 dl 1.2 a[k++] = (T)p.item;
515     return a;
516     }
517     finally {
518     fullyUnlock();
519     }
520 tim 1.1 }
521 dl 1.2
522     public String toString() {
523     fullyLock();
524     try {
525     return super.toString();
526     }
527     finally {
528     fullyUnlock();
529     }
530 tim 1.1 }
531 dl 1.2
532 dholmes 1.14 /**
533     * Returns an iterator over the elements in this queue in proper sequence.
534     *
535     * @return an iterator over the elements in this queue in proper sequence.
536     */
537 dl 1.2 public Iterator<E> iterator() {
538     return new Itr();
539 tim 1.1 }
540 dl 1.2
541     private class Itr implements Iterator<E> {
542 tim 1.12 /*
543 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
544     * item to hand out so that if hasNext() reports true, we will
545     * still have it to return even if lost race with a take etc.
546     */
547 dl 1.2 Node<E> current;
548     Node<E> lastRet;
549 dl 1.4 E currentElement;
550 tim 1.12
551 dl 1.2 Itr() {
552     fullyLock();
553     try {
554     current = head.next;
555 dl 1.4 if (current != null)
556     currentElement = current.item;
557 dl 1.2 }
558     finally {
559     fullyUnlock();
560     }
561     }
562 tim 1.12
563     public boolean hasNext() {
564 dl 1.2 return current != null;
565     }
566    
567 tim 1.12 public E next() {
568 dl 1.2 fullyLock();
569     try {
570     if (current == null)
571     throw new NoSuchElementException();
572 dl 1.4 E x = currentElement;
573 dl 1.2 lastRet = current;
574     current = current.next;
575 dl 1.4 if (current != null)
576     currentElement = current.item;
577 dl 1.2 return x;
578     }
579     finally {
580     fullyUnlock();
581     }
582 tim 1.12
583 dl 1.2 }
584    
585 tim 1.12 public void remove() {
586 dl 1.2 if (lastRet == null)
587 tim 1.12 throw new IllegalStateException();
588 dl 1.2 fullyLock();
589     try {
590     Node<E> node = lastRet;
591     lastRet = null;
592     Node<E> trail = head;
593     Node<E> p = head.next;
594     while (p != null && p != node) {
595     trail = p;
596     p = p.next;
597     }
598     if (p == node) {
599     p.item = null;
600     trail.next = p.next;
601     int c = count.getAndDecrement();
602     if (c == capacity)
603     notFull.signalAll();
604     }
605     }
606     finally {
607     fullyUnlock();
608     }
609     }
610 tim 1.1 }
611 dl 1.2
612     /**
613     * Save the state to a stream (that is, serialize it).
614     *
615     * @serialData The capacity is emitted (int), followed by all of
616     * its elements (each an <tt>Object</tt>) in the proper order,
617     * followed by a null
618 dl 1.6 * @param s the stream
619 dl 1.2 */
620     private void writeObject(java.io.ObjectOutputStream s)
621     throws java.io.IOException {
622    
623 tim 1.12 fullyLock();
624 dl 1.2 try {
625     // Write out any hidden stuff, plus capacity
626     s.defaultWriteObject();
627    
628     // Write out all elements in the proper order.
629 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
630 dl 1.2 s.writeObject(p.item);
631    
632     // Use trailing null as sentinel
633     s.writeObject(null);
634     }
635     finally {
636     fullyUnlock();
637     }
638 tim 1.1 }
639    
640 dl 1.2 /**
641 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
642 dl 1.2 * deserialize it).
643 dl 1.6 * @param s the stream
644 dl 1.2 */
645     private void readObject(java.io.ObjectInputStream s)
646     throws java.io.IOException, ClassNotFoundException {
647 tim 1.12 // Read in capacity, and any hidden stuff
648     s.defaultReadObject();
649 dl 1.2
650 dl 1.6 // Read in all elements and place in queue
651 dl 1.2 for (;;) {
652     E item = (E)s.readObject();
653     if (item == null)
654     break;
655     add(item);
656     }
657 tim 1.1 }
658     }
659 dholmes 1.8
660    
661    
662    
663 tim 1.1