ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.43
Committed: Sun Feb 8 15:35:10 2004 UTC (20 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.42: +138 -73 lines
Log Message:
Wording cleanups; Improve SynchronousQueue serialization and fairness support

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14     * synchronous queue does not have any internal capacity - in
15     * particular it does not have a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to take it; you cannot add an element (using
18     * any method) unless another thread is trying to remove it; you
19     * cannot iterate as there is nothing to iterate. The <em>head</em>
20     * of the queue is the element that the first queued thread is trying
21     * to add to the queue; if there are no queued threads then no element
22     * is being added and the head is <tt>null</tt>. For purposes of
23     * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 dl 1.19 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 dl 1.18 * queue does not permit <tt>null</tt> elements.
26     *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.43 *
33     * <p> This class supports an optional fairness policy for ordering
34     * waiting producer and consumer threads. By default, this ordering
35     * is not guaranteed. However, a queue constructed with fairness set
36     * to <tt>true</tt> grants threads access in FIFO order. Fairness
37     * generally decreases throughput but reduces variability and avoids
38     * starvation.
39     *
40 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
41     * of the {@link Collection} and {@link Iterator} interfaces.
42 dl 1.42 *
43     * <p>This class is a member of the
44     * <a href="{@docRoot}/../guide/collections/index.html">
45     * Java Collections Framework</a>.
46     *
47 dl 1.6 * @since 1.5
48     * @author Doug Lea
49 dl 1.24 * @param <E> the type of elements held in this collection
50 dl 1.23 */
51 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
52 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
53 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
54 tim 1.1
55 dl 1.2 /*
56     This implementation divides actions into two cases for puts:
57    
58 dl 1.43 * An arriving producer that does not already have a waiting consumer
59     creates a node holding item, and then waits for a consumer to take it.
60     * An arriving producer that does already have a waiting consumer fills
61     the slot node created by the consumer, and notifies it to continue.
62 dl 1.2
63     And symmetrically, two for takes:
64    
65 dl 1.43 * An arriving consumer that does not already have a waiting producer
66     creates an empty slot node, and then waits for a producer to fill it.
67     * An arriving consumer that does already have a waiting producer takes
68     item from the node created by the producer, and notifies it to continue.
69 tim 1.10
70 dl 1.2 When a put or take waiting for the actions of its counterpart
71     aborts due to interruption or timeout, it marks the node
72     it created as "CANCELLED", which causes its counterpart to retry
73     the entire put or take sequence.
74 dl 1.43
75     This requires keeping two simple queues, waitingProducers and
76     waitingConsumers. Each of these can be FIFO (preserves fairness)
77     or LIFO (improves throughput).
78 dl 1.2 */
79    
80 dl 1.43 /** Lock protecting both wait queues */
81     private final ReentrantLock qlock;
82     /** Queue holding waiting puts */
83     private final WaitQueue waitingProducers;
84     /** Queue holding waiting takes */
85     private final WaitQueue waitingConsumers;
86 dl 1.2
87 dl 1.43 /**
88     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
89     */
90     public SynchronousQueue() {
91     qlock = new ReentrantLock();
92     waitingProducers = new LifoWaitQueue();
93     waitingConsumers = new LifoWaitQueue();
94     }
95    
96     /**
97     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
98     * @param fair if true, threads contend in FIFO order for access.
99     */
100     public SynchronousQueue(boolean fair) {
101     if (fair) {
102     qlock = new ReentrantLock(true);
103     waitingProducers = new FifoWaitQueue();
104     waitingConsumers = new FifoWaitQueue();
105     }
106     else {
107     qlock = new ReentrantLock();
108     waitingProducers = new LifoWaitQueue();
109     waitingConsumers = new LifoWaitQueue();
110     }
111     }
112    
113     /**
114     * Queue to hold waiting puts/takes; specialized to FiFo/Lifo below.
115     * These queues have all transient fields, but are serializable
116     * in order to retain fairness settings when deserialized.
117     */
118     static abstract class WaitQueue implements java.io.Serializable {
119     /** Create, add, and return node for x */
120     abstract Node enq(Object x);
121     /** Remove and return node, or null if empty */
122     abstract Node deq();
123     }
124    
125     /**
126     * FIFO queue to hold waiting puts/takes.
127     */
128     static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
129     private static final long serialVersionUID = -3623113410248163686L;
130     private transient Node head;
131     private transient Node last;
132    
133     Node enq(Object x) {
134     Node p = new Node(x);
135     if (last == null)
136     last = head = p;
137     else
138     last = last.next = p;
139     return p;
140     }
141    
142     Node deq() {
143     Node p = head;
144     if (p != null) {
145     if ((head = p.next) == null)
146     last = null;
147     p.next = null;
148     }
149     return p;
150     }
151     }
152    
153     /**
154     * LIFO queue to hold waiting puts/takes.
155 dl 1.2 */
156 dl 1.43 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
157     private static final long serialVersionUID = -3633113410248163686L;
158     private transient Node head;
159 dl 1.2
160 dl 1.43 Node enq(Object x) {
161     return head = new Node(x, head);
162     }
163    
164     Node deq() {
165     Node p = head;
166     if (p != null)
167     head = p.next;
168     return p;
169     }
170     }
171 dl 1.2
172     /**
173     * Nodes each maintain an item and handle waits and signals for
174 dl 1.31 * getting and setting it. The class extends
175     * AbstractQueuedSynchronizer to manage blocking, using AQS state
176     * 0 for waiting, 1 for ack, -1 for cancelled.
177 dl 1.2 */
178 dl 1.41 static final class Node extends AbstractQueuedSynchronizer {
179 dl 1.35 /** Synchronization state value representing that node acked */
180     private static final int ACK = 1;
181     /** Synchronization state value representing that node cancelled */
182     private static final int CANCEL = -1;
183    
184 dl 1.6 /** The item being transferred */
185 dl 1.2 Object item;
186 dl 1.6 /** Next node in wait queue */
187 dl 1.2 Node next;
188 dl 1.35
189     /** Create node with initial item */
190 dl 1.31 Node(Object x) { item = x; }
191    
192 dl 1.43 /** Create node with initial item and next */
193     Node(Object x, Node n) { item = x; next = n; }
194    
195 dl 1.31 /**
196     * Implements AQS base acquire to succeed if not in WAITING state
197     */
198 dl 1.39 protected boolean tryAcquire(int ignore) {
199 dl 1.35 return getState() != 0;
200 dl 1.31 }
201    
202     /**
203 dl 1.34 * Implements AQS base release to signal if state changed
204 dl 1.31 */
205 dl 1.39 protected boolean tryRelease(int newState) {
206 dl 1.35 return compareAndSetState(0, newState);
207 dl 1.31 }
208    
209     /**
210 dl 1.35 * Take item and null out field (for sake of GC)
211 dl 1.31 */
212 dl 1.35 private Object extract() {
213     Object x = item;
214     item = null;
215     return x;
216 dl 1.31 }
217    
218     /**
219 dl 1.35 * Try to cancel on interrupt; if so rethrowing,
220     * else setting interrupt state
221 dl 1.31 */
222 dl 1.35 private void checkCancellationOnInterrupt(InterruptedException ie)
223     throws InterruptedException {
224 dl 1.39 if (release(CANCEL))
225 dl 1.35 throw ie;
226     Thread.currentThread().interrupt();
227 dl 1.31 }
228 dl 1.2
229     /**
230 dl 1.43 * Fill in the slot created by the consumer and signal consumer to
231 dl 1.2 * continue.
232     */
233 dl 1.31 boolean setItem(Object x) {
234 dl 1.35 item = x; // can place in slot even if cancelled
235 dl 1.39 return release(ACK);
236 dl 1.2 }
237    
238     /**
239 dl 1.43 * Remove item from slot created by producer and signal producer
240 dl 1.2 * to continue.
241     */
242 dl 1.31 Object getItem() {
243 dl 1.39 return (release(ACK))? extract() : null;
244 dl 1.35 }
245    
246     /**
247 dl 1.43 * Wait for a consumer to take item placed by producer.
248 dl 1.35 */
249     void waitForTake() throws InterruptedException {
250     try {
251 dl 1.39 acquireInterruptibly(0);
252 dl 1.35 } catch (InterruptedException ie) {
253     checkCancellationOnInterrupt(ie);
254     }
255     }
256    
257     /**
258 dl 1.43 * Wait for a producer to put item placed by consumer.
259 dl 1.35 */
260     Object waitForPut() throws InterruptedException {
261     try {
262 dl 1.39 acquireInterruptibly(0);
263 dl 1.35 } catch (InterruptedException ie) {
264     checkCancellationOnInterrupt(ie);
265     }
266     return extract();
267 dl 1.31 }
268    
269     /**
270 dl 1.43 * Wait for a consumer to take item placed by producer or time out.
271 dl 1.31 */
272 dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
273 dl 1.2 try {
274 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
275     release(CANCEL))
276 dl 1.33 return false;
277 dl 1.31 } catch (InterruptedException ie) {
278 dl 1.35 checkCancellationOnInterrupt(ie);
279 dl 1.2 }
280 dl 1.35 return true;
281 dl 1.2 }
282    
283     /**
284 dl 1.43 * Wait for a producer to put item placed by consumer, or time out.
285 dl 1.31 */
286 dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
287 dl 1.31 try {
288 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
289     release(CANCEL))
290 dl 1.33 return null;
291 dl 1.31 } catch (InterruptedException ie) {
292 dl 1.35 checkCancellationOnInterrupt(ie);
293 dl 1.2 }
294 dl 1.35 return extract();
295 dl 1.2 }
296     }
297    
298    
299    
300     /**
301 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
302     * another thread to receive it.
303     * @param o the element to add
304     * @throws InterruptedException if interrupted while waiting.
305     * @throws NullPointerException if the specified element is <tt>null</tt>.
306 tim 1.10 */
307 dl 1.35 public void put(E o) throws InterruptedException {
308     if (o == null) throw new NullPointerException();
309     final ReentrantLock qlock = this.qlock;
310    
311 dl 1.2 for (;;) {
312     Node node;
313     boolean mustWait;
314 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
315     qlock.lock();
316 dl 1.2 try {
317 dl 1.43 node = waitingConsumers.deq();
318 dl 1.2 if ( (mustWait = (node == null)) )
319 dl 1.43 node = waitingProducers.enq(o);
320 tim 1.14 } finally {
321 dl 1.2 qlock.unlock();
322     }
323    
324 dl 1.31 if (mustWait) {
325 dl 1.35 node.waitForTake();
326     return;
327 dl 1.2 }
328    
329 dl 1.35 else if (node.setItem(o))
330     return;
331 dl 1.2
332 dl 1.43 // else consumer cancelled, so retry
333 dl 1.35 }
334 tim 1.1 }
335    
336 dholmes 1.11 /**
337 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
338 dl 1.18 * up to the specified wait time for another thread to receive it.
339     * @param o the element to add
340     * @param timeout how long to wait before giving up, in units of
341     * <tt>unit</tt>
342     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
343     * <tt>timeout</tt> parameter
344 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
345 dl 1.43 * the specified waiting time elapses before a consumer appears.
346 dl 1.18 * @throws InterruptedException if interrupted while waiting.
347     * @throws NullPointerException if the specified element is <tt>null</tt>.
348 dholmes 1.11 */
349 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
350 dl 1.35 if (o == null) throw new NullPointerException();
351     long nanos = unit.toNanos(timeout);
352     final ReentrantLock qlock = this.qlock;
353     for (;;) {
354     Node node;
355     boolean mustWait;
356 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
357     qlock.lock();
358 dl 1.35 try {
359 dl 1.43 node = waitingConsumers.deq();
360 dl 1.35 if ( (mustWait = (node == null)) )
361 dl 1.43 node = waitingProducers.enq(o);
362 dl 1.35 } finally {
363     qlock.unlock();
364     }
365    
366     if (mustWait)
367     return node.waitForTake(nanos);
368    
369     else if (node.setItem(o))
370     return true;
371    
372 dl 1.43 // else consumer cancelled, so retry
373 dl 1.35 }
374 tim 1.1 }
375    
376 dholmes 1.11 /**
377     * Retrieves and removes the head of this queue, waiting if necessary
378     * for another thread to insert it.
379 dl 1.40 * @throws InterruptedException if interrupted while waiting.
380 dholmes 1.11 * @return the head of this queue
381     */
382 dl 1.2 public E take() throws InterruptedException {
383 dl 1.35 final ReentrantLock qlock = this.qlock;
384     for (;;) {
385     Node node;
386     boolean mustWait;
387    
388 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
389     qlock.lock();
390 dl 1.35 try {
391 dl 1.43 node = waitingProducers.deq();
392 dl 1.35 if ( (mustWait = (node == null)) )
393 dl 1.43 node = waitingConsumers.enq(null);
394 dl 1.35 } finally {
395     qlock.unlock();
396     }
397    
398 dl 1.36 if (mustWait) {
399     Object x = node.waitForPut();
400     return (E)x;
401     }
402 dl 1.35 else {
403     Object x = node.getItem();
404     if (x != null)
405     return (E)x;
406     // else cancelled, so retry
407     }
408     }
409 tim 1.1 }
410 dl 1.2
411 dholmes 1.11 /**
412     * Retrieves and removes the head of this queue, waiting
413     * if necessary up to the specified wait time, for another thread
414     * to insert it.
415 dl 1.18 * @param timeout how long to wait before giving up, in units of
416     * <tt>unit</tt>
417     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
418     * <tt>timeout</tt> parameter
419     * @return the head of this queue, or <tt>null</tt> if the
420     * specified waiting time elapses before an element is present.
421     * @throws InterruptedException if interrupted while waiting.
422 dholmes 1.11 */
423 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
424 dl 1.35 long nanos = unit.toNanos(timeout);
425     final ReentrantLock qlock = this.qlock;
426    
427     for (;;) {
428     Node node;
429     boolean mustWait;
430    
431 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
432     qlock.lock();
433 dl 1.35 try {
434 dl 1.43 node = waitingProducers.deq();
435 dl 1.35 if ( (mustWait = (node == null)) )
436 dl 1.43 node = waitingConsumers.enq(null);
437 dl 1.35 } finally {
438     qlock.unlock();
439     }
440    
441 dl 1.36 if (mustWait) {
442     Object x = node.waitForPut(nanos);
443     return (E)x;
444     }
445 dl 1.35 else {
446     Object x = node.getItem();
447     if (x != null)
448     return (E)x;
449     // else cancelled, so retry
450     }
451     }
452 tim 1.1 }
453 dl 1.2
454     // Untimed nonblocking versions
455    
456 dl 1.18 /**
457 dl 1.20 * Inserts the specified element into this queue, if another thread is
458 dl 1.18 * waiting to receive it.
459     *
460     * @param o the element to add.
461     * @return <tt>true</tt> if it was possible to add the element to
462     * this queue, else <tt>false</tt>
463     * @throws NullPointerException if the specified element is <tt>null</tt>
464     */
465 dholmes 1.11 public boolean offer(E o) {
466     if (o == null) throw new NullPointerException();
467 dl 1.27 final ReentrantLock qlock = this.qlock;
468 tim 1.10
469     for (;;) {
470 tim 1.26 Node node;
471 dl 1.2 qlock.lock();
472     try {
473 dl 1.43 node = waitingConsumers.deq();
474 tim 1.14 } finally {
475 dl 1.2 qlock.unlock();
476     }
477     if (node == null)
478     return false;
479 tim 1.10
480 dl 1.31 else if (node.setItem(o))
481 dl 1.2 return true;
482     // else retry
483     }
484 tim 1.1 }
485 dl 1.2
486 dl 1.18 /**
487     * Retrieves and removes the head of this queue, if another thread
488     * is currently making an element available.
489     *
490     * @return the head of this queue, or <tt>null</tt> if no
491     * element is available.
492     */
493 dl 1.2 public E poll() {
494 dl 1.27 final ReentrantLock qlock = this.qlock;
495 dl 1.2 for (;;) {
496     Node node;
497     qlock.lock();
498     try {
499 dl 1.43 node = waitingProducers.deq();
500 tim 1.14 } finally {
501 dl 1.2 qlock.unlock();
502     }
503     if (node == null)
504     return null;
505    
506     else {
507 dl 1.31 Object x = node.getItem();
508 dl 1.2 if (x != null)
509     return (E)x;
510     // else retry
511     }
512     }
513 tim 1.1 }
514 dl 1.2
515 dl 1.5 /**
516 dholmes 1.11 * Always returns <tt>true</tt>.
517     * A <tt>SynchronousQueue</tt> has no internal capacity.
518     * @return <tt>true</tt>
519 dl 1.5 */
520     public boolean isEmpty() {
521     return true;
522     }
523    
524     /**
525 dholmes 1.11 * Always returns zero.
526     * A <tt>SynchronousQueue</tt> has no internal capacity.
527 dl 1.5 * @return zero.
528     */
529     public int size() {
530     return 0;
531 tim 1.1 }
532 dl 1.2
533 dl 1.5 /**
534 dholmes 1.11 * Always returns zero.
535     * A <tt>SynchronousQueue</tt> has no internal capacity.
536 dl 1.5 * @return zero.
537     */
538     public int remainingCapacity() {
539     return 0;
540     }
541    
542     /**
543 dholmes 1.11 * Does nothing.
544     * A <tt>SynchronousQueue</tt> has no internal capacity.
545     */
546     public void clear() {}
547    
548     /**
549     * Always returns <tt>false</tt>.
550     * A <tt>SynchronousQueue</tt> has no internal capacity.
551 dl 1.18 * @param o the element
552 dholmes 1.11 * @return <tt>false</tt>
553     */
554     public boolean contains(Object o) {
555     return false;
556     }
557    
558     /**
559 dl 1.18 * Always returns <tt>false</tt>.
560     * A <tt>SynchronousQueue</tt> has no internal capacity.
561     *
562     * @param o the element to remove
563     * @return <tt>false</tt>
564     */
565     public boolean remove(Object o) {
566     return false;
567     }
568    
569     /**
570 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
571 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
572 dl 1.18 * @param c the collection
573 dl 1.16 * @return <tt>false</tt> unless given collection is empty
574 dholmes 1.11 */
575 dl 1.12 public boolean containsAll(Collection<?> c) {
576 dl 1.16 return c.isEmpty();
577 dholmes 1.11 }
578    
579     /**
580     * Always returns <tt>false</tt>.
581     * A <tt>SynchronousQueue</tt> has no internal capacity.
582 dl 1.18 * @param c the collection
583 dholmes 1.11 * @return <tt>false</tt>
584     */
585 dl 1.12 public boolean removeAll(Collection<?> c) {
586 dholmes 1.11 return false;
587     }
588    
589     /**
590     * Always returns <tt>false</tt>.
591     * A <tt>SynchronousQueue</tt> has no internal capacity.
592 dl 1.18 * @param c the collection
593 dholmes 1.11 * @return <tt>false</tt>
594     */
595 dl 1.12 public boolean retainAll(Collection<?> c) {
596 dholmes 1.11 return false;
597     }
598    
599     /**
600     * Always returns <tt>null</tt>.
601     * A <tt>SynchronousQueue</tt> does not return elements
602 dl 1.5 * unless actively waited on.
603 dholmes 1.11 * @return <tt>null</tt>
604 dl 1.5 */
605     public E peek() {
606     return null;
607     }
608    
609    
610     static class EmptyIterator<E> implements Iterator<E> {
611 dl 1.2 public boolean hasNext() {
612     return false;
613     }
614     public E next() {
615     throw new NoSuchElementException();
616     }
617     public void remove() {
618 dl 1.17 throw new IllegalStateException();
619 dl 1.2 }
620 tim 1.1 }
621 dl 1.2
622 dl 1.5 /**
623 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
624 tim 1.13 * <tt>false</tt>.
625     *
626 dholmes 1.11 * @return an empty iterator
627 dl 1.5 */
628 dl 1.2 public Iterator<E> iterator() {
629 dl 1.5 return new EmptyIterator<E>();
630 tim 1.1 }
631    
632 dl 1.2
633 dl 1.5 /**
634 dholmes 1.11 * Returns a zero-length array.
635     * @return a zero-length array
636 dl 1.5 */
637 dl 1.3 public Object[] toArray() {
638 dl 1.25 return new Object[0];
639 tim 1.1 }
640    
641 dholmes 1.11 /**
642     * Sets the zeroeth element of the specified array to <tt>null</tt>
643     * (if the array has non-zero length) and returns it.
644 dl 1.40 * @param a the array
645 dholmes 1.11 * @return the specified array
646     */
647 dl 1.2 public <T> T[] toArray(T[] a) {
648     if (a.length > 0)
649     a[0] = null;
650     return a;
651     }
652 dl 1.21
653    
654     public int drainTo(Collection<? super E> c) {
655     if (c == null)
656     throw new NullPointerException();
657     if (c == this)
658     throw new IllegalArgumentException();
659     int n = 0;
660     E e;
661     while ( (e = poll()) != null) {
662     c.add(e);
663     ++n;
664     }
665     return n;
666     }
667    
668     public int drainTo(Collection<? super E> c, int maxElements) {
669     if (c == null)
670     throw new NullPointerException();
671     if (c == this)
672     throw new IllegalArgumentException();
673     int n = 0;
674     E e;
675     while (n < maxElements && (e = poll()) != null) {
676     c.add(e);
677     ++n;
678     }
679     return n;
680     }
681 tim 1.1 }
682 dholmes 1.11
683    
684    
685    
686