ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.35
Committed: Fri Jan 2 21:02:32 2004 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.34: +157 -104 lines
Log Message:
Avoid timeout problems in fair modes; improve AQS method names

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14     * synchronous queue does not have any internal capacity - in
15     * particular it does not have a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to take it; you cannot add an element (using
18     * any method) unless another thread is trying to remove it; you
19     * cannot iterate as there is nothing to iterate. The <em>head</em>
20     * of the queue is the element that the first queued thread is trying
21     * to add to the queue; if there are no queued threads then no element
22     * is being added and the head is <tt>null</tt>. For purposes of
23     * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 dl 1.19 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 dl 1.18 * queue does not permit <tt>null</tt> elements.
26     *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
33     * of the {@link Collection} and {@link Iterator} interfaces.
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36 dl 1.24 * @param <E> the type of elements held in this collection
37 dl 1.23 */
38 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
39 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
40 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
41 tim 1.1
42 dl 1.2 /*
43     This implementation divides actions into two cases for puts:
44    
45 tim 1.10 * An arriving putter that does not already have a waiting taker
46 dl 1.2 creates a node holding item, and then waits for a taker to take it.
47     * An arriving putter that does already have a waiting taker fills
48     the slot node created by the taker, and notifies it to continue.
49    
50     And symmetrically, two for takes:
51    
52     * An arriving taker that does not already have a waiting putter
53     creates an empty slot node, and then waits for a putter to fill it.
54     * An arriving taker that does already have a waiting putter takes
55     item from the node created by the putter, and notifies it to continue.
56    
57     This requires keeping two simple queues: waitingPuts and waitingTakes.
58 tim 1.10
59 dl 1.2 When a put or take waiting for the actions of its counterpart
60     aborts due to interruption or timeout, it marks the node
61     it created as "CANCELLED", which causes its counterpart to retry
62     the entire put or take sequence.
63     */
64    
65    
66     /*
67     * Note that all fields are transient final, so there is
68     * no explicit serialization code.
69     */
70    
71     private transient final WaitQueue waitingPuts = new WaitQueue();
72     private transient final WaitQueue waitingTakes = new WaitQueue();
73     private transient final ReentrantLock qlock = new ReentrantLock();
74    
75     /**
76     * Nodes each maintain an item and handle waits and signals for
77 dl 1.31 * getting and setting it. The class extends
78     * AbstractQueuedSynchronizer to manage blocking, using AQS state
79     * 0 for waiting, 1 for ack, -1 for cancelled.
80 dl 1.2 */
81 dl 1.31 private static final class Node extends AbstractQueuedSynchronizer {
82 dl 1.35 /** Synchronization state value representing that node acked */
83     private static final int ACK = 1;
84     /** Synchronization state value representing that node cancelled */
85     private static final int CANCEL = -1;
86    
87 dl 1.6 /** The item being transferred */
88 dl 1.2 Object item;
89 dl 1.6 /** Next node in wait queue */
90 dl 1.2 Node next;
91 dl 1.35
92     /** Create node with initial item */
93 dl 1.31 Node(Object x) { item = x; }
94    
95     /**
96     * Implements AQS base acquire to succeed if not in WAITING state
97     */
98 dl 1.35 protected boolean tryAcquireExclusive(boolean b, int ignore) {
99     return getState() != 0;
100 dl 1.31 }
101    
102     /**
103 dl 1.34 * Implements AQS base release to signal if state changed
104 dl 1.31 */
105 dl 1.35 protected boolean tryReleaseExclusive(int newState) {
106     return compareAndSetState(0, newState);
107 dl 1.31 }
108    
109     /**
110 dl 1.35 * Take item and null out field (for sake of GC)
111 dl 1.31 */
112 dl 1.35 private Object extract() {
113     Object x = item;
114     item = null;
115     return x;
116 dl 1.31 }
117    
118     /**
119 dl 1.35 * Try to cancel on interrupt; if so rethrowing,
120     * else setting interrupt state
121 dl 1.31 */
122 dl 1.35 private void checkCancellationOnInterrupt(InterruptedException ie)
123     throws InterruptedException {
124     if (releaseExclusive(CANCEL))
125     throw ie;
126     Thread.currentThread().interrupt();
127 dl 1.31 }
128 dl 1.2
129     /**
130     * Fill in the slot created by the taker and signal taker to
131     * continue.
132     */
133 dl 1.31 boolean setItem(Object x) {
134 dl 1.35 item = x; // can place in slot even if cancelled
135     return releaseExclusive(ACK);
136 dl 1.2 }
137    
138     /**
139     * Remove item from slot created by putter and signal putter
140     * to continue.
141     */
142 dl 1.31 Object getItem() {
143 dl 1.35 return (releaseExclusive(ACK))? extract() : null;
144     }
145    
146     /**
147     * Wait for a taker to take item placed by putter.
148     */
149     void waitForTake() throws InterruptedException {
150     try {
151     acquireExclusiveInterruptibly(0);
152     } catch (InterruptedException ie) {
153     checkCancellationOnInterrupt(ie);
154     }
155     }
156    
157     /**
158     * Wait for a putter to put item placed by taker.
159     */
160     Object waitForPut() throws InterruptedException {
161     try {
162     acquireExclusiveInterruptibly(0);
163     } catch (InterruptedException ie) {
164     checkCancellationOnInterrupt(ie);
165     }
166     return extract();
167 dl 1.31 }
168    
169     /**
170 dl 1.33 * Wait for a taker to take item placed by putter or time out.
171 dl 1.31 */
172 dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
173 dl 1.2 try {
174 dl 1.35 if (!acquireExclusiveTimed(0, nanos) &&
175     releaseExclusive(CANCEL))
176 dl 1.33 return false;
177 dl 1.31 } catch (InterruptedException ie) {
178 dl 1.35 checkCancellationOnInterrupt(ie);
179 dl 1.2 }
180 dl 1.35 return true;
181 dl 1.2 }
182    
183     /**
184 dl 1.33 * Wait for a putter to put item placed by taker, or time out.
185 dl 1.31 */
186 dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
187 dl 1.31 try {
188 dl 1.35 if (!acquireExclusiveTimed(0, nanos) &&
189     releaseExclusive(CANCEL))
190 dl 1.33 return null;
191 dl 1.31 } catch (InterruptedException ie) {
192 dl 1.35 checkCancellationOnInterrupt(ie);
193 dl 1.2 }
194 dl 1.35 return extract();
195 dl 1.2 }
196    
197     }
198    
199     /**
200     * Simple FIFO queue class to hold waiting puts/takes.
201     **/
202     private static class WaitQueue<E> {
203     Node head;
204     Node last;
205    
206 tim 1.10 Node enq(Object x) {
207 dl 1.2 Node p = new Node(x);
208 tim 1.10 if (last == null)
209 dl 1.2 last = head = p;
210 tim 1.10 else
211 dl 1.2 last = last.next = p;
212     return p;
213     }
214    
215     Node deq() {
216     Node p = head;
217 dl 1.35 if (p != null) {
218     if ((head = p.next) == null)
219     last = null;
220     p.next = null;
221     }
222 dl 1.2 return p;
223     }
224     }
225    
226     /**
227 dl 1.35 * Creates a <tt>SynchronousQueue</tt>.
228 tim 1.10 */
229 dl 1.35 public SynchronousQueue() {}
230 dl 1.2
231    
232     /**
233 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
234     * another thread to receive it.
235     * @param o the element to add
236     * @throws InterruptedException if interrupted while waiting.
237     * @throws NullPointerException if the specified element is <tt>null</tt>.
238 tim 1.10 */
239 dl 1.35 public void put(E o) throws InterruptedException {
240     if (o == null) throw new NullPointerException();
241     final ReentrantLock qlock = this.qlock;
242    
243 dl 1.2 for (;;) {
244     Node node;
245     boolean mustWait;
246     qlock.lockInterruptibly();
247     try {
248 dl 1.35 node = waitingTakes.deq();
249 dl 1.2 if ( (mustWait = (node == null)) )
250 dl 1.35 node = waitingPuts.enq(o);
251 tim 1.14 } finally {
252 dl 1.2 qlock.unlock();
253     }
254    
255 dl 1.31 if (mustWait) {
256 dl 1.35 node.waitForTake();
257     return;
258 dl 1.2 }
259    
260 dl 1.35 else if (node.setItem(o))
261     return;
262 dl 1.2
263 dl 1.35 // else taker cancelled, so retry
264     }
265 tim 1.1 }
266    
267 dholmes 1.11 /**
268 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
269 dl 1.18 * up to the specified wait time for another thread to receive it.
270     * @param o the element to add
271     * @param timeout how long to wait before giving up, in units of
272     * <tt>unit</tt>
273     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
274     * <tt>timeout</tt> parameter
275 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
276     * the specified waiting time elapses before a taker appears.
277 dl 1.18 * @throws InterruptedException if interrupted while waiting.
278     * @throws NullPointerException if the specified element is <tt>null</tt>.
279 dholmes 1.11 */
280 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
281 dl 1.35 if (o == null) throw new NullPointerException();
282     long nanos = unit.toNanos(timeout);
283     final ReentrantLock qlock = this.qlock;
284     for (;;) {
285     Node node;
286     boolean mustWait;
287     qlock.lockInterruptibly();
288     try {
289     node = waitingTakes.deq();
290     if ( (mustWait = (node == null)) )
291     node = waitingPuts.enq(o);
292     } finally {
293     qlock.unlock();
294     }
295    
296     if (mustWait)
297     return node.waitForTake(nanos);
298    
299     else if (node.setItem(o))
300     return true;
301    
302     // else taker cancelled, so retry
303     }
304    
305 tim 1.1 }
306    
307 dl 1.2
308 dholmes 1.11 /**
309     * Retrieves and removes the head of this queue, waiting if necessary
310     * for another thread to insert it.
311     * @return the head of this queue
312     */
313 dl 1.2 public E take() throws InterruptedException {
314 dl 1.35 final ReentrantLock qlock = this.qlock;
315     for (;;) {
316     Node node;
317     boolean mustWait;
318    
319     qlock.lockInterruptibly();
320     try {
321     node = waitingPuts.deq();
322     if ( (mustWait = (node == null)) )
323     node = waitingTakes.enq(null);
324     } finally {
325     qlock.unlock();
326     }
327    
328     if (mustWait)
329     return (E)node.waitForPut();
330    
331     else {
332     Object x = node.getItem();
333     if (x != null)
334     return (E)x;
335     // else cancelled, so retry
336     }
337     }
338 tim 1.1 }
339 dl 1.2
340 dholmes 1.11 /**
341     * Retrieves and removes the head of this queue, waiting
342     * if necessary up to the specified wait time, for another thread
343     * to insert it.
344 dl 1.18 * @param timeout how long to wait before giving up, in units of
345     * <tt>unit</tt>
346     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
347     * <tt>timeout</tt> parameter
348     * @return the head of this queue, or <tt>null</tt> if the
349     * specified waiting time elapses before an element is present.
350     * @throws InterruptedException if interrupted while waiting.
351 dholmes 1.11 */
352 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
353 dl 1.35 long nanos = unit.toNanos(timeout);
354     final ReentrantLock qlock = this.qlock;
355    
356     for (;;) {
357     Node node;
358     boolean mustWait;
359    
360     qlock.lockInterruptibly();
361     try {
362     node = waitingPuts.deq();
363     if ( (mustWait = (node == null)) )
364     node = waitingTakes.enq(null);
365     } finally {
366     qlock.unlock();
367     }
368    
369     if (mustWait)
370     return (E) node.waitForPut(nanos);
371    
372     else {
373     Object x = node.getItem();
374     if (x != null)
375     return (E)x;
376     // else cancelled, so retry
377     }
378     }
379 tim 1.1 }
380 dl 1.2
381     // Untimed nonblocking versions
382    
383 dl 1.18 /**
384 dl 1.20 * Inserts the specified element into this queue, if another thread is
385 dl 1.18 * waiting to receive it.
386     *
387     * @param o the element to add.
388     * @return <tt>true</tt> if it was possible to add the element to
389     * this queue, else <tt>false</tt>
390     * @throws NullPointerException if the specified element is <tt>null</tt>
391     */
392 dholmes 1.11 public boolean offer(E o) {
393     if (o == null) throw new NullPointerException();
394 dl 1.27 final ReentrantLock qlock = this.qlock;
395 tim 1.10
396     for (;;) {
397 tim 1.26 Node node;
398 dl 1.2 qlock.lock();
399     try {
400     node = waitingTakes.deq();
401 tim 1.14 } finally {
402 dl 1.2 qlock.unlock();
403     }
404     if (node == null)
405     return false;
406 tim 1.10
407 dl 1.31 else if (node.setItem(o))
408 dl 1.2 return true;
409     // else retry
410     }
411 tim 1.1 }
412 dl 1.2
413 dl 1.18 /**
414     * Retrieves and removes the head of this queue, if another thread
415     * is currently making an element available.
416     *
417     * @return the head of this queue, or <tt>null</tt> if no
418     * element is available.
419     */
420 dl 1.2 public E poll() {
421 dl 1.27 final ReentrantLock qlock = this.qlock;
422 dl 1.2 for (;;) {
423     Node node;
424     qlock.lock();
425     try {
426     node = waitingPuts.deq();
427 tim 1.14 } finally {
428 dl 1.2 qlock.unlock();
429     }
430     if (node == null)
431     return null;
432    
433     else {
434 dl 1.31 Object x = node.getItem();
435 dl 1.2 if (x != null)
436     return (E)x;
437     // else retry
438     }
439     }
440 tim 1.1 }
441 dl 1.2
442 dl 1.5 /**
443 dholmes 1.11 * Always returns <tt>true</tt>.
444     * A <tt>SynchronousQueue</tt> has no internal capacity.
445     * @return <tt>true</tt>
446 dl 1.5 */
447     public boolean isEmpty() {
448     return true;
449     }
450    
451     /**
452 dholmes 1.11 * Always returns zero.
453     * A <tt>SynchronousQueue</tt> has no internal capacity.
454 dl 1.5 * @return zero.
455     */
456     public int size() {
457     return 0;
458 tim 1.1 }
459 dl 1.2
460 dl 1.5 /**
461 dholmes 1.11 * Always returns zero.
462     * A <tt>SynchronousQueue</tt> has no internal capacity.
463 dl 1.5 * @return zero.
464     */
465     public int remainingCapacity() {
466     return 0;
467     }
468    
469     /**
470 dholmes 1.11 * Does nothing.
471     * A <tt>SynchronousQueue</tt> has no internal capacity.
472     */
473     public void clear() {}
474    
475     /**
476     * Always returns <tt>false</tt>.
477     * A <tt>SynchronousQueue</tt> has no internal capacity.
478 dl 1.18 * @param o the element
479 dholmes 1.11 * @return <tt>false</tt>
480     */
481     public boolean contains(Object o) {
482     return false;
483     }
484    
485     /**
486 dl 1.18 * Always returns <tt>false</tt>.
487     * A <tt>SynchronousQueue</tt> has no internal capacity.
488     *
489     * @param o the element to remove
490     * @return <tt>false</tt>
491     */
492     public boolean remove(Object o) {
493     return false;
494     }
495    
496     /**
497 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
498 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
499 dl 1.18 * @param c the collection
500 dl 1.16 * @return <tt>false</tt> unless given collection is empty
501 dholmes 1.11 */
502 dl 1.12 public boolean containsAll(Collection<?> c) {
503 dl 1.16 return c.isEmpty();
504 dholmes 1.11 }
505    
506     /**
507     * Always returns <tt>false</tt>.
508     * A <tt>SynchronousQueue</tt> has no internal capacity.
509 dl 1.18 * @param c the collection
510 dholmes 1.11 * @return <tt>false</tt>
511     */
512 dl 1.12 public boolean removeAll(Collection<?> c) {
513 dholmes 1.11 return false;
514     }
515    
516     /**
517     * Always returns <tt>false</tt>.
518     * A <tt>SynchronousQueue</tt> has no internal capacity.
519 dl 1.18 * @param c the collection
520 dholmes 1.11 * @return <tt>false</tt>
521     */
522 dl 1.12 public boolean retainAll(Collection<?> c) {
523 dholmes 1.11 return false;
524     }
525    
526     /**
527     * Always returns <tt>null</tt>.
528     * A <tt>SynchronousQueue</tt> does not return elements
529 dl 1.5 * unless actively waited on.
530 dholmes 1.11 * @return <tt>null</tt>
531 dl 1.5 */
532     public E peek() {
533     return null;
534     }
535    
536    
537     static class EmptyIterator<E> implements Iterator<E> {
538 dl 1.2 public boolean hasNext() {
539     return false;
540     }
541     public E next() {
542     throw new NoSuchElementException();
543     }
544     public void remove() {
545 dl 1.17 throw new IllegalStateException();
546 dl 1.2 }
547 tim 1.1 }
548 dl 1.2
549 dl 1.5 /**
550 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
551 tim 1.13 * <tt>false</tt>.
552     *
553 dholmes 1.11 * @return an empty iterator
554 dl 1.5 */
555 dl 1.2 public Iterator<E> iterator() {
556 dl 1.5 return new EmptyIterator<E>();
557 tim 1.1 }
558    
559 dl 1.2
560 dl 1.5 /**
561 dholmes 1.11 * Returns a zero-length array.
562     * @return a zero-length array
563 dl 1.5 */
564 dl 1.3 public Object[] toArray() {
565 dl 1.25 return new Object[0];
566 tim 1.1 }
567    
568 dholmes 1.11 /**
569     * Sets the zeroeth element of the specified array to <tt>null</tt>
570     * (if the array has non-zero length) and returns it.
571     * @return the specified array
572     */
573 dl 1.2 public <T> T[] toArray(T[] a) {
574     if (a.length > 0)
575     a[0] = null;
576     return a;
577     }
578 dl 1.21
579    
580     public int drainTo(Collection<? super E> c) {
581     if (c == null)
582     throw new NullPointerException();
583     if (c == this)
584     throw new IllegalArgumentException();
585     int n = 0;
586     E e;
587     while ( (e = poll()) != null) {
588     c.add(e);
589     ++n;
590     }
591     return n;
592     }
593    
594     public int drainTo(Collection<? super E> c, int maxElements) {
595     if (c == null)
596     throw new NullPointerException();
597     if (c == this)
598     throw new IllegalArgumentException();
599     int n = 0;
600     E e;
601     while (n < maxElements && (e = poll()) != null) {
602     c.add(e);
603     ++n;
604     }
605     return n;
606     }
607 tim 1.1 }
608 dholmes 1.11
609    
610    
611    
612