ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.34
Committed: Fri Jan 2 01:31:12 2004 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.33: +6 -14 lines
Log Message:
release relays return value

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14     * synchronous queue does not have any internal capacity - in
15     * particular it does not have a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to take it; you cannot add an element (using
18     * any method) unless another thread is trying to remove it; you
19     * cannot iterate as there is nothing to iterate. The <em>head</em>
20     * of the queue is the element that the first queued thread is trying
21     * to add to the queue; if there are no queued threads then no element
22     * is being added and the head is <tt>null</tt>. For purposes of
23     * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 dl 1.19 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 dl 1.18 * queue does not permit <tt>null</tt> elements.
26     *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
33     * of the {@link Collection} and {@link Iterator} interfaces.
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36 dl 1.24 * @param <E> the type of elements held in this collection
37 dl 1.23 */
38 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
39 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
40 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
41 tim 1.1
42 dl 1.2 /*
43     This implementation divides actions into two cases for puts:
44    
45 tim 1.10 * An arriving putter that does not already have a waiting taker
46 dl 1.2 creates a node holding item, and then waits for a taker to take it.
47     * An arriving putter that does already have a waiting taker fills
48     the slot node created by the taker, and notifies it to continue.
49    
50     And symmetrically, two for takes:
51    
52     * An arriving taker that does not already have a waiting putter
53     creates an empty slot node, and then waits for a putter to fill it.
54     * An arriving taker that does already have a waiting putter takes
55     item from the node created by the putter, and notifies it to continue.
56    
57     This requires keeping two simple queues: waitingPuts and waitingTakes.
58 tim 1.10
59 dl 1.2 When a put or take waiting for the actions of its counterpart
60     aborts due to interruption or timeout, it marks the node
61     it created as "CANCELLED", which causes its counterpart to retry
62     the entire put or take sequence.
63     */
64    
65    
66     /*
67     * Note that all fields are transient final, so there is
68     * no explicit serialization code.
69     */
70    
71     private transient final WaitQueue waitingPuts = new WaitQueue();
72     private transient final WaitQueue waitingTakes = new WaitQueue();
73     private transient final ReentrantLock qlock = new ReentrantLock();
74    
75     /**
76     * Nodes each maintain an item and handle waits and signals for
77 dl 1.31 * getting and setting it. The class extends
78     * AbstractQueuedSynchronizer to manage blocking, using AQS state
79     * 0 for waiting, 1 for ack, -1 for cancelled.
80 dl 1.2 */
81 dl 1.31 private static final class Node extends AbstractQueuedSynchronizer {
82 dl 1.6 /** The item being transferred */
83 dl 1.2 Object item;
84 dl 1.6 /** Next node in wait queue */
85 dl 1.2 Node next;
86 dl 1.31 Node(Object x) { item = x; }
87    
88     private static final int WAITING = 0;
89     private static final int ACKED = 1;
90     private static final int CANCELLED = -1;
91    
92     /**
93     * Implements AQS base acquire to succeed if not in WAITING state
94     */
95 dl 1.34 protected boolean tryAcquireExclusiveState(boolean b, int ignore) {
96 dl 1.32 return getState() != WAITING;
97 dl 1.31 }
98    
99     /**
100 dl 1.34 * Implements AQS base release to signal if state changed
101 dl 1.31 */
102 dl 1.34 protected boolean releaseExclusiveState(int newState) {
103     return compareAndSetState(WAITING, newState);
104 dl 1.31 }
105    
106     /**
107     * Try to acknowledge; fail if not waiting
108     */
109     private boolean ack() {
110 dl 1.34 return releaseExclusive(ACKED);
111 dl 1.31 }
112    
113     /**
114     * Try to cancel; fail if not waiting
115     */
116     private boolean cancel() {
117 dl 1.34 return releaseExclusive(CANCELLED);
118 dl 1.31 }
119 dl 1.6
120 dl 1.31 /**
121     * Take item and null out fields (for sake of GC)
122     */
123     private Object extract() {
124     Object x = item;
125     item = null;
126     next = null;
127     return x;
128     }
129 dl 1.2
130     /**
131     * Fill in the slot created by the taker and signal taker to
132     * continue.
133     */
134 dl 1.31 boolean setItem(Object x) {
135     item = x;
136     return ack();
137 dl 1.2 }
138    
139     /**
140     * Remove item from slot created by putter and signal putter
141     * to continue.
142     */
143 dl 1.31 Object getItem() {
144 dl 1.33 return (ack())? extract() : null;
145 dl 1.31 }
146    
147     /**
148 dl 1.33 * Wait for a taker to take item placed by putter or time out.
149 dl 1.31 */
150 dl 1.33 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
151 dl 1.2 try {
152 dl 1.33 if (!timed)
153     acquireExclusiveInterruptibly(0);
154     else if (!acquireExclusiveTimed(0, nanos) && cancel())
155     return false;
156 dl 1.31 return true;
157     } catch (InterruptedException ie) {
158     if (cancel())
159     throw ie;
160     Thread.currentThread().interrupt();
161     return true;
162 dl 1.2 }
163     }
164    
165     /**
166 dl 1.33 * Wait for a putter to put item placed by taker, or time out.
167 dl 1.31 */
168 dl 1.33 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
169 dl 1.31 try {
170 dl 1.33 if (!timed)
171     acquireExclusiveInterruptibly(0);
172     else if (!acquireExclusiveTimed(0, nanos) && cancel())
173     return null;
174 dl 1.31 return extract();
175     } catch (InterruptedException ie) {
176     if (cancel())
177     throw ie;
178     Thread.currentThread().interrupt();
179     return extract();
180 dl 1.2 }
181     }
182    
183     }
184    
185     /**
186     * Simple FIFO queue class to hold waiting puts/takes.
187     **/
188     private static class WaitQueue<E> {
189     Node head;
190     Node last;
191    
192 tim 1.10 Node enq(Object x) {
193 dl 1.2 Node p = new Node(x);
194 tim 1.10 if (last == null)
195 dl 1.2 last = head = p;
196 tim 1.10 else
197 dl 1.2 last = last.next = p;
198     return p;
199     }
200    
201     Node deq() {
202     Node p = head;
203 tim 1.10 if (p != null && (head = p.next) == null)
204 dl 1.2 last = null;
205     return p;
206     }
207     }
208    
209     /**
210     * Main put algorithm, used by put, timed offer
211 tim 1.10 */
212 dl 1.2 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
213 dl 1.6 if (x == null) throw new NullPointerException();
214 tim 1.10 for (;;) {
215 dl 1.2 Node node;
216     boolean mustWait;
217 dl 1.27 final ReentrantLock qlock = this.qlock;
218 dl 1.2 qlock.lockInterruptibly();
219     try {
220     node = waitingTakes.deq();
221     if ( (mustWait = (node == null)) )
222     node = waitingPuts.enq(x);
223 tim 1.14 } finally {
224 dl 1.2 qlock.unlock();
225     }
226    
227 dl 1.31 if (mustWait)
228 dl 1.33 return node.waitForTake(timed, nanos);
229 dl 1.2
230 dl 1.31 else if (node.setItem(x))
231 dl 1.2 return true;
232    
233     // else taker cancelled, so retry
234     }
235 tim 1.1 }
236 dl 1.2
237     /**
238     * Main take algorithm, used by take, timed poll
239 tim 1.10 */
240 dl 1.2 private E doTake(boolean timed, long nanos) throws InterruptedException {
241     for (;;) {
242     Node node;
243     boolean mustWait;
244    
245 dl 1.27 final ReentrantLock qlock = this.qlock;
246 dl 1.2 qlock.lockInterruptibly();
247     try {
248     node = waitingPuts.deq();
249     if ( (mustWait = (node == null)) )
250     node = waitingTakes.enq(null);
251 tim 1.14 } finally {
252 dl 1.2 qlock.unlock();
253     }
254    
255 dl 1.31 if (mustWait) {
256 dl 1.33 Object x = node.waitForPut(timed, nanos);
257 dl 1.31 return (E)x;
258     }
259 dl 1.2 else {
260 dl 1.31 Object x = node.getItem();
261 dl 1.2 if (x != null)
262 dl 1.31 return (E)x;
263 dl 1.2 // else cancelled, so retry
264     }
265     }
266 tim 1.1 }
267 dl 1.2
268 dholmes 1.11 /**
269 tim 1.13 * Creates a <tt>SynchronousQueue</tt>.
270 dholmes 1.11 */
271 dl 1.2 public SynchronousQueue() {}
272    
273    
274 dholmes 1.11 /**
275     * Adds the specified element to this queue, waiting if necessary for
276     * another thread to receive it.
277 dl 1.18 * @param o the element to add
278     * @throws InterruptedException if interrupted while waiting.
279     * @throws NullPointerException if the specified element is <tt>null</tt>.
280 dholmes 1.11 */
281     public void put(E o) throws InterruptedException {
282     doPut(o, false, 0);
283 tim 1.1 }
284    
285 dholmes 1.11 /**
286 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
287 dl 1.18 * up to the specified wait time for another thread to receive it.
288     * @param o the element to add
289     * @param timeout how long to wait before giving up, in units of
290     * <tt>unit</tt>
291     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
292     * <tt>timeout</tt> parameter
293 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
294     * the specified waiting time elapses before a taker appears.
295 dl 1.18 * @throws InterruptedException if interrupted while waiting.
296     * @throws NullPointerException if the specified element is <tt>null</tt>.
297 dholmes 1.11 */
298 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
299     return doPut(o, true, unit.toNanos(timeout));
300 tim 1.1 }
301    
302 dl 1.2
303 dholmes 1.11 /**
304     * Retrieves and removes the head of this queue, waiting if necessary
305     * for another thread to insert it.
306     * @return the head of this queue
307     */
308 dl 1.2 public E take() throws InterruptedException {
309     return doTake(false, 0);
310 tim 1.1 }
311 dl 1.2
312 dholmes 1.11 /**
313     * Retrieves and removes the head of this queue, waiting
314     * if necessary up to the specified wait time, for another thread
315     * to insert it.
316 dl 1.18 * @param timeout how long to wait before giving up, in units of
317     * <tt>unit</tt>
318     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
319     * <tt>timeout</tt> parameter
320     * @return the head of this queue, or <tt>null</tt> if the
321     * specified waiting time elapses before an element is present.
322     * @throws InterruptedException if interrupted while waiting.
323 dholmes 1.11 */
324 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
325     return doTake(true, unit.toNanos(timeout));
326 tim 1.1 }
327 dl 1.2
328     // Untimed nonblocking versions
329    
330 dl 1.18 /**
331 dl 1.20 * Inserts the specified element into this queue, if another thread is
332 dl 1.18 * waiting to receive it.
333     *
334     * @param o the element to add.
335     * @return <tt>true</tt> if it was possible to add the element to
336     * this queue, else <tt>false</tt>
337     * @throws NullPointerException if the specified element is <tt>null</tt>
338     */
339 dholmes 1.11 public boolean offer(E o) {
340     if (o == null) throw new NullPointerException();
341 dl 1.27 final ReentrantLock qlock = this.qlock;
342 tim 1.10
343     for (;;) {
344 tim 1.26 Node node;
345 dl 1.2 qlock.lock();
346     try {
347     node = waitingTakes.deq();
348 tim 1.14 } finally {
349 dl 1.2 qlock.unlock();
350     }
351     if (node == null)
352     return false;
353 tim 1.10
354 dl 1.31 else if (node.setItem(o))
355 dl 1.2 return true;
356     // else retry
357     }
358 tim 1.1 }
359 dl 1.2
360 dl 1.18 /**
361     * Retrieves and removes the head of this queue, if another thread
362     * is currently making an element available.
363     *
364     * @return the head of this queue, or <tt>null</tt> if no
365     * element is available.
366     */
367 dl 1.2 public E poll() {
368 dl 1.27 final ReentrantLock qlock = this.qlock;
369 dl 1.2 for (;;) {
370     Node node;
371     qlock.lock();
372     try {
373     node = waitingPuts.deq();
374 tim 1.14 } finally {
375 dl 1.2 qlock.unlock();
376     }
377     if (node == null)
378     return null;
379    
380     else {
381 dl 1.31 Object x = node.getItem();
382 dl 1.2 if (x != null)
383     return (E)x;
384     // else retry
385     }
386     }
387 tim 1.1 }
388 dl 1.2
389 dl 1.5 /**
390 dholmes 1.11 * Always returns <tt>true</tt>.
391     * A <tt>SynchronousQueue</tt> has no internal capacity.
392     * @return <tt>true</tt>
393 dl 1.5 */
394     public boolean isEmpty() {
395     return true;
396     }
397    
398     /**
399 dholmes 1.11 * Always returns zero.
400     * A <tt>SynchronousQueue</tt> has no internal capacity.
401 dl 1.5 * @return zero.
402     */
403     public int size() {
404     return 0;
405 tim 1.1 }
406 dl 1.2
407 dl 1.5 /**
408 dholmes 1.11 * Always returns zero.
409     * A <tt>SynchronousQueue</tt> has no internal capacity.
410 dl 1.5 * @return zero.
411     */
412     public int remainingCapacity() {
413     return 0;
414     }
415    
416     /**
417 dholmes 1.11 * Does nothing.
418     * A <tt>SynchronousQueue</tt> has no internal capacity.
419     */
420     public void clear() {}
421    
422     /**
423     * Always returns <tt>false</tt>.
424     * A <tt>SynchronousQueue</tt> has no internal capacity.
425 dl 1.18 * @param o the element
426 dholmes 1.11 * @return <tt>false</tt>
427     */
428     public boolean contains(Object o) {
429     return false;
430     }
431    
432     /**
433 dl 1.18 * Always returns <tt>false</tt>.
434     * A <tt>SynchronousQueue</tt> has no internal capacity.
435     *
436     * @param o the element to remove
437     * @return <tt>false</tt>
438     */
439     public boolean remove(Object o) {
440     return false;
441     }
442    
443     /**
444 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
445 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
446 dl 1.18 * @param c the collection
447 dl 1.16 * @return <tt>false</tt> unless given collection is empty
448 dholmes 1.11 */
449 dl 1.12 public boolean containsAll(Collection<?> c) {
450 dl 1.16 return c.isEmpty();
451 dholmes 1.11 }
452    
453     /**
454     * Always returns <tt>false</tt>.
455     * A <tt>SynchronousQueue</tt> has no internal capacity.
456 dl 1.18 * @param c the collection
457 dholmes 1.11 * @return <tt>false</tt>
458     */
459 dl 1.12 public boolean removeAll(Collection<?> c) {
460 dholmes 1.11 return false;
461     }
462    
463     /**
464     * Always returns <tt>false</tt>.
465     * A <tt>SynchronousQueue</tt> has no internal capacity.
466 dl 1.18 * @param c the collection
467 dholmes 1.11 * @return <tt>false</tt>
468     */
469 dl 1.12 public boolean retainAll(Collection<?> c) {
470 dholmes 1.11 return false;
471     }
472    
473     /**
474     * Always returns <tt>null</tt>.
475     * A <tt>SynchronousQueue</tt> does not return elements
476 dl 1.5 * unless actively waited on.
477 dholmes 1.11 * @return <tt>null</tt>
478 dl 1.5 */
479     public E peek() {
480     return null;
481     }
482    
483    
484     static class EmptyIterator<E> implements Iterator<E> {
485 dl 1.2 public boolean hasNext() {
486     return false;
487     }
488     public E next() {
489     throw new NoSuchElementException();
490     }
491     public void remove() {
492 dl 1.17 throw new IllegalStateException();
493 dl 1.2 }
494 tim 1.1 }
495 dl 1.2
496 dl 1.5 /**
497 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
498 tim 1.13 * <tt>false</tt>.
499     *
500 dholmes 1.11 * @return an empty iterator
501 dl 1.5 */
502 dl 1.2 public Iterator<E> iterator() {
503 dl 1.5 return new EmptyIterator<E>();
504 tim 1.1 }
505    
506 dl 1.2
507 dl 1.5 /**
508 dholmes 1.11 * Returns a zero-length array.
509     * @return a zero-length array
510 dl 1.5 */
511 dl 1.3 public Object[] toArray() {
512 dl 1.25 return new Object[0];
513 tim 1.1 }
514    
515 dholmes 1.11 /**
516     * Sets the zeroeth element of the specified array to <tt>null</tt>
517     * (if the array has non-zero length) and returns it.
518     * @return the specified array
519     */
520 dl 1.2 public <T> T[] toArray(T[] a) {
521     if (a.length > 0)
522     a[0] = null;
523     return a;
524     }
525 dl 1.21
526    
527     public int drainTo(Collection<? super E> c) {
528     if (c == null)
529     throw new NullPointerException();
530     if (c == this)
531     throw new IllegalArgumentException();
532     int n = 0;
533     E e;
534     while ( (e = poll()) != null) {
535     c.add(e);
536     ++n;
537     }
538     return n;
539     }
540    
541     public int drainTo(Collection<? super E> c, int maxElements) {
542     if (c == null)
543     throw new NullPointerException();
544     if (c == this)
545     throw new IllegalArgumentException();
546     int n = 0;
547     E e;
548     while (n < maxElements && (e = poll()) != null) {
549     c.add(e);
550     ++n;
551     }
552     return n;
553     }
554 tim 1.1 }
555 dholmes 1.11
556    
557    
558    
559