ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.5
Committed: Sat Jun 7 18:20:21 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_1
Changes since 1.4: +46 -30 lines
Log Message:
Misc documentation updates

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8     import java.util.*;
9    
10     /**
11 dl 1.5 * A {@link Queue} in which each put must wait for a take, and vice
12     * versa. SynchronousQueues are similar to rendezvous channels used
13     * in CSP and Ada. They are well suited for handoff designs, in which
14     * an object running in one thread must synch up with an object
15     * running in another thread in order to hand it some information,
16     * event, or task.
17 tim 1.1 **/
18 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
19 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
20    
21 dl 1.2 /*
22     This implementation divides actions into two cases for puts:
23    
24     * An arriving putter that does not already have a waiting taker
25     creates a node holding item, and then waits for a taker to take it.
26     * An arriving putter that does already have a waiting taker fills
27     the slot node created by the taker, and notifies it to continue.
28    
29     And symmetrically, two for takes:
30    
31     * An arriving taker that does not already have a waiting putter
32     creates an empty slot node, and then waits for a putter to fill it.
33     * An arriving taker that does already have a waiting putter takes
34     item from the node created by the putter, and notifies it to continue.
35    
36     This requires keeping two simple queues: waitingPuts and waitingTakes.
37    
38     When a put or take waiting for the actions of its counterpart
39     aborts due to interruption or timeout, it marks the node
40     it created as "CANCELLED", which causes its counterpart to retry
41     the entire put or take sequence.
42     */
43    
44     /**
45     * Special marker used in queue nodes to indicate that
46     * the thread waiting for a change in the node has timed out
47     * or been interrupted.
48     **/
49     private static final Object CANCELLED = new Object();
50    
51     /*
52     * Note that all fields are transient final, so there is
53     * no explicit serialization code.
54     */
55    
56     private transient final WaitQueue waitingPuts = new WaitQueue();
57     private transient final WaitQueue waitingTakes = new WaitQueue();
58     private transient final ReentrantLock qlock = new ReentrantLock();
59    
60     /**
61     * Nodes each maintain an item and handle waits and signals for
62     * getting and setting it. The class opportunistically extends
63     * ReentrantLock to save an extra object allocation per
64     * rendezvous.
65     */
66     private static class Node extends ReentrantLock {
67     Condition done;
68     Object item;
69     Node next;
70     Node(Object x) { item = x; }
71    
72     /**
73     * Fill in the slot created by the taker and signal taker to
74     * continue.
75     */
76     boolean set(Object x) {
77     this.lock();
78     try {
79     if (item != CANCELLED) {
80     item = x;
81     if (done != null)
82     done.signal();
83     return true;
84     }
85     else // taker has cancelled
86     return false;
87     }
88     finally {
89     this.unlock();
90     }
91     }
92    
93     /**
94     * Remove item from slot created by putter and signal putter
95     * to continue.
96     */
97     Object get() {
98     this.lock();
99     try {
100     Object x = item;
101     if (x != CANCELLED) {
102     item = null;
103     next = null;
104     if (done != null)
105     done.signal();
106     return x;
107     }
108     else
109     return null;
110     }
111     finally {
112     this.unlock();
113     }
114     }
115    
116    
117     /**
118     * Wait for a taker to take item placed by putter, or time out.
119     */
120     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
121     this.lock();
122     try {
123     for (;;) {
124     if (item == null)
125     return true;
126     if (done == null)
127     done = this.newCondition();
128     if (timed) {
129     if (nanos <= 0) {
130     item = CANCELLED;
131     return false;
132     }
133     nanos = done.awaitNanos(nanos);
134     }
135     else
136     done.await();
137     }
138     }
139     catch (InterruptedException ie) {
140     // If taken, return normally but set interrupt status
141     if (item == null) {
142     Thread.currentThread().interrupt();
143     return true;
144     }
145     else {
146     item = CANCELLED;
147     done.signal(); // propagate signal
148     throw ie;
149     }
150     }
151     finally {
152     this.unlock();
153     }
154     }
155    
156    
157     /**
158     * Wait for a putter to put item placed by taker, or time out.
159     */
160     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
161     this.lock();
162     try {
163     for (;;) {
164     Object x = item;
165     if (x != null) {
166     item = null;
167     next = null;
168     return x;
169     }
170     if (done == null)
171     done = this.newCondition();
172     if (timed) {
173     if (nanos <= 0) {
174     item = CANCELLED;
175     return null;
176     }
177     nanos = done.awaitNanos(nanos);
178     }
179     else
180     done.await();
181     }
182     }
183     catch(InterruptedException ie) {
184     Object x = item;
185     if (x != null) {
186     item = null;
187     next = null;
188     Thread.currentThread().interrupt();
189     return x;
190     }
191     else {
192     item = CANCELLED;
193     done.signal(); // propagate signal
194     throw ie;
195     }
196     }
197     finally {
198     this.unlock();
199     }
200     }
201     }
202    
203     /**
204     * Simple FIFO queue class to hold waiting puts/takes.
205     **/
206     private static class WaitQueue<E> {
207     Node head;
208     Node last;
209    
210     Node enq(Object x) {
211     Node p = new Node(x);
212     if (last == null)
213     last = head = p;
214     else
215     last = last.next = p;
216     return p;
217     }
218    
219     Node deq() {
220     Node p = head;
221     if (p != null && (head = p.next) == null)
222     last = null;
223     return p;
224     }
225     }
226    
227     /**
228     * Main put algorithm, used by put, timed offer
229     */
230     private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
231     if (x == null) throw new IllegalArgumentException();
232     for (;;) {
233     Node node;
234     boolean mustWait;
235    
236     qlock.lockInterruptibly();
237     try {
238     node = waitingTakes.deq();
239     if ( (mustWait = (node == null)) )
240     node = waitingPuts.enq(x);
241     }
242     finally {
243     qlock.unlock();
244     }
245    
246     if (mustWait)
247     return node.waitForTake(timed, nanos);
248    
249     else if (node.set(x))
250     return true;
251    
252     // else taker cancelled, so retry
253     }
254 tim 1.1 }
255 dl 1.2
256     /**
257     * Main take algorithm, used by take, timed poll
258     */
259     private E doTake(boolean timed, long nanos) throws InterruptedException {
260     for (;;) {
261     Node node;
262     boolean mustWait;
263    
264     qlock.lockInterruptibly();
265     try {
266     node = waitingPuts.deq();
267     if ( (mustWait = (node == null)) )
268     node = waitingTakes.enq(null);
269     }
270     finally {
271     qlock.unlock();
272     }
273    
274     if (mustWait)
275     return (E)node.waitForPut(timed, nanos);
276    
277     else {
278     E x = (E)node.get();
279     if (x != null)
280     return x;
281     // else cancelled, so retry
282     }
283     }
284 tim 1.1 }
285 dl 1.2
286     public SynchronousQueue() {}
287    
288    
289     public void put(E x) throws InterruptedException {
290     doPut(x, false, 0);
291 tim 1.1 }
292    
293 dl 1.2 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
294     return doPut(x, true, unit.toNanos(timeout));
295 tim 1.1 }
296    
297 dl 1.2
298    
299     public E take() throws InterruptedException {
300     return doTake(false, 0);
301 tim 1.1 }
302 dl 1.2
303     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
304     return doTake(true, unit.toNanos(timeout));
305 tim 1.1 }
306 dl 1.2
307     // Untimed nonblocking versions
308    
309     public boolean offer(E x) {
310     if (x == null) throw new IllegalArgumentException();
311    
312     for (;;) {
313     qlock.lock();
314     Node node;
315     try {
316     node = waitingTakes.deq();
317     }
318     finally {
319     qlock.unlock();
320     }
321     if (node == null)
322     return false;
323    
324     else if (node.set(x))
325     return true;
326     // else retry
327     }
328 tim 1.1 }
329 dl 1.2
330     public E poll() {
331     for (;;) {
332     Node node;
333     qlock.lock();
334     try {
335     node = waitingPuts.deq();
336     }
337     finally {
338     qlock.unlock();
339     }
340     if (node == null)
341     return null;
342    
343     else {
344     Object x = node.get();
345     if (x != null)
346     return (E)x;
347     // else retry
348     }
349     }
350 tim 1.1 }
351 dl 1.2
352 dl 1.5 /**
353     * Always returns true. SynchronousQueues have no internal capacity.
354     * @return true.
355     */
356     public boolean isEmpty() {
357     return true;
358     }
359    
360     /**
361     * Always returns 0. SynchronousQueues have no internal capacity.
362     * @return zero.
363     */
364     public int size() {
365     return 0;
366 tim 1.1 }
367 dl 1.2
368 dl 1.5 /**
369     * Always returns zero. SynchronousQueues have no internal capacity.
370     * @return zero.
371     */
372     public int remainingCapacity() {
373     return 0;
374     }
375    
376     /**
377     * Always returns null. SynchronousQueues do not return elements
378     * unless actively waited on.
379     * @return null.
380     */
381     public E peek() {
382     return null;
383     }
384    
385    
386     static class EmptyIterator<E> implements Iterator<E> {
387 dl 1.2 public boolean hasNext() {
388     return false;
389     }
390     public E next() {
391     throw new NoSuchElementException();
392     }
393     public void remove() {
394     throw new UnsupportedOperationException();
395     }
396 tim 1.1 }
397 dl 1.2
398 dl 1.5 /**
399     * Returns an empty iterator.
400     */
401 dl 1.2 public Iterator<E> iterator() {
402 dl 1.5 return new EmptyIterator<E>();
403 tim 1.1 }
404    
405 dl 1.2
406 dl 1.5 /**
407     * Returns an empty array.
408     */
409 dl 1.3 public Object[] toArray() {
410 dl 1.2 return new E[0];
411 tim 1.1 }
412    
413 dl 1.2 public <T> T[] toArray(T[] a) {
414     if (a.length > 0)
415     a[0] = null;
416     return a;
417     }
418 tim 1.1 }