ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.20
Committed: Mon Sep 15 12:02:46 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.19: +2 -2 lines
Log Message:
Fix some javadoc inconsistencies

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14     * synchronous queue does not have any internal capacity - in
15     * particular it does not have a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to take it; you cannot add an element (using
18     * any method) unless another thread is trying to remove it; you
19     * cannot iterate as there is nothing to iterate. The <em>head</em>
20     * of the queue is the element that the first queued thread is trying
21     * to add to the queue; if there are no queued threads then no element
22     * is being added and the head is <tt>null</tt>. For purposes of
23     * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 dl 1.19 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 dl 1.18 * queue does not permit <tt>null</tt> elements.
26     *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29     * object running in one thread must synch up with an object running
30     * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
33     * of the {@link Collection} and {@link Iterator} interfaces.
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36     **/
37 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
38 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
39 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
40 tim 1.1
41 dl 1.2 /*
42     This implementation divides actions into two cases for puts:
43    
44 tim 1.10 * An arriving putter that does not already have a waiting taker
45 dl 1.2 creates a node holding item, and then waits for a taker to take it.
46     * An arriving putter that does already have a waiting taker fills
47     the slot node created by the taker, and notifies it to continue.
48    
49     And symmetrically, two for takes:
50    
51     * An arriving taker that does not already have a waiting putter
52     creates an empty slot node, and then waits for a putter to fill it.
53     * An arriving taker that does already have a waiting putter takes
54     item from the node created by the putter, and notifies it to continue.
55    
56     This requires keeping two simple queues: waitingPuts and waitingTakes.
57 tim 1.10
58 dl 1.2 When a put or take waiting for the actions of its counterpart
59     aborts due to interruption or timeout, it marks the node
60     it created as "CANCELLED", which causes its counterpart to retry
61     the entire put or take sequence.
62     */
63    
64 tim 1.10 /**
65 dl 1.2 * Special marker used in queue nodes to indicate that
66     * the thread waiting for a change in the node has timed out
67     * or been interrupted.
68     **/
69     private static final Object CANCELLED = new Object();
70    
71     /*
72     * Note that all fields are transient final, so there is
73     * no explicit serialization code.
74     */
75    
76     private transient final WaitQueue waitingPuts = new WaitQueue();
77     private transient final WaitQueue waitingTakes = new WaitQueue();
78     private transient final ReentrantLock qlock = new ReentrantLock();
79    
80     /**
81     * Nodes each maintain an item and handle waits and signals for
82     * getting and setting it. The class opportunistically extends
83     * ReentrantLock to save an extra object allocation per
84     * rendezvous.
85     */
86     private static class Node extends ReentrantLock {
87 dl 1.6 /** Condition to wait on for other party; lazily constructed */
88 dl 1.2 Condition done;
89 dl 1.6 /** The item being transferred */
90 dl 1.2 Object item;
91 dl 1.6 /** Next node in wait queue */
92 dl 1.2 Node next;
93 dl 1.6
94 dl 1.2 Node(Object x) { item = x; }
95    
96     /**
97     * Fill in the slot created by the taker and signal taker to
98     * continue.
99     */
100     boolean set(Object x) {
101     this.lock();
102     try {
103     if (item != CANCELLED) {
104     item = x;
105     if (done != null)
106     done.signal();
107     return true;
108 tim 1.14 } else // taker has cancelled
109 dl 1.2 return false;
110 tim 1.14 } finally {
111 dl 1.2 this.unlock();
112     }
113     }
114    
115     /**
116     * Remove item from slot created by putter and signal putter
117     * to continue.
118     */
119     Object get() {
120     this.lock();
121     try {
122     Object x = item;
123     if (x != CANCELLED) {
124     item = null;
125     next = null;
126     if (done != null)
127     done.signal();
128     return x;
129 tim 1.14 } else
130 dl 1.2 return null;
131 tim 1.14 } finally {
132 dl 1.2 this.unlock();
133     }
134     }
135    
136     /**
137     * Wait for a taker to take item placed by putter, or time out.
138     */
139     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
140     this.lock();
141     try {
142     for (;;) {
143     if (item == null)
144     return true;
145     if (timed) {
146     if (nanos <= 0) {
147     item = CANCELLED;
148     return false;
149     }
150     }
151 dl 1.9 if (done == null)
152     done = this.newCondition();
153     if (timed)
154     nanos = done.awaitNanos(nanos);
155     else
156     done.await();
157     }
158 tim 1.14 } catch (InterruptedException ie) {
159 dl 1.9 // If taken, return normally but set interrupt status
160     if (item == null) {
161     Thread.currentThread().interrupt();
162     return true;
163 tim 1.14 } else {
164 dl 1.9 item = CANCELLED;
165     done.signal(); // propagate signal
166     throw ie;
167 dl 1.2 }
168 tim 1.14 } finally {
169 dl 1.2 this.unlock();
170     }
171     }
172    
173     /**
174     * Wait for a putter to put item placed by taker, or time out.
175     */
176     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
177     this.lock();
178     try {
179     for (;;) {
180     Object x = item;
181     if (x != null) {
182     item = null;
183     next = null;
184     return x;
185     }
186     if (timed) {
187     if (nanos <= 0) {
188     item = CANCELLED;
189     return null;
190     }
191     }
192 dl 1.9 if (done == null)
193     done = this.newCondition();
194     if (timed)
195     nanos = done.awaitNanos(nanos);
196     else
197     done.await();
198     }
199 tim 1.14 } catch (InterruptedException ie) {
200 dl 1.9 Object y = item;
201     if (y != null) {
202     item = null;
203     next = null;
204     Thread.currentThread().interrupt();
205     return y;
206 tim 1.14 } else {
207 dl 1.9 item = CANCELLED;
208     done.signal(); // propagate signal
209     throw ie;
210 dl 1.2 }
211 tim 1.14 } finally {
212 dl 1.2 this.unlock();
213     }
214     }
215     }
216    
217     /**
218     * Simple FIFO queue class to hold waiting puts/takes.
219     **/
220     private static class WaitQueue<E> {
221     Node head;
222     Node last;
223    
224 tim 1.10 Node enq(Object x) {
225 dl 1.2 Node p = new Node(x);
226 tim 1.10 if (last == null)
227 dl 1.2 last = head = p;
228 tim 1.10 else
229 dl 1.2 last = last.next = p;
230     return p;
231     }
232    
233     Node deq() {
234     Node p = head;
235 tim 1.10 if (p != null && (head = p.next) == null)
236 dl 1.2 last = null;
237     return p;
238     }
239     }
240    
241     /**
242     * Main put algorithm, used by put, timed offer
243 tim 1.10 */
244 dl 1.2 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
245 dl 1.6 if (x == null) throw new NullPointerException();
246 tim 1.10 for (;;) {
247 dl 1.2 Node node;
248     boolean mustWait;
249 tim 1.10
250 dl 1.2 qlock.lockInterruptibly();
251     try {
252     node = waitingTakes.deq();
253     if ( (mustWait = (node == null)) )
254     node = waitingPuts.enq(x);
255 tim 1.14 } finally {
256 dl 1.2 qlock.unlock();
257     }
258    
259     if (mustWait)
260     return node.waitForTake(timed, nanos);
261    
262     else if (node.set(x))
263     return true;
264    
265     // else taker cancelled, so retry
266     }
267 tim 1.1 }
268 dl 1.2
269     /**
270     * Main take algorithm, used by take, timed poll
271 tim 1.10 */
272 dl 1.2 private E doTake(boolean timed, long nanos) throws InterruptedException {
273     for (;;) {
274     Node node;
275     boolean mustWait;
276    
277     qlock.lockInterruptibly();
278     try {
279     node = waitingPuts.deq();
280     if ( (mustWait = (node == null)) )
281     node = waitingTakes.enq(null);
282 tim 1.14 } finally {
283 dl 1.2 qlock.unlock();
284     }
285    
286 dl 1.9 if (mustWait)
287 dl 1.2 return (E)node.waitForPut(timed, nanos);
288    
289     else {
290     E x = (E)node.get();
291     if (x != null)
292     return x;
293     // else cancelled, so retry
294     }
295     }
296 tim 1.1 }
297 dl 1.2
298 dholmes 1.11 /**
299 tim 1.13 * Creates a <tt>SynchronousQueue</tt>.
300 dholmes 1.11 */
301 dl 1.2 public SynchronousQueue() {}
302    
303    
304 dholmes 1.11 /**
305     * Adds the specified element to this queue, waiting if necessary for
306     * another thread to receive it.
307 dl 1.18 * @param o the element to add
308     * @throws InterruptedException if interrupted while waiting.
309     * @throws NullPointerException if the specified element is <tt>null</tt>.
310 dholmes 1.11 */
311     public void put(E o) throws InterruptedException {
312     doPut(o, false, 0);
313 tim 1.1 }
314    
315 dholmes 1.11 /**
316 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
317 dl 1.18 * up to the specified wait time for another thread to receive it.
318     * @param o the element to add
319     * @param timeout how long to wait before giving up, in units of
320     * <tt>unit</tt>
321     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
322     * <tt>timeout</tt> parameter
323 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
324     * the specified waiting time elapses before a taker appears.
325 dl 1.18 * @throws InterruptedException if interrupted while waiting.
326     * @throws NullPointerException if the specified element is <tt>null</tt>.
327 dholmes 1.11 */
328 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
329     return doPut(o, true, unit.toNanos(timeout));
330 tim 1.1 }
331    
332 dl 1.2
333 dholmes 1.11 /**
334     * Retrieves and removes the head of this queue, waiting if necessary
335     * for another thread to insert it.
336     * @return the head of this queue
337     */
338 dl 1.2 public E take() throws InterruptedException {
339     return doTake(false, 0);
340 tim 1.1 }
341 dl 1.2
342 dholmes 1.11 /**
343     * Retrieves and removes the head of this queue, waiting
344     * if necessary up to the specified wait time, for another thread
345     * to insert it.
346 dl 1.18 * @param timeout how long to wait before giving up, in units of
347     * <tt>unit</tt>
348     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
349     * <tt>timeout</tt> parameter
350     * @return the head of this queue, or <tt>null</tt> if the
351     * specified waiting time elapses before an element is present.
352     * @throws InterruptedException if interrupted while waiting.
353 dholmes 1.11 */
354 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
355     return doTake(true, unit.toNanos(timeout));
356 tim 1.1 }
357 dl 1.2
358     // Untimed nonblocking versions
359    
360 dl 1.18 /**
361 dl 1.20 * Inserts the specified element into this queue, if another thread is
362 dl 1.18 * waiting to receive it.
363     *
364     * @param o the element to add.
365     * @return <tt>true</tt> if it was possible to add the element to
366     * this queue, else <tt>false</tt>
367     * @throws NullPointerException if the specified element is <tt>null</tt>
368     */
369 dholmes 1.11 public boolean offer(E o) {
370     if (o == null) throw new NullPointerException();
371 tim 1.10
372     for (;;) {
373 dl 1.2 qlock.lock();
374     Node node;
375     try {
376     node = waitingTakes.deq();
377 tim 1.14 } finally {
378 dl 1.2 qlock.unlock();
379     }
380     if (node == null)
381     return false;
382 tim 1.10
383 dholmes 1.11 else if (node.set(o))
384 dl 1.2 return true;
385     // else retry
386     }
387 tim 1.1 }
388 dl 1.2
389 dl 1.18 /**
390     * Retrieves and removes the head of this queue, if another thread
391     * is currently making an element available.
392     *
393     * @return the head of this queue, or <tt>null</tt> if no
394     * element is available.
395     */
396 dl 1.2 public E poll() {
397     for (;;) {
398     Node node;
399     qlock.lock();
400     try {
401     node = waitingPuts.deq();
402 tim 1.14 } finally {
403 dl 1.2 qlock.unlock();
404     }
405     if (node == null)
406     return null;
407    
408     else {
409     Object x = node.get();
410     if (x != null)
411     return (E)x;
412     // else retry
413     }
414     }
415 tim 1.1 }
416 dl 1.2
417 dl 1.5 /**
418 dholmes 1.11 * Always returns <tt>true</tt>.
419     * A <tt>SynchronousQueue</tt> has no internal capacity.
420     * @return <tt>true</tt>
421 dl 1.5 */
422     public boolean isEmpty() {
423     return true;
424     }
425    
426     /**
427 dholmes 1.11 * Always returns zero.
428     * A <tt>SynchronousQueue</tt> has no internal capacity.
429 dl 1.5 * @return zero.
430     */
431     public int size() {
432     return 0;
433 tim 1.1 }
434 dl 1.2
435 dl 1.5 /**
436 dholmes 1.11 * Always returns zero.
437     * A <tt>SynchronousQueue</tt> has no internal capacity.
438 dl 1.5 * @return zero.
439     */
440     public int remainingCapacity() {
441     return 0;
442     }
443    
444     /**
445 dholmes 1.11 * Does nothing.
446     * A <tt>SynchronousQueue</tt> has no internal capacity.
447     */
448     public void clear() {}
449    
450     /**
451     * Always returns <tt>false</tt>.
452     * A <tt>SynchronousQueue</tt> has no internal capacity.
453 dl 1.18 * @param o the element
454 dholmes 1.11 * @return <tt>false</tt>
455     */
456     public boolean contains(Object o) {
457     return false;
458     }
459    
460     /**
461 dl 1.18 * Always returns <tt>false</tt>.
462     * A <tt>SynchronousQueue</tt> has no internal capacity.
463     *
464     * @param o the element to remove
465     * @return <tt>false</tt>
466     */
467     public boolean remove(Object o) {
468     return false;
469     }
470    
471     /**
472 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
473 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
474 dl 1.18 * @param c the collection
475 dl 1.16 * @return <tt>false</tt> unless given collection is empty
476 dholmes 1.11 */
477 dl 1.12 public boolean containsAll(Collection<?> c) {
478 dl 1.16 return c.isEmpty();
479 dholmes 1.11 }
480    
481     /**
482     * Always returns <tt>false</tt>.
483     * A <tt>SynchronousQueue</tt> has no internal capacity.
484 dl 1.18 * @param c the collection
485 dholmes 1.11 * @return <tt>false</tt>
486     */
487 dl 1.12 public boolean removeAll(Collection<?> c) {
488 dholmes 1.11 return false;
489     }
490    
491     /**
492     * Always returns <tt>false</tt>.
493     * A <tt>SynchronousQueue</tt> has no internal capacity.
494 dl 1.18 * @param c the collection
495 dholmes 1.11 * @return <tt>false</tt>
496     */
497 dl 1.12 public boolean retainAll(Collection<?> c) {
498 dholmes 1.11 return false;
499     }
500    
501     /**
502     * Always returns <tt>null</tt>.
503     * A <tt>SynchronousQueue</tt> does not return elements
504 dl 1.5 * unless actively waited on.
505 dholmes 1.11 * @return <tt>null</tt>
506 dl 1.5 */
507     public E peek() {
508     return null;
509     }
510    
511    
512     static class EmptyIterator<E> implements Iterator<E> {
513 dl 1.2 public boolean hasNext() {
514     return false;
515     }
516     public E next() {
517     throw new NoSuchElementException();
518     }
519     public void remove() {
520 dl 1.17 throw new IllegalStateException();
521 dl 1.2 }
522 tim 1.1 }
523 dl 1.2
524 dl 1.5 /**
525 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
526 tim 1.13 * <tt>false</tt>.
527     *
528 dholmes 1.11 * @return an empty iterator
529 dl 1.5 */
530 dl 1.2 public Iterator<E> iterator() {
531 dl 1.5 return new EmptyIterator<E>();
532 tim 1.1 }
533    
534 dl 1.2
535 dl 1.5 /**
536 dholmes 1.11 * Returns a zero-length array.
537     * @return a zero-length array
538 dl 1.5 */
539 dl 1.3 public Object[] toArray() {
540 tim 1.10 return (E[]) new Object[0];
541 tim 1.1 }
542    
543 dholmes 1.11 /**
544     * Sets the zeroeth element of the specified array to <tt>null</tt>
545     * (if the array has non-zero length) and returns it.
546     * @return the specified array
547     */
548 dl 1.2 public <T> T[] toArray(T[] a) {
549     if (a.length > 0)
550     a[0] = null;
551     return a;
552     }
553 tim 1.1 }
554 dholmes 1.11
555    
556    
557    
558