ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.6
Committed: Tue Jun 24 14:34:49 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.5: +9 -3 lines
Log Message:
Added missing javadoc tags; minor reformatting

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8     import java.util.*;
9    
10     /**
11 dl 1.5 * A {@link Queue} in which each put must wait for a take, and vice
12     * versa. SynchronousQueues are similar to rendezvous channels used
13     * in CSP and Ada. They are well suited for handoff designs, in which
14     * an object running in one thread must synch up with an object
15     * running in another thread in order to hand it some information,
16     * event, or task.
17 dl 1.6 * @since 1.5
18     * @author Doug Lea
19     **/
20 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
21 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
22    
23 dl 1.2 /*
24     This implementation divides actions into two cases for puts:
25    
26     * An arriving putter that does not already have a waiting taker
27     creates a node holding item, and then waits for a taker to take it.
28     * An arriving putter that does already have a waiting taker fills
29     the slot node created by the taker, and notifies it to continue.
30    
31     And symmetrically, two for takes:
32    
33     * An arriving taker that does not already have a waiting putter
34     creates an empty slot node, and then waits for a putter to fill it.
35     * An arriving taker that does already have a waiting putter takes
36     item from the node created by the putter, and notifies it to continue.
37    
38     This requires keeping two simple queues: waitingPuts and waitingTakes.
39    
40     When a put or take waiting for the actions of its counterpart
41     aborts due to interruption or timeout, it marks the node
42     it created as "CANCELLED", which causes its counterpart to retry
43     the entire put or take sequence.
44     */
45    
46     /**
47     * Special marker used in queue nodes to indicate that
48     * the thread waiting for a change in the node has timed out
49     * or been interrupted.
50     **/
51     private static final Object CANCELLED = new Object();
52    
53     /*
54     * Note that all fields are transient final, so there is
55     * no explicit serialization code.
56     */
57    
58     private transient final WaitQueue waitingPuts = new WaitQueue();
59     private transient final WaitQueue waitingTakes = new WaitQueue();
60     private transient final ReentrantLock qlock = new ReentrantLock();
61    
62     /**
63     * Nodes each maintain an item and handle waits and signals for
64     * getting and setting it. The class opportunistically extends
65     * ReentrantLock to save an extra object allocation per
66     * rendezvous.
67     */
68     private static class Node extends ReentrantLock {
69 dl 1.6 /** Condition to wait on for other party; lazily constructed */
70 dl 1.2 Condition done;
71 dl 1.6 /** The item being transferred */
72 dl 1.2 Object item;
73 dl 1.6 /** Next node in wait queue */
74 dl 1.2 Node next;
75 dl 1.6
76 dl 1.2 Node(Object x) { item = x; }
77    
78     /**
79     * Fill in the slot created by the taker and signal taker to
80     * continue.
81     */
82     boolean set(Object x) {
83     this.lock();
84     try {
85     if (item != CANCELLED) {
86     item = x;
87     if (done != null)
88     done.signal();
89     return true;
90     }
91     else // taker has cancelled
92     return false;
93     }
94     finally {
95     this.unlock();
96     }
97     }
98    
99     /**
100     * Remove item from slot created by putter and signal putter
101     * to continue.
102     */
103     Object get() {
104     this.lock();
105     try {
106     Object x = item;
107     if (x != CANCELLED) {
108     item = null;
109     next = null;
110     if (done != null)
111     done.signal();
112     return x;
113     }
114     else
115     return null;
116     }
117     finally {
118     this.unlock();
119     }
120     }
121    
122    
123     /**
124     * Wait for a taker to take item placed by putter, or time out.
125     */
126     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
127     this.lock();
128     try {
129     for (;;) {
130     if (item == null)
131     return true;
132     if (done == null)
133     done = this.newCondition();
134     if (timed) {
135     if (nanos <= 0) {
136     item = CANCELLED;
137     return false;
138     }
139     nanos = done.awaitNanos(nanos);
140     }
141     else
142     done.await();
143     }
144     }
145     catch (InterruptedException ie) {
146     // If taken, return normally but set interrupt status
147     if (item == null) {
148     Thread.currentThread().interrupt();
149     return true;
150     }
151     else {
152     item = CANCELLED;
153     done.signal(); // propagate signal
154     throw ie;
155     }
156     }
157     finally {
158     this.unlock();
159     }
160     }
161    
162    
163     /**
164     * Wait for a putter to put item placed by taker, or time out.
165     */
166     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
167     this.lock();
168     try {
169     for (;;) {
170     Object x = item;
171     if (x != null) {
172     item = null;
173     next = null;
174     return x;
175     }
176     if (done == null)
177     done = this.newCondition();
178     if (timed) {
179     if (nanos <= 0) {
180     item = CANCELLED;
181     return null;
182     }
183     nanos = done.awaitNanos(nanos);
184     }
185     else
186     done.await();
187     }
188     }
189     catch(InterruptedException ie) {
190     Object x = item;
191     if (x != null) {
192     item = null;
193     next = null;
194     Thread.currentThread().interrupt();
195     return x;
196     }
197     else {
198     item = CANCELLED;
199     done.signal(); // propagate signal
200     throw ie;
201     }
202     }
203     finally {
204     this.unlock();
205     }
206     }
207     }
208    
209     /**
210     * Simple FIFO queue class to hold waiting puts/takes.
211     **/
212     private static class WaitQueue<E> {
213     Node head;
214     Node last;
215    
216     Node enq(Object x) {
217     Node p = new Node(x);
218     if (last == null)
219     last = head = p;
220     else
221     last = last.next = p;
222     return p;
223     }
224    
225     Node deq() {
226     Node p = head;
227     if (p != null && (head = p.next) == null)
228     last = null;
229     return p;
230     }
231     }
232    
233     /**
234     * Main put algorithm, used by put, timed offer
235     */
236     private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
237 dl 1.6 if (x == null) throw new NullPointerException();
238 dl 1.2 for (;;) {
239     Node node;
240     boolean mustWait;
241    
242     qlock.lockInterruptibly();
243     try {
244     node = waitingTakes.deq();
245     if ( (mustWait = (node == null)) )
246     node = waitingPuts.enq(x);
247     }
248     finally {
249     qlock.unlock();
250     }
251    
252     if (mustWait)
253     return node.waitForTake(timed, nanos);
254    
255     else if (node.set(x))
256     return true;
257    
258     // else taker cancelled, so retry
259     }
260 tim 1.1 }
261 dl 1.2
262     /**
263     * Main take algorithm, used by take, timed poll
264     */
265     private E doTake(boolean timed, long nanos) throws InterruptedException {
266     for (;;) {
267     Node node;
268     boolean mustWait;
269    
270     qlock.lockInterruptibly();
271     try {
272     node = waitingPuts.deq();
273     if ( (mustWait = (node == null)) )
274     node = waitingTakes.enq(null);
275     }
276     finally {
277     qlock.unlock();
278     }
279    
280     if (mustWait)
281     return (E)node.waitForPut(timed, nanos);
282    
283     else {
284     E x = (E)node.get();
285     if (x != null)
286     return x;
287     // else cancelled, so retry
288     }
289     }
290 tim 1.1 }
291 dl 1.2
292     public SynchronousQueue() {}
293    
294    
295     public void put(E x) throws InterruptedException {
296     doPut(x, false, 0);
297 tim 1.1 }
298    
299 dl 1.2 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
300     return doPut(x, true, unit.toNanos(timeout));
301 tim 1.1 }
302    
303 dl 1.2
304    
305     public E take() throws InterruptedException {
306     return doTake(false, 0);
307 tim 1.1 }
308 dl 1.2
309     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
310     return doTake(true, unit.toNanos(timeout));
311 tim 1.1 }
312 dl 1.2
313     // Untimed nonblocking versions
314    
315     public boolean offer(E x) {
316 dl 1.6 if (x == null) throw new NullPointerException();
317 dl 1.2
318     for (;;) {
319     qlock.lock();
320     Node node;
321     try {
322     node = waitingTakes.deq();
323     }
324     finally {
325     qlock.unlock();
326     }
327     if (node == null)
328     return false;
329    
330     else if (node.set(x))
331     return true;
332     // else retry
333     }
334 tim 1.1 }
335 dl 1.2
336     public E poll() {
337     for (;;) {
338     Node node;
339     qlock.lock();
340     try {
341     node = waitingPuts.deq();
342     }
343     finally {
344     qlock.unlock();
345     }
346     if (node == null)
347     return null;
348    
349     else {
350     Object x = node.get();
351     if (x != null)
352     return (E)x;
353     // else retry
354     }
355     }
356 tim 1.1 }
357 dl 1.2
358 dl 1.5 /**
359     * Always returns true. SynchronousQueues have no internal capacity.
360     * @return true.
361     */
362     public boolean isEmpty() {
363     return true;
364     }
365    
366     /**
367     * Always returns 0. SynchronousQueues have no internal capacity.
368     * @return zero.
369     */
370     public int size() {
371     return 0;
372 tim 1.1 }
373 dl 1.2
374 dl 1.5 /**
375     * Always returns zero. SynchronousQueues have no internal capacity.
376     * @return zero.
377     */
378     public int remainingCapacity() {
379     return 0;
380     }
381    
382     /**
383     * Always returns null. SynchronousQueues do not return elements
384     * unless actively waited on.
385     * @return null.
386     */
387     public E peek() {
388     return null;
389     }
390    
391    
392     static class EmptyIterator<E> implements Iterator<E> {
393 dl 1.2 public boolean hasNext() {
394     return false;
395     }
396     public E next() {
397     throw new NoSuchElementException();
398     }
399     public void remove() {
400     throw new UnsupportedOperationException();
401     }
402 tim 1.1 }
403 dl 1.2
404 dl 1.5 /**
405     * Returns an empty iterator.
406     */
407 dl 1.2 public Iterator<E> iterator() {
408 dl 1.5 return new EmptyIterator<E>();
409 tim 1.1 }
410    
411 dl 1.2
412 dl 1.5 /**
413     * Returns an empty array.
414     */
415 dl 1.3 public Object[] toArray() {
416 dl 1.2 return new E[0];
417 tim 1.1 }
418    
419 dl 1.2 public <T> T[] toArray(T[] a) {
420     if (a.length > 0)
421     a[0] = null;
422     return a;
423     }
424 tim 1.1 }