ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.41
Committed: Wed Jan 21 15:20:35 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.40: +2 -2 lines
Log Message:
doc improvements; consistent conventions for nested classes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14     * synchronous queue does not have any internal capacity - in
15     * particular it does not have a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to take it; you cannot add an element (using
18     * any method) unless another thread is trying to remove it; you
19     * cannot iterate as there is nothing to iterate. The <em>head</em>
20     * of the queue is the element that the first queued thread is trying
21     * to add to the queue; if there are no queued threads then no element
22     * is being added and the head is <tt>null</tt>. For purposes of
23     * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 dl 1.19 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 dl 1.18 * queue does not permit <tt>null</tt> elements.
26     *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
33     * of the {@link Collection} and {@link Iterator} interfaces.
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36 dl 1.24 * @param <E> the type of elements held in this collection
37 dl 1.23 */
38 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
39 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
40 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
41 tim 1.1
42 dl 1.2 /*
43     This implementation divides actions into two cases for puts:
44    
45 tim 1.10 * An arriving putter that does not already have a waiting taker
46 dl 1.2 creates a node holding item, and then waits for a taker to take it.
47     * An arriving putter that does already have a waiting taker fills
48     the slot node created by the taker, and notifies it to continue.
49    
50     And symmetrically, two for takes:
51    
52     * An arriving taker that does not already have a waiting putter
53     creates an empty slot node, and then waits for a putter to fill it.
54     * An arriving taker that does already have a waiting putter takes
55     item from the node created by the putter, and notifies it to continue.
56    
57     This requires keeping two simple queues: waitingPuts and waitingTakes.
58 tim 1.10
59 dl 1.2 When a put or take waiting for the actions of its counterpart
60     aborts due to interruption or timeout, it marks the node
61     it created as "CANCELLED", which causes its counterpart to retry
62     the entire put or take sequence.
63     */
64    
65    
66     /*
67     * Note that all fields are transient final, so there is
68     * no explicit serialization code.
69     */
70    
71     private transient final WaitQueue waitingPuts = new WaitQueue();
72     private transient final WaitQueue waitingTakes = new WaitQueue();
73     private transient final ReentrantLock qlock = new ReentrantLock();
74    
75     /**
76     * Nodes each maintain an item and handle waits and signals for
77 dl 1.31 * getting and setting it. The class extends
78     * AbstractQueuedSynchronizer to manage blocking, using AQS state
79     * 0 for waiting, 1 for ack, -1 for cancelled.
80 dl 1.2 */
81 dl 1.41 static final class Node extends AbstractQueuedSynchronizer {
82 dl 1.35 /** Synchronization state value representing that node acked */
83     private static final int ACK = 1;
84     /** Synchronization state value representing that node cancelled */
85     private static final int CANCEL = -1;
86    
87 dl 1.6 /** The item being transferred */
88 dl 1.2 Object item;
89 dl 1.6 /** Next node in wait queue */
90 dl 1.2 Node next;
91 dl 1.35
92     /** Create node with initial item */
93 dl 1.31 Node(Object x) { item = x; }
94    
95     /**
96     * Implements AQS base acquire to succeed if not in WAITING state
97     */
98 dl 1.39 protected boolean tryAcquire(int ignore) {
99 dl 1.35 return getState() != 0;
100 dl 1.31 }
101    
102     /**
103 dl 1.34 * Implements AQS base release to signal if state changed
104 dl 1.31 */
105 dl 1.39 protected boolean tryRelease(int newState) {
106 dl 1.35 return compareAndSetState(0, newState);
107 dl 1.31 }
108    
109     /**
110 dl 1.35 * Take item and null out field (for sake of GC)
111 dl 1.31 */
112 dl 1.35 private Object extract() {
113     Object x = item;
114     item = null;
115     return x;
116 dl 1.31 }
117    
118     /**
119 dl 1.35 * Try to cancel on interrupt; if so rethrowing,
120     * else setting interrupt state
121 dl 1.31 */
122 dl 1.35 private void checkCancellationOnInterrupt(InterruptedException ie)
123     throws InterruptedException {
124 dl 1.39 if (release(CANCEL))
125 dl 1.35 throw ie;
126     Thread.currentThread().interrupt();
127 dl 1.31 }
128 dl 1.2
129     /**
130     * Fill in the slot created by the taker and signal taker to
131     * continue.
132     */
133 dl 1.31 boolean setItem(Object x) {
134 dl 1.35 item = x; // can place in slot even if cancelled
135 dl 1.39 return release(ACK);
136 dl 1.2 }
137    
138     /**
139     * Remove item from slot created by putter and signal putter
140     * to continue.
141     */
142 dl 1.31 Object getItem() {
143 dl 1.39 return (release(ACK))? extract() : null;
144 dl 1.35 }
145    
146     /**
147     * Wait for a taker to take item placed by putter.
148     */
149     void waitForTake() throws InterruptedException {
150     try {
151 dl 1.39 acquireInterruptibly(0);
152 dl 1.35 } catch (InterruptedException ie) {
153     checkCancellationOnInterrupt(ie);
154     }
155     }
156    
157     /**
158     * Wait for a putter to put item placed by taker.
159     */
160     Object waitForPut() throws InterruptedException {
161     try {
162 dl 1.39 acquireInterruptibly(0);
163 dl 1.35 } catch (InterruptedException ie) {
164     checkCancellationOnInterrupt(ie);
165     }
166     return extract();
167 dl 1.31 }
168    
169     /**
170 dl 1.33 * Wait for a taker to take item placed by putter or time out.
171 dl 1.31 */
172 dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
173 dl 1.2 try {
174 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
175     release(CANCEL))
176 dl 1.33 return false;
177 dl 1.31 } catch (InterruptedException ie) {
178 dl 1.35 checkCancellationOnInterrupt(ie);
179 dl 1.2 }
180 dl 1.35 return true;
181 dl 1.2 }
182    
183     /**
184 dl 1.33 * Wait for a putter to put item placed by taker, or time out.
185 dl 1.31 */
186 dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
187 dl 1.31 try {
188 dl 1.39 if (!tryAcquireNanos(0, nanos) &&
189     release(CANCEL))
190 dl 1.33 return null;
191 dl 1.31 } catch (InterruptedException ie) {
192 dl 1.35 checkCancellationOnInterrupt(ie);
193 dl 1.2 }
194 dl 1.35 return extract();
195 dl 1.2 }
196    
197     }
198    
199     /**
200     * Simple FIFO queue class to hold waiting puts/takes.
201     **/
202 dl 1.41 static final class WaitQueue<E> {
203 dl 1.2 Node head;
204     Node last;
205    
206 tim 1.10 Node enq(Object x) {
207 dl 1.2 Node p = new Node(x);
208 tim 1.10 if (last == null)
209 dl 1.2 last = head = p;
210 tim 1.10 else
211 dl 1.2 last = last.next = p;
212     return p;
213     }
214    
215     Node deq() {
216     Node p = head;
217 dl 1.35 if (p != null) {
218     if ((head = p.next) == null)
219     last = null;
220     p.next = null;
221     }
222 dl 1.2 return p;
223     }
224     }
225    
226     /**
227 dl 1.35 * Creates a <tt>SynchronousQueue</tt>.
228 tim 1.10 */
229 dl 1.35 public SynchronousQueue() {}
230 dl 1.2
231    
232     /**
233 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
234     * another thread to receive it.
235     * @param o the element to add
236     * @throws InterruptedException if interrupted while waiting.
237     * @throws NullPointerException if the specified element is <tt>null</tt>.
238 tim 1.10 */
239 dl 1.35 public void put(E o) throws InterruptedException {
240     if (o == null) throw new NullPointerException();
241     final ReentrantLock qlock = this.qlock;
242    
243 dl 1.2 for (;;) {
244     Node node;
245     boolean mustWait;
246     qlock.lockInterruptibly();
247     try {
248 dl 1.35 node = waitingTakes.deq();
249 dl 1.2 if ( (mustWait = (node == null)) )
250 dl 1.35 node = waitingPuts.enq(o);
251 tim 1.14 } finally {
252 dl 1.2 qlock.unlock();
253     }
254    
255 dl 1.31 if (mustWait) {
256 dl 1.35 node.waitForTake();
257     return;
258 dl 1.2 }
259    
260 dl 1.35 else if (node.setItem(o))
261     return;
262 dl 1.2
263 dl 1.35 // else taker cancelled, so retry
264     }
265 tim 1.1 }
266    
267 dholmes 1.11 /**
268 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
269 dl 1.18 * up to the specified wait time for another thread to receive it.
270     * @param o the element to add
271     * @param timeout how long to wait before giving up, in units of
272     * <tt>unit</tt>
273     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
274     * <tt>timeout</tt> parameter
275 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
276     * the specified waiting time elapses before a taker appears.
277 dl 1.18 * @throws InterruptedException if interrupted while waiting.
278     * @throws NullPointerException if the specified element is <tt>null</tt>.
279 dholmes 1.11 */
280 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
281 dl 1.35 if (o == null) throw new NullPointerException();
282     long nanos = unit.toNanos(timeout);
283     final ReentrantLock qlock = this.qlock;
284     for (;;) {
285     Node node;
286     boolean mustWait;
287     qlock.lockInterruptibly();
288     try {
289     node = waitingTakes.deq();
290     if ( (mustWait = (node == null)) )
291     node = waitingPuts.enq(o);
292     } finally {
293     qlock.unlock();
294     }
295    
296     if (mustWait)
297     return node.waitForTake(nanos);
298    
299     else if (node.setItem(o))
300     return true;
301    
302     // else taker cancelled, so retry
303     }
304    
305 tim 1.1 }
306    
307 dl 1.2
308 dholmes 1.11 /**
309     * Retrieves and removes the head of this queue, waiting if necessary
310     * for another thread to insert it.
311 dl 1.40 * @throws InterruptedException if interrupted while waiting.
312 dholmes 1.11 * @return the head of this queue
313     */
314 dl 1.2 public E take() throws InterruptedException {
315 dl 1.35 final ReentrantLock qlock = this.qlock;
316     for (;;) {
317     Node node;
318     boolean mustWait;
319    
320     qlock.lockInterruptibly();
321     try {
322     node = waitingPuts.deq();
323     if ( (mustWait = (node == null)) )
324     node = waitingTakes.enq(null);
325     } finally {
326     qlock.unlock();
327     }
328    
329 dl 1.36 if (mustWait) {
330     Object x = node.waitForPut();
331     return (E)x;
332     }
333 dl 1.35 else {
334     Object x = node.getItem();
335     if (x != null)
336     return (E)x;
337     // else cancelled, so retry
338     }
339     }
340 tim 1.1 }
341 dl 1.2
342 dholmes 1.11 /**
343     * Retrieves and removes the head of this queue, waiting
344     * if necessary up to the specified wait time, for another thread
345     * to insert it.
346 dl 1.18 * @param timeout how long to wait before giving up, in units of
347     * <tt>unit</tt>
348     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
349     * <tt>timeout</tt> parameter
350     * @return the head of this queue, or <tt>null</tt> if the
351     * specified waiting time elapses before an element is present.
352     * @throws InterruptedException if interrupted while waiting.
353 dholmes 1.11 */
354 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
355 dl 1.35 long nanos = unit.toNanos(timeout);
356     final ReentrantLock qlock = this.qlock;
357    
358     for (;;) {
359     Node node;
360     boolean mustWait;
361    
362     qlock.lockInterruptibly();
363     try {
364     node = waitingPuts.deq();
365     if ( (mustWait = (node == null)) )
366     node = waitingTakes.enq(null);
367     } finally {
368     qlock.unlock();
369     }
370    
371 dl 1.36 if (mustWait) {
372     Object x = node.waitForPut(nanos);
373     return (E)x;
374     }
375 dl 1.35 else {
376     Object x = node.getItem();
377     if (x != null)
378     return (E)x;
379     // else cancelled, so retry
380     }
381     }
382 tim 1.1 }
383 dl 1.2
384     // Untimed nonblocking versions
385    
386 dl 1.18 /**
387 dl 1.20 * Inserts the specified element into this queue, if another thread is
388 dl 1.18 * waiting to receive it.
389     *
390     * @param o the element to add.
391     * @return <tt>true</tt> if it was possible to add the element to
392     * this queue, else <tt>false</tt>
393     * @throws NullPointerException if the specified element is <tt>null</tt>
394     */
395 dholmes 1.11 public boolean offer(E o) {
396     if (o == null) throw new NullPointerException();
397 dl 1.27 final ReentrantLock qlock = this.qlock;
398 tim 1.10
399     for (;;) {
400 tim 1.26 Node node;
401 dl 1.2 qlock.lock();
402     try {
403     node = waitingTakes.deq();
404 tim 1.14 } finally {
405 dl 1.2 qlock.unlock();
406     }
407     if (node == null)
408     return false;
409 tim 1.10
410 dl 1.31 else if (node.setItem(o))
411 dl 1.2 return true;
412     // else retry
413     }
414 tim 1.1 }
415 dl 1.2
416 dl 1.18 /**
417     * Retrieves and removes the head of this queue, if another thread
418     * is currently making an element available.
419     *
420     * @return the head of this queue, or <tt>null</tt> if no
421     * element is available.
422     */
423 dl 1.2 public E poll() {
424 dl 1.27 final ReentrantLock qlock = this.qlock;
425 dl 1.2 for (;;) {
426     Node node;
427     qlock.lock();
428     try {
429     node = waitingPuts.deq();
430 tim 1.14 } finally {
431 dl 1.2 qlock.unlock();
432     }
433     if (node == null)
434     return null;
435    
436     else {
437 dl 1.31 Object x = node.getItem();
438 dl 1.2 if (x != null)
439     return (E)x;
440     // else retry
441     }
442     }
443 tim 1.1 }
444 dl 1.2
445 dl 1.5 /**
446 dholmes 1.11 * Always returns <tt>true</tt>.
447     * A <tt>SynchronousQueue</tt> has no internal capacity.
448     * @return <tt>true</tt>
449 dl 1.5 */
450     public boolean isEmpty() {
451     return true;
452     }
453    
454     /**
455 dholmes 1.11 * Always returns zero.
456     * A <tt>SynchronousQueue</tt> has no internal capacity.
457 dl 1.5 * @return zero.
458     */
459     public int size() {
460     return 0;
461 tim 1.1 }
462 dl 1.2
463 dl 1.5 /**
464 dholmes 1.11 * Always returns zero.
465     * A <tt>SynchronousQueue</tt> has no internal capacity.
466 dl 1.5 * @return zero.
467     */
468     public int remainingCapacity() {
469     return 0;
470     }
471    
472     /**
473 dholmes 1.11 * Does nothing.
474     * A <tt>SynchronousQueue</tt> has no internal capacity.
475     */
476     public void clear() {}
477    
478     /**
479     * Always returns <tt>false</tt>.
480     * A <tt>SynchronousQueue</tt> has no internal capacity.
481 dl 1.18 * @param o the element
482 dholmes 1.11 * @return <tt>false</tt>
483     */
484     public boolean contains(Object o) {
485     return false;
486     }
487    
488     /**
489 dl 1.18 * Always returns <tt>false</tt>.
490     * A <tt>SynchronousQueue</tt> has no internal capacity.
491     *
492     * @param o the element to remove
493     * @return <tt>false</tt>
494     */
495     public boolean remove(Object o) {
496     return false;
497     }
498    
499     /**
500 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
501 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
502 dl 1.18 * @param c the collection
503 dl 1.16 * @return <tt>false</tt> unless given collection is empty
504 dholmes 1.11 */
505 dl 1.12 public boolean containsAll(Collection<?> c) {
506 dl 1.16 return c.isEmpty();
507 dholmes 1.11 }
508    
509     /**
510     * Always returns <tt>false</tt>.
511     * A <tt>SynchronousQueue</tt> has no internal capacity.
512 dl 1.18 * @param c the collection
513 dholmes 1.11 * @return <tt>false</tt>
514     */
515 dl 1.12 public boolean removeAll(Collection<?> c) {
516 dholmes 1.11 return false;
517     }
518    
519     /**
520     * Always returns <tt>false</tt>.
521     * A <tt>SynchronousQueue</tt> has no internal capacity.
522 dl 1.18 * @param c the collection
523 dholmes 1.11 * @return <tt>false</tt>
524     */
525 dl 1.12 public boolean retainAll(Collection<?> c) {
526 dholmes 1.11 return false;
527     }
528    
529     /**
530     * Always returns <tt>null</tt>.
531     * A <tt>SynchronousQueue</tt> does not return elements
532 dl 1.5 * unless actively waited on.
533 dholmes 1.11 * @return <tt>null</tt>
534 dl 1.5 */
535     public E peek() {
536     return null;
537     }
538    
539    
540     static class EmptyIterator<E> implements Iterator<E> {
541 dl 1.2 public boolean hasNext() {
542     return false;
543     }
544     public E next() {
545     throw new NoSuchElementException();
546     }
547     public void remove() {
548 dl 1.17 throw new IllegalStateException();
549 dl 1.2 }
550 tim 1.1 }
551 dl 1.2
552 dl 1.5 /**
553 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
554 tim 1.13 * <tt>false</tt>.
555     *
556 dholmes 1.11 * @return an empty iterator
557 dl 1.5 */
558 dl 1.2 public Iterator<E> iterator() {
559 dl 1.5 return new EmptyIterator<E>();
560 tim 1.1 }
561    
562 dl 1.2
563 dl 1.5 /**
564 dholmes 1.11 * Returns a zero-length array.
565     * @return a zero-length array
566 dl 1.5 */
567 dl 1.3 public Object[] toArray() {
568 dl 1.25 return new Object[0];
569 tim 1.1 }
570    
571 dholmes 1.11 /**
572     * Sets the zeroeth element of the specified array to <tt>null</tt>
573     * (if the array has non-zero length) and returns it.
574 dl 1.40 * @param a the array
575 dholmes 1.11 * @return the specified array
576     */
577 dl 1.2 public <T> T[] toArray(T[] a) {
578     if (a.length > 0)
579     a[0] = null;
580     return a;
581     }
582 dl 1.21
583    
584     public int drainTo(Collection<? super E> c) {
585     if (c == null)
586     throw new NullPointerException();
587     if (c == this)
588     throw new IllegalArgumentException();
589     int n = 0;
590     E e;
591     while ( (e = poll()) != null) {
592     c.add(e);
593     ++n;
594     }
595     return n;
596     }
597    
598     public int drainTo(Collection<? super E> c, int maxElements) {
599     if (c == null)
600     throw new NullPointerException();
601     if (c == this)
602     throw new IllegalArgumentException();
603     int n = 0;
604     E e;
605     while (n < maxElements && (e = poll()) != null) {
606     c.add(e);
607     ++n;
608     }
609     return n;
610     }
611 tim 1.1 }
612 dholmes 1.11
613    
614    
615    
616