ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.11
Committed: Wed Aug 6 01:57:53 2003 UTC (20 years, 10 months ago) by dholmes
Branch: MAIN
Changes since 1.10: +150 -16 lines
Log Message:
Final major updates to Collection related classes.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dholmes 1.11 * A {@linkplain BlockingQueue blocking queue} in which each <tt>put</tt>
13     * must wait for a <tt>take</tt>, and vice versa.
14     * A synchronous queue does not have any internal capacity - in particular
15     * it does not have a capacity of one. You cannot <tt>peek</tt> at a
16     * synchronous queue because an element is only present when you try to take
17     * it; you cannot add an element (using any method) unless another thread is
18     * trying to remove it; you cannot iterate as there is nothing to iterate.
19     * The <em>head</em> of the queue is the element that the first queued thread
20     * is trying to add to the queue; if there are no queued threads then no
21     * element is being added and the head is <tt>null</tt>.
22     * Many of the <tt>Collection</tt> methods make little or no sense for a
23     * synchronous queue.
24     * This queue does not permit <tt>null</tt> elements.
25     * <p>Synchronous queues are similar to rendezvous channels used
26 dl 1.5 * in CSP and Ada. They are well suited for handoff designs, in which
27     * an object running in one thread must synch up with an object
28     * running in another thread in order to hand it some information,
29     * event, or task.
30 dl 1.6 * @since 1.5
31     * @author Doug Lea
32     **/
33 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
34 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
35    
36 dl 1.2 /*
37     This implementation divides actions into two cases for puts:
38    
39 tim 1.10 * An arriving putter that does not already have a waiting taker
40 dl 1.2 creates a node holding item, and then waits for a taker to take it.
41     * An arriving putter that does already have a waiting taker fills
42     the slot node created by the taker, and notifies it to continue.
43    
44     And symmetrically, two for takes:
45    
46     * An arriving taker that does not already have a waiting putter
47     creates an empty slot node, and then waits for a putter to fill it.
48     * An arriving taker that does already have a waiting putter takes
49     item from the node created by the putter, and notifies it to continue.
50    
51     This requires keeping two simple queues: waitingPuts and waitingTakes.
52 tim 1.10
53 dl 1.2 When a put or take waiting for the actions of its counterpart
54     aborts due to interruption or timeout, it marks the node
55     it created as "CANCELLED", which causes its counterpart to retry
56     the entire put or take sequence.
57     */
58    
59 tim 1.10 /**
60 dl 1.2 * Special marker used in queue nodes to indicate that
61     * the thread waiting for a change in the node has timed out
62     * or been interrupted.
63     **/
64     private static final Object CANCELLED = new Object();
65    
66     /*
67     * Note that all fields are transient final, so there is
68     * no explicit serialization code.
69     */
70    
71     private transient final WaitQueue waitingPuts = new WaitQueue();
72     private transient final WaitQueue waitingTakes = new WaitQueue();
73     private transient final ReentrantLock qlock = new ReentrantLock();
74    
75     /**
76     * Nodes each maintain an item and handle waits and signals for
77     * getting and setting it. The class opportunistically extends
78     * ReentrantLock to save an extra object allocation per
79     * rendezvous.
80     */
81     private static class Node extends ReentrantLock {
82 dl 1.6 /** Condition to wait on for other party; lazily constructed */
83 dl 1.2 Condition done;
84 dl 1.6 /** The item being transferred */
85 dl 1.2 Object item;
86 dl 1.6 /** Next node in wait queue */
87 dl 1.2 Node next;
88 dl 1.6
89 dl 1.2 Node(Object x) { item = x; }
90    
91     /**
92     * Fill in the slot created by the taker and signal taker to
93     * continue.
94     */
95     boolean set(Object x) {
96     this.lock();
97     try {
98     if (item != CANCELLED) {
99     item = x;
100     if (done != null)
101     done.signal();
102     return true;
103     }
104     else // taker has cancelled
105     return false;
106     }
107     finally {
108     this.unlock();
109     }
110     }
111    
112     /**
113     * Remove item from slot created by putter and signal putter
114     * to continue.
115     */
116     Object get() {
117     this.lock();
118     try {
119     Object x = item;
120     if (x != CANCELLED) {
121     item = null;
122     next = null;
123     if (done != null)
124     done.signal();
125     return x;
126     }
127     else
128     return null;
129     }
130     finally {
131     this.unlock();
132     }
133     }
134    
135     /**
136     * Wait for a taker to take item placed by putter, or time out.
137     */
138     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
139     this.lock();
140     try {
141     for (;;) {
142     if (item == null)
143     return true;
144     if (timed) {
145     if (nanos <= 0) {
146     item = CANCELLED;
147     return false;
148     }
149     }
150 dl 1.9 if (done == null)
151     done = this.newCondition();
152     if (timed)
153     nanos = done.awaitNanos(nanos);
154     else
155     done.await();
156     }
157     }
158     catch (InterruptedException ie) {
159     // If taken, return normally but set interrupt status
160     if (item == null) {
161     Thread.currentThread().interrupt();
162     return true;
163     }
164     else {
165     item = CANCELLED;
166     done.signal(); // propagate signal
167     throw ie;
168 dl 1.2 }
169     }
170     finally {
171     this.unlock();
172     }
173     }
174    
175     /**
176     * Wait for a putter to put item placed by taker, or time out.
177     */
178     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
179     this.lock();
180     try {
181     for (;;) {
182     Object x = item;
183     if (x != null) {
184     item = null;
185     next = null;
186     return x;
187     }
188     if (timed) {
189     if (nanos <= 0) {
190     item = CANCELLED;
191     return null;
192     }
193     }
194 dl 1.9 if (done == null)
195     done = this.newCondition();
196     if (timed)
197     nanos = done.awaitNanos(nanos);
198     else
199     done.await();
200     }
201     }
202     catch (InterruptedException ie) {
203     Object y = item;
204     if (y != null) {
205     item = null;
206     next = null;
207     Thread.currentThread().interrupt();
208     return y;
209     }
210     else {
211     item = CANCELLED;
212     done.signal(); // propagate signal
213     throw ie;
214 dl 1.2 }
215     }
216     finally {
217     this.unlock();
218     }
219     }
220     }
221    
222     /**
223     * Simple FIFO queue class to hold waiting puts/takes.
224     **/
225     private static class WaitQueue<E> {
226     Node head;
227     Node last;
228    
229 tim 1.10 Node enq(Object x) {
230 dl 1.2 Node p = new Node(x);
231 tim 1.10 if (last == null)
232 dl 1.2 last = head = p;
233 tim 1.10 else
234 dl 1.2 last = last.next = p;
235     return p;
236     }
237    
238     Node deq() {
239     Node p = head;
240 tim 1.10 if (p != null && (head = p.next) == null)
241 dl 1.2 last = null;
242     return p;
243     }
244     }
245    
246     /**
247     * Main put algorithm, used by put, timed offer
248 tim 1.10 */
249 dl 1.2 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
250 dl 1.6 if (x == null) throw new NullPointerException();
251 tim 1.10 for (;;) {
252 dl 1.2 Node node;
253     boolean mustWait;
254 tim 1.10
255 dl 1.2 qlock.lockInterruptibly();
256     try {
257     node = waitingTakes.deq();
258     if ( (mustWait = (node == null)) )
259     node = waitingPuts.enq(x);
260     }
261     finally {
262     qlock.unlock();
263     }
264    
265     if (mustWait)
266     return node.waitForTake(timed, nanos);
267    
268     else if (node.set(x))
269     return true;
270    
271     // else taker cancelled, so retry
272     }
273 tim 1.1 }
274 dl 1.2
275     /**
276     * Main take algorithm, used by take, timed poll
277 tim 1.10 */
278 dl 1.2 private E doTake(boolean timed, long nanos) throws InterruptedException {
279     for (;;) {
280     Node node;
281     boolean mustWait;
282    
283     qlock.lockInterruptibly();
284     try {
285     node = waitingPuts.deq();
286     if ( (mustWait = (node == null)) )
287     node = waitingTakes.enq(null);
288     }
289     finally {
290     qlock.unlock();
291     }
292    
293 dl 1.9 if (mustWait)
294 dl 1.2 return (E)node.waitForPut(timed, nanos);
295    
296     else {
297     E x = (E)node.get();
298     if (x != null)
299     return x;
300     // else cancelled, so retry
301     }
302     }
303 tim 1.1 }
304 dl 1.2
305 dholmes 1.11 /**
306     * Creates a <tt>SynchronousQueue</tt>
307     */
308 dl 1.2 public SynchronousQueue() {}
309    
310    
311 dholmes 1.11 /**
312     * Adds the specified element to this queue, waiting if necessary for
313     * another thread to receive it.
314     * @throws NullPointerException {@inheritDoc}
315     */
316     public void put(E o) throws InterruptedException {
317     doPut(o, false, 0);
318 tim 1.1 }
319    
320 dholmes 1.11 /**
321     * Adds the specified element to this queue, waiting if necessary up to the
322     * specified wait time for another thread to receive it.
323     * @return <tt>true</tt> if successful, or <tt>false</tt> if
324     * the specified waiting time elapses before a taker appears.
325     * @throws NullPointerException {@inheritDoc}
326     */
327 dl 1.2 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
328     return doPut(x, true, unit.toNanos(timeout));
329 tim 1.1 }
330    
331 dl 1.2
332 dholmes 1.11 /**
333     * Retrieves and removes the head of this queue, waiting if necessary
334     * for another thread to insert it.
335     * @return the head of this queue
336     */
337 dl 1.2 public E take() throws InterruptedException {
338     return doTake(false, 0);
339 tim 1.1 }
340 dl 1.2
341 dholmes 1.11 /**
342     * Retrieves and removes the head of this queue, waiting
343     * if necessary up to the specified wait time, for another thread
344     * to insert it.
345     */
346 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
347     return doTake(true, unit.toNanos(timeout));
348 tim 1.1 }
349 dl 1.2
350     // Untimed nonblocking versions
351    
352 dholmes 1.11 /**
353     * Adds the specified element to this queue, if another thread is
354     * waiting to receive it.
355     *
356     * @throws NullpointerException {@inheritDoc}
357     */
358     public boolean offer(E o) {
359     if (o == null) throw new NullPointerException();
360 tim 1.10
361     for (;;) {
362 dl 1.2 qlock.lock();
363     Node node;
364     try {
365     node = waitingTakes.deq();
366     }
367     finally {
368     qlock.unlock();
369     }
370     if (node == null)
371     return false;
372 tim 1.10
373 dholmes 1.11 else if (node.set(o))
374 dl 1.2 return true;
375     // else retry
376     }
377 tim 1.1 }
378 dl 1.2
379 dholmes 1.11
380 dl 1.2 public E poll() {
381     for (;;) {
382     Node node;
383     qlock.lock();
384     try {
385     node = waitingPuts.deq();
386     }
387     finally {
388     qlock.unlock();
389     }
390     if (node == null)
391     return null;
392    
393     else {
394     Object x = node.get();
395     if (x != null)
396     return (E)x;
397     // else retry
398     }
399     }
400 tim 1.1 }
401 dl 1.2
402 dholmes 1.11
403     /**
404     * Adds the specified element to this queue.
405     * @return <tt>true</tt> (as per the general contract of
406     * <tt>Collection.add</tt>).
407     *
408     * @throws NullPointerException {@inheritDoc}
409     * @throws IllegalStateException if no thread is waiting to receive the
410     * element being added
411     */
412     public boolean add(E o) {
413     return super.add(o);
414     }
415    
416    
417     /**
418     * Adds all of the elements in the specified collection to this queue.
419     * The behavior of this operation is undefined if
420     * the specified collection is modified while the operation is in
421     * progress. (This implies that the behavior of this call is undefined if
422     * the specified collection is this queue, and this queue is nonempty.)
423     * <p>
424     * This implementation iterates over the specified collection, and adds
425     * each object returned by the iterator to this collection, in turn.
426     * @throws NullPointerException {@inheritDoc}
427     * @throws IllegalStateException if no thread is waiting to receive the
428     * element being added
429     */
430     public boolean addAll(Collection<? extends E> c) {
431     return super.addAll(c);
432     }
433    
434 dl 1.5 /**
435 dholmes 1.11 * Always returns <tt>true</tt>.
436     * A <tt>SynchronousQueue</tt> has no internal capacity.
437     * @return <tt>true</tt>
438 dl 1.5 */
439     public boolean isEmpty() {
440     return true;
441     }
442    
443     /**
444 dholmes 1.11 * Always returns zero.
445     * A <tt>SynchronousQueue</tt> has no internal capacity.
446 dl 1.5 * @return zero.
447     */
448     public int size() {
449     return 0;
450 tim 1.1 }
451 dl 1.2
452 dl 1.5 /**
453 dholmes 1.11 * Always returns zero.
454     * A <tt>SynchronousQueue</tt> has no internal capacity.
455 dl 1.5 * @return zero.
456     */
457     public int remainingCapacity() {
458     return 0;
459     }
460    
461     /**
462 dholmes 1.11 * Does nothing.
463     * A <tt>SynchronousQueue</tt> has no internal capacity.
464     */
465     public void clear() {}
466    
467     /**
468     * Always returns <tt>false</tt>.
469     * A <tt>SynchronousQueue</tt> has no internal capacity.
470     * @return <tt>false</tt>
471     */
472     public boolean contains(Object o) {
473     return false;
474     }
475    
476     /**
477     * Always returns <tt>false</tt>.
478     * A <tt>SynchronousQueue</tt> has no internal capacity.
479     * @return <tt>false</tt>
480     */
481     public boolean containsAll(Collection<? extends E> c) {
482     return false;
483     }
484    
485     /**
486     * Always returns <tt>false</tt>.
487     * A <tt>SynchronousQueue</tt> has no internal capacity.
488     * @return <tt>false</tt>
489     */
490     public boolean removeAll(Collection<? extends E> c) {
491     return false;
492     }
493    
494     /**
495     * Always returns <tt>false</tt>.
496     * A <tt>SynchronousQueue</tt> has no internal capacity.
497     * @return <tt>false</tt>
498     */
499     public boolean retainAll(Collection<? extends E> c) {
500     return false;
501     }
502    
503     /**
504     * Always returns <tt>null</tt>.
505     * A <tt>SynchronousQueue</tt> does not return elements
506 dl 1.5 * unless actively waited on.
507 dholmes 1.11 * @return <tt>null</tt>
508 dl 1.5 */
509     public E peek() {
510     return null;
511     }
512    
513    
514     static class EmptyIterator<E> implements Iterator<E> {
515 dl 1.2 public boolean hasNext() {
516     return false;
517     }
518     public E next() {
519     throw new NoSuchElementException();
520     }
521     public void remove() {
522     throw new UnsupportedOperationException();
523     }
524 tim 1.1 }
525 dl 1.2
526 dl 1.5 /**
527 dholmes 1.11 * Returns an empty iterator: <tt>hasNext</tt> always returns
528     * <tt>false</tt>
529     * @return an empty iterator
530 dl 1.5 */
531 dl 1.2 public Iterator<E> iterator() {
532 dl 1.5 return new EmptyIterator<E>();
533 tim 1.1 }
534    
535 dl 1.2
536 dl 1.5 /**
537 dholmes 1.11 * Returns a zero-length array.
538     * @return a zero-length array
539 dl 1.5 */
540 dl 1.3 public Object[] toArray() {
541 tim 1.10 return (E[]) new Object[0];
542 tim 1.1 }
543    
544 dholmes 1.11 /**
545     * Sets the zeroeth element of the specified array to <tt>null</tt>
546     * (if the array has non-zero length) and returns it.
547     * @return the specified array
548     */
549 dl 1.2 public <T> T[] toArray(T[] a) {
550     if (a.length > 0)
551     a[0] = null;
552     return a;
553     }
554 tim 1.1 }
555 dholmes 1.11
556    
557    
558    
559