ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.2
Committed: Tue May 27 18:14:40 2003 UTC (21 years ago) by dl
Branch: MAIN
Changes since 1.1: +362 -33 lines
Log Message:
re-check-in initial implementations

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8     import java.util.*;
9    
10     /**
11     * A Queue in which each put must wait for a take, and vice versa.
12     * SynchronousQueues are similar to rendezvous channels used in CSP
13     * and Ada. They are well suited for handoff designs, in which an
14     * object running in one thread must synch up with an object running
15     * in another thread in order to hand it some information, event, or
16     * task.
17     **/
18 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
19 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
20    
21 dl 1.2 /*
22     This implementation divides actions into two cases for puts:
23    
24     * An arriving putter that does not already have a waiting taker
25     creates a node holding item, and then waits for a taker to take it.
26     * An arriving putter that does already have a waiting taker fills
27     the slot node created by the taker, and notifies it to continue.
28    
29     And symmetrically, two for takes:
30    
31     * An arriving taker that does not already have a waiting putter
32     creates an empty slot node, and then waits for a putter to fill it.
33     * An arriving taker that does already have a waiting putter takes
34     item from the node created by the putter, and notifies it to continue.
35    
36     This requires keeping two simple queues: waitingPuts and waitingTakes.
37    
38     When a put or take waiting for the actions of its counterpart
39     aborts due to interruption or timeout, it marks the node
40     it created as "CANCELLED", which causes its counterpart to retry
41     the entire put or take sequence.
42     */
43    
44     /**
45     * Special marker used in queue nodes to indicate that
46     * the thread waiting for a change in the node has timed out
47     * or been interrupted.
48     **/
49     private static final Object CANCELLED = new Object();
50    
51     /*
52     * Note that all fields are transient final, so there is
53     * no explicit serialization code.
54     */
55    
56     private transient final WaitQueue waitingPuts = new WaitQueue();
57     private transient final WaitQueue waitingTakes = new WaitQueue();
58     private transient final ReentrantLock qlock = new ReentrantLock();
59    
60     /**
61     * Nodes each maintain an item and handle waits and signals for
62     * getting and setting it. The class opportunistically extends
63     * ReentrantLock to save an extra object allocation per
64     * rendezvous.
65     */
66     private static class Node extends ReentrantLock {
67     Condition done;
68     Object item;
69     Node next;
70     Node(Object x) { item = x; }
71    
72     /**
73     * Fill in the slot created by the taker and signal taker to
74     * continue.
75     */
76     boolean set(Object x) {
77     this.lock();
78     try {
79     if (item != CANCELLED) {
80     item = x;
81     if (done != null)
82     done.signal();
83     return true;
84     }
85     else // taker has cancelled
86     return false;
87     }
88     finally {
89     this.unlock();
90     }
91     }
92    
93     /**
94     * Remove item from slot created by putter and signal putter
95     * to continue.
96     */
97     Object get() {
98     this.lock();
99     try {
100     Object x = item;
101     if (x != CANCELLED) {
102     item = null;
103     next = null;
104     if (done != null)
105     done.signal();
106     return x;
107     }
108     else
109     return null;
110     }
111     finally {
112     this.unlock();
113     }
114     }
115    
116    
117     /**
118     * Wait for a taker to take item placed by putter, or time out.
119     */
120     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
121     this.lock();
122     try {
123     for (;;) {
124     if (item == null)
125     return true;
126     if (done == null)
127     done = this.newCondition();
128     if (timed) {
129     if (nanos <= 0) {
130     item = CANCELLED;
131     return false;
132     }
133     nanos = done.awaitNanos(nanos);
134     }
135     else
136     done.await();
137     }
138     }
139     catch (InterruptedException ie) {
140     // If taken, return normally but set interrupt status
141     if (item == null) {
142     Thread.currentThread().interrupt();
143     return true;
144     }
145     else {
146     item = CANCELLED;
147     done.signal(); // propagate signal
148     throw ie;
149     }
150     }
151     finally {
152     this.unlock();
153     }
154     }
155    
156    
157     /**
158     * Wait for a putter to put item placed by taker, or time out.
159     */
160     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
161     this.lock();
162     try {
163     for (;;) {
164     Object x = item;
165     if (x != null) {
166     item = null;
167     next = null;
168     return x;
169     }
170     if (done == null)
171     done = this.newCondition();
172     if (timed) {
173     if (nanos <= 0) {
174     item = CANCELLED;
175     return null;
176     }
177     nanos = done.awaitNanos(nanos);
178     }
179     else
180     done.await();
181     }
182     }
183     catch(InterruptedException ie) {
184     Object x = item;
185     if (x != null) {
186     item = null;
187     next = null;
188     Thread.currentThread().interrupt();
189     return x;
190     }
191     else {
192     item = CANCELLED;
193     done.signal(); // propagate signal
194     throw ie;
195     }
196     }
197     finally {
198     this.unlock();
199     }
200     }
201     }
202    
203     /**
204     * Simple FIFO queue class to hold waiting puts/takes.
205     **/
206     private static class WaitQueue<E> {
207     Node head;
208     Node last;
209    
210     Node enq(Object x) {
211     Node p = new Node(x);
212     if (last == null)
213     last = head = p;
214     else
215     last = last.next = p;
216     return p;
217     }
218    
219     Node deq() {
220     Node p = head;
221     if (p != null && (head = p.next) == null)
222     last = null;
223     return p;
224     }
225     }
226    
227     /**
228     * Main put algorithm, used by put, timed offer
229     */
230     private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
231     if (x == null) throw new IllegalArgumentException();
232     for (;;) {
233     Node node;
234     boolean mustWait;
235    
236     qlock.lockInterruptibly();
237     try {
238     node = waitingTakes.deq();
239     if ( (mustWait = (node == null)) )
240     node = waitingPuts.enq(x);
241     }
242     finally {
243     qlock.unlock();
244     }
245    
246     if (mustWait)
247     return node.waitForTake(timed, nanos);
248    
249     else if (node.set(x))
250     return true;
251    
252     // else taker cancelled, so retry
253     }
254 tim 1.1 }
255 dl 1.2
256     /**
257     * Main take algorithm, used by take, timed poll
258     */
259     private E doTake(boolean timed, long nanos) throws InterruptedException {
260     for (;;) {
261     Node node;
262     boolean mustWait;
263    
264     qlock.lockInterruptibly();
265     try {
266     node = waitingPuts.deq();
267     if ( (mustWait = (node == null)) )
268     node = waitingTakes.enq(null);
269     }
270     finally {
271     qlock.unlock();
272     }
273    
274     if (mustWait)
275     return (E)node.waitForPut(timed, nanos);
276    
277     else {
278     E x = (E)node.get();
279     if (x != null)
280     return x;
281     // else cancelled, so retry
282     }
283     }
284 tim 1.1 }
285 dl 1.2
286     public SynchronousQueue() {}
287    
288     public boolean isEmpty() {
289     return true;
290 tim 1.1 }
291 dl 1.2
292     public int size() {
293     return 0;
294 tim 1.1 }
295 dl 1.2
296     public int remainingCapacity() {
297     return 0;
298 tim 1.1 }
299 dl 1.2
300     public E peek() {
301 tim 1.1 return null;
302     }
303 dl 1.2
304    
305     public void put(E x) throws InterruptedException {
306     doPut(x, false, 0);
307 tim 1.1 }
308    
309 dl 1.2 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
310     return doPut(x, true, unit.toNanos(timeout));
311 tim 1.1 }
312    
313 dl 1.2
314    
315     public E take() throws InterruptedException {
316     return doTake(false, 0);
317 tim 1.1 }
318 dl 1.2
319     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
320     return doTake(true, unit.toNanos(timeout));
321 tim 1.1 }
322 dl 1.2
323     // Untimed nonblocking versions
324    
325     public boolean offer(E x) {
326     if (x == null) throw new IllegalArgumentException();
327    
328     for (;;) {
329     qlock.lock();
330     Node node;
331     try {
332     node = waitingTakes.deq();
333     }
334     finally {
335     qlock.unlock();
336     }
337     if (node == null)
338     return false;
339    
340     else if (node.set(x))
341     return true;
342     // else retry
343     }
344 tim 1.1 }
345 dl 1.2
346     public E poll() {
347     for (;;) {
348     Node node;
349     qlock.lock();
350     try {
351     node = waitingPuts.deq();
352     }
353     finally {
354     qlock.unlock();
355     }
356     if (node == null)
357     return null;
358    
359     else {
360     Object x = node.get();
361     if (x != null)
362     return (E)x;
363     // else retry
364     }
365     }
366 tim 1.1 }
367 dl 1.2
368     public boolean remove(Object x) {
369 tim 1.1 return false;
370     }
371 dl 1.2
372     static class EmptyIterator<E> implements Iterator {
373     public boolean hasNext() {
374     return false;
375     }
376     public E next() {
377     throw new NoSuchElementException();
378     }
379     public void remove() {
380     throw new UnsupportedOperationException();
381     }
382 tim 1.1 }
383 dl 1.2
384     public Iterator<E> iterator() {
385     return new EmptyIterator();
386 tim 1.1 }
387    
388 dl 1.2
389     public E[] toArray() {
390     return new E[0];
391 tim 1.1 }
392    
393 dl 1.2 public <T> T[] toArray(T[] a) {
394     if (a.length > 0)
395     a[0] = null;
396     return a;
397     }
398 tim 1.1 }