ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.44
Committed: Mon Feb 9 13:28:48 2004 UTC (20 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.43: +29 -30 lines
Log Message:
Wording fixes and improvements

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 dl 1.44 * synchronous queue does not have any internal capacity, not even a
15     * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
16     * because an element is only present when you try to take it; you
17     * cannot add an element (using any method) unless another thread is
18     * trying to remove it; you cannot iterate as there is nothing to
19     * iterate. The <em>head</em> of the queue is the element that the
20     * first queued thread is trying to add to the queue; if there are no
21     * queued threads then no element is being added and the head is
22     * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods
23     * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
24     * as an empty collection. This queue does not permit <tt>null</tt>
25     * elements.
26 dl 1.18 *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.43 *
33     * <p> This class supports an optional fairness policy for ordering
34     * waiting producer and consumer threads. By default, this ordering
35     * is not guaranteed. However, a queue constructed with fairness set
36     * to <tt>true</tt> grants threads access in FIFO order. Fairness
37     * generally decreases throughput but reduces variability and avoids
38     * starvation.
39     *
40 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
41     * of the {@link Collection} and {@link Iterator} interfaces.
42 dl 1.42 *
43     * <p>This class is a member of the
44     * <a href="{@docRoot}/../guide/collections/index.html">
45     * Java Collections Framework</a>.
46     *
47 dl 1.6 * @since 1.5
48     * @author Doug Lea
49 dl 1.24 * @param <E> the type of elements held in this collection
50 dl 1.23 */
51 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
52 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
53 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
54 tim 1.1
55 dl 1.2 /*
56     This implementation divides actions into two cases for puts:
57    
58 dl 1.43 * An arriving producer that does not already have a waiting consumer
59     creates a node holding item, and then waits for a consumer to take it.
60     * An arriving producer that does already have a waiting consumer fills
61     the slot node created by the consumer, and notifies it to continue.
62 dl 1.2
63     And symmetrically, two for takes:
64    
65 dl 1.43 * An arriving consumer that does not already have a waiting producer
66     creates an empty slot node, and then waits for a producer to fill it.
67     * An arriving consumer that does already have a waiting producer takes
68     item from the node created by the producer, and notifies it to continue.
69 tim 1.10
70 dl 1.2 When a put or take waiting for the actions of its counterpart
71     aborts due to interruption or timeout, it marks the node
72     it created as "CANCELLED", which causes its counterpart to retry
73     the entire put or take sequence.
74 dl 1.43
75     This requires keeping two simple queues, waitingProducers and
76     waitingConsumers. Each of these can be FIFO (preserves fairness)
77     or LIFO (improves throughput).
78 dl 1.2 */
79    
80 dl 1.43 /** Lock protecting both wait queues */
81     private final ReentrantLock qlock;
82     /** Queue holding waiting puts */
83     private final WaitQueue waitingProducers;
84     /** Queue holding waiting takes */
85     private final WaitQueue waitingConsumers;
86 dl 1.2
87 dl 1.43 /**
88     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
89     */
90     public SynchronousQueue() {
91 dl 1.44 this(false);
92 dl 1.43 }
93    
94     /**
95     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
96 dl 1.44 * @param fair if true, threads contend in FIFO order for access;
97     * otherwise the order is unspecified.
98 dl 1.43 */
99     public SynchronousQueue(boolean fair) {
100     if (fair) {
101     qlock = new ReentrantLock(true);
102     waitingProducers = new FifoWaitQueue();
103     waitingConsumers = new FifoWaitQueue();
104     }
105     else {
106     qlock = new ReentrantLock();
107     waitingProducers = new LifoWaitQueue();
108     waitingConsumers = new LifoWaitQueue();
109     }
110     }
111    
112     /**
113     * Queue to hold waiting puts/takes; specialized to FiFo/Lifo below.
114     * These queues have all transient fields, but are serializable
115 dl 1.44 * in order to recover fairness settings when deserialized.
116 dl 1.43 */
117     static abstract class WaitQueue implements java.io.Serializable {
118     /** Create, add, and return node for x */
119     abstract Node enq(Object x);
120     /** Remove and return node, or null if empty */
121     abstract Node deq();
122     }
123    
124     /**
125     * FIFO queue to hold waiting puts/takes.
126     */
127     static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
128     private static final long serialVersionUID = -3623113410248163686L;
129     private transient Node head;
130     private transient Node last;
131    
132     Node enq(Object x) {
133     Node p = new Node(x);
134     if (last == null)
135     last = head = p;
136     else
137     last = last.next = p;
138     return p;
139     }
140    
141     Node deq() {
142     Node p = head;
143     if (p != null) {
144     if ((head = p.next) == null)
145     last = null;
146     p.next = null;
147     }
148     return p;
149     }
150     }
151    
152     /**
153     * LIFO queue to hold waiting puts/takes.
154 dl 1.2 */
155 dl 1.43 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
156     private static final long serialVersionUID = -3633113410248163686L;
157     private transient Node head;
158 dl 1.2
159 dl 1.43 Node enq(Object x) {
160     return head = new Node(x, head);
161     }
162    
163     Node deq() {
164     Node p = head;
165 dl 1.44 if (p != null) {
166 dl 1.43 head = p.next;
167 dl 1.44 p.next = null;
168     }
169 dl 1.43 return p;
170     }
171     }
172 dl 1.2
173     /**
174     * Nodes each maintain an item and handle waits and signals for
175 dl 1.31 * getting and setting it. The class extends
176     * AbstractQueuedSynchronizer to manage blocking, using AQS state
177     * 0 for waiting, 1 for ack, -1 for cancelled.
178 dl 1.2 */
179 dl 1.41 static final class Node extends AbstractQueuedSynchronizer {
180 dl 1.35 /** Synchronization state value representing that node acked */
181     private static final int ACK = 1;
182     /** Synchronization state value representing that node cancelled */
183     private static final int CANCEL = -1;
184    
185 dl 1.6 /** The item being transferred */
186 dl 1.2 Object item;
187 dl 1.6 /** Next node in wait queue */
188 dl 1.2 Node next;
189 dl 1.35
190 dl 1.44 /** Creates a node with initial item */
191 dl 1.31 Node(Object x) { item = x; }
192    
193 dl 1.44 /** Creates a node with initial item and next */
194 dl 1.43 Node(Object x, Node n) { item = x; next = n; }
195    
196 dl 1.31 /**
197     * Implements AQS base acquire to succeed if not in WAITING state
198     */
199 dl 1.39 protected boolean tryAcquire(int ignore) {
200 dl 1.35 return getState() != 0;
201 dl 1.31 }
202    
203     /**
204 dl 1.34 * Implements AQS base release to signal if state changed
205 dl 1.31 */
206 dl 1.39 protected boolean tryRelease(int newState) {
207 dl 1.35 return compareAndSetState(0, newState);
208 dl 1.31 }
209    
210     /**
211 dl 1.44 * Takes item and nulls out field (for sake of GC)
212 dl 1.31 */
213 dl 1.35 private Object extract() {
214     Object x = item;
215     item = null;
216     return x;
217 dl 1.31 }
218    
219     /**
220 dl 1.44 * Tries to cancel on interrupt; if so rethrowing,
221 dl 1.35 * else setting interrupt state
222 dl 1.31 */
223 dl 1.35 private void checkCancellationOnInterrupt(InterruptedException ie)
224     throws InterruptedException {
225 dl 1.39 if (release(CANCEL))
226 dl 1.35 throw ie;
227     Thread.currentThread().interrupt();
228 dl 1.31 }
229 dl 1.2
230     /**
231 dl 1.44 * Fills in the slot created by the consumer and signal consumer to
232 dl 1.2 * continue.
233     */
234 dl 1.31 boolean setItem(Object x) {
235 dl 1.35 item = x; // can place in slot even if cancelled
236 dl 1.39 return release(ACK);
237 dl 1.2 }
238    
239     /**
240 dl 1.44 * Removes item from slot created by producer and signal producer
241 dl 1.2 * to continue.
242     */
243 dl 1.31 Object getItem() {
244 dl 1.39 return (release(ACK))? extract() : null;
245 dl 1.35 }
246    
247     /**
248 dl 1.44 * Waits for a consumer to take item placed by producer.
249 dl 1.35 */
250     void waitForTake() throws InterruptedException {
251     try {
252 dl 1.39 acquireInterruptibly(0);
253 dl 1.35 } catch (InterruptedException ie) {
254     checkCancellationOnInterrupt(ie);
255     }
256     }
257    
258     /**
259 dl 1.44 * Waits for a producer to put item placed by consumer.
260 dl 1.35 */
261     Object waitForPut() throws InterruptedException {
262     try {
263 dl 1.39 acquireInterruptibly(0);
264 dl 1.35 } catch (InterruptedException ie) {
265     checkCancellationOnInterrupt(ie);
266     }
267     return extract();
268 dl 1.31 }
269    
270     /**
271 dl 1.44 * Waits for a consumer to take item placed by producer or time out.
272 dl 1.31 */
273 dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
274 dl 1.2 try {
275 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
276     release(CANCEL))
277 dl 1.33 return false;
278 dl 1.31 } catch (InterruptedException ie) {
279 dl 1.35 checkCancellationOnInterrupt(ie);
280 dl 1.2 }
281 dl 1.35 return true;
282 dl 1.2 }
283    
284     /**
285 dl 1.44 * Waits for a producer to put item placed by consumer, or time out.
286 dl 1.31 */
287 dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
288 dl 1.31 try {
289 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
290     release(CANCEL))
291 dl 1.33 return null;
292 dl 1.31 } catch (InterruptedException ie) {
293 dl 1.35 checkCancellationOnInterrupt(ie);
294 dl 1.2 }
295 dl 1.35 return extract();
296 dl 1.2 }
297     }
298    
299     /**
300 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
301     * another thread to receive it.
302     * @param o the element to add
303     * @throws InterruptedException if interrupted while waiting.
304     * @throws NullPointerException if the specified element is <tt>null</tt>.
305 tim 1.10 */
306 dl 1.35 public void put(E o) throws InterruptedException {
307     if (o == null) throw new NullPointerException();
308     final ReentrantLock qlock = this.qlock;
309    
310 dl 1.2 for (;;) {
311     Node node;
312     boolean mustWait;
313 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
314     qlock.lock();
315 dl 1.2 try {
316 dl 1.43 node = waitingConsumers.deq();
317 dl 1.2 if ( (mustWait = (node == null)) )
318 dl 1.43 node = waitingProducers.enq(o);
319 tim 1.14 } finally {
320 dl 1.2 qlock.unlock();
321     }
322    
323 dl 1.31 if (mustWait) {
324 dl 1.35 node.waitForTake();
325     return;
326 dl 1.2 }
327    
328 dl 1.35 else if (node.setItem(o))
329     return;
330 dl 1.2
331 dl 1.43 // else consumer cancelled, so retry
332 dl 1.35 }
333 tim 1.1 }
334    
335 dholmes 1.11 /**
336 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
337 dl 1.18 * up to the specified wait time for another thread to receive it.
338     * @param o the element to add
339     * @param timeout how long to wait before giving up, in units of
340     * <tt>unit</tt>
341     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
342     * <tt>timeout</tt> parameter
343 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
344 dl 1.43 * the specified waiting time elapses before a consumer appears.
345 dl 1.18 * @throws InterruptedException if interrupted while waiting.
346     * @throws NullPointerException if the specified element is <tt>null</tt>.
347 dholmes 1.11 */
348 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
349 dl 1.35 if (o == null) throw new NullPointerException();
350     long nanos = unit.toNanos(timeout);
351     final ReentrantLock qlock = this.qlock;
352     for (;;) {
353     Node node;
354     boolean mustWait;
355 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
356     qlock.lock();
357 dl 1.35 try {
358 dl 1.43 node = waitingConsumers.deq();
359 dl 1.35 if ( (mustWait = (node == null)) )
360 dl 1.43 node = waitingProducers.enq(o);
361 dl 1.35 } finally {
362     qlock.unlock();
363     }
364    
365     if (mustWait)
366     return node.waitForTake(nanos);
367    
368     else if (node.setItem(o))
369     return true;
370    
371 dl 1.43 // else consumer cancelled, so retry
372 dl 1.35 }
373 tim 1.1 }
374    
375 dholmes 1.11 /**
376     * Retrieves and removes the head of this queue, waiting if necessary
377     * for another thread to insert it.
378 dl 1.40 * @throws InterruptedException if interrupted while waiting.
379 dholmes 1.11 * @return the head of this queue
380     */
381 dl 1.2 public E take() throws InterruptedException {
382 dl 1.35 final ReentrantLock qlock = this.qlock;
383     for (;;) {
384     Node node;
385     boolean mustWait;
386    
387 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
388     qlock.lock();
389 dl 1.35 try {
390 dl 1.43 node = waitingProducers.deq();
391 dl 1.35 if ( (mustWait = (node == null)) )
392 dl 1.43 node = waitingConsumers.enq(null);
393 dl 1.35 } finally {
394     qlock.unlock();
395     }
396    
397 dl 1.36 if (mustWait) {
398     Object x = node.waitForPut();
399     return (E)x;
400     }
401 dl 1.35 else {
402     Object x = node.getItem();
403     if (x != null)
404     return (E)x;
405     // else cancelled, so retry
406     }
407     }
408 tim 1.1 }
409 dl 1.2
410 dholmes 1.11 /**
411     * Retrieves and removes the head of this queue, waiting
412     * if necessary up to the specified wait time, for another thread
413     * to insert it.
414 dl 1.18 * @param timeout how long to wait before giving up, in units of
415     * <tt>unit</tt>
416     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
417     * <tt>timeout</tt> parameter
418     * @return the head of this queue, or <tt>null</tt> if the
419     * specified waiting time elapses before an element is present.
420     * @throws InterruptedException if interrupted while waiting.
421 dholmes 1.11 */
422 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
423 dl 1.35 long nanos = unit.toNanos(timeout);
424     final ReentrantLock qlock = this.qlock;
425    
426     for (;;) {
427     Node node;
428     boolean mustWait;
429    
430 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
431     qlock.lock();
432 dl 1.35 try {
433 dl 1.43 node = waitingProducers.deq();
434 dl 1.35 if ( (mustWait = (node == null)) )
435 dl 1.43 node = waitingConsumers.enq(null);
436 dl 1.35 } finally {
437     qlock.unlock();
438     }
439    
440 dl 1.36 if (mustWait) {
441     Object x = node.waitForPut(nanos);
442     return (E)x;
443     }
444 dl 1.35 else {
445     Object x = node.getItem();
446     if (x != null)
447     return (E)x;
448     // else cancelled, so retry
449     }
450     }
451 tim 1.1 }
452 dl 1.2
453     // Untimed nonblocking versions
454    
455 dl 1.18 /**
456 dl 1.20 * Inserts the specified element into this queue, if another thread is
457 dl 1.18 * waiting to receive it.
458     *
459     * @param o the element to add.
460     * @return <tt>true</tt> if it was possible to add the element to
461     * this queue, else <tt>false</tt>
462     * @throws NullPointerException if the specified element is <tt>null</tt>
463     */
464 dholmes 1.11 public boolean offer(E o) {
465     if (o == null) throw new NullPointerException();
466 dl 1.27 final ReentrantLock qlock = this.qlock;
467 tim 1.10
468     for (;;) {
469 tim 1.26 Node node;
470 dl 1.2 qlock.lock();
471     try {
472 dl 1.43 node = waitingConsumers.deq();
473 tim 1.14 } finally {
474 dl 1.2 qlock.unlock();
475     }
476     if (node == null)
477     return false;
478 tim 1.10
479 dl 1.31 else if (node.setItem(o))
480 dl 1.2 return true;
481     // else retry
482     }
483 tim 1.1 }
484 dl 1.2
485 dl 1.18 /**
486     * Retrieves and removes the head of this queue, if another thread
487     * is currently making an element available.
488     *
489     * @return the head of this queue, or <tt>null</tt> if no
490     * element is available.
491     */
492 dl 1.2 public E poll() {
493 dl 1.27 final ReentrantLock qlock = this.qlock;
494 dl 1.2 for (;;) {
495     Node node;
496     qlock.lock();
497     try {
498 dl 1.43 node = waitingProducers.deq();
499 tim 1.14 } finally {
500 dl 1.2 qlock.unlock();
501     }
502     if (node == null)
503     return null;
504    
505     else {
506 dl 1.31 Object x = node.getItem();
507 dl 1.2 if (x != null)
508     return (E)x;
509     // else retry
510     }
511     }
512 tim 1.1 }
513 dl 1.2
514 dl 1.5 /**
515 dholmes 1.11 * Always returns <tt>true</tt>.
516     * A <tt>SynchronousQueue</tt> has no internal capacity.
517     * @return <tt>true</tt>
518 dl 1.5 */
519     public boolean isEmpty() {
520     return true;
521     }
522    
523     /**
524 dholmes 1.11 * Always returns zero.
525     * A <tt>SynchronousQueue</tt> has no internal capacity.
526 dl 1.5 * @return zero.
527     */
528     public int size() {
529     return 0;
530 tim 1.1 }
531 dl 1.2
532 dl 1.5 /**
533 dholmes 1.11 * Always returns zero.
534     * A <tt>SynchronousQueue</tt> has no internal capacity.
535 dl 1.5 * @return zero.
536     */
537     public int remainingCapacity() {
538     return 0;
539     }
540    
541     /**
542 dholmes 1.11 * Does nothing.
543     * A <tt>SynchronousQueue</tt> has no internal capacity.
544     */
545     public void clear() {}
546    
547     /**
548     * Always returns <tt>false</tt>.
549     * A <tt>SynchronousQueue</tt> has no internal capacity.
550 dl 1.18 * @param o the element
551 dholmes 1.11 * @return <tt>false</tt>
552     */
553     public boolean contains(Object o) {
554     return false;
555     }
556    
557     /**
558 dl 1.18 * Always returns <tt>false</tt>.
559     * A <tt>SynchronousQueue</tt> has no internal capacity.
560     *
561     * @param o the element to remove
562     * @return <tt>false</tt>
563     */
564     public boolean remove(Object o) {
565     return false;
566     }
567    
568     /**
569 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
570 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
571 dl 1.18 * @param c the collection
572 dl 1.16 * @return <tt>false</tt> unless given collection is empty
573 dholmes 1.11 */
574 dl 1.12 public boolean containsAll(Collection<?> c) {
575 dl 1.16 return c.isEmpty();
576 dholmes 1.11 }
577    
578     /**
579     * Always returns <tt>false</tt>.
580     * A <tt>SynchronousQueue</tt> has no internal capacity.
581 dl 1.18 * @param c the collection
582 dholmes 1.11 * @return <tt>false</tt>
583     */
584 dl 1.12 public boolean removeAll(Collection<?> c) {
585 dholmes 1.11 return false;
586     }
587    
588     /**
589     * Always returns <tt>false</tt>.
590     * A <tt>SynchronousQueue</tt> has no internal capacity.
591 dl 1.18 * @param c the collection
592 dholmes 1.11 * @return <tt>false</tt>
593     */
594 dl 1.12 public boolean retainAll(Collection<?> c) {
595 dholmes 1.11 return false;
596     }
597    
598     /**
599     * Always returns <tt>null</tt>.
600     * A <tt>SynchronousQueue</tt> does not return elements
601 dl 1.5 * unless actively waited on.
602 dholmes 1.11 * @return <tt>null</tt>
603 dl 1.5 */
604     public E peek() {
605     return null;
606     }
607    
608    
609     static class EmptyIterator<E> implements Iterator<E> {
610 dl 1.2 public boolean hasNext() {
611     return false;
612     }
613     public E next() {
614     throw new NoSuchElementException();
615     }
616     public void remove() {
617 dl 1.17 throw new IllegalStateException();
618 dl 1.2 }
619 tim 1.1 }
620 dl 1.2
621 dl 1.5 /**
622 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
623 tim 1.13 * <tt>false</tt>.
624     *
625 dholmes 1.11 * @return an empty iterator
626 dl 1.5 */
627 dl 1.2 public Iterator<E> iterator() {
628 dl 1.5 return new EmptyIterator<E>();
629 tim 1.1 }
630    
631 dl 1.2
632 dl 1.5 /**
633 dholmes 1.11 * Returns a zero-length array.
634     * @return a zero-length array
635 dl 1.5 */
636 dl 1.3 public Object[] toArray() {
637 dl 1.25 return new Object[0];
638 tim 1.1 }
639    
640 dholmes 1.11 /**
641     * Sets the zeroeth element of the specified array to <tt>null</tt>
642     * (if the array has non-zero length) and returns it.
643 dl 1.40 * @param a the array
644 dholmes 1.11 * @return the specified array
645     */
646 dl 1.2 public <T> T[] toArray(T[] a) {
647     if (a.length > 0)
648     a[0] = null;
649     return a;
650     }
651 dl 1.21
652    
653     public int drainTo(Collection<? super E> c) {
654     if (c == null)
655     throw new NullPointerException();
656     if (c == this)
657     throw new IllegalArgumentException();
658     int n = 0;
659     E e;
660     while ( (e = poll()) != null) {
661     c.add(e);
662     ++n;
663     }
664     return n;
665     }
666    
667     public int drainTo(Collection<? super E> c, int maxElements) {
668     if (c == null)
669     throw new NullPointerException();
670     if (c == this)
671     throw new IllegalArgumentException();
672     int n = 0;
673     E e;
674     while (n < maxElements && (e = poll()) != null) {
675     c.add(e);
676     ++n;
677     }
678     return n;
679     }
680 tim 1.1 }
681 dholmes 1.11
682    
683    
684    
685