ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.42
Committed: Tue Jan 27 11:36:31 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.41: +5 -0 lines
Log Message:
Add Collection framework membership doc

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14     * synchronous queue does not have any internal capacity - in
15     * particular it does not have a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to take it; you cannot add an element (using
18     * any method) unless another thread is trying to remove it; you
19     * cannot iterate as there is nothing to iterate. The <em>head</em>
20     * of the queue is the element that the first queued thread is trying
21     * to add to the queue; if there are no queued threads then no element
22     * is being added and the head is <tt>null</tt>. For purposes of
23     * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 dl 1.19 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 dl 1.18 * queue does not permit <tt>null</tt> elements.
26     *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
33     * of the {@link Collection} and {@link Iterator} interfaces.
34 dl 1.42 *
35     * <p>This class is a member of the
36     * <a href="{@docRoot}/../guide/collections/index.html">
37     * Java Collections Framework</a>.
38     *
39 dl 1.6 * @since 1.5
40     * @author Doug Lea
41 dl 1.24 * @param <E> the type of elements held in this collection
42 dl 1.23 */
43 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
44 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
45 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
46 tim 1.1
47 dl 1.2 /*
48     This implementation divides actions into two cases for puts:
49    
50 tim 1.10 * An arriving putter that does not already have a waiting taker
51 dl 1.2 creates a node holding item, and then waits for a taker to take it.
52     * An arriving putter that does already have a waiting taker fills
53     the slot node created by the taker, and notifies it to continue.
54    
55     And symmetrically, two for takes:
56    
57     * An arriving taker that does not already have a waiting putter
58     creates an empty slot node, and then waits for a putter to fill it.
59     * An arriving taker that does already have a waiting putter takes
60     item from the node created by the putter, and notifies it to continue.
61    
62     This requires keeping two simple queues: waitingPuts and waitingTakes.
63 tim 1.10
64 dl 1.2 When a put or take waiting for the actions of its counterpart
65     aborts due to interruption or timeout, it marks the node
66     it created as "CANCELLED", which causes its counterpart to retry
67     the entire put or take sequence.
68     */
69    
70    
71     /*
72     * Note that all fields are transient final, so there is
73     * no explicit serialization code.
74     */
75    
76     private transient final WaitQueue waitingPuts = new WaitQueue();
77     private transient final WaitQueue waitingTakes = new WaitQueue();
78     private transient final ReentrantLock qlock = new ReentrantLock();
79    
80     /**
81     * Nodes each maintain an item and handle waits and signals for
82 dl 1.31 * getting and setting it. The class extends
83     * AbstractQueuedSynchronizer to manage blocking, using AQS state
84     * 0 for waiting, 1 for ack, -1 for cancelled.
85 dl 1.2 */
86 dl 1.41 static final class Node extends AbstractQueuedSynchronizer {
87 dl 1.35 /** Synchronization state value representing that node acked */
88     private static final int ACK = 1;
89     /** Synchronization state value representing that node cancelled */
90     private static final int CANCEL = -1;
91    
92 dl 1.6 /** The item being transferred */
93 dl 1.2 Object item;
94 dl 1.6 /** Next node in wait queue */
95 dl 1.2 Node next;
96 dl 1.35
97     /** Create node with initial item */
98 dl 1.31 Node(Object x) { item = x; }
99    
100     /**
101     * Implements AQS base acquire to succeed if not in WAITING state
102     */
103 dl 1.39 protected boolean tryAcquire(int ignore) {
104 dl 1.35 return getState() != 0;
105 dl 1.31 }
106    
107     /**
108 dl 1.34 * Implements AQS base release to signal if state changed
109 dl 1.31 */
110 dl 1.39 protected boolean tryRelease(int newState) {
111 dl 1.35 return compareAndSetState(0, newState);
112 dl 1.31 }
113    
114     /**
115 dl 1.35 * Take item and null out field (for sake of GC)
116 dl 1.31 */
117 dl 1.35 private Object extract() {
118     Object x = item;
119     item = null;
120     return x;
121 dl 1.31 }
122    
123     /**
124 dl 1.35 * Try to cancel on interrupt; if so rethrowing,
125     * else setting interrupt state
126 dl 1.31 */
127 dl 1.35 private void checkCancellationOnInterrupt(InterruptedException ie)
128     throws InterruptedException {
129 dl 1.39 if (release(CANCEL))
130 dl 1.35 throw ie;
131     Thread.currentThread().interrupt();
132 dl 1.31 }
133 dl 1.2
134     /**
135     * Fill in the slot created by the taker and signal taker to
136     * continue.
137     */
138 dl 1.31 boolean setItem(Object x) {
139 dl 1.35 item = x; // can place in slot even if cancelled
140 dl 1.39 return release(ACK);
141 dl 1.2 }
142    
143     /**
144     * Remove item from slot created by putter and signal putter
145     * to continue.
146     */
147 dl 1.31 Object getItem() {
148 dl 1.39 return (release(ACK))? extract() : null;
149 dl 1.35 }
150    
151     /**
152     * Wait for a taker to take item placed by putter.
153     */
154     void waitForTake() throws InterruptedException {
155     try {
156 dl 1.39 acquireInterruptibly(0);
157 dl 1.35 } catch (InterruptedException ie) {
158     checkCancellationOnInterrupt(ie);
159     }
160     }
161    
162     /**
163     * Wait for a putter to put item placed by taker.
164     */
165     Object waitForPut() throws InterruptedException {
166     try {
167 dl 1.39 acquireInterruptibly(0);
168 dl 1.35 } catch (InterruptedException ie) {
169     checkCancellationOnInterrupt(ie);
170     }
171     return extract();
172 dl 1.31 }
173    
174     /**
175 dl 1.33 * Wait for a taker to take item placed by putter or time out.
176 dl 1.31 */
177 dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
178 dl 1.2 try {
179 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
180     release(CANCEL))
181 dl 1.33 return false;
182 dl 1.31 } catch (InterruptedException ie) {
183 dl 1.35 checkCancellationOnInterrupt(ie);
184 dl 1.2 }
185 dl 1.35 return true;
186 dl 1.2 }
187    
188     /**
189 dl 1.33 * Wait for a putter to put item placed by taker, or time out.
190 dl 1.31 */
191 dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
192 dl 1.31 try {
193 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
194     release(CANCEL))
195 dl 1.33 return null;
196 dl 1.31 } catch (InterruptedException ie) {
197 dl 1.35 checkCancellationOnInterrupt(ie);
198 dl 1.2 }
199 dl 1.35 return extract();
200 dl 1.2 }
201    
202     }
203    
204     /**
205     * Simple FIFO queue class to hold waiting puts/takes.
206     **/
207 dl 1.41 static final class WaitQueue<E> {
208 dl 1.2 Node head;
209     Node last;
210    
211 tim 1.10 Node enq(Object x) {
212 dl 1.2 Node p = new Node(x);
213 tim 1.10 if (last == null)
214 dl 1.2 last = head = p;
215 tim 1.10 else
216 dl 1.2 last = last.next = p;
217     return p;
218     }
219    
220     Node deq() {
221     Node p = head;
222 dl 1.35 if (p != null) {
223     if ((head = p.next) == null)
224     last = null;
225     p.next = null;
226     }
227 dl 1.2 return p;
228     }
229     }
230    
231     /**
232 dl 1.35 * Creates a <tt>SynchronousQueue</tt>.
233 tim 1.10 */
234 dl 1.35 public SynchronousQueue() {}
235 dl 1.2
236    
237     /**
238 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
239     * another thread to receive it.
240     * @param o the element to add
241     * @throws InterruptedException if interrupted while waiting.
242     * @throws NullPointerException if the specified element is <tt>null</tt>.
243 tim 1.10 */
244 dl 1.35 public void put(E o) throws InterruptedException {
245     if (o == null) throw new NullPointerException();
246     final ReentrantLock qlock = this.qlock;
247    
248 dl 1.2 for (;;) {
249     Node node;
250     boolean mustWait;
251     qlock.lockInterruptibly();
252     try {
253 dl 1.35 node = waitingTakes.deq();
254 dl 1.2 if ( (mustWait = (node == null)) )
255 dl 1.35 node = waitingPuts.enq(o);
256 tim 1.14 } finally {
257 dl 1.2 qlock.unlock();
258     }
259    
260 dl 1.31 if (mustWait) {
261 dl 1.35 node.waitForTake();
262     return;
263 dl 1.2 }
264    
265 dl 1.35 else if (node.setItem(o))
266     return;
267 dl 1.2
268 dl 1.35 // else taker cancelled, so retry
269     }
270 tim 1.1 }
271    
272 dholmes 1.11 /**
273 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
274 dl 1.18 * up to the specified wait time for another thread to receive it.
275     * @param o the element to add
276     * @param timeout how long to wait before giving up, in units of
277     * <tt>unit</tt>
278     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
279     * <tt>timeout</tt> parameter
280 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
281     * the specified waiting time elapses before a taker appears.
282 dl 1.18 * @throws InterruptedException if interrupted while waiting.
283     * @throws NullPointerException if the specified element is <tt>null</tt>.
284 dholmes 1.11 */
285 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
286 dl 1.35 if (o == null) throw new NullPointerException();
287     long nanos = unit.toNanos(timeout);
288     final ReentrantLock qlock = this.qlock;
289     for (;;) {
290     Node node;
291     boolean mustWait;
292     qlock.lockInterruptibly();
293     try {
294     node = waitingTakes.deq();
295     if ( (mustWait = (node == null)) )
296     node = waitingPuts.enq(o);
297     } finally {
298     qlock.unlock();
299     }
300    
301     if (mustWait)
302     return node.waitForTake(nanos);
303    
304     else if (node.setItem(o))
305     return true;
306    
307     // else taker cancelled, so retry
308     }
309    
310 tim 1.1 }
311    
312 dl 1.2
313 dholmes 1.11 /**
314     * Retrieves and removes the head of this queue, waiting if necessary
315     * for another thread to insert it.
316 dl 1.40 * @throws InterruptedException if interrupted while waiting.
317 dholmes 1.11 * @return the head of this queue
318     */
319 dl 1.2 public E take() throws InterruptedException {
320 dl 1.35 final ReentrantLock qlock = this.qlock;
321     for (;;) {
322     Node node;
323     boolean mustWait;
324    
325     qlock.lockInterruptibly();
326     try {
327     node = waitingPuts.deq();
328     if ( (mustWait = (node == null)) )
329     node = waitingTakes.enq(null);
330     } finally {
331     qlock.unlock();
332     }
333    
334 dl 1.36 if (mustWait) {
335     Object x = node.waitForPut();
336     return (E)x;
337     }
338 dl 1.35 else {
339     Object x = node.getItem();
340     if (x != null)
341     return (E)x;
342     // else cancelled, so retry
343     }
344     }
345 tim 1.1 }
346 dl 1.2
347 dholmes 1.11 /**
348     * Retrieves and removes the head of this queue, waiting
349     * if necessary up to the specified wait time, for another thread
350     * to insert it.
351 dl 1.18 * @param timeout how long to wait before giving up, in units of
352     * <tt>unit</tt>
353     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
354     * <tt>timeout</tt> parameter
355     * @return the head of this queue, or <tt>null</tt> if the
356     * specified waiting time elapses before an element is present.
357     * @throws InterruptedException if interrupted while waiting.
358 dholmes 1.11 */
359 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
360 dl 1.35 long nanos = unit.toNanos(timeout);
361     final ReentrantLock qlock = this.qlock;
362    
363     for (;;) {
364     Node node;
365     boolean mustWait;
366    
367     qlock.lockInterruptibly();
368     try {
369     node = waitingPuts.deq();
370     if ( (mustWait = (node == null)) )
371     node = waitingTakes.enq(null);
372     } finally {
373     qlock.unlock();
374     }
375    
376 dl 1.36 if (mustWait) {
377     Object x = node.waitForPut(nanos);
378     return (E)x;
379     }
380 dl 1.35 else {
381     Object x = node.getItem();
382     if (x != null)
383     return (E)x;
384     // else cancelled, so retry
385     }
386     }
387 tim 1.1 }
388 dl 1.2
389     // Untimed nonblocking versions
390    
391 dl 1.18 /**
392 dl 1.20 * Inserts the specified element into this queue, if another thread is
393 dl 1.18 * waiting to receive it.
394     *
395     * @param o the element to add.
396     * @return <tt>true</tt> if it was possible to add the element to
397     * this queue, else <tt>false</tt>
398     * @throws NullPointerException if the specified element is <tt>null</tt>
399     */
400 dholmes 1.11 public boolean offer(E o) {
401     if (o == null) throw new NullPointerException();
402 dl 1.27 final ReentrantLock qlock = this.qlock;
403 tim 1.10
404     for (;;) {
405 tim 1.26 Node node;
406 dl 1.2 qlock.lock();
407     try {
408     node = waitingTakes.deq();
409 tim 1.14 } finally {
410 dl 1.2 qlock.unlock();
411     }
412     if (node == null)
413     return false;
414 tim 1.10
415 dl 1.31 else if (node.setItem(o))
416 dl 1.2 return true;
417     // else retry
418     }
419 tim 1.1 }
420 dl 1.2
421 dl 1.18 /**
422     * Retrieves and removes the head of this queue, if another thread
423     * is currently making an element available.
424     *
425     * @return the head of this queue, or <tt>null</tt> if no
426     * element is available.
427     */
428 dl 1.2 public E poll() {
429 dl 1.27 final ReentrantLock qlock = this.qlock;
430 dl 1.2 for (;;) {
431     Node node;
432     qlock.lock();
433     try {
434     node = waitingPuts.deq();
435 tim 1.14 } finally {
436 dl 1.2 qlock.unlock();
437     }
438     if (node == null)
439     return null;
440    
441     else {
442 dl 1.31 Object x = node.getItem();
443 dl 1.2 if (x != null)
444     return (E)x;
445     // else retry
446     }
447     }
448 tim 1.1 }
449 dl 1.2
450 dl 1.5 /**
451 dholmes 1.11 * Always returns <tt>true</tt>.
452     * A <tt>SynchronousQueue</tt> has no internal capacity.
453     * @return <tt>true</tt>
454 dl 1.5 */
455     public boolean isEmpty() {
456     return true;
457     }
458    
459     /**
460 dholmes 1.11 * Always returns zero.
461     * A <tt>SynchronousQueue</tt> has no internal capacity.
462 dl 1.5 * @return zero.
463     */
464     public int size() {
465     return 0;
466 tim 1.1 }
467 dl 1.2
468 dl 1.5 /**
469 dholmes 1.11 * Always returns zero.
470     * A <tt>SynchronousQueue</tt> has no internal capacity.
471 dl 1.5 * @return zero.
472     */
473     public int remainingCapacity() {
474     return 0;
475     }
476    
477     /**
478 dholmes 1.11 * Does nothing.
479     * A <tt>SynchronousQueue</tt> has no internal capacity.
480     */
481     public void clear() {}
482    
483     /**
484     * Always returns <tt>false</tt>.
485     * A <tt>SynchronousQueue</tt> has no internal capacity.
486 dl 1.18 * @param o the element
487 dholmes 1.11 * @return <tt>false</tt>
488     */
489     public boolean contains(Object o) {
490     return false;
491     }
492    
493     /**
494 dl 1.18 * Always returns <tt>false</tt>.
495     * A <tt>SynchronousQueue</tt> has no internal capacity.
496     *
497     * @param o the element to remove
498     * @return <tt>false</tt>
499     */
500     public boolean remove(Object o) {
501     return false;
502     }
503    
504     /**
505 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
506 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
507 dl 1.18 * @param c the collection
508 dl 1.16 * @return <tt>false</tt> unless given collection is empty
509 dholmes 1.11 */
510 dl 1.12 public boolean containsAll(Collection<?> c) {
511 dl 1.16 return c.isEmpty();
512 dholmes 1.11 }
513    
514     /**
515     * Always returns <tt>false</tt>.
516     * A <tt>SynchronousQueue</tt> has no internal capacity.
517 dl 1.18 * @param c the collection
518 dholmes 1.11 * @return <tt>false</tt>
519     */
520 dl 1.12 public boolean removeAll(Collection<?> c) {
521 dholmes 1.11 return false;
522     }
523    
524     /**
525     * Always returns <tt>false</tt>.
526     * A <tt>SynchronousQueue</tt> has no internal capacity.
527 dl 1.18 * @param c the collection
528 dholmes 1.11 * @return <tt>false</tt>
529     */
530 dl 1.12 public boolean retainAll(Collection<?> c) {
531 dholmes 1.11 return false;
532     }
533    
534     /**
535     * Always returns <tt>null</tt>.
536     * A <tt>SynchronousQueue</tt> does not return elements
537 dl 1.5 * unless actively waited on.
538 dholmes 1.11 * @return <tt>null</tt>
539 dl 1.5 */
540     public E peek() {
541     return null;
542     }
543    
544    
545     static class EmptyIterator<E> implements Iterator<E> {
546 dl 1.2 public boolean hasNext() {
547     return false;
548     }
549     public E next() {
550     throw new NoSuchElementException();
551     }
552     public void remove() {
553 dl 1.17 throw new IllegalStateException();
554 dl 1.2 }
555 tim 1.1 }
556 dl 1.2
557 dl 1.5 /**
558 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
559 tim 1.13 * <tt>false</tt>.
560     *
561 dholmes 1.11 * @return an empty iterator
562 dl 1.5 */
563 dl 1.2 public Iterator<E> iterator() {
564 dl 1.5 return new EmptyIterator<E>();
565 tim 1.1 }
566    
567 dl 1.2
568 dl 1.5 /**
569 dholmes 1.11 * Returns a zero-length array.
570     * @return a zero-length array
571 dl 1.5 */
572 dl 1.3 public Object[] toArray() {
573 dl 1.25 return new Object[0];
574 tim 1.1 }
575    
576 dholmes 1.11 /**
577     * Sets the zeroeth element of the specified array to <tt>null</tt>
578     * (if the array has non-zero length) and returns it.
579 dl 1.40 * @param a the array
580 dholmes 1.11 * @return the specified array
581     */
582 dl 1.2 public <T> T[] toArray(T[] a) {
583     if (a.length > 0)
584     a[0] = null;
585     return a;
586     }
587 dl 1.21
588    
589     public int drainTo(Collection<? super E> c) {
590     if (c == null)
591     throw new NullPointerException();
592     if (c == this)
593     throw new IllegalArgumentException();
594     int n = 0;
595     E e;
596     while ( (e = poll()) != null) {
597     c.add(e);
598     ++n;
599     }
600     return n;
601     }
602    
603     public int drainTo(Collection<? super E> c, int maxElements) {
604     if (c == null)
605     throw new NullPointerException();
606     if (c == this)
607     throw new IllegalArgumentException();
608     int n = 0;
609     E e;
610     while (n < maxElements && (e = poll()) != null) {
611     c.add(e);
612     ++n;
613     }
614     return n;
615     }
616 tim 1.1 }
617 dholmes 1.11
618    
619    
620    
621