ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.10
Committed: Sat Jul 26 13:17:51 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.9: +15 -15 lines
Log Message:
Default compiler is now 2.2-ea. Some sources are not compatible with 2.0-ea.

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.5 * A {@link Queue} in which each put must wait for a take, and vice
13     * versa. SynchronousQueues are similar to rendezvous channels used
14     * in CSP and Ada. They are well suited for handoff designs, in which
15     * an object running in one thread must synch up with an object
16     * running in another thread in order to hand it some information,
17     * event, or task.
18 dl 1.6 * @since 1.5
19     * @author Doug Lea
20     **/
21 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
22 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
23    
24 dl 1.2 /*
25     This implementation divides actions into two cases for puts:
26    
27 tim 1.10 * An arriving putter that does not already have a waiting taker
28 dl 1.2 creates a node holding item, and then waits for a taker to take it.
29     * An arriving putter that does already have a waiting taker fills
30     the slot node created by the taker, and notifies it to continue.
31    
32     And symmetrically, two for takes:
33    
34     * An arriving taker that does not already have a waiting putter
35     creates an empty slot node, and then waits for a putter to fill it.
36     * An arriving taker that does already have a waiting putter takes
37     item from the node created by the putter, and notifies it to continue.
38    
39     This requires keeping two simple queues: waitingPuts and waitingTakes.
40 tim 1.10
41 dl 1.2 When a put or take waiting for the actions of its counterpart
42     aborts due to interruption or timeout, it marks the node
43     it created as "CANCELLED", which causes its counterpart to retry
44     the entire put or take sequence.
45     */
46    
47 tim 1.10 /**
48 dl 1.2 * Special marker used in queue nodes to indicate that
49     * the thread waiting for a change in the node has timed out
50     * or been interrupted.
51     **/
52     private static final Object CANCELLED = new Object();
53    
54     /*
55     * Note that all fields are transient final, so there is
56     * no explicit serialization code.
57     */
58    
59     private transient final WaitQueue waitingPuts = new WaitQueue();
60     private transient final WaitQueue waitingTakes = new WaitQueue();
61     private transient final ReentrantLock qlock = new ReentrantLock();
62    
63     /**
64     * Nodes each maintain an item and handle waits and signals for
65     * getting and setting it. The class opportunistically extends
66     * ReentrantLock to save an extra object allocation per
67     * rendezvous.
68     */
69     private static class Node extends ReentrantLock {
70 dl 1.6 /** Condition to wait on for other party; lazily constructed */
71 dl 1.2 Condition done;
72 dl 1.6 /** The item being transferred */
73 dl 1.2 Object item;
74 dl 1.6 /** Next node in wait queue */
75 dl 1.2 Node next;
76 dl 1.6
77 dl 1.2 Node(Object x) { item = x; }
78    
79     /**
80     * Fill in the slot created by the taker and signal taker to
81     * continue.
82     */
83     boolean set(Object x) {
84     this.lock();
85     try {
86     if (item != CANCELLED) {
87     item = x;
88     if (done != null)
89     done.signal();
90     return true;
91     }
92     else // taker has cancelled
93     return false;
94     }
95     finally {
96     this.unlock();
97     }
98     }
99    
100     /**
101     * Remove item from slot created by putter and signal putter
102     * to continue.
103     */
104     Object get() {
105     this.lock();
106     try {
107     Object x = item;
108     if (x != CANCELLED) {
109     item = null;
110     next = null;
111     if (done != null)
112     done.signal();
113     return x;
114     }
115     else
116     return null;
117     }
118     finally {
119     this.unlock();
120     }
121     }
122    
123     /**
124     * Wait for a taker to take item placed by putter, or time out.
125     */
126     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
127     this.lock();
128     try {
129     for (;;) {
130     if (item == null)
131     return true;
132     if (timed) {
133     if (nanos <= 0) {
134     item = CANCELLED;
135     return false;
136     }
137     }
138 dl 1.9 if (done == null)
139     done = this.newCondition();
140     if (timed)
141     nanos = done.awaitNanos(nanos);
142     else
143     done.await();
144     }
145     }
146     catch (InterruptedException ie) {
147     // If taken, return normally but set interrupt status
148     if (item == null) {
149     Thread.currentThread().interrupt();
150     return true;
151     }
152     else {
153     item = CANCELLED;
154     done.signal(); // propagate signal
155     throw ie;
156 dl 1.2 }
157     }
158     finally {
159     this.unlock();
160     }
161     }
162    
163     /**
164     * Wait for a putter to put item placed by taker, or time out.
165     */
166     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
167     this.lock();
168     try {
169     for (;;) {
170     Object x = item;
171     if (x != null) {
172     item = null;
173     next = null;
174     return x;
175     }
176     if (timed) {
177     if (nanos <= 0) {
178     item = CANCELLED;
179     return null;
180     }
181     }
182 dl 1.9 if (done == null)
183     done = this.newCondition();
184     if (timed)
185     nanos = done.awaitNanos(nanos);
186     else
187     done.await();
188     }
189     }
190     catch (InterruptedException ie) {
191     Object y = item;
192     if (y != null) {
193     item = null;
194     next = null;
195     Thread.currentThread().interrupt();
196     return y;
197     }
198     else {
199     item = CANCELLED;
200     done.signal(); // propagate signal
201     throw ie;
202 dl 1.2 }
203     }
204     finally {
205     this.unlock();
206     }
207     }
208     }
209    
210     /**
211     * Simple FIFO queue class to hold waiting puts/takes.
212     **/
213     private static class WaitQueue<E> {
214     Node head;
215     Node last;
216    
217 tim 1.10 Node enq(Object x) {
218 dl 1.2 Node p = new Node(x);
219 tim 1.10 if (last == null)
220 dl 1.2 last = head = p;
221 tim 1.10 else
222 dl 1.2 last = last.next = p;
223     return p;
224     }
225    
226     Node deq() {
227     Node p = head;
228 tim 1.10 if (p != null && (head = p.next) == null)
229 dl 1.2 last = null;
230     return p;
231     }
232     }
233    
234     /**
235     * Main put algorithm, used by put, timed offer
236 tim 1.10 */
237 dl 1.2 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
238 dl 1.6 if (x == null) throw new NullPointerException();
239 tim 1.10 for (;;) {
240 dl 1.2 Node node;
241     boolean mustWait;
242 tim 1.10
243 dl 1.2 qlock.lockInterruptibly();
244     try {
245     node = waitingTakes.deq();
246     if ( (mustWait = (node == null)) )
247     node = waitingPuts.enq(x);
248     }
249     finally {
250     qlock.unlock();
251     }
252    
253     if (mustWait)
254     return node.waitForTake(timed, nanos);
255    
256     else if (node.set(x))
257     return true;
258    
259     // else taker cancelled, so retry
260     }
261 tim 1.1 }
262 dl 1.2
263     /**
264     * Main take algorithm, used by take, timed poll
265 tim 1.10 */
266 dl 1.2 private E doTake(boolean timed, long nanos) throws InterruptedException {
267     for (;;) {
268     Node node;
269     boolean mustWait;
270    
271     qlock.lockInterruptibly();
272     try {
273     node = waitingPuts.deq();
274     if ( (mustWait = (node == null)) )
275     node = waitingTakes.enq(null);
276     }
277     finally {
278     qlock.unlock();
279     }
280    
281 dl 1.9 if (mustWait)
282 dl 1.2 return (E)node.waitForPut(timed, nanos);
283    
284     else {
285     E x = (E)node.get();
286     if (x != null)
287     return x;
288     // else cancelled, so retry
289     }
290     }
291 tim 1.1 }
292 dl 1.2
293     public SynchronousQueue() {}
294    
295    
296     public void put(E x) throws InterruptedException {
297     doPut(x, false, 0);
298 tim 1.1 }
299    
300 dl 1.2 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
301     return doPut(x, true, unit.toNanos(timeout));
302 tim 1.1 }
303    
304 dl 1.2
305    
306     public E take() throws InterruptedException {
307     return doTake(false, 0);
308 tim 1.1 }
309 dl 1.2
310     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
311     return doTake(true, unit.toNanos(timeout));
312 tim 1.1 }
313 dl 1.2
314     // Untimed nonblocking versions
315    
316     public boolean offer(E x) {
317 dl 1.6 if (x == null) throw new NullPointerException();
318 tim 1.10
319     for (;;) {
320 dl 1.2 qlock.lock();
321     Node node;
322     try {
323     node = waitingTakes.deq();
324     }
325     finally {
326     qlock.unlock();
327     }
328     if (node == null)
329     return false;
330 tim 1.10
331 dl 1.2 else if (node.set(x))
332     return true;
333     // else retry
334     }
335 tim 1.1 }
336 dl 1.2
337     public E poll() {
338     for (;;) {
339     Node node;
340     qlock.lock();
341     try {
342     node = waitingPuts.deq();
343     }
344     finally {
345     qlock.unlock();
346     }
347     if (node == null)
348     return null;
349    
350     else {
351     Object x = node.get();
352     if (x != null)
353     return (E)x;
354     // else retry
355     }
356     }
357 tim 1.1 }
358 dl 1.2
359 dl 1.5 /**
360     * Always returns true. SynchronousQueues have no internal capacity.
361     * @return true.
362     */
363     public boolean isEmpty() {
364     return true;
365     }
366    
367     /**
368     * Always returns 0. SynchronousQueues have no internal capacity.
369     * @return zero.
370     */
371     public int size() {
372     return 0;
373 tim 1.1 }
374 dl 1.2
375 dl 1.5 /**
376     * Always returns zero. SynchronousQueues have no internal capacity.
377     * @return zero.
378     */
379     public int remainingCapacity() {
380     return 0;
381     }
382    
383     /**
384     * Always returns null. SynchronousQueues do not return elements
385     * unless actively waited on.
386     * @return null.
387     */
388     public E peek() {
389     return null;
390     }
391    
392    
393     static class EmptyIterator<E> implements Iterator<E> {
394 dl 1.2 public boolean hasNext() {
395     return false;
396     }
397     public E next() {
398     throw new NoSuchElementException();
399     }
400     public void remove() {
401     throw new UnsupportedOperationException();
402     }
403 tim 1.1 }
404 dl 1.2
405 dl 1.5 /**
406     * Returns an empty iterator.
407     */
408 dl 1.2 public Iterator<E> iterator() {
409 dl 1.5 return new EmptyIterator<E>();
410 tim 1.1 }
411    
412 dl 1.2
413 dl 1.5 /**
414     * Returns an empty array.
415     */
416 dl 1.3 public Object[] toArray() {
417 tim 1.10 return (E[]) new Object[0];
418 tim 1.1 }
419    
420 dl 1.2 public <T> T[] toArray(T[] a) {
421     if (a.length > 0)
422     a[0] = null;
423     return a;
424     }
425 tim 1.1 }