ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.38
Committed: Fri Jan 9 14:45:17 2004 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.37: +2 -2 lines
Log Message:
Cosmetics

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.29 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 dl 1.2 */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14     * synchronous queue does not have any internal capacity - in
15     * particular it does not have a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to take it; you cannot add an element (using
18     * any method) unless another thread is trying to remove it; you
19     * cannot iterate as there is nothing to iterate. The <em>head</em>
20     * of the queue is the element that the first queued thread is trying
21     * to add to the queue; if there are no queued threads then no element
22     * is being added and the head is <tt>null</tt>. For purposes of
23     * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 dl 1.19 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 dl 1.18 * queue does not permit <tt>null</tt> elements.
26     *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29 dl 1.30 * object running in one thread must sync up with an object running
30 dl 1.18 * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
33     * of the {@link Collection} and {@link Iterator} interfaces.
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36 dl 1.24 * @param <E> the type of elements held in this collection
37 dl 1.23 */
38 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
39 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
40 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
41 tim 1.1
42 dl 1.2 /*
43     This implementation divides actions into two cases for puts:
44    
45 tim 1.10 * An arriving putter that does not already have a waiting taker
46 dl 1.2 creates a node holding item, and then waits for a taker to take it.
47     * An arriving putter that does already have a waiting taker fills
48     the slot node created by the taker, and notifies it to continue.
49    
50     And symmetrically, two for takes:
51    
52     * An arriving taker that does not already have a waiting putter
53     creates an empty slot node, and then waits for a putter to fill it.
54     * An arriving taker that does already have a waiting putter takes
55     item from the node created by the putter, and notifies it to continue.
56    
57     This requires keeping two simple queues: waitingPuts and waitingTakes.
58 tim 1.10
59 dl 1.2 When a put or take waiting for the actions of its counterpart
60     aborts due to interruption or timeout, it marks the node
61     it created as "CANCELLED", which causes its counterpart to retry
62     the entire put or take sequence.
63     */
64    
65    
66     /*
67     * Note that all fields are transient final, so there is
68     * no explicit serialization code.
69     */
70    
71     private transient final WaitQueue waitingPuts = new WaitQueue();
72     private transient final WaitQueue waitingTakes = new WaitQueue();
73     private transient final ReentrantLock qlock = new ReentrantLock();
74    
75     /**
76     * Nodes each maintain an item and handle waits and signals for
77 dl 1.31 * getting and setting it. The class extends
78     * AbstractQueuedSynchronizer to manage blocking, using AQS state
79     * 0 for waiting, 1 for ack, -1 for cancelled.
80 dl 1.2 */
81 dl 1.31 private static final class Node extends AbstractQueuedSynchronizer {
82 dl 1.35 /** Synchronization state value representing that node acked */
83     private static final int ACK = 1;
84     /** Synchronization state value representing that node cancelled */
85     private static final int CANCEL = -1;
86    
87 dl 1.6 /** The item being transferred */
88 dl 1.2 Object item;
89 dl 1.6 /** Next node in wait queue */
90 dl 1.2 Node next;
91 dl 1.35
92     /** Create node with initial item */
93 dl 1.31 Node(Object x) { item = x; }
94    
95     /**
96     * Implements AQS base acquire to succeed if not in WAITING state
97     */
98 dl 1.37 protected boolean tryAcquireExclusive(int ignore) {
99 dl 1.35 return getState() != 0;
100 dl 1.31 }
101    
102     /**
103 dl 1.34 * Implements AQS base release to signal if state changed
104 dl 1.31 */
105 dl 1.35 protected boolean tryReleaseExclusive(int newState) {
106     return compareAndSetState(0, newState);
107 dl 1.31 }
108    
109     /**
110 dl 1.35 * Take item and null out field (for sake of GC)
111 dl 1.31 */
112 dl 1.35 private Object extract() {
113     Object x = item;
114     item = null;
115     return x;
116 dl 1.31 }
117    
118     /**
119 dl 1.35 * Try to cancel on interrupt; if so rethrowing,
120     * else setting interrupt state
121 dl 1.31 */
122 dl 1.35 private void checkCancellationOnInterrupt(InterruptedException ie)
123     throws InterruptedException {
124     if (releaseExclusive(CANCEL))
125     throw ie;
126     Thread.currentThread().interrupt();
127 dl 1.31 }
128 dl 1.2
129     /**
130     * Fill in the slot created by the taker and signal taker to
131     * continue.
132     */
133 dl 1.31 boolean setItem(Object x) {
134 dl 1.35 item = x; // can place in slot even if cancelled
135     return releaseExclusive(ACK);
136 dl 1.2 }
137    
138     /**
139     * Remove item from slot created by putter and signal putter
140     * to continue.
141     */
142 dl 1.31 Object getItem() {
143 dl 1.35 return (releaseExclusive(ACK))? extract() : null;
144     }
145    
146     /**
147     * Wait for a taker to take item placed by putter.
148     */
149     void waitForTake() throws InterruptedException {
150     try {
151     acquireExclusiveInterruptibly(0);
152     } catch (InterruptedException ie) {
153     checkCancellationOnInterrupt(ie);
154     }
155     }
156    
157     /**
158     * Wait for a putter to put item placed by taker.
159     */
160     Object waitForPut() throws InterruptedException {
161     try {
162     acquireExclusiveInterruptibly(0);
163     } catch (InterruptedException ie) {
164     checkCancellationOnInterrupt(ie);
165     }
166     return extract();
167 dl 1.31 }
168    
169     /**
170 dl 1.33 * Wait for a taker to take item placed by putter or time out.
171 dl 1.31 */
172 dl 1.35 boolean waitForTake(long nanos) throws InterruptedException {
173 dl 1.2 try {
174 dl 1.38 if (!acquireExclusiveNanos(0, nanos) &&
175 dl 1.35 releaseExclusive(CANCEL))
176 dl 1.33 return false;
177 dl 1.31 } catch (InterruptedException ie) {
178 dl 1.35 checkCancellationOnInterrupt(ie);
179 dl 1.2 }
180 dl 1.35 return true;
181 dl 1.2 }
182    
183     /**
184 dl 1.33 * Wait for a putter to put item placed by taker, or time out.
185 dl 1.31 */
186 dl 1.35 Object waitForPut(long nanos) throws InterruptedException {
187 dl 1.31 try {
188 dl 1.38 if (!acquireExclusiveNanos(0, nanos) &&
189 dl 1.35 releaseExclusive(CANCEL))
190 dl 1.33 return null;
191 dl 1.31 } catch (InterruptedException ie) {
192 dl 1.35 checkCancellationOnInterrupt(ie);
193 dl 1.2 }
194 dl 1.35 return extract();
195 dl 1.2 }
196    
197     }
198    
199     /**
200     * Simple FIFO queue class to hold waiting puts/takes.
201     **/
202     private static class WaitQueue<E> {
203     Node head;
204     Node last;
205    
206 tim 1.10 Node enq(Object x) {
207 dl 1.2 Node p = new Node(x);
208 tim 1.10 if (last == null)
209 dl 1.2 last = head = p;
210 tim 1.10 else
211 dl 1.2 last = last.next = p;
212     return p;
213     }
214    
215     Node deq() {
216     Node p = head;
217 dl 1.35 if (p != null) {
218     if ((head = p.next) == null)
219     last = null;
220     p.next = null;
221     }
222 dl 1.2 return p;
223     }
224     }
225    
226     /**
227 dl 1.35 * Creates a <tt>SynchronousQueue</tt>.
228 tim 1.10 */
229 dl 1.35 public SynchronousQueue() {}
230 dl 1.2
231    
232     /**
233 dl 1.35 * Adds the specified element to this queue, waiting if necessary for
234     * another thread to receive it.
235     * @param o the element to add
236     * @throws InterruptedException if interrupted while waiting.
237     * @throws NullPointerException if the specified element is <tt>null</tt>.
238 tim 1.10 */
239 dl 1.35 public void put(E o) throws InterruptedException {
240     if (o == null) throw new NullPointerException();
241     final ReentrantLock qlock = this.qlock;
242    
243 dl 1.2 for (;;) {
244     Node node;
245     boolean mustWait;
246     qlock.lockInterruptibly();
247     try {
248 dl 1.35 node = waitingTakes.deq();
249 dl 1.2 if ( (mustWait = (node == null)) )
250 dl 1.35 node = waitingPuts.enq(o);
251 tim 1.14 } finally {
252 dl 1.2 qlock.unlock();
253     }
254    
255 dl 1.31 if (mustWait) {
256 dl 1.35 node.waitForTake();
257     return;
258 dl 1.2 }
259    
260 dl 1.35 else if (node.setItem(o))
261     return;
262 dl 1.2
263 dl 1.35 // else taker cancelled, so retry
264     }
265 tim 1.1 }
266    
267 dholmes 1.11 /**
268 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
269 dl 1.18 * up to the specified wait time for another thread to receive it.
270     * @param o the element to add
271     * @param timeout how long to wait before giving up, in units of
272     * <tt>unit</tt>
273     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
274     * <tt>timeout</tt> parameter
275 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
276     * the specified waiting time elapses before a taker appears.
277 dl 1.18 * @throws InterruptedException if interrupted while waiting.
278     * @throws NullPointerException if the specified element is <tt>null</tt>.
279 dholmes 1.11 */
280 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
281 dl 1.35 if (o == null) throw new NullPointerException();
282     long nanos = unit.toNanos(timeout);
283     final ReentrantLock qlock = this.qlock;
284     for (;;) {
285     Node node;
286     boolean mustWait;
287     qlock.lockInterruptibly();
288     try {
289     node = waitingTakes.deq();
290     if ( (mustWait = (node == null)) )
291     node = waitingPuts.enq(o);
292     } finally {
293     qlock.unlock();
294     }
295    
296     if (mustWait)
297     return node.waitForTake(nanos);
298    
299     else if (node.setItem(o))
300     return true;
301    
302     // else taker cancelled, so retry
303     }
304    
305 tim 1.1 }
306    
307 dl 1.2
308 dholmes 1.11 /**
309     * Retrieves and removes the head of this queue, waiting if necessary
310     * for another thread to insert it.
311     * @return the head of this queue
312     */
313 dl 1.2 public E take() throws InterruptedException {
314 dl 1.35 final ReentrantLock qlock = this.qlock;
315     for (;;) {
316     Node node;
317     boolean mustWait;
318    
319     qlock.lockInterruptibly();
320     try {
321     node = waitingPuts.deq();
322     if ( (mustWait = (node == null)) )
323     node = waitingTakes.enq(null);
324     } finally {
325     qlock.unlock();
326     }
327    
328 dl 1.36 if (mustWait) {
329     Object x = node.waitForPut();
330     return (E)x;
331     }
332 dl 1.35 else {
333     Object x = node.getItem();
334     if (x != null)
335     return (E)x;
336     // else cancelled, so retry
337     }
338     }
339 tim 1.1 }
340 dl 1.2
341 dholmes 1.11 /**
342     * Retrieves and removes the head of this queue, waiting
343     * if necessary up to the specified wait time, for another thread
344     * to insert it.
345 dl 1.18 * @param timeout how long to wait before giving up, in units of
346     * <tt>unit</tt>
347     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
348     * <tt>timeout</tt> parameter
349     * @return the head of this queue, or <tt>null</tt> if the
350     * specified waiting time elapses before an element is present.
351     * @throws InterruptedException if interrupted while waiting.
352 dholmes 1.11 */
353 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
354 dl 1.35 long nanos = unit.toNanos(timeout);
355     final ReentrantLock qlock = this.qlock;
356    
357     for (;;) {
358     Node node;
359     boolean mustWait;
360    
361     qlock.lockInterruptibly();
362     try {
363     node = waitingPuts.deq();
364     if ( (mustWait = (node == null)) )
365     node = waitingTakes.enq(null);
366     } finally {
367     qlock.unlock();
368     }
369    
370 dl 1.36 if (mustWait) {
371     Object x = node.waitForPut(nanos);
372     return (E)x;
373     }
374 dl 1.35 else {
375     Object x = node.getItem();
376     if (x != null)
377     return (E)x;
378     // else cancelled, so retry
379     }
380     }
381 tim 1.1 }
382 dl 1.2
383     // Untimed nonblocking versions
384    
385 dl 1.18 /**
386 dl 1.20 * Inserts the specified element into this queue, if another thread is
387 dl 1.18 * waiting to receive it.
388     *
389     * @param o the element to add.
390     * @return <tt>true</tt> if it was possible to add the element to
391     * this queue, else <tt>false</tt>
392     * @throws NullPointerException if the specified element is <tt>null</tt>
393     */
394 dholmes 1.11 public boolean offer(E o) {
395     if (o == null) throw new NullPointerException();
396 dl 1.27 final ReentrantLock qlock = this.qlock;
397 tim 1.10
398     for (;;) {
399 tim 1.26 Node node;
400 dl 1.2 qlock.lock();
401     try {
402     node = waitingTakes.deq();
403 tim 1.14 } finally {
404 dl 1.2 qlock.unlock();
405     }
406     if (node == null)
407     return false;
408 tim 1.10
409 dl 1.31 else if (node.setItem(o))
410 dl 1.2 return true;
411     // else retry
412     }
413 tim 1.1 }
414 dl 1.2
415 dl 1.18 /**
416     * Retrieves and removes the head of this queue, if another thread
417     * is currently making an element available.
418     *
419     * @return the head of this queue, or <tt>null</tt> if no
420     * element is available.
421     */
422 dl 1.2 public E poll() {
423 dl 1.27 final ReentrantLock qlock = this.qlock;
424 dl 1.2 for (;;) {
425     Node node;
426     qlock.lock();
427     try {
428     node = waitingPuts.deq();
429 tim 1.14 } finally {
430 dl 1.2 qlock.unlock();
431     }
432     if (node == null)
433     return null;
434    
435     else {
436 dl 1.31 Object x = node.getItem();
437 dl 1.2 if (x != null)
438     return (E)x;
439     // else retry
440     }
441     }
442 tim 1.1 }
443 dl 1.2
444 dl 1.5 /**
445 dholmes 1.11 * Always returns <tt>true</tt>.
446     * A <tt>SynchronousQueue</tt> has no internal capacity.
447     * @return <tt>true</tt>
448 dl 1.5 */
449     public boolean isEmpty() {
450     return true;
451     }
452    
453     /**
454 dholmes 1.11 * Always returns zero.
455     * A <tt>SynchronousQueue</tt> has no internal capacity.
456 dl 1.5 * @return zero.
457     */
458     public int size() {
459     return 0;
460 tim 1.1 }
461 dl 1.2
462 dl 1.5 /**
463 dholmes 1.11 * Always returns zero.
464     * A <tt>SynchronousQueue</tt> has no internal capacity.
465 dl 1.5 * @return zero.
466     */
467     public int remainingCapacity() {
468     return 0;
469     }
470    
471     /**
472 dholmes 1.11 * Does nothing.
473     * A <tt>SynchronousQueue</tt> has no internal capacity.
474     */
475     public void clear() {}
476    
477     /**
478     * Always returns <tt>false</tt>.
479     * A <tt>SynchronousQueue</tt> has no internal capacity.
480 dl 1.18 * @param o the element
481 dholmes 1.11 * @return <tt>false</tt>
482     */
483     public boolean contains(Object o) {
484     return false;
485     }
486    
487     /**
488 dl 1.18 * Always returns <tt>false</tt>.
489     * A <tt>SynchronousQueue</tt> has no internal capacity.
490     *
491     * @param o the element to remove
492     * @return <tt>false</tt>
493     */
494     public boolean remove(Object o) {
495     return false;
496     }
497    
498     /**
499 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
500 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
501 dl 1.18 * @param c the collection
502 dl 1.16 * @return <tt>false</tt> unless given collection is empty
503 dholmes 1.11 */
504 dl 1.12 public boolean containsAll(Collection<?> c) {
505 dl 1.16 return c.isEmpty();
506 dholmes 1.11 }
507    
508     /**
509     * Always returns <tt>false</tt>.
510     * A <tt>SynchronousQueue</tt> has no internal capacity.
511 dl 1.18 * @param c the collection
512 dholmes 1.11 * @return <tt>false</tt>
513     */
514 dl 1.12 public boolean removeAll(Collection<?> c) {
515 dholmes 1.11 return false;
516     }
517    
518     /**
519     * Always returns <tt>false</tt>.
520     * A <tt>SynchronousQueue</tt> has no internal capacity.
521 dl 1.18 * @param c the collection
522 dholmes 1.11 * @return <tt>false</tt>
523     */
524 dl 1.12 public boolean retainAll(Collection<?> c) {
525 dholmes 1.11 return false;
526     }
527    
528     /**
529     * Always returns <tt>null</tt>.
530     * A <tt>SynchronousQueue</tt> does not return elements
531 dl 1.5 * unless actively waited on.
532 dholmes 1.11 * @return <tt>null</tt>
533 dl 1.5 */
534     public E peek() {
535     return null;
536     }
537    
538    
539     static class EmptyIterator<E> implements Iterator<E> {
540 dl 1.2 public boolean hasNext() {
541     return false;
542     }
543     public E next() {
544     throw new NoSuchElementException();
545     }
546     public void remove() {
547 dl 1.17 throw new IllegalStateException();
548 dl 1.2 }
549 tim 1.1 }
550 dl 1.2
551 dl 1.5 /**
552 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
553 tim 1.13 * <tt>false</tt>.
554     *
555 dholmes 1.11 * @return an empty iterator
556 dl 1.5 */
557 dl 1.2 public Iterator<E> iterator() {
558 dl 1.5 return new EmptyIterator<E>();
559 tim 1.1 }
560    
561 dl 1.2
562 dl 1.5 /**
563 dholmes 1.11 * Returns a zero-length array.
564     * @return a zero-length array
565 dl 1.5 */
566 dl 1.3 public Object[] toArray() {
567 dl 1.25 return new Object[0];
568 tim 1.1 }
569    
570 dholmes 1.11 /**
571     * Sets the zeroeth element of the specified array to <tt>null</tt>
572     * (if the array has non-zero length) and returns it.
573     * @return the specified array
574     */
575 dl 1.2 public <T> T[] toArray(T[] a) {
576     if (a.length > 0)
577     a[0] = null;
578     return a;
579     }
580 dl 1.21
581    
582     public int drainTo(Collection<? super E> c) {
583     if (c == null)
584     throw new NullPointerException();
585     if (c == this)
586     throw new IllegalArgumentException();
587     int n = 0;
588     E e;
589     while ( (e = poll()) != null) {
590     c.add(e);
591     ++n;
592     }
593     return n;
594     }
595    
596     public int drainTo(Collection<? super E> c, int maxElements) {
597     if (c == null)
598     throw new NullPointerException();
599     if (c == this)
600     throw new IllegalArgumentException();
601     int n = 0;
602     E e;
603     while (n < maxElements && (e = poll()) != null) {
604     c.add(e);
605     ++n;
606     }
607     return n;
608     }
609 tim 1.1 }
610 dholmes 1.11
611    
612    
613    
614