ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.4
Committed: Sat Jun 7 11:54:20 2003 UTC (21 years ago) by dl
Branch: MAIN
Changes since 1.3: +4 -0 lines
Log Message:
More javadoc fixes

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8     import java.util.*;
9    
10     /**
11     * A Queue in which each put must wait for a take, and vice versa.
12     * SynchronousQueues are similar to rendezvous channels used in CSP
13     * and Ada. They are well suited for handoff designs, in which an
14     * object running in one thread must synch up with an object running
15     * in another thread in order to hand it some information, event, or
16     * task.
17     **/
18 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
19 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
20    
21 dl 1.2 /*
22     This implementation divides actions into two cases for puts:
23    
24     * An arriving putter that does not already have a waiting taker
25     creates a node holding item, and then waits for a taker to take it.
26     * An arriving putter that does already have a waiting taker fills
27     the slot node created by the taker, and notifies it to continue.
28    
29     And symmetrically, two for takes:
30    
31     * An arriving taker that does not already have a waiting putter
32     creates an empty slot node, and then waits for a putter to fill it.
33     * An arriving taker that does already have a waiting putter takes
34     item from the node created by the putter, and notifies it to continue.
35    
36     This requires keeping two simple queues: waitingPuts and waitingTakes.
37    
38     When a put or take waiting for the actions of its counterpart
39     aborts due to interruption or timeout, it marks the node
40     it created as "CANCELLED", which causes its counterpart to retry
41     the entire put or take sequence.
42     */
43    
44     /**
45     * Special marker used in queue nodes to indicate that
46     * the thread waiting for a change in the node has timed out
47     * or been interrupted.
48     **/
49     private static final Object CANCELLED = new Object();
50    
51     /*
52     * Note that all fields are transient final, so there is
53     * no explicit serialization code.
54     */
55    
56     private transient final WaitQueue waitingPuts = new WaitQueue();
57     private transient final WaitQueue waitingTakes = new WaitQueue();
58     private transient final ReentrantLock qlock = new ReentrantLock();
59    
60     /**
61     * Nodes each maintain an item and handle waits and signals for
62     * getting and setting it. The class opportunistically extends
63     * ReentrantLock to save an extra object allocation per
64     * rendezvous.
65     */
66     private static class Node extends ReentrantLock {
67     Condition done;
68     Object item;
69     Node next;
70     Node(Object x) { item = x; }
71    
72     /**
73     * Fill in the slot created by the taker and signal taker to
74     * continue.
75     */
76     boolean set(Object x) {
77     this.lock();
78     try {
79     if (item != CANCELLED) {
80     item = x;
81     if (done != null)
82     done.signal();
83     return true;
84     }
85     else // taker has cancelled
86     return false;
87     }
88     finally {
89     this.unlock();
90     }
91     }
92    
93     /**
94     * Remove item from slot created by putter and signal putter
95     * to continue.
96     */
97     Object get() {
98     this.lock();
99     try {
100     Object x = item;
101     if (x != CANCELLED) {
102     item = null;
103     next = null;
104     if (done != null)
105     done.signal();
106     return x;
107     }
108     else
109     return null;
110     }
111     finally {
112     this.unlock();
113     }
114     }
115    
116    
117     /**
118     * Wait for a taker to take item placed by putter, or time out.
119     */
120     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
121     this.lock();
122     try {
123     for (;;) {
124     if (item == null)
125     return true;
126     if (done == null)
127     done = this.newCondition();
128     if (timed) {
129     if (nanos <= 0) {
130     item = CANCELLED;
131     return false;
132     }
133     nanos = done.awaitNanos(nanos);
134     }
135     else
136     done.await();
137     }
138     }
139     catch (InterruptedException ie) {
140     // If taken, return normally but set interrupt status
141     if (item == null) {
142     Thread.currentThread().interrupt();
143     return true;
144     }
145     else {
146     item = CANCELLED;
147     done.signal(); // propagate signal
148     throw ie;
149     }
150     }
151     finally {
152     this.unlock();
153     }
154     }
155    
156    
157     /**
158     * Wait for a putter to put item placed by taker, or time out.
159     */
160     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
161     this.lock();
162     try {
163     for (;;) {
164     Object x = item;
165     if (x != null) {
166     item = null;
167     next = null;
168     return x;
169     }
170     if (done == null)
171     done = this.newCondition();
172     if (timed) {
173     if (nanos <= 0) {
174     item = CANCELLED;
175     return null;
176     }
177     nanos = done.awaitNanos(nanos);
178     }
179     else
180     done.await();
181     }
182     }
183     catch(InterruptedException ie) {
184     Object x = item;
185     if (x != null) {
186     item = null;
187     next = null;
188     Thread.currentThread().interrupt();
189     return x;
190     }
191     else {
192     item = CANCELLED;
193     done.signal(); // propagate signal
194     throw ie;
195     }
196     }
197     finally {
198     this.unlock();
199     }
200     }
201     }
202    
203     /**
204     * Simple FIFO queue class to hold waiting puts/takes.
205     **/
206     private static class WaitQueue<E> {
207     Node head;
208     Node last;
209    
210     Node enq(Object x) {
211     Node p = new Node(x);
212     if (last == null)
213     last = head = p;
214     else
215     last = last.next = p;
216     return p;
217     }
218    
219     Node deq() {
220     Node p = head;
221     if (p != null && (head = p.next) == null)
222     last = null;
223     return p;
224     }
225     }
226    
227     /**
228     * Main put algorithm, used by put, timed offer
229     */
230     private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
231     if (x == null) throw new IllegalArgumentException();
232     for (;;) {
233     Node node;
234     boolean mustWait;
235    
236     qlock.lockInterruptibly();
237     try {
238     node = waitingTakes.deq();
239     if ( (mustWait = (node == null)) )
240     node = waitingPuts.enq(x);
241     }
242     finally {
243     qlock.unlock();
244     }
245    
246     if (mustWait)
247     return node.waitForTake(timed, nanos);
248    
249     else if (node.set(x))
250     return true;
251    
252     // else taker cancelled, so retry
253     }
254 tim 1.1 }
255 dl 1.2
256     /**
257     * Main take algorithm, used by take, timed poll
258     */
259     private E doTake(boolean timed, long nanos) throws InterruptedException {
260     for (;;) {
261     Node node;
262     boolean mustWait;
263    
264     qlock.lockInterruptibly();
265     try {
266     node = waitingPuts.deq();
267     if ( (mustWait = (node == null)) )
268     node = waitingTakes.enq(null);
269     }
270     finally {
271     qlock.unlock();
272     }
273    
274     if (mustWait)
275     return (E)node.waitForPut(timed, nanos);
276    
277     else {
278     E x = (E)node.get();
279     if (x != null)
280     return x;
281     // else cancelled, so retry
282     }
283     }
284 tim 1.1 }
285 dl 1.2
286     public SynchronousQueue() {}
287    
288     public boolean isEmpty() {
289     return true;
290 tim 1.1 }
291 dl 1.2
292     public int size() {
293     return 0;
294 tim 1.1 }
295 dl 1.2
296 dl 1.4 /**
297     * Always returns zero. SynchronousQueues have no internal capacity.
298     * @return zero.
299     */
300 dl 1.2 public int remainingCapacity() {
301     return 0;
302 tim 1.1 }
303 dl 1.2
304     public E peek() {
305 tim 1.1 return null;
306     }
307 dl 1.2
308    
309     public void put(E x) throws InterruptedException {
310     doPut(x, false, 0);
311 tim 1.1 }
312    
313 dl 1.2 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
314     return doPut(x, true, unit.toNanos(timeout));
315 tim 1.1 }
316    
317 dl 1.2
318    
319     public E take() throws InterruptedException {
320     return doTake(false, 0);
321 tim 1.1 }
322 dl 1.2
323     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
324     return doTake(true, unit.toNanos(timeout));
325 tim 1.1 }
326 dl 1.2
327     // Untimed nonblocking versions
328    
329     public boolean offer(E x) {
330     if (x == null) throw new IllegalArgumentException();
331    
332     for (;;) {
333     qlock.lock();
334     Node node;
335     try {
336     node = waitingTakes.deq();
337     }
338     finally {
339     qlock.unlock();
340     }
341     if (node == null)
342     return false;
343    
344     else if (node.set(x))
345     return true;
346     // else retry
347     }
348 tim 1.1 }
349 dl 1.2
350     public E poll() {
351     for (;;) {
352     Node node;
353     qlock.lock();
354     try {
355     node = waitingPuts.deq();
356     }
357     finally {
358     qlock.unlock();
359     }
360     if (node == null)
361     return null;
362    
363     else {
364     Object x = node.get();
365     if (x != null)
366     return (E)x;
367     // else retry
368     }
369     }
370 tim 1.1 }
371 dl 1.2
372     public boolean remove(Object x) {
373 tim 1.1 return false;
374     }
375 dl 1.2
376     static class EmptyIterator<E> implements Iterator {
377     public boolean hasNext() {
378     return false;
379     }
380     public E next() {
381     throw new NoSuchElementException();
382     }
383     public void remove() {
384     throw new UnsupportedOperationException();
385     }
386 tim 1.1 }
387 dl 1.2
388     public Iterator<E> iterator() {
389     return new EmptyIterator();
390 tim 1.1 }
391    
392 dl 1.2
393 dl 1.3 public Object[] toArray() {
394 dl 1.2 return new E[0];
395 tim 1.1 }
396    
397 dl 1.2 public <T> T[] toArray(T[] a) {
398     if (a.length > 0)
399     a[0] = null;
400     return a;
401     }
402 tim 1.1 }