ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.31
Committed: Tue Dec 30 23:55:43 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.30: +112 -111 lines
Log Message:
More responsive cancellation

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14     * synchronous queue does not have any internal capacity - in
15     * particular it does not have a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to take it; you cannot add an element (using
18     * any method) unless another thread is trying to remove it; you
19     * cannot iterate as there is nothing to iterate. The <em>head</em>
20     * of the queue is the element that the first queued thread is trying
21     * to add to the queue; if there are no queued threads then no element
22     * is being added and the head is <tt>null</tt>. For purposes of
23     * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 dl 1.19 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 dl 1.18 * queue does not permit <tt>null</tt> elements.
26     *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
33     * of the {@link Collection} and {@link Iterator} interfaces.
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36 dl 1.24 * @param <E> the type of elements held in this collection
37 dl 1.23 */
38 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
39 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
40 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
41 tim 1.1
42 dl 1.2 /*
43     This implementation divides actions into two cases for puts:
44    
45 tim 1.10 * An arriving putter that does not already have a waiting taker
46 dl 1.2 creates a node holding item, and then waits for a taker to take it.
47     * An arriving putter that does already have a waiting taker fills
48     the slot node created by the taker, and notifies it to continue.
49    
50     And symmetrically, two for takes:
51    
52     * An arriving taker that does not already have a waiting putter
53     creates an empty slot node, and then waits for a putter to fill it.
54     * An arriving taker that does already have a waiting putter takes
55     item from the node created by the putter, and notifies it to continue.
56    
57     This requires keeping two simple queues: waitingPuts and waitingTakes.
58 tim 1.10
59 dl 1.2 When a put or take waiting for the actions of its counterpart
60     aborts due to interruption or timeout, it marks the node
61     it created as "CANCELLED", which causes its counterpart to retry
62     the entire put or take sequence.
63     */
64    
65    
66     /*
67     * Note that all fields are transient final, so there is
68     * no explicit serialization code.
69     */
70    
71     private transient final WaitQueue waitingPuts = new WaitQueue();
72     private transient final WaitQueue waitingTakes = new WaitQueue();
73     private transient final ReentrantLock qlock = new ReentrantLock();
74    
75     /**
76     * Nodes each maintain an item and handle waits and signals for
77 dl 1.31 * getting and setting it. The class extends
78     * AbstractQueuedSynchronizer to manage blocking, using AQS state
79     * 0 for waiting, 1 for ack, -1 for cancelled.
80 dl 1.2 */
81 dl 1.31 private static final class Node extends AbstractQueuedSynchronizer {
82 dl 1.6 /** The item being transferred */
83 dl 1.2 Object item;
84 dl 1.6 /** Next node in wait queue */
85 dl 1.2 Node next;
86 dl 1.31 Node(Object x) { item = x; }
87    
88     private static final int WAITING = 0;
89     private static final int ACKED = 1;
90     private static final int CANCELLED = -1;
91    
92     /**
93     * Implements AQS base acquire to succeed if not in WAITING state
94     */
95     public int acquireExclusiveState(boolean b, int ignore) {
96     return get() == WAITING ? -1 : 0;
97     }
98    
99     /**
100     * Implements AQS base release to always signal.
101     * Status is changed in ack or cancel methods before calling,
102     * which is needed to ensure we win cancel race.
103     */
104     public boolean releaseExclusiveState(int ignore) {
105     return true;
106     }
107    
108     /**
109     * Try to acknowledge; fail if not waiting
110     */
111     private boolean ack() {
112     if (!compareAndSet(WAITING, ACKED))
113     return false;
114     releaseExclusive(0);
115     return true;
116     }
117    
118     /**
119     * Try to cancel; fail if not waiting
120     */
121     private boolean cancel() {
122     if (!compareAndSet(WAITING, CANCELLED))
123     return false;
124     releaseExclusive(0);
125     return true;
126     }
127 dl 1.6
128 dl 1.31 /**
129     * Take item and null out fields (for sake of GC)
130     */
131     private Object extract() {
132     Object x = item;
133     item = null;
134     next = null;
135     return x;
136     }
137 dl 1.2
138     /**
139     * Fill in the slot created by the taker and signal taker to
140     * continue.
141     */
142 dl 1.31 boolean setItem(Object x) {
143     item = x;
144     return ack();
145 dl 1.2 }
146    
147     /**
148     * Remove item from slot created by putter and signal putter
149     * to continue.
150     */
151 dl 1.31 Object getItem() {
152     if (!ack())
153     return null;
154     return extract();
155     }
156    
157     /**
158     * Wait for a taker to take item placed by putter.
159     */
160     boolean waitForTake() throws InterruptedException {
161 dl 1.2 try {
162 dl 1.31 acquireExclusiveInterruptibly(0);
163     return true;
164     } catch (InterruptedException ie) {
165     if (cancel())
166     throw ie;
167     Thread.currentThread().interrupt();
168     return true;
169 dl 1.2 }
170     }
171    
172     /**
173     * Wait for a taker to take item placed by putter, or time out.
174     */
175 dl 1.31 boolean waitForTake(long nanos) throws InterruptedException {
176 dl 1.2 try {
177 dl 1.31 return acquireExclusiveTimed(0, nanos) || !cancel();
178 tim 1.14 } catch (InterruptedException ie) {
179 dl 1.31 if (cancel())
180 dl 1.9 throw ie;
181 dl 1.31 Thread.currentThread().interrupt();
182     return true;
183     }
184     }
185    
186     /**
187     * Wait for a putter to put item placed by taker.
188     */
189     Object waitForPut() throws InterruptedException {
190     try {
191     acquireExclusiveInterruptibly(0);
192     return extract();
193     } catch (InterruptedException ie) {
194     if (cancel())
195     throw ie;
196     Thread.currentThread().interrupt();
197     return extract();
198 dl 1.2 }
199     }
200    
201     /**
202     * Wait for a putter to put item placed by taker, or time out.
203     */
204 dl 1.31 Object waitForPut(long nanos) throws InterruptedException {
205 dl 1.2 try {
206 dl 1.31 if (acquireExclusiveTimed(0, nanos) || !cancel())
207     return extract();
208     return null;
209 tim 1.14 } catch (InterruptedException ie) {
210 dl 1.31 if (cancel())
211 dl 1.9 throw ie;
212 dl 1.31 Thread.currentThread().interrupt();
213     return extract();
214 dl 1.2 }
215     }
216     }
217    
218     /**
219     * Simple FIFO queue class to hold waiting puts/takes.
220     **/
221     private static class WaitQueue<E> {
222     Node head;
223     Node last;
224    
225 tim 1.10 Node enq(Object x) {
226 dl 1.2 Node p = new Node(x);
227 tim 1.10 if (last == null)
228 dl 1.2 last = head = p;
229 tim 1.10 else
230 dl 1.2 last = last.next = p;
231     return p;
232     }
233    
234     Node deq() {
235     Node p = head;
236 tim 1.10 if (p != null && (head = p.next) == null)
237 dl 1.2 last = null;
238     return p;
239     }
240     }
241    
242     /**
243     * Main put algorithm, used by put, timed offer
244 tim 1.10 */
245 dl 1.2 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
246 dl 1.6 if (x == null) throw new NullPointerException();
247 tim 1.10 for (;;) {
248 dl 1.2 Node node;
249     boolean mustWait;
250 dl 1.27 final ReentrantLock qlock = this.qlock;
251 dl 1.2 qlock.lockInterruptibly();
252     try {
253     node = waitingTakes.deq();
254     if ( (mustWait = (node == null)) )
255     node = waitingPuts.enq(x);
256 tim 1.14 } finally {
257 dl 1.2 qlock.unlock();
258     }
259    
260 dl 1.31 if (mustWait)
261     return timed? node.waitForTake(nanos) : node.waitForTake();
262 dl 1.2
263 dl 1.31 else if (node.setItem(x))
264 dl 1.2 return true;
265    
266     // else taker cancelled, so retry
267     }
268 tim 1.1 }
269 dl 1.2
270     /**
271     * Main take algorithm, used by take, timed poll
272 tim 1.10 */
273 dl 1.2 private E doTake(boolean timed, long nanos) throws InterruptedException {
274     for (;;) {
275     Node node;
276     boolean mustWait;
277    
278 dl 1.27 final ReentrantLock qlock = this.qlock;
279 dl 1.2 qlock.lockInterruptibly();
280     try {
281     node = waitingPuts.deq();
282     if ( (mustWait = (node == null)) )
283     node = waitingTakes.enq(null);
284 tim 1.14 } finally {
285 dl 1.2 qlock.unlock();
286     }
287    
288 dl 1.31 if (mustWait) {
289     Object x = timed? node.waitForPut(nanos) : node.waitForPut();
290     return (E)x;
291     }
292 dl 1.2 else {
293 dl 1.31 Object x = node.getItem();
294 dl 1.2 if (x != null)
295 dl 1.31 return (E)x;
296 dl 1.2 // else cancelled, so retry
297     }
298     }
299 tim 1.1 }
300 dl 1.2
301 dholmes 1.11 /**
302 tim 1.13 * Creates a <tt>SynchronousQueue</tt>.
303 dholmes 1.11 */
304 dl 1.2 public SynchronousQueue() {}
305    
306    
307 dholmes 1.11 /**
308     * Adds the specified element to this queue, waiting if necessary for
309     * another thread to receive it.
310 dl 1.18 * @param o the element to add
311     * @throws InterruptedException if interrupted while waiting.
312     * @throws NullPointerException if the specified element is <tt>null</tt>.
313 dholmes 1.11 */
314     public void put(E o) throws InterruptedException {
315     doPut(o, false, 0);
316 tim 1.1 }
317    
318 dholmes 1.11 /**
319 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
320 dl 1.18 * up to the specified wait time for another thread to receive it.
321     * @param o the element to add
322     * @param timeout how long to wait before giving up, in units of
323     * <tt>unit</tt>
324     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
325     * <tt>timeout</tt> parameter
326 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
327     * the specified waiting time elapses before a taker appears.
328 dl 1.18 * @throws InterruptedException if interrupted while waiting.
329     * @throws NullPointerException if the specified element is <tt>null</tt>.
330 dholmes 1.11 */
331 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
332     return doPut(o, true, unit.toNanos(timeout));
333 tim 1.1 }
334    
335 dl 1.2
336 dholmes 1.11 /**
337     * Retrieves and removes the head of this queue, waiting if necessary
338     * for another thread to insert it.
339     * @return the head of this queue
340     */
341 dl 1.2 public E take() throws InterruptedException {
342     return doTake(false, 0);
343 tim 1.1 }
344 dl 1.2
345 dholmes 1.11 /**
346     * Retrieves and removes the head of this queue, waiting
347     * if necessary up to the specified wait time, for another thread
348     * to insert it.
349 dl 1.18 * @param timeout how long to wait before giving up, in units of
350     * <tt>unit</tt>
351     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
352     * <tt>timeout</tt> parameter
353     * @return the head of this queue, or <tt>null</tt> if the
354     * specified waiting time elapses before an element is present.
355     * @throws InterruptedException if interrupted while waiting.
356 dholmes 1.11 */
357 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
358     return doTake(true, unit.toNanos(timeout));
359 tim 1.1 }
360 dl 1.2
361     // Untimed nonblocking versions
362    
363 dl 1.18 /**
364 dl 1.20 * Inserts the specified element into this queue, if another thread is
365 dl 1.18 * waiting to receive it.
366     *
367     * @param o the element to add.
368     * @return <tt>true</tt> if it was possible to add the element to
369     * this queue, else <tt>false</tt>
370     * @throws NullPointerException if the specified element is <tt>null</tt>
371     */
372 dholmes 1.11 public boolean offer(E o) {
373     if (o == null) throw new NullPointerException();
374 dl 1.27 final ReentrantLock qlock = this.qlock;
375 tim 1.10
376     for (;;) {
377 tim 1.26 Node node;
378 dl 1.2 qlock.lock();
379     try {
380     node = waitingTakes.deq();
381 tim 1.14 } finally {
382 dl 1.2 qlock.unlock();
383     }
384     if (node == null)
385     return false;
386 tim 1.10
387 dl 1.31 else if (node.setItem(o))
388 dl 1.2 return true;
389     // else retry
390     }
391 tim 1.1 }
392 dl 1.2
393 dl 1.18 /**
394     * Retrieves and removes the head of this queue, if another thread
395     * is currently making an element available.
396     *
397     * @return the head of this queue, or <tt>null</tt> if no
398     * element is available.
399     */
400 dl 1.2 public E poll() {
401 dl 1.27 final ReentrantLock qlock = this.qlock;
402 dl 1.2 for (;;) {
403     Node node;
404     qlock.lock();
405     try {
406     node = waitingPuts.deq();
407 tim 1.14 } finally {
408 dl 1.2 qlock.unlock();
409     }
410     if (node == null)
411     return null;
412    
413     else {
414 dl 1.31 Object x = node.getItem();
415 dl 1.2 if (x != null)
416     return (E)x;
417     // else retry
418     }
419     }
420 tim 1.1 }
421 dl 1.2
422 dl 1.5 /**
423 dholmes 1.11 * Always returns <tt>true</tt>.
424     * A <tt>SynchronousQueue</tt> has no internal capacity.
425     * @return <tt>true</tt>
426 dl 1.5 */
427     public boolean isEmpty() {
428     return true;
429     }
430    
431     /**
432 dholmes 1.11 * Always returns zero.
433     * A <tt>SynchronousQueue</tt> has no internal capacity.
434 dl 1.5 * @return zero.
435     */
436     public int size() {
437     return 0;
438 tim 1.1 }
439 dl 1.2
440 dl 1.5 /**
441 dholmes 1.11 * Always returns zero.
442     * A <tt>SynchronousQueue</tt> has no internal capacity.
443 dl 1.5 * @return zero.
444     */
445     public int remainingCapacity() {
446     return 0;
447     }
448    
449     /**
450 dholmes 1.11 * Does nothing.
451     * A <tt>SynchronousQueue</tt> has no internal capacity.
452     */
453     public void clear() {}
454    
455     /**
456     * Always returns <tt>false</tt>.
457     * A <tt>SynchronousQueue</tt> has no internal capacity.
458 dl 1.18 * @param o the element
459 dholmes 1.11 * @return <tt>false</tt>
460     */
461     public boolean contains(Object o) {
462     return false;
463     }
464    
465     /**
466 dl 1.18 * Always returns <tt>false</tt>.
467     * A <tt>SynchronousQueue</tt> has no internal capacity.
468     *
469     * @param o the element to remove
470     * @return <tt>false</tt>
471     */
472     public boolean remove(Object o) {
473     return false;
474     }
475    
476     /**
477 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
478 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
479 dl 1.18 * @param c the collection
480 dl 1.16 * @return <tt>false</tt> unless given collection is empty
481 dholmes 1.11 */
482 dl 1.12 public boolean containsAll(Collection<?> c) {
483 dl 1.16 return c.isEmpty();
484 dholmes 1.11 }
485    
486     /**
487     * Always returns <tt>false</tt>.
488     * A <tt>SynchronousQueue</tt> has no internal capacity.
489 dl 1.18 * @param c the collection
490 dholmes 1.11 * @return <tt>false</tt>
491     */
492 dl 1.12 public boolean removeAll(Collection<?> c) {
493 dholmes 1.11 return false;
494     }
495    
496     /**
497     * Always returns <tt>false</tt>.
498     * A <tt>SynchronousQueue</tt> has no internal capacity.
499 dl 1.18 * @param c the collection
500 dholmes 1.11 * @return <tt>false</tt>
501     */
502 dl 1.12 public boolean retainAll(Collection<?> c) {
503 dholmes 1.11 return false;
504     }
505    
506     /**
507     * Always returns <tt>null</tt>.
508     * A <tt>SynchronousQueue</tt> does not return elements
509 dl 1.5 * unless actively waited on.
510 dholmes 1.11 * @return <tt>null</tt>
511 dl 1.5 */
512     public E peek() {
513     return null;
514     }
515    
516    
517     static class EmptyIterator<E> implements Iterator<E> {
518 dl 1.2 public boolean hasNext() {
519     return false;
520     }
521     public E next() {
522     throw new NoSuchElementException();
523     }
524     public void remove() {
525 dl 1.17 throw new IllegalStateException();
526 dl 1.2 }
527 tim 1.1 }
528 dl 1.2
529 dl 1.5 /**
530 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
531 tim 1.13 * <tt>false</tt>.
532     *
533 dholmes 1.11 * @return an empty iterator
534 dl 1.5 */
535 dl 1.2 public Iterator<E> iterator() {
536 dl 1.5 return new EmptyIterator<E>();
537 tim 1.1 }
538    
539 dl 1.2
540 dl 1.5 /**
541 dholmes 1.11 * Returns a zero-length array.
542     * @return a zero-length array
543 dl 1.5 */
544 dl 1.3 public Object[] toArray() {
545 dl 1.25 return new Object[0];
546 tim 1.1 }
547    
548 dholmes 1.11 /**
549     * Sets the zeroeth element of the specified array to <tt>null</tt>
550     * (if the array has non-zero length) and returns it.
551     * @return the specified array
552     */
553 dl 1.2 public <T> T[] toArray(T[] a) {
554     if (a.length > 0)
555     a[0] = null;
556     return a;
557     }
558 dl 1.21
559    
560     public int drainTo(Collection<? super E> c) {
561     if (c == null)
562     throw new NullPointerException();
563     if (c == this)
564     throw new IllegalArgumentException();
565     int n = 0;
566     E e;
567     while ( (e = poll()) != null) {
568     c.add(e);
569     ++n;
570     }
571     return n;
572     }
573    
574     public int drainTo(Collection<? super E> c, int maxElements) {
575     if (c == null)
576     throw new NullPointerException();
577     if (c == this)
578     throw new IllegalArgumentException();
579     int n = 0;
580     E e;
581     while (n < maxElements && (e = poll()) != null) {
582     c.add(e);
583     ++n;
584     }
585     return n;
586     }
587 tim 1.1 }
588 dholmes 1.11
589    
590    
591    
592