ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.46
Committed: Wed Jun 2 23:49:07 2004 UTC (20 years ago) by dl
Branch: MAIN
Changes since 1.45: +3 -2 lines
Log Message:
CopyOnWriteArraySet and ConcurrentHashMap no longer implement Cloneable
Improve javadoc wording in other classes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 dl 1.44 * synchronous queue does not have any internal capacity, not even a
15     * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
16     * because an element is only present when you try to take it; you
17     * cannot add an element (using any method) unless another thread is
18     * trying to remove it; you cannot iterate as there is nothing to
19     * iterate. The <em>head</em> of the queue is the element that the
20     * first queued thread is trying to add to the queue; if there are no
21     * queued threads then no element is being added and the head is
22     * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods
23     * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
24     * as an empty collection. This queue does not permit <tt>null</tt>
25     * elements.
26 dl 1.18 *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.43 *
33     * <p> This class supports an optional fairness policy for ordering
34     * waiting producer and consumer threads. By default, this ordering
35     * is not guaranteed. However, a queue constructed with fairness set
36     * to <tt>true</tt> grants threads access in FIFO order. Fairness
37     * generally decreases throughput but reduces variability and avoids
38     * starvation.
39     *
40 dl 1.46 * <p>This class and its iterator implement all of the
41     * <em>optional</em> methods of the {@link Collection} and {@link
42     * Iterator} interfaces.
43 dl 1.42 *
44     * <p>This class is a member of the
45     * <a href="{@docRoot}/../guide/collections/index.html">
46     * Java Collections Framework</a>.
47     *
48 dl 1.6 * @since 1.5
49     * @author Doug Lea
50 dl 1.24 * @param <E> the type of elements held in this collection
51 dl 1.23 */
52 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
53 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
54 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
55 tim 1.1
56 dl 1.2 /*
57     This implementation divides actions into two cases for puts:
58    
59 dl 1.43 * An arriving producer that does not already have a waiting consumer
60     creates a node holding item, and then waits for a consumer to take it.
61     * An arriving producer that does already have a waiting consumer fills
62     the slot node created by the consumer, and notifies it to continue.
63 dl 1.2
64     And symmetrically, two for takes:
65    
66 dl 1.43 * An arriving consumer that does not already have a waiting producer
67     creates an empty slot node, and then waits for a producer to fill it.
68     * An arriving consumer that does already have a waiting producer takes
69     item from the node created by the producer, and notifies it to continue.
70 tim 1.10
71 dl 1.2 When a put or take waiting for the actions of its counterpart
72     aborts due to interruption or timeout, it marks the node
73     it created as "CANCELLED", which causes its counterpart to retry
74     the entire put or take sequence.
75 dl 1.43
76     This requires keeping two simple queues, waitingProducers and
77     waitingConsumers. Each of these can be FIFO (preserves fairness)
78     or LIFO (improves throughput).
79 dl 1.2 */
80    
81 dl 1.43 /** Lock protecting both wait queues */
82     private final ReentrantLock qlock;
83     /** Queue holding waiting puts */
84     private final WaitQueue waitingProducers;
85     /** Queue holding waiting takes */
86     private final WaitQueue waitingConsumers;
87 dl 1.2
88 dl 1.43 /**
89     * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
90     */
91     public SynchronousQueue() {
92 dl 1.44 this(false);
93 dl 1.43 }
94    
95     /**
96     * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
97 dl 1.44 * @param fair if true, threads contend in FIFO order for access;
98     * otherwise the order is unspecified.
99 dl 1.43 */
100     public SynchronousQueue(boolean fair) {
101     if (fair) {
102     qlock = new ReentrantLock(true);
103     waitingProducers = new FifoWaitQueue();
104     waitingConsumers = new FifoWaitQueue();
105     }
106     else {
107     qlock = new ReentrantLock();
108     waitingProducers = new LifoWaitQueue();
109     waitingConsumers = new LifoWaitQueue();
110     }
111     }
112    
113     /**
114 dl 1.45 * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
115 dl 1.43 * These queues have all transient fields, but are serializable
116 dl 1.44 * in order to recover fairness settings when deserialized.
117 dl 1.43 */
118     static abstract class WaitQueue implements java.io.Serializable {
119     /** Create, add, and return node for x */
120     abstract Node enq(Object x);
121     /** Remove and return node, or null if empty */
122     abstract Node deq();
123     }
124    
125     /**
126     * FIFO queue to hold waiting puts/takes.
127     */
128     static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
129     private static final long serialVersionUID = -3623113410248163686L;
130     private transient Node head;
131     private transient Node last;
132    
133     Node enq(Object x) {
134     Node p = new Node(x);
135     if (last == null)
136     last = head = p;
137     else
138     last = last.next = p;
139     return p;
140     }
141    
142     Node deq() {
143     Node p = head;
144     if (p != null) {
145     if ((head = p.next) == null)
146     last = null;
147     p.next = null;
148     }
149     return p;
150     }
151     }
152    
153     /**
154     * LIFO queue to hold waiting puts/takes.
155 dl 1.2 */
156 dl 1.43 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
157     private static final long serialVersionUID = -3633113410248163686L;
158     private transient Node head;
159 dl 1.2
160 dl 1.43 Node enq(Object x) {
161     return head = new Node(x, head);
162     }
163    
164     Node deq() {
165     Node p = head;
166 dl 1.44 if (p != null) {
167 dl 1.43 head = p.next;
168 dl 1.44 p.next = null;
169     }
170 dl 1.43 return p;
171     }
172     }
173 dl 1.2
174     /**
175     * Nodes each maintain an item and handle waits and signals for
176 dl 1.31 * getting and setting it. The class extends
177     * AbstractQueuedSynchronizer to manage blocking, using AQS state
178     * 0 for waiting, 1 for ack, -1 for cancelled.
179 dl 1.2 */
180 dl 1.41 static final class Node extends AbstractQueuedSynchronizer {
181 dl 1.35 /** Synchronization state value representing that node acked */
182     private static final int ACK = 1;
183     /** Synchronization state value representing that node cancelled */
184     private static final int CANCEL = -1;
185    
186 dl 1.6 /** The item being transferred */
187 dl 1.2 Object item;
188 dl 1.6 /** Next node in wait queue */
189 dl 1.2 Node next;
190 dl 1.35
191 dl 1.44 /** Creates a node with initial item */
192 dl 1.31 Node(Object x) { item = x; }
193    
194 dl 1.44 /** Creates a node with initial item and next */
195 dl 1.43 Node(Object x, Node n) { item = x; next = n; }
196    
197 dl 1.31 /**
198     * Implements AQS base acquire to succeed if not in WAITING state
199     */
200 dl 1.39 protected boolean tryAcquire(int ignore) {
201 dl 1.35 return getState() != 0;
202 dl 1.31 }
203    
204     /**
205 dl 1.34 * Implements AQS base release to signal if state changed
206 dl 1.31 */
207 dl 1.39 protected boolean tryRelease(int newState) {
208 dl 1.35 return compareAndSetState(0, newState);
209 dl 1.31 }
210    
211     /**
212 dl 1.44 * Takes item and nulls out field (for sake of GC)
213 dl 1.31 */
214 dl 1.35 private Object extract() {
215     Object x = item;
216     item = null;
217     return x;
218 dl 1.31 }
219    
220     /**
221 dl 1.44 * Tries to cancel on interrupt; if so rethrowing,
222 dl 1.35 * else setting interrupt state
223 dl 1.31 */
224 dl 1.35 private void checkCancellationOnInterrupt(InterruptedException ie)
225     throws InterruptedException {
226 dl 1.39 if (release(CANCEL))
227 dl 1.35 throw ie;
228     Thread.currentThread().interrupt();
229 dl 1.31 }
230 dl 1.2
231     /**
232 dl 1.44 * Fills in the slot created by the consumer and signal consumer to
233 dl 1.2 * continue.
234     */
235 dl 1.31 boolean setItem(Object x) {
236 dl 1.35 item = x; // can place in slot even if cancelled
237 dl 1.39 return release(ACK);
238 dl 1.2 }
239    
240     /**
241 dl 1.44 * Removes item from slot created by producer and signal producer
242 dl 1.2 * to continue.
243     */
244 dl 1.31 Object getItem() {
245 dl 1.39 return (release(ACK))? extract() : null;
246 dl 1.35 }
247    
248     /**
249 dl 1.44 * Waits for a consumer to take item placed by producer.
250 dl 1.35 */
251     void waitForTake() throws InterruptedException {
252     try {
253 dl 1.39 acquireInterruptibly(0);
254 dl 1.35 } catch (InterruptedException ie) {
255     checkCancellationOnInterrupt(ie);
256     }
257     }
258    
259     /**
260 dl 1.44 * Waits for a producer to put item placed by consumer.
261 dl 1.35 */
262     Object waitForPut() throws InterruptedException {
263     try {
264 dl 1.39 acquireInterruptibly(0);
265 dl 1.35 } catch (InterruptedException ie) {
266     checkCancellationOnInterrupt(ie);
267     }
268     return extract();
269 dl 1.31 }
270    
271     /**
272 dl 1.44 * Waits for a consumer to take item placed by producer or time out.
273 dl 1.31 */
274 dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
275 dl 1.2 try {
276 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
277     release(CANCEL))
278 dl 1.33 return false;
279 dl 1.31 } catch (InterruptedException ie) {
280 dl 1.35 checkCancellationOnInterrupt(ie);
281 dl 1.2 }
282 dl 1.35 return true;
283 dl 1.2 }
284    
285     /**
286 dl 1.44 * Waits for a producer to put item placed by consumer, or time out.
287 dl 1.31 */
288 dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
289 dl 1.31 try {
290 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
291     release(CANCEL))
292 dl 1.33 return null;
293 dl 1.31 } catch (InterruptedException ie) {
294 dl 1.35 checkCancellationOnInterrupt(ie);
295 dl 1.2 }
296 dl 1.35 return extract();
297 dl 1.2 }
298     }
299    
300     /**
301 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
302     * another thread to receive it.
303     * @param o the element to add
304     * @throws InterruptedException if interrupted while waiting.
305     * @throws NullPointerException if the specified element is <tt>null</tt>.
306 tim 1.10 */
307 dl 1.35 public void put(E o) throws InterruptedException {
308     if (o == null) throw new NullPointerException();
309     final ReentrantLock qlock = this.qlock;
310    
311 dl 1.2 for (;;) {
312     Node node;
313     boolean mustWait;
314 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
315     qlock.lock();
316 dl 1.2 try {
317 dl 1.43 node = waitingConsumers.deq();
318 dl 1.2 if ( (mustWait = (node == null)) )
319 dl 1.43 node = waitingProducers.enq(o);
320 tim 1.14 } finally {
321 dl 1.2 qlock.unlock();
322     }
323    
324 dl 1.31 if (mustWait) {
325 dl 1.35 node.waitForTake();
326     return;
327 dl 1.2 }
328    
329 dl 1.35 else if (node.setItem(o))
330     return;
331 dl 1.2
332 dl 1.43 // else consumer cancelled, so retry
333 dl 1.35 }
334 tim 1.1 }
335    
336 dholmes 1.11 /**
337 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
338 dl 1.18 * up to the specified wait time for another thread to receive it.
339     * @param o the element to add
340     * @param timeout how long to wait before giving up, in units of
341     * <tt>unit</tt>
342     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
343     * <tt>timeout</tt> parameter
344 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
345 dl 1.43 * the specified waiting time elapses before a consumer appears.
346 dl 1.18 * @throws InterruptedException if interrupted while waiting.
347     * @throws NullPointerException if the specified element is <tt>null</tt>.
348 dholmes 1.11 */
349 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
350 dl 1.35 if (o == null) throw new NullPointerException();
351     long nanos = unit.toNanos(timeout);
352     final ReentrantLock qlock = this.qlock;
353     for (;;) {
354     Node node;
355     boolean mustWait;
356 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
357     qlock.lock();
358 dl 1.35 try {
359 dl 1.43 node = waitingConsumers.deq();
360 dl 1.35 if ( (mustWait = (node == null)) )
361 dl 1.43 node = waitingProducers.enq(o);
362 dl 1.35 } finally {
363     qlock.unlock();
364     }
365    
366     if (mustWait)
367     return node.waitForTake(nanos);
368    
369     else if (node.setItem(o))
370     return true;
371    
372 dl 1.43 // else consumer cancelled, so retry
373 dl 1.35 }
374 tim 1.1 }
375    
376 dholmes 1.11 /**
377     * Retrieves and removes the head of this queue, waiting if necessary
378     * for another thread to insert it.
379 dl 1.40 * @throws InterruptedException if interrupted while waiting.
380 dholmes 1.11 * @return the head of this queue
381     */
382 dl 1.2 public E take() throws InterruptedException {
383 dl 1.35 final ReentrantLock qlock = this.qlock;
384     for (;;) {
385     Node node;
386     boolean mustWait;
387    
388 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
389     qlock.lock();
390 dl 1.35 try {
391 dl 1.43 node = waitingProducers.deq();
392 dl 1.35 if ( (mustWait = (node == null)) )
393 dl 1.43 node = waitingConsumers.enq(null);
394 dl 1.35 } finally {
395     qlock.unlock();
396     }
397    
398 dl 1.36 if (mustWait) {
399     Object x = node.waitForPut();
400     return (E)x;
401     }
402 dl 1.35 else {
403     Object x = node.getItem();
404     if (x != null)
405     return (E)x;
406     // else cancelled, so retry
407     }
408     }
409 tim 1.1 }
410 dl 1.2
411 dholmes 1.11 /**
412     * Retrieves and removes the head of this queue, waiting
413     * if necessary up to the specified wait time, for another thread
414     * to insert it.
415 dl 1.18 * @param timeout how long to wait before giving up, in units of
416     * <tt>unit</tt>
417     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
418     * <tt>timeout</tt> parameter
419     * @return the head of this queue, or <tt>null</tt> if the
420     * specified waiting time elapses before an element is present.
421     * @throws InterruptedException if interrupted while waiting.
422 dholmes 1.11 */
423 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
424 dl 1.35 long nanos = unit.toNanos(timeout);
425     final ReentrantLock qlock = this.qlock;
426    
427     for (;;) {
428     Node node;
429     boolean mustWait;
430    
431 dl 1.43 if (Thread.interrupted()) throw new InterruptedException();
432     qlock.lock();
433 dl 1.35 try {
434 dl 1.43 node = waitingProducers.deq();
435 dl 1.35 if ( (mustWait = (node == null)) )
436 dl 1.43 node = waitingConsumers.enq(null);
437 dl 1.35 } finally {
438     qlock.unlock();
439     }
440    
441 dl 1.36 if (mustWait) {
442     Object x = node.waitForPut(nanos);
443     return (E)x;
444     }
445 dl 1.35 else {
446     Object x = node.getItem();
447     if (x != null)
448     return (E)x;
449     // else cancelled, so retry
450     }
451     }
452 tim 1.1 }
453 dl 1.2
454     // Untimed nonblocking versions
455    
456 dl 1.18 /**
457 dl 1.20 * Inserts the specified element into this queue, if another thread is
458 dl 1.18 * waiting to receive it.
459     *
460     * @param o the element to add.
461     * @return <tt>true</tt> if it was possible to add the element to
462     * this queue, else <tt>false</tt>
463     * @throws NullPointerException if the specified element is <tt>null</tt>
464     */
465 dholmes 1.11 public boolean offer(E o) {
466     if (o == null) throw new NullPointerException();
467 dl 1.27 final ReentrantLock qlock = this.qlock;
468 tim 1.10
469     for (;;) {
470 tim 1.26 Node node;
471 dl 1.2 qlock.lock();
472     try {
473 dl 1.43 node = waitingConsumers.deq();
474 tim 1.14 } finally {
475 dl 1.2 qlock.unlock();
476     }
477     if (node == null)
478     return false;
479 tim 1.10
480 dl 1.31 else if (node.setItem(o))
481 dl 1.2 return true;
482     // else retry
483     }
484 tim 1.1 }
485 dl 1.2
486 dl 1.18 /**
487     * Retrieves and removes the head of this queue, if another thread
488     * is currently making an element available.
489     *
490     * @return the head of this queue, or <tt>null</tt> if no
491     * element is available.
492     */
493 dl 1.2 public E poll() {
494 dl 1.27 final ReentrantLock qlock = this.qlock;
495 dl 1.2 for (;;) {
496     Node node;
497     qlock.lock();
498     try {
499 dl 1.43 node = waitingProducers.deq();
500 tim 1.14 } finally {
501 dl 1.2 qlock.unlock();
502     }
503     if (node == null)
504     return null;
505    
506     else {
507 dl 1.31 Object x = node.getItem();
508 dl 1.2 if (x != null)
509     return (E)x;
510     // else retry
511     }
512     }
513 tim 1.1 }
514 dl 1.2
515 dl 1.5 /**
516 dholmes 1.11 * Always returns <tt>true</tt>.
517     * A <tt>SynchronousQueue</tt> has no internal capacity.
518     * @return <tt>true</tt>
519 dl 1.5 */
520     public boolean isEmpty() {
521     return true;
522     }
523    
524     /**
525 dholmes 1.11 * Always returns zero.
526     * A <tt>SynchronousQueue</tt> has no internal capacity.
527 dl 1.5 * @return zero.
528     */
529     public int size() {
530     return 0;
531 tim 1.1 }
532 dl 1.2
533 dl 1.5 /**
534 dholmes 1.11 * Always returns zero.
535     * A <tt>SynchronousQueue</tt> has no internal capacity.
536 dl 1.5 * @return zero.
537     */
538     public int remainingCapacity() {
539     return 0;
540     }
541    
542     /**
543 dholmes 1.11 * Does nothing.
544     * A <tt>SynchronousQueue</tt> has no internal capacity.
545     */
546     public void clear() {}
547    
548     /**
549     * Always returns <tt>false</tt>.
550     * A <tt>SynchronousQueue</tt> has no internal capacity.
551 dl 1.18 * @param o the element
552 dholmes 1.11 * @return <tt>false</tt>
553     */
554     public boolean contains(Object o) {
555     return false;
556     }
557    
558     /**
559 dl 1.18 * Always returns <tt>false</tt>.
560     * A <tt>SynchronousQueue</tt> has no internal capacity.
561     *
562     * @param o the element to remove
563     * @return <tt>false</tt>
564     */
565     public boolean remove(Object o) {
566     return false;
567     }
568    
569     /**
570 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
571 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
572 dl 1.18 * @param c the collection
573 dl 1.16 * @return <tt>false</tt> unless given collection is empty
574 dholmes 1.11 */
575 dl 1.12 public boolean containsAll(Collection<?> c) {
576 dl 1.16 return c.isEmpty();
577 dholmes 1.11 }
578    
579     /**
580     * Always returns <tt>false</tt>.
581     * A <tt>SynchronousQueue</tt> has no internal capacity.
582 dl 1.18 * @param c the collection
583 dholmes 1.11 * @return <tt>false</tt>
584     */
585 dl 1.12 public boolean removeAll(Collection<?> c) {
586 dholmes 1.11 return false;
587     }
588    
589     /**
590     * Always returns <tt>false</tt>.
591     * A <tt>SynchronousQueue</tt> has no internal capacity.
592 dl 1.18 * @param c the collection
593 dholmes 1.11 * @return <tt>false</tt>
594     */
595 dl 1.12 public boolean retainAll(Collection<?> c) {
596 dholmes 1.11 return false;
597     }
598    
599     /**
600     * Always returns <tt>null</tt>.
601     * A <tt>SynchronousQueue</tt> does not return elements
602 dl 1.5 * unless actively waited on.
603 dholmes 1.11 * @return <tt>null</tt>
604 dl 1.5 */
605     public E peek() {
606     return null;
607     }
608    
609    
610     static class EmptyIterator<E> implements Iterator<E> {
611 dl 1.2 public boolean hasNext() {
612     return false;
613     }
614     public E next() {
615     throw new NoSuchElementException();
616     }
617     public void remove() {
618 dl 1.17 throw new IllegalStateException();
619 dl 1.2 }
620 tim 1.1 }
621 dl 1.2
622 dl 1.5 /**
623 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
624 tim 1.13 * <tt>false</tt>.
625     *
626 dholmes 1.11 * @return an empty iterator
627 dl 1.5 */
628 dl 1.2 public Iterator<E> iterator() {
629 dl 1.5 return new EmptyIterator<E>();
630 tim 1.1 }
631    
632 dl 1.2
633 dl 1.5 /**
634 dholmes 1.11 * Returns a zero-length array.
635     * @return a zero-length array
636 dl 1.5 */
637 dl 1.3 public Object[] toArray() {
638 dl 1.25 return new Object[0];
639 tim 1.1 }
640    
641 dholmes 1.11 /**
642     * Sets the zeroeth element of the specified array to <tt>null</tt>
643     * (if the array has non-zero length) and returns it.
644 dl 1.40 * @param a the array
645 dholmes 1.11 * @return the specified array
646     */
647 dl 1.2 public <T> T[] toArray(T[] a) {
648     if (a.length > 0)
649     a[0] = null;
650     return a;
651     }
652 dl 1.21
653    
654     public int drainTo(Collection<? super E> c) {
655     if (c == null)
656     throw new NullPointerException();
657     if (c == this)
658     throw new IllegalArgumentException();
659     int n = 0;
660     E e;
661     while ( (e = poll()) != null) {
662     c.add(e);
663     ++n;
664     }
665     return n;
666     }
667    
668     public int drainTo(Collection<? super E> c, int maxElements) {
669     if (c == null)
670     throw new NullPointerException();
671     if (c == this)
672     throw new IllegalArgumentException();
673     int n = 0;
674     E e;
675     while (n < maxElements && (e = poll()) != null) {
676     c.add(e);
677     ++n;
678     }
679     return n;
680     }
681 tim 1.1 }
682 dholmes 1.11
683    
684    
685    
686