ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.17
Committed: Fri Aug 29 22:38:12 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.16: +1 -1 lines
Log Message:
Fix grammar

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dholmes 1.11 * A {@linkplain BlockingQueue blocking queue} in which each <tt>put</tt>
13     * must wait for a <tt>take</tt>, and vice versa.
14     * A synchronous queue does not have any internal capacity - in particular
15     * it does not have a capacity of one. You cannot <tt>peek</tt> at a
16     * synchronous queue because an element is only present when you try to take
17     * it; you cannot add an element (using any method) unless another thread is
18     * trying to remove it; you cannot iterate as there is nothing to iterate.
19     * The <em>head</em> of the queue is the element that the first queued thread
20     * is trying to add to the queue; if there are no queued threads then no
21     * element is being added and the head is <tt>null</tt>.
22     * Many of the <tt>Collection</tt> methods make little or no sense for a
23     * synchronous queue.
24     * This queue does not permit <tt>null</tt> elements.
25     * <p>Synchronous queues are similar to rendezvous channels used
26 dl 1.5 * in CSP and Ada. They are well suited for handoff designs, in which
27     * an object running in one thread must synch up with an object
28     * running in another thread in order to hand it some information,
29     * event, or task.
30 dl 1.6 * @since 1.5
31     * @author Doug Lea
32     **/
33 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
34 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
35 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
36 tim 1.1
37 dl 1.2 /*
38     This implementation divides actions into two cases for puts:
39    
40 tim 1.10 * An arriving putter that does not already have a waiting taker
41 dl 1.2 creates a node holding item, and then waits for a taker to take it.
42     * An arriving putter that does already have a waiting taker fills
43     the slot node created by the taker, and notifies it to continue.
44    
45     And symmetrically, two for takes:
46    
47     * An arriving taker that does not already have a waiting putter
48     creates an empty slot node, and then waits for a putter to fill it.
49     * An arriving taker that does already have a waiting putter takes
50     item from the node created by the putter, and notifies it to continue.
51    
52     This requires keeping two simple queues: waitingPuts and waitingTakes.
53 tim 1.10
54 dl 1.2 When a put or take waiting for the actions of its counterpart
55     aborts due to interruption or timeout, it marks the node
56     it created as "CANCELLED", which causes its counterpart to retry
57     the entire put or take sequence.
58     */
59    
60 tim 1.10 /**
61 dl 1.2 * Special marker used in queue nodes to indicate that
62     * the thread waiting for a change in the node has timed out
63     * or been interrupted.
64     **/
65     private static final Object CANCELLED = new Object();
66    
67     /*
68     * Note that all fields are transient final, so there is
69     * no explicit serialization code.
70     */
71    
72     private transient final WaitQueue waitingPuts = new WaitQueue();
73     private transient final WaitQueue waitingTakes = new WaitQueue();
74     private transient final ReentrantLock qlock = new ReentrantLock();
75    
76     /**
77     * Nodes each maintain an item and handle waits and signals for
78     * getting and setting it. The class opportunistically extends
79     * ReentrantLock to save an extra object allocation per
80     * rendezvous.
81     */
82     private static class Node extends ReentrantLock {
83 dl 1.6 /** Condition to wait on for other party; lazily constructed */
84 dl 1.2 Condition done;
85 dl 1.6 /** The item being transferred */
86 dl 1.2 Object item;
87 dl 1.6 /** Next node in wait queue */
88 dl 1.2 Node next;
89 dl 1.6
90 dl 1.2 Node(Object x) { item = x; }
91    
92     /**
93     * Fill in the slot created by the taker and signal taker to
94     * continue.
95     */
96     boolean set(Object x) {
97     this.lock();
98     try {
99     if (item != CANCELLED) {
100     item = x;
101     if (done != null)
102     done.signal();
103     return true;
104 tim 1.14 } else // taker has cancelled
105 dl 1.2 return false;
106 tim 1.14 } finally {
107 dl 1.2 this.unlock();
108     }
109     }
110    
111     /**
112     * Remove item from slot created by putter and signal putter
113     * to continue.
114     */
115     Object get() {
116     this.lock();
117     try {
118     Object x = item;
119     if (x != CANCELLED) {
120     item = null;
121     next = null;
122     if (done != null)
123     done.signal();
124     return x;
125 tim 1.14 } else
126 dl 1.2 return null;
127 tim 1.14 } finally {
128 dl 1.2 this.unlock();
129     }
130     }
131    
132     /**
133     * Wait for a taker to take item placed by putter, or time out.
134     */
135     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
136     this.lock();
137     try {
138     for (;;) {
139     if (item == null)
140     return true;
141     if (timed) {
142     if (nanos <= 0) {
143     item = CANCELLED;
144     return false;
145     }
146     }
147 dl 1.9 if (done == null)
148     done = this.newCondition();
149     if (timed)
150     nanos = done.awaitNanos(nanos);
151     else
152     done.await();
153     }
154 tim 1.14 } catch (InterruptedException ie) {
155 dl 1.9 // If taken, return normally but set interrupt status
156     if (item == null) {
157     Thread.currentThread().interrupt();
158     return true;
159 tim 1.14 } else {
160 dl 1.9 item = CANCELLED;
161     done.signal(); // propagate signal
162     throw ie;
163 dl 1.2 }
164 tim 1.14 } finally {
165 dl 1.2 this.unlock();
166     }
167     }
168    
169     /**
170     * Wait for a putter to put item placed by taker, or time out.
171     */
172     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
173     this.lock();
174     try {
175     for (;;) {
176     Object x = item;
177     if (x != null) {
178     item = null;
179     next = null;
180     return x;
181     }
182     if (timed) {
183     if (nanos <= 0) {
184     item = CANCELLED;
185     return null;
186     }
187     }
188 dl 1.9 if (done == null)
189     done = this.newCondition();
190     if (timed)
191     nanos = done.awaitNanos(nanos);
192     else
193     done.await();
194     }
195 tim 1.14 } catch (InterruptedException ie) {
196 dl 1.9 Object y = item;
197     if (y != null) {
198     item = null;
199     next = null;
200     Thread.currentThread().interrupt();
201     return y;
202 tim 1.14 } else {
203 dl 1.9 item = CANCELLED;
204     done.signal(); // propagate signal
205     throw ie;
206 dl 1.2 }
207 tim 1.14 } finally {
208 dl 1.2 this.unlock();
209     }
210     }
211     }
212    
213     /**
214     * Simple FIFO queue class to hold waiting puts/takes.
215     **/
216     private static class WaitQueue<E> {
217     Node head;
218     Node last;
219    
220 tim 1.10 Node enq(Object x) {
221 dl 1.2 Node p = new Node(x);
222 tim 1.10 if (last == null)
223 dl 1.2 last = head = p;
224 tim 1.10 else
225 dl 1.2 last = last.next = p;
226     return p;
227     }
228    
229     Node deq() {
230     Node p = head;
231 tim 1.10 if (p != null && (head = p.next) == null)
232 dl 1.2 last = null;
233     return p;
234     }
235     }
236    
237     /**
238     * Main put algorithm, used by put, timed offer
239 tim 1.10 */
240 dl 1.2 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
241 dl 1.6 if (x == null) throw new NullPointerException();
242 tim 1.10 for (;;) {
243 dl 1.2 Node node;
244     boolean mustWait;
245 tim 1.10
246 dl 1.2 qlock.lockInterruptibly();
247     try {
248     node = waitingTakes.deq();
249     if ( (mustWait = (node == null)) )
250     node = waitingPuts.enq(x);
251 tim 1.14 } finally {
252 dl 1.2 qlock.unlock();
253     }
254    
255     if (mustWait)
256     return node.waitForTake(timed, nanos);
257    
258     else if (node.set(x))
259     return true;
260    
261     // else taker cancelled, so retry
262     }
263 tim 1.1 }
264 dl 1.2
265     /**
266     * Main take algorithm, used by take, timed poll
267 tim 1.10 */
268 dl 1.2 private E doTake(boolean timed, long nanos) throws InterruptedException {
269     for (;;) {
270     Node node;
271     boolean mustWait;
272    
273     qlock.lockInterruptibly();
274     try {
275     node = waitingPuts.deq();
276     if ( (mustWait = (node == null)) )
277     node = waitingTakes.enq(null);
278 tim 1.14 } finally {
279 dl 1.2 qlock.unlock();
280     }
281    
282 dl 1.9 if (mustWait)
283 dl 1.2 return (E)node.waitForPut(timed, nanos);
284    
285     else {
286     E x = (E)node.get();
287     if (x != null)
288     return x;
289     // else cancelled, so retry
290     }
291     }
292 tim 1.1 }
293 dl 1.2
294 dholmes 1.11 /**
295 tim 1.13 * Creates a <tt>SynchronousQueue</tt>.
296 dholmes 1.11 */
297 dl 1.2 public SynchronousQueue() {}
298    
299    
300 dholmes 1.11 /**
301     * Adds the specified element to this queue, waiting if necessary for
302     * another thread to receive it.
303     * @throws NullPointerException {@inheritDoc}
304     */
305     public void put(E o) throws InterruptedException {
306     doPut(o, false, 0);
307 tim 1.1 }
308    
309 dholmes 1.11 /**
310     * Adds the specified element to this queue, waiting if necessary up to the
311     * specified wait time for another thread to receive it.
312     * @return <tt>true</tt> if successful, or <tt>false</tt> if
313     * the specified waiting time elapses before a taker appears.
314     * @throws NullPointerException {@inheritDoc}
315     */
316 dl 1.2 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
317     return doPut(x, true, unit.toNanos(timeout));
318 tim 1.1 }
319    
320 dl 1.2
321 dholmes 1.11 /**
322     * Retrieves and removes the head of this queue, waiting if necessary
323     * for another thread to insert it.
324     * @return the head of this queue
325     */
326 dl 1.2 public E take() throws InterruptedException {
327     return doTake(false, 0);
328 tim 1.1 }
329 dl 1.2
330 dholmes 1.11 /**
331     * Retrieves and removes the head of this queue, waiting
332     * if necessary up to the specified wait time, for another thread
333     * to insert it.
334     */
335 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
336     return doTake(true, unit.toNanos(timeout));
337 tim 1.1 }
338 dl 1.2
339     // Untimed nonblocking versions
340    
341 dholmes 1.11 /**
342     * Adds the specified element to this queue, if another thread is
343     * waiting to receive it.
344     *
345     * @throws NullpointerException {@inheritDoc}
346     */
347     public boolean offer(E o) {
348     if (o == null) throw new NullPointerException();
349 tim 1.10
350     for (;;) {
351 dl 1.2 qlock.lock();
352     Node node;
353     try {
354     node = waitingTakes.deq();
355 tim 1.14 } finally {
356 dl 1.2 qlock.unlock();
357     }
358     if (node == null)
359     return false;
360 tim 1.10
361 dholmes 1.11 else if (node.set(o))
362 dl 1.2 return true;
363     // else retry
364     }
365 tim 1.1 }
366 dl 1.2
367 dholmes 1.11
368 dl 1.2 public E poll() {
369     for (;;) {
370     Node node;
371     qlock.lock();
372     try {
373     node = waitingPuts.deq();
374 tim 1.14 } finally {
375 dl 1.2 qlock.unlock();
376     }
377     if (node == null)
378     return null;
379    
380     else {
381     Object x = node.get();
382     if (x != null)
383     return (E)x;
384     // else retry
385     }
386     }
387 tim 1.1 }
388 dl 1.2
389 dholmes 1.11
390     /**
391     * Adds the specified element to this queue.
392     * @return <tt>true</tt> (as per the general contract of
393     * <tt>Collection.add</tt>).
394     *
395     * @throws NullPointerException {@inheritDoc}
396     * @throws IllegalStateException if no thread is waiting to receive the
397     * element being added
398     */
399     public boolean add(E o) {
400     return super.add(o);
401     }
402    
403    
404     /**
405     * Adds all of the elements in the specified collection to this queue.
406     * The behavior of this operation is undefined if
407     * the specified collection is modified while the operation is in
408     * progress. (This implies that the behavior of this call is undefined if
409     * the specified collection is this queue, and this queue is nonempty.)
410     * <p>
411     * This implementation iterates over the specified collection, and adds
412     * each object returned by the iterator to this collection, in turn.
413     * @throws NullPointerException {@inheritDoc}
414     * @throws IllegalStateException if no thread is waiting to receive the
415     * element being added
416     */
417     public boolean addAll(Collection<? extends E> c) {
418     return super.addAll(c);
419     }
420    
421 dl 1.5 /**
422 dholmes 1.11 * Always returns <tt>true</tt>.
423     * A <tt>SynchronousQueue</tt> has no internal capacity.
424     * @return <tt>true</tt>
425 dl 1.5 */
426     public boolean isEmpty() {
427     return true;
428     }
429    
430     /**
431 dholmes 1.11 * Always returns zero.
432     * A <tt>SynchronousQueue</tt> has no internal capacity.
433 dl 1.5 * @return zero.
434     */
435     public int size() {
436     return 0;
437 tim 1.1 }
438 dl 1.2
439 dl 1.5 /**
440 dholmes 1.11 * Always returns zero.
441     * A <tt>SynchronousQueue</tt> has no internal capacity.
442 dl 1.5 * @return zero.
443     */
444     public int remainingCapacity() {
445     return 0;
446     }
447    
448     /**
449 dholmes 1.11 * Does nothing.
450     * A <tt>SynchronousQueue</tt> has no internal capacity.
451     */
452     public void clear() {}
453    
454     /**
455     * Always returns <tt>false</tt>.
456     * A <tt>SynchronousQueue</tt> has no internal capacity.
457     * @return <tt>false</tt>
458     */
459     public boolean contains(Object o) {
460     return false;
461     }
462    
463     /**
464 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
465 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
466 dl 1.16 * @return <tt>false</tt> unless given collection is empty
467 dholmes 1.11 */
468 dl 1.12 public boolean containsAll(Collection<?> c) {
469 dl 1.16 return c.isEmpty();
470 dholmes 1.11 }
471    
472     /**
473     * Always returns <tt>false</tt>.
474     * A <tt>SynchronousQueue</tt> has no internal capacity.
475     * @return <tt>false</tt>
476     */
477 dl 1.12 public boolean removeAll(Collection<?> c) {
478 dholmes 1.11 return false;
479     }
480    
481     /**
482     * Always returns <tt>false</tt>.
483     * A <tt>SynchronousQueue</tt> has no internal capacity.
484     * @return <tt>false</tt>
485     */
486 dl 1.12 public boolean retainAll(Collection<?> c) {
487 dholmes 1.11 return false;
488     }
489    
490     /**
491     * Always returns <tt>null</tt>.
492     * A <tt>SynchronousQueue</tt> does not return elements
493 dl 1.5 * unless actively waited on.
494 dholmes 1.11 * @return <tt>null</tt>
495 dl 1.5 */
496     public E peek() {
497     return null;
498     }
499    
500    
501     static class EmptyIterator<E> implements Iterator<E> {
502 dl 1.2 public boolean hasNext() {
503     return false;
504     }
505     public E next() {
506     throw new NoSuchElementException();
507     }
508     public void remove() {
509 dl 1.17 throw new IllegalStateException();
510 dl 1.2 }
511 tim 1.1 }
512 dl 1.2
513 dl 1.5 /**
514 dholmes 1.11 * Returns an empty iterator: <tt>hasNext</tt> always returns
515 tim 1.13 * <tt>false</tt>.
516     *
517 dholmes 1.11 * @return an empty iterator
518 dl 1.5 */
519 dl 1.2 public Iterator<E> iterator() {
520 dl 1.5 return new EmptyIterator<E>();
521 tim 1.1 }
522    
523 dl 1.2
524 dl 1.5 /**
525 dholmes 1.11 * Returns a zero-length array.
526     * @return a zero-length array
527 dl 1.5 */
528 dl 1.3 public Object[] toArray() {
529 tim 1.10 return (E[]) new Object[0];
530 tim 1.1 }
531    
532 dholmes 1.11 /**
533     * Sets the zeroeth element of the specified array to <tt>null</tt>
534     * (if the array has non-zero length) and returns it.
535     * @return the specified array
536     */
537 dl 1.2 public <T> T[] toArray(T[] a) {
538     if (a.length > 0)
539     a[0] = null;
540     return a;
541     }
542 tim 1.1 }
543 dholmes 1.11
544    
545    
546    
547