ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/LinkedBlockingQueue.java
Revision: 1.12
Committed: Mon Aug 4 16:14:48 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.11: +47 -47 lines
Log Message:
Make atomics emulation classes match the main atomics.
Fix docs for atomics (both in main and emulation).
Restored more specific iterator types in both blocking queue impls.
Fix unchecked cast warning in PQ.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.2 import java.util.concurrent.atomic.*;
9 dl 1.7 import java.util.concurrent.locks.*;
10 tim 1.1 import java.util.*;
11    
12     /**
13 tim 1.12 * An optionally-bounded {@link BlockingQueue blocking queue} based on
14 dholmes 1.8 * linked nodes.
15     * This queue orders elements FIFO (first-in-first-out).
16 tim 1.12 * The <em>head</em> of the queue is that element that has been on the
17 dholmes 1.8 * queue the longest time.
18     * The <em>tail</em> of the queue is that element that has been on the
19     * queue the shortest time.
20     * Linked queues typically have higher throughput than array-based queues but
21     * less predictable performance in most concurrent applications.
22 tim 1.12 *
23 dl 1.3 * <p> The optional capacity bound constructor argument serves as a
24 dholmes 1.8 * way to prevent excessive queue expansion. The capacity, if unspecified,
25     * is equal to {@link Integer#MAX_VALUE}. Linked nodes are
26 dl 1.3 * dynamically created upon each insertion unless this would bring the
27     * queue above capacity.
28 dholmes 1.8 *
29 dl 1.6 * @since 1.5
30     * @author Doug Lea
31 tim 1.12 *
32 tim 1.1 **/
33 dl 1.2 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
34 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
35    
36 dl 1.2 /*
37     * A variant of the "two lock queue" algorithm. The putLock gates
38     * entry to put (and offer), and has an associated condition for
39     * waiting puts. Similarly for the takeLock. The "count" field
40     * that they both rely on is maintained as an atomic to avoid
41     * needing to get both locks in most cases. Also, to minimize need
42     * for puts to get takeLock and vice-versa, cascading notifies are
43     * used. When a put notices that it has enabled at least one take,
44     * it signals taker. That taker in turn signals others if more
45     * items have been entered since the signal. And symmetrically for
46 tim 1.12 * takes signalling puts. Operations such as remove(Object) and
47 dl 1.2 * iterators acquire both locks.
48     */
49    
50 dl 1.6 /**
51     * Linked list node class
52     */
53 dl 1.2 static class Node<E> {
54 dl 1.6 /** The item, volatile to ensure barrier separating write and read */
55 dl 1.2 volatile E item;
56     Node<E> next;
57     Node(E x) { item = x; }
58     }
59    
60 dl 1.6 /** The capacity bound, or Integer.MAX_VALUE if none */
61 dl 1.2 private final int capacity;
62 dl 1.6
63     /** Current number of elements */
64 dl 1.2 private transient final AtomicInteger count = new AtomicInteger(0);
65    
66 dl 1.6 /** Head of linked list */
67     private transient Node<E> head;
68    
69 dholmes 1.8 /** Tail of linked list */
70 dl 1.6 private transient Node<E> last;
71 dl 1.2
72 dl 1.6 /** Lock held by take, poll, etc */
73 dl 1.5 private final ReentrantLock takeLock = new ReentrantLock();
74 dl 1.6
75     /** Wait queue for waiting takes */
76 dl 1.5 private final Condition notEmpty = takeLock.newCondition();
77 dl 1.2
78 dl 1.6 /** Lock held by put, offer, etc */
79 dl 1.5 private final ReentrantLock putLock = new ReentrantLock();
80 dl 1.6
81     /** Wait queue for waiting puts */
82 dl 1.5 private final Condition notFull = putLock.newCondition();
83 dl 1.2
84     /**
85     * Signal a waiting take. Called only from put/offer (which do not
86 dl 1.4 * otherwise ordinarily lock takeLock.)
87 dl 1.2 */
88     private void signalNotEmpty() {
89     takeLock.lock();
90     try {
91     notEmpty.signal();
92     }
93     finally {
94     takeLock.unlock();
95     }
96     }
97    
98     /**
99     * Signal a waiting put. Called only from take/poll.
100     */
101     private void signalNotFull() {
102     putLock.lock();
103     try {
104     notFull.signal();
105     }
106     finally {
107     putLock.unlock();
108     }
109     }
110    
111     /**
112 dholmes 1.8 * Create a node and link it at end of queue
113 dl 1.6 * @param x the item
114 dl 1.2 */
115     private void insert(E x) {
116     last = last.next = new Node<E>(x);
117     }
118    
119     /**
120     * Remove a node from head of queue,
121 dl 1.6 * @return the node
122 dl 1.2 */
123     private E extract() {
124     Node<E> first = head.next;
125     head = first;
126     E x = (E)first.item;
127     first.item = null;
128     return x;
129     }
130    
131     /**
132 tim 1.12 * Lock to prevent both puts and takes.
133 dl 1.2 */
134     private void fullyLock() {
135     putLock.lock();
136     takeLock.lock();
137 tim 1.1 }
138 dl 1.2
139     /**
140 tim 1.12 * Unlock to allow both puts and takes.
141 dl 1.2 */
142     private void fullyUnlock() {
143     takeLock.unlock();
144     putLock.unlock();
145     }
146    
147    
148     /**
149 dholmes 1.8 * Create a <tt>LinkedBlockingQueue</tt> with a capacity of
150     * {@link Integer#MAX_VALUE}.
151 dl 1.2 */
152     public LinkedBlockingQueue() {
153     this(Integer.MAX_VALUE);
154     }
155    
156     /**
157 tim 1.12 * Create a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity
158 dholmes 1.8 * @param capacity the capacity of this queue.
159     * @throws IllegalArgumentException if <tt>capacity</tt> is not greater
160     * than zero.
161 dl 1.2 */
162     public LinkedBlockingQueue(int capacity) {
163 dholmes 1.8 if (capacity <= 0) throw new IllegalArgumentException();
164 dl 1.2 this.capacity = capacity;
165 dl 1.6 last = head = new Node<E>(null);
166 dl 1.2 }
167    
168     /**
169 dholmes 1.9 * Create a <tt>LinkedBlockingQueue</tt> with a capacity of
170 tim 1.12 * {@link Integer#MAX_VALUE}, initially holding the elements of the
171     * given collection,
172 dholmes 1.8 * added in traversal order of the collection's iterator.
173 dholmes 1.9 * @param c the collection of elements to initially contain
174     * @throws NullPointerException if <tt>c</tt> or any element within it
175     * is <tt>null</tt>
176 dl 1.2 */
177 dholmes 1.10 public LinkedBlockingQueue(Collection<? extends E> c) {
178 dl 1.2 this(Integer.MAX_VALUE);
179 tim 1.12 for (Iterator<? extends E> it = c.iterator(); it.hasNext();)
180     add(it.next());
181 dl 1.2 }
182    
183 dholmes 1.9
184     // Have to override just to update the javadoc for @throws
185    
186     /**
187     * @throws IllegalStateException {@inheritDoc}
188     * @throws NullPointerException {@inheritDoc}
189     */
190     public boolean add(E o) {
191     return super.add(o);
192     }
193    
194     /**
195     * @throws IllegalStateException {@inheritDoc}
196     * @throws NullPointerException {@inheritDoc}
197     */
198     public boolean addAll(Collection<? extends E> c) {
199     return super.addAll(c);
200     }
201    
202    
203 dholmes 1.8 // this doc comment is overridden to remove the reference to collections
204     // greater in size than Integer.MAX_VALUE
205 tim 1.12 /**
206     * Return the number of elements in this collection.
207 dholmes 1.8 */
208 dl 1.2 public int size() {
209     return count.get();
210 tim 1.1 }
211 dl 1.2
212 dholmes 1.8 // this doc comment is a modified copy of the inherited doc comment,
213     // without the reference to unlimited queues.
214 tim 1.12 /**
215 dholmes 1.8 * Return the number of elements that this queue can ideally (in
216     * the absence of memory or resource constraints) accept without
217     * blocking. This is always equal to the initial capacity of this queue
218     * less the current <tt>size</tt> of this queue.
219     * <p>Note that you <em>cannot</em> always tell if
220     * an attempt to <tt>add</tt> an element will succeed by
221     * inspecting <tt>remainingCapacity</tt> because it may be the
222     * case that a waiting consumer is ready to <tt>take</tt> an
223     * element out of an otherwise full queue.
224     */
225 dl 1.2 public int remainingCapacity() {
226     return capacity - count.get();
227     }
228    
229 dholmes 1.8 /**
230 tim 1.12 * Add the specified element to the tail of this queue, waiting if
231 dholmes 1.8 * necessary for space to become available.
232     * @throws NullPointerException {@inheritDoc}
233     */
234 dl 1.2 public void put(E x) throws InterruptedException {
235 dl 1.6 if (x == null) throw new NullPointerException();
236 dl 1.2 // Note: convention in all put/take/etc is to preset
237     // local var holding count negative to indicate failure unless set.
238 tim 1.12 int c = -1;
239 dl 1.2 putLock.lockInterruptibly();
240     try {
241     /*
242     * Note that count is used in wait guard even though it is
243     * not protected by lock. This works because count can
244     * only decrease at this point (all other puts are shut
245     * out by lock), and we (or some other waiting put) are
246     * signalled if it ever changes from
247     * capacity. Similarly for all other uses of count in
248     * other wait guards.
249     */
250     try {
251 tim 1.12 while (count.get() == capacity)
252 dl 1.2 notFull.await();
253     }
254     catch (InterruptedException ie) {
255     notFull.signal(); // propagate to a non-interrupted thread
256     throw ie;
257     }
258     insert(x);
259     c = count.getAndIncrement();
260 dl 1.6 if (c + 1 < capacity)
261 dl 1.2 notFull.signal();
262     }
263     finally {
264     putLock.unlock();
265     }
266 tim 1.12 if (c == 0)
267 dl 1.2 signalNotEmpty();
268 tim 1.1 }
269 dl 1.2
270 dholmes 1.8 /**
271 tim 1.12 * Add the specified element to the tail of this queue, waiting if
272 dholmes 1.8 * necessary up to the specified wait time for space to become available.
273     * @throws NullPointerException {@inheritDoc}
274     */
275 tim 1.12 public boolean offer(E x, long timeout, TimeUnit unit)
276 dholmes 1.8 throws InterruptedException {
277 tim 1.12
278 dl 1.6 if (x == null) throw new NullPointerException();
279 dl 1.2 long nanos = unit.toNanos(timeout);
280     int c = -1;
281 dholmes 1.8 putLock.lockInterruptibly();
282 dl 1.2 try {
283     for (;;) {
284     if (count.get() < capacity) {
285     insert(x);
286     c = count.getAndIncrement();
287 dl 1.6 if (c + 1 < capacity)
288 dl 1.2 notFull.signal();
289     break;
290     }
291     if (nanos <= 0)
292     return false;
293     try {
294     nanos = notFull.awaitNanos(nanos);
295     }
296     catch (InterruptedException ie) {
297     notFull.signal(); // propagate to a non-interrupted thread
298     throw ie;
299     }
300     }
301     }
302     finally {
303     putLock.unlock();
304     }
305 tim 1.12 if (c == 0)
306 dl 1.2 signalNotEmpty();
307     return true;
308 tim 1.1 }
309 dl 1.2
310 tim 1.12 /**
311 dholmes 1.8 * Add the specified element to the tail of this queue if possible,
312     * returning immediately if this queue is full.
313     *
314     * @throws NullPointerException {@inheritDoc}
315     */
316 tim 1.1 public boolean offer(E x) {
317 dl 1.6 if (x == null) throw new NullPointerException();
318 dl 1.2 if (count.get() == capacity)
319     return false;
320 tim 1.12 int c = -1;
321 dholmes 1.8 putLock.lock();
322 dl 1.2 try {
323     if (count.get() < capacity) {
324     insert(x);
325     c = count.getAndIncrement();
326 dl 1.6 if (c + 1 < capacity)
327 dl 1.2 notFull.signal();
328     }
329     }
330     finally {
331     putLock.unlock();
332     }
333 tim 1.12 if (c == 0)
334 dl 1.2 signalNotEmpty();
335     return c >= 0;
336 tim 1.1 }
337 dl 1.2
338    
339     public E take() throws InterruptedException {
340     E x;
341     int c = -1;
342     takeLock.lockInterruptibly();
343     try {
344     try {
345 tim 1.12 while (count.get() == 0)
346 dl 1.2 notEmpty.await();
347     }
348     catch (InterruptedException ie) {
349     notEmpty.signal(); // propagate to a non-interrupted thread
350     throw ie;
351     }
352    
353     x = extract();
354     c = count.getAndDecrement();
355     if (c > 1)
356     notEmpty.signal();
357     }
358     finally {
359     takeLock.unlock();
360     }
361 tim 1.12 if (c == capacity)
362 dl 1.2 signalNotFull();
363     return x;
364     }
365    
366     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
367     E x = null;
368     int c = -1;
369 dholmes 1.8 long nanos = unit.toNanos(timeout);
370 dl 1.2 takeLock.lockInterruptibly();
371     try {
372     for (;;) {
373     if (count.get() > 0) {
374     x = extract();
375     c = count.getAndDecrement();
376     if (c > 1)
377     notEmpty.signal();
378     break;
379     }
380     if (nanos <= 0)
381     return null;
382     try {
383     nanos = notEmpty.awaitNanos(nanos);
384     }
385     catch (InterruptedException ie) {
386     notEmpty.signal(); // propagate to a non-interrupted thread
387     throw ie;
388     }
389     }
390     }
391     finally {
392     takeLock.unlock();
393     }
394 tim 1.12 if (c == capacity)
395 dl 1.2 signalNotFull();
396     return x;
397     }
398    
399     public E poll() {
400     if (count.get() == 0)
401     return null;
402     E x = null;
403 tim 1.12 int c = -1;
404 dl 1.2 takeLock.tryLock();
405     try {
406     if (count.get() > 0) {
407     x = extract();
408     c = count.getAndDecrement();
409     if (c > 1)
410     notEmpty.signal();
411     }
412     }
413     finally {
414     takeLock.unlock();
415     }
416 tim 1.12 if (c == capacity)
417 dl 1.2 signalNotFull();
418     return x;
419 tim 1.1 }
420 dl 1.2
421    
422     public E peek() {
423     if (count.get() == 0)
424     return null;
425 dholmes 1.8 takeLock.lock();
426 dl 1.2 try {
427     Node<E> first = head.next;
428     if (first == null)
429     return null;
430     else
431     return first.item;
432     }
433     finally {
434     takeLock.unlock();
435     }
436 tim 1.1 }
437    
438 dholmes 1.9 public boolean remove(Object o) {
439     if (o == null) return false;
440 dl 1.2 boolean removed = false;
441     fullyLock();
442     try {
443     Node<E> trail = head;
444     Node<E> p = head.next;
445     while (p != null) {
446 dholmes 1.9 if (o.equals(p.item)) {
447 dl 1.2 removed = true;
448     break;
449     }
450     trail = p;
451     p = p.next;
452     }
453     if (removed) {
454     p.item = null;
455     trail.next = p.next;
456     if (count.getAndDecrement() == capacity)
457     notFull.signalAll();
458     }
459     }
460     finally {
461     fullyUnlock();
462     }
463     return removed;
464 tim 1.1 }
465 dl 1.2
466     public Object[] toArray() {
467     fullyLock();
468     try {
469     int size = count.get();
470 tim 1.12 Object[] a = new Object[size];
471 dl 1.2 int k = 0;
472 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
473 dl 1.2 a[k++] = p.item;
474     return a;
475     }
476     finally {
477     fullyUnlock();
478     }
479 tim 1.1 }
480 dl 1.2
481     public <T> T[] toArray(T[] a) {
482     fullyLock();
483     try {
484     int size = count.get();
485     if (a.length < size)
486 dl 1.4 a = (T[])java.lang.reflect.Array.newInstance
487     (a.getClass().getComponentType(), size);
488 tim 1.12
489 dl 1.2 int k = 0;
490 tim 1.12 for (Node p = head.next; p != null; p = p.next)
491 dl 1.2 a[k++] = (T)p.item;
492     return a;
493     }
494     finally {
495     fullyUnlock();
496     }
497 tim 1.1 }
498 dl 1.2
499     public String toString() {
500     fullyLock();
501     try {
502     return super.toString();
503     }
504     finally {
505     fullyUnlock();
506     }
507 tim 1.1 }
508 dl 1.2
509     public Iterator<E> iterator() {
510     return new Itr();
511 tim 1.1 }
512 dl 1.2
513     private class Itr implements Iterator<E> {
514 tim 1.12 /*
515 dl 1.4 * Basic weak-consistent iterator. At all times hold the next
516     * item to hand out so that if hasNext() reports true, we will
517     * still have it to return even if lost race with a take etc.
518     */
519 dl 1.2 Node<E> current;
520     Node<E> lastRet;
521 dl 1.4 E currentElement;
522 tim 1.12
523 dl 1.2 Itr() {
524     fullyLock();
525     try {
526     current = head.next;
527 dl 1.4 if (current != null)
528     currentElement = current.item;
529 dl 1.2 }
530     finally {
531     fullyUnlock();
532     }
533     }
534 tim 1.12
535     public boolean hasNext() {
536 dl 1.2 return current != null;
537     }
538    
539 tim 1.12 public E next() {
540 dl 1.2 fullyLock();
541     try {
542     if (current == null)
543     throw new NoSuchElementException();
544 dl 1.4 E x = currentElement;
545 dl 1.2 lastRet = current;
546     current = current.next;
547 dl 1.4 if (current != null)
548     currentElement = current.item;
549 dl 1.2 return x;
550     }
551     finally {
552     fullyUnlock();
553     }
554 tim 1.12
555 dl 1.2 }
556    
557 tim 1.12 public void remove() {
558 dl 1.2 if (lastRet == null)
559 tim 1.12 throw new IllegalStateException();
560 dl 1.2 fullyLock();
561     try {
562     Node<E> node = lastRet;
563     lastRet = null;
564     Node<E> trail = head;
565     Node<E> p = head.next;
566     while (p != null && p != node) {
567     trail = p;
568     p = p.next;
569     }
570     if (p == node) {
571     p.item = null;
572     trail.next = p.next;
573     int c = count.getAndDecrement();
574     if (c == capacity)
575     notFull.signalAll();
576     }
577     }
578     finally {
579     fullyUnlock();
580     }
581     }
582 tim 1.1 }
583 dl 1.2
584     /**
585     * Save the state to a stream (that is, serialize it).
586     *
587     * @serialData The capacity is emitted (int), followed by all of
588     * its elements (each an <tt>Object</tt>) in the proper order,
589     * followed by a null
590 dl 1.6 * @param s the stream
591 dl 1.2 */
592     private void writeObject(java.io.ObjectOutputStream s)
593     throws java.io.IOException {
594    
595 tim 1.12 fullyLock();
596 dl 1.2 try {
597     // Write out any hidden stuff, plus capacity
598     s.defaultWriteObject();
599    
600     // Write out all elements in the proper order.
601 tim 1.12 for (Node<E> p = head.next; p != null; p = p.next)
602 dl 1.2 s.writeObject(p.item);
603    
604     // Use trailing null as sentinel
605     s.writeObject(null);
606     }
607     finally {
608     fullyUnlock();
609     }
610 tim 1.1 }
611    
612 dl 1.2 /**
613 dholmes 1.8 * Reconstitute this queue instance from a stream (that is,
614 dl 1.2 * deserialize it).
615 dl 1.6 * @param s the stream
616 dl 1.2 */
617     private void readObject(java.io.ObjectInputStream s)
618     throws java.io.IOException, ClassNotFoundException {
619 tim 1.12 // Read in capacity, and any hidden stuff
620     s.defaultReadObject();
621 dl 1.2
622 dl 1.6 // Read in all elements and place in queue
623 dl 1.2 for (;;) {
624     E item = (E)s.readObject();
625     if (item == null)
626     break;
627     add(item);
628     }
629 tim 1.1 }
630     }
631 dholmes 1.8
632    
633    
634    
635 tim 1.1