ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.14
Committed: Fri Aug 8 20:05:07 2003 UTC (20 years, 9 months ago) by tim
Branch: MAIN
Changes since 1.13: +14 -28 lines
Log Message:
Scrunched catch, finally, else clauses.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dholmes 1.11 * A {@linkplain BlockingQueue blocking queue} in which each <tt>put</tt>
13     * must wait for a <tt>take</tt>, and vice versa.
14     * A synchronous queue does not have any internal capacity - in particular
15     * it does not have a capacity of one. You cannot <tt>peek</tt> at a
16     * synchronous queue because an element is only present when you try to take
17     * it; you cannot add an element (using any method) unless another thread is
18     * trying to remove it; you cannot iterate as there is nothing to iterate.
19     * The <em>head</em> of the queue is the element that the first queued thread
20     * is trying to add to the queue; if there are no queued threads then no
21     * element is being added and the head is <tt>null</tt>.
22     * Many of the <tt>Collection</tt> methods make little or no sense for a
23     * synchronous queue.
24     * This queue does not permit <tt>null</tt> elements.
25     * <p>Synchronous queues are similar to rendezvous channels used
26 dl 1.5 * in CSP and Ada. They are well suited for handoff designs, in which
27     * an object running in one thread must synch up with an object
28     * running in another thread in order to hand it some information,
29     * event, or task.
30 dl 1.6 * @since 1.5
31     * @author Doug Lea
32     **/
33 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
34 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
35    
36 dl 1.2 /*
37     This implementation divides actions into two cases for puts:
38    
39 tim 1.10 * An arriving putter that does not already have a waiting taker
40 dl 1.2 creates a node holding item, and then waits for a taker to take it.
41     * An arriving putter that does already have a waiting taker fills
42     the slot node created by the taker, and notifies it to continue.
43    
44     And symmetrically, two for takes:
45    
46     * An arriving taker that does not already have a waiting putter
47     creates an empty slot node, and then waits for a putter to fill it.
48     * An arriving taker that does already have a waiting putter takes
49     item from the node created by the putter, and notifies it to continue.
50    
51     This requires keeping two simple queues: waitingPuts and waitingTakes.
52 tim 1.10
53 dl 1.2 When a put or take waiting for the actions of its counterpart
54     aborts due to interruption or timeout, it marks the node
55     it created as "CANCELLED", which causes its counterpart to retry
56     the entire put or take sequence.
57     */
58    
59 tim 1.10 /**
60 dl 1.2 * Special marker used in queue nodes to indicate that
61     * the thread waiting for a change in the node has timed out
62     * or been interrupted.
63     **/
64     private static final Object CANCELLED = new Object();
65    
66     /*
67     * Note that all fields are transient final, so there is
68     * no explicit serialization code.
69     */
70    
71     private transient final WaitQueue waitingPuts = new WaitQueue();
72     private transient final WaitQueue waitingTakes = new WaitQueue();
73     private transient final ReentrantLock qlock = new ReentrantLock();
74    
75     /**
76     * Nodes each maintain an item and handle waits and signals for
77     * getting and setting it. The class opportunistically extends
78     * ReentrantLock to save an extra object allocation per
79     * rendezvous.
80     */
81     private static class Node extends ReentrantLock {
82 dl 1.6 /** Condition to wait on for other party; lazily constructed */
83 dl 1.2 Condition done;
84 dl 1.6 /** The item being transferred */
85 dl 1.2 Object item;
86 dl 1.6 /** Next node in wait queue */
87 dl 1.2 Node next;
88 dl 1.6
89 dl 1.2 Node(Object x) { item = x; }
90    
91     /**
92     * Fill in the slot created by the taker and signal taker to
93     * continue.
94     */
95     boolean set(Object x) {
96     this.lock();
97     try {
98     if (item != CANCELLED) {
99     item = x;
100     if (done != null)
101     done.signal();
102     return true;
103 tim 1.14 } else // taker has cancelled
104 dl 1.2 return false;
105 tim 1.14 } finally {
106 dl 1.2 this.unlock();
107     }
108     }
109    
110     /**
111     * Remove item from slot created by putter and signal putter
112     * to continue.
113     */
114     Object get() {
115     this.lock();
116     try {
117     Object x = item;
118     if (x != CANCELLED) {
119     item = null;
120     next = null;
121     if (done != null)
122     done.signal();
123     return x;
124 tim 1.14 } else
125 dl 1.2 return null;
126 tim 1.14 } finally {
127 dl 1.2 this.unlock();
128     }
129     }
130    
131     /**
132     * Wait for a taker to take item placed by putter, or time out.
133     */
134     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
135     this.lock();
136     try {
137     for (;;) {
138     if (item == null)
139     return true;
140     if (timed) {
141     if (nanos <= 0) {
142     item = CANCELLED;
143     return false;
144     }
145     }
146 dl 1.9 if (done == null)
147     done = this.newCondition();
148     if (timed)
149     nanos = done.awaitNanos(nanos);
150     else
151     done.await();
152     }
153 tim 1.14 } catch (InterruptedException ie) {
154 dl 1.9 // If taken, return normally but set interrupt status
155     if (item == null) {
156     Thread.currentThread().interrupt();
157     return true;
158 tim 1.14 } else {
159 dl 1.9 item = CANCELLED;
160     done.signal(); // propagate signal
161     throw ie;
162 dl 1.2 }
163 tim 1.14 } finally {
164 dl 1.2 this.unlock();
165     }
166     }
167    
168     /**
169     * Wait for a putter to put item placed by taker, or time out.
170     */
171     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
172     this.lock();
173     try {
174     for (;;) {
175     Object x = item;
176     if (x != null) {
177     item = null;
178     next = null;
179     return x;
180     }
181     if (timed) {
182     if (nanos <= 0) {
183     item = CANCELLED;
184     return null;
185     }
186     }
187 dl 1.9 if (done == null)
188     done = this.newCondition();
189     if (timed)
190     nanos = done.awaitNanos(nanos);
191     else
192     done.await();
193     }
194 tim 1.14 } catch (InterruptedException ie) {
195 dl 1.9 Object y = item;
196     if (y != null) {
197     item = null;
198     next = null;
199     Thread.currentThread().interrupt();
200     return y;
201 tim 1.14 } else {
202 dl 1.9 item = CANCELLED;
203     done.signal(); // propagate signal
204     throw ie;
205 dl 1.2 }
206 tim 1.14 } finally {
207 dl 1.2 this.unlock();
208     }
209     }
210     }
211    
212     /**
213     * Simple FIFO queue class to hold waiting puts/takes.
214     **/
215     private static class WaitQueue<E> {
216     Node head;
217     Node last;
218    
219 tim 1.10 Node enq(Object x) {
220 dl 1.2 Node p = new Node(x);
221 tim 1.10 if (last == null)
222 dl 1.2 last = head = p;
223 tim 1.10 else
224 dl 1.2 last = last.next = p;
225     return p;
226     }
227    
228     Node deq() {
229     Node p = head;
230 tim 1.10 if (p != null && (head = p.next) == null)
231 dl 1.2 last = null;
232     return p;
233     }
234     }
235    
236     /**
237     * Main put algorithm, used by put, timed offer
238 tim 1.10 */
239 dl 1.2 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
240 dl 1.6 if (x == null) throw new NullPointerException();
241 tim 1.10 for (;;) {
242 dl 1.2 Node node;
243     boolean mustWait;
244 tim 1.10
245 dl 1.2 qlock.lockInterruptibly();
246     try {
247     node = waitingTakes.deq();
248     if ( (mustWait = (node == null)) )
249     node = waitingPuts.enq(x);
250 tim 1.14 } finally {
251 dl 1.2 qlock.unlock();
252     }
253    
254     if (mustWait)
255     return node.waitForTake(timed, nanos);
256    
257     else if (node.set(x))
258     return true;
259    
260     // else taker cancelled, so retry
261     }
262 tim 1.1 }
263 dl 1.2
264     /**
265     * Main take algorithm, used by take, timed poll
266 tim 1.10 */
267 dl 1.2 private E doTake(boolean timed, long nanos) throws InterruptedException {
268     for (;;) {
269     Node node;
270     boolean mustWait;
271    
272     qlock.lockInterruptibly();
273     try {
274     node = waitingPuts.deq();
275     if ( (mustWait = (node == null)) )
276     node = waitingTakes.enq(null);
277 tim 1.14 } finally {
278 dl 1.2 qlock.unlock();
279     }
280    
281 dl 1.9 if (mustWait)
282 dl 1.2 return (E)node.waitForPut(timed, nanos);
283    
284     else {
285     E x = (E)node.get();
286     if (x != null)
287     return x;
288     // else cancelled, so retry
289     }
290     }
291 tim 1.1 }
292 dl 1.2
293 dholmes 1.11 /**
294 tim 1.13 * Creates a <tt>SynchronousQueue</tt>.
295 dholmes 1.11 */
296 dl 1.2 public SynchronousQueue() {}
297    
298    
299 dholmes 1.11 /**
300     * Adds the specified element to this queue, waiting if necessary for
301     * another thread to receive it.
302     * @throws NullPointerException {@inheritDoc}
303     */
304     public void put(E o) throws InterruptedException {
305     doPut(o, false, 0);
306 tim 1.1 }
307    
308 dholmes 1.11 /**
309     * Adds the specified element to this queue, waiting if necessary up to the
310     * specified wait time for another thread to receive it.
311     * @return <tt>true</tt> if successful, or <tt>false</tt> if
312     * the specified waiting time elapses before a taker appears.
313     * @throws NullPointerException {@inheritDoc}
314     */
315 dl 1.2 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
316     return doPut(x, true, unit.toNanos(timeout));
317 tim 1.1 }
318    
319 dl 1.2
320 dholmes 1.11 /**
321     * Retrieves and removes the head of this queue, waiting if necessary
322     * for another thread to insert it.
323     * @return the head of this queue
324     */
325 dl 1.2 public E take() throws InterruptedException {
326     return doTake(false, 0);
327 tim 1.1 }
328 dl 1.2
329 dholmes 1.11 /**
330     * Retrieves and removes the head of this queue, waiting
331     * if necessary up to the specified wait time, for another thread
332     * to insert it.
333     */
334 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
335     return doTake(true, unit.toNanos(timeout));
336 tim 1.1 }
337 dl 1.2
338     // Untimed nonblocking versions
339    
340 dholmes 1.11 /**
341     * Adds the specified element to this queue, if another thread is
342     * waiting to receive it.
343     *
344     * @throws NullpointerException {@inheritDoc}
345     */
346     public boolean offer(E o) {
347     if (o == null) throw new NullPointerException();
348 tim 1.10
349     for (;;) {
350 dl 1.2 qlock.lock();
351     Node node;
352     try {
353     node = waitingTakes.deq();
354 tim 1.14 } finally {
355 dl 1.2 qlock.unlock();
356     }
357     if (node == null)
358     return false;
359 tim 1.10
360 dholmes 1.11 else if (node.set(o))
361 dl 1.2 return true;
362     // else retry
363     }
364 tim 1.1 }
365 dl 1.2
366 dholmes 1.11
367 dl 1.2 public E poll() {
368     for (;;) {
369     Node node;
370     qlock.lock();
371     try {
372     node = waitingPuts.deq();
373 tim 1.14 } finally {
374 dl 1.2 qlock.unlock();
375     }
376     if (node == null)
377     return null;
378    
379     else {
380     Object x = node.get();
381     if (x != null)
382     return (E)x;
383     // else retry
384     }
385     }
386 tim 1.1 }
387 dl 1.2
388 dholmes 1.11
389     /**
390     * Adds the specified element to this queue.
391     * @return <tt>true</tt> (as per the general contract of
392     * <tt>Collection.add</tt>).
393     *
394     * @throws NullPointerException {@inheritDoc}
395     * @throws IllegalStateException if no thread is waiting to receive the
396     * element being added
397     */
398     public boolean add(E o) {
399     return super.add(o);
400     }
401    
402    
403     /**
404     * Adds all of the elements in the specified collection to this queue.
405     * The behavior of this operation is undefined if
406     * the specified collection is modified while the operation is in
407     * progress. (This implies that the behavior of this call is undefined if
408     * the specified collection is this queue, and this queue is nonempty.)
409     * <p>
410     * This implementation iterates over the specified collection, and adds
411     * each object returned by the iterator to this collection, in turn.
412     * @throws NullPointerException {@inheritDoc}
413     * @throws IllegalStateException if no thread is waiting to receive the
414     * element being added
415     */
416     public boolean addAll(Collection<? extends E> c) {
417     return super.addAll(c);
418     }
419    
420 dl 1.5 /**
421 dholmes 1.11 * Always returns <tt>true</tt>.
422     * A <tt>SynchronousQueue</tt> has no internal capacity.
423     * @return <tt>true</tt>
424 dl 1.5 */
425     public boolean isEmpty() {
426     return true;
427     }
428    
429     /**
430 dholmes 1.11 * Always returns zero.
431     * A <tt>SynchronousQueue</tt> has no internal capacity.
432 dl 1.5 * @return zero.
433     */
434     public int size() {
435     return 0;
436 tim 1.1 }
437 dl 1.2
438 dl 1.5 /**
439 dholmes 1.11 * Always returns zero.
440     * A <tt>SynchronousQueue</tt> has no internal capacity.
441 dl 1.5 * @return zero.
442     */
443     public int remainingCapacity() {
444     return 0;
445     }
446    
447     /**
448 dholmes 1.11 * Does nothing.
449     * A <tt>SynchronousQueue</tt> has no internal capacity.
450     */
451     public void clear() {}
452    
453     /**
454     * Always returns <tt>false</tt>.
455     * A <tt>SynchronousQueue</tt> has no internal capacity.
456     * @return <tt>false</tt>
457     */
458     public boolean contains(Object o) {
459     return false;
460     }
461    
462     /**
463     * Always returns <tt>false</tt>.
464     * A <tt>SynchronousQueue</tt> has no internal capacity.
465     * @return <tt>false</tt>
466     */
467 dl 1.12 public boolean containsAll(Collection<?> c) {
468 dholmes 1.11 return false;
469     }
470    
471     /**
472     * Always returns <tt>false</tt>.
473     * A <tt>SynchronousQueue</tt> has no internal capacity.
474     * @return <tt>false</tt>
475     */
476 dl 1.12 public boolean removeAll(Collection<?> c) {
477 dholmes 1.11 return false;
478     }
479    
480     /**
481     * Always returns <tt>false</tt>.
482     * A <tt>SynchronousQueue</tt> has no internal capacity.
483     * @return <tt>false</tt>
484     */
485 dl 1.12 public boolean retainAll(Collection<?> c) {
486 dholmes 1.11 return false;
487     }
488    
489     /**
490     * Always returns <tt>null</tt>.
491     * A <tt>SynchronousQueue</tt> does not return elements
492 dl 1.5 * unless actively waited on.
493 dholmes 1.11 * @return <tt>null</tt>
494 dl 1.5 */
495     public E peek() {
496     return null;
497     }
498    
499    
500     static class EmptyIterator<E> implements Iterator<E> {
501 dl 1.2 public boolean hasNext() {
502     return false;
503     }
504     public E next() {
505     throw new NoSuchElementException();
506     }
507     public void remove() {
508     throw new UnsupportedOperationException();
509     }
510 tim 1.1 }
511 dl 1.2
512 dl 1.5 /**
513 dholmes 1.11 * Returns an empty iterator: <tt>hasNext</tt> always returns
514 tim 1.13 * <tt>false</tt>.
515     *
516 dholmes 1.11 * @return an empty iterator
517 dl 1.5 */
518 dl 1.2 public Iterator<E> iterator() {
519 dl 1.5 return new EmptyIterator<E>();
520 tim 1.1 }
521    
522 dl 1.2
523 dl 1.5 /**
524 dholmes 1.11 * Returns a zero-length array.
525     * @return a zero-length array
526 dl 1.5 */
527 dl 1.3 public Object[] toArray() {
528 tim 1.10 return (E[]) new Object[0];
529 tim 1.1 }
530    
531 dholmes 1.11 /**
532     * Sets the zeroeth element of the specified array to <tt>null</tt>
533     * (if the array has non-zero length) and returns it.
534     * @return the specified array
535     */
536 dl 1.2 public <T> T[] toArray(T[] a) {
537     if (a.length > 0)
538     a[0] = null;
539     return a;
540     }
541 tim 1.1 }
542 dholmes 1.11
543    
544    
545    
546