ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.24
Committed: Sun Oct 19 13:38:34 2003 UTC (20 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.23: +1 -1 lines
Log Message:
Changed doc strings for generic params

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14     * synchronous queue does not have any internal capacity - in
15     * particular it does not have a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to take it; you cannot add an element (using
18     * any method) unless another thread is trying to remove it; you
19     * cannot iterate as there is nothing to iterate. The <em>head</em>
20     * of the queue is the element that the first queued thread is trying
21     * to add to the queue; if there are no queued threads then no element
22     * is being added and the head is <tt>null</tt>. For purposes of
23     * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 dl 1.19 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 dl 1.18 * queue does not permit <tt>null</tt> elements.
26     *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29     * object running in one thread must synch up with an object running
30     * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
33     * of the {@link Collection} and {@link Iterator} interfaces.
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36 dl 1.24 * @param <E> the type of elements held in this collection
37 dl 1.23 */
38 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
39 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
40 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
41 tim 1.1
42 dl 1.2 /*
43     This implementation divides actions into two cases for puts:
44    
45 tim 1.10 * An arriving putter that does not already have a waiting taker
46 dl 1.2 creates a node holding item, and then waits for a taker to take it.
47     * An arriving putter that does already have a waiting taker fills
48     the slot node created by the taker, and notifies it to continue.
49    
50     And symmetrically, two for takes:
51    
52     * An arriving taker that does not already have a waiting putter
53     creates an empty slot node, and then waits for a putter to fill it.
54     * An arriving taker that does already have a waiting putter takes
55     item from the node created by the putter, and notifies it to continue.
56    
57     This requires keeping two simple queues: waitingPuts and waitingTakes.
58 tim 1.10
59 dl 1.2 When a put or take waiting for the actions of its counterpart
60     aborts due to interruption or timeout, it marks the node
61     it created as "CANCELLED", which causes its counterpart to retry
62     the entire put or take sequence.
63     */
64    
65 tim 1.10 /**
66 dl 1.2 * Special marker used in queue nodes to indicate that
67     * the thread waiting for a change in the node has timed out
68     * or been interrupted.
69     **/
70     private static final Object CANCELLED = new Object();
71    
72     /*
73     * Note that all fields are transient final, so there is
74     * no explicit serialization code.
75     */
76    
77     private transient final WaitQueue waitingPuts = new WaitQueue();
78     private transient final WaitQueue waitingTakes = new WaitQueue();
79     private transient final ReentrantLock qlock = new ReentrantLock();
80    
81     /**
82     * Nodes each maintain an item and handle waits and signals for
83     * getting and setting it. The class opportunistically extends
84     * ReentrantLock to save an extra object allocation per
85     * rendezvous.
86     */
87     private static class Node extends ReentrantLock {
88 dl 1.6 /** Condition to wait on for other party; lazily constructed */
89 dl 1.22 ReentrantLock.ConditionObject done;
90 dl 1.6 /** The item being transferred */
91 dl 1.2 Object item;
92 dl 1.6 /** Next node in wait queue */
93 dl 1.2 Node next;
94 dl 1.6
95 dl 1.2 Node(Object x) { item = x; }
96    
97     /**
98     * Fill in the slot created by the taker and signal taker to
99     * continue.
100     */
101     boolean set(Object x) {
102     this.lock();
103     try {
104     if (item != CANCELLED) {
105     item = x;
106     if (done != null)
107     done.signal();
108     return true;
109 tim 1.14 } else // taker has cancelled
110 dl 1.2 return false;
111 tim 1.14 } finally {
112 dl 1.2 this.unlock();
113     }
114     }
115    
116     /**
117     * Remove item from slot created by putter and signal putter
118     * to continue.
119     */
120     Object get() {
121     this.lock();
122     try {
123     Object x = item;
124     if (x != CANCELLED) {
125     item = null;
126     next = null;
127     if (done != null)
128     done.signal();
129     return x;
130 tim 1.14 } else
131 dl 1.2 return null;
132 tim 1.14 } finally {
133 dl 1.2 this.unlock();
134     }
135     }
136    
137     /**
138     * Wait for a taker to take item placed by putter, or time out.
139     */
140     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
141     this.lock();
142     try {
143     for (;;) {
144     if (item == null)
145     return true;
146     if (timed) {
147     if (nanos <= 0) {
148     item = CANCELLED;
149     return false;
150     }
151     }
152 dl 1.9 if (done == null)
153     done = this.newCondition();
154     if (timed)
155     nanos = done.awaitNanos(nanos);
156     else
157     done.await();
158     }
159 tim 1.14 } catch (InterruptedException ie) {
160 dl 1.9 // If taken, return normally but set interrupt status
161     if (item == null) {
162     Thread.currentThread().interrupt();
163     return true;
164 tim 1.14 } else {
165 dl 1.9 item = CANCELLED;
166     done.signal(); // propagate signal
167     throw ie;
168 dl 1.2 }
169 tim 1.14 } finally {
170 dl 1.2 this.unlock();
171     }
172     }
173    
174     /**
175     * Wait for a putter to put item placed by taker, or time out.
176     */
177     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
178     this.lock();
179     try {
180     for (;;) {
181     Object x = item;
182     if (x != null) {
183     item = null;
184     next = null;
185     return x;
186     }
187     if (timed) {
188     if (nanos <= 0) {
189     item = CANCELLED;
190     return null;
191     }
192     }
193 dl 1.9 if (done == null)
194     done = this.newCondition();
195     if (timed)
196     nanos = done.awaitNanos(nanos);
197     else
198     done.await();
199     }
200 tim 1.14 } catch (InterruptedException ie) {
201 dl 1.9 Object y = item;
202     if (y != null) {
203     item = null;
204     next = null;
205     Thread.currentThread().interrupt();
206     return y;
207 tim 1.14 } else {
208 dl 1.9 item = CANCELLED;
209     done.signal(); // propagate signal
210     throw ie;
211 dl 1.2 }
212 tim 1.14 } finally {
213 dl 1.2 this.unlock();
214     }
215     }
216     }
217    
218     /**
219     * Simple FIFO queue class to hold waiting puts/takes.
220     **/
221     private static class WaitQueue<E> {
222     Node head;
223     Node last;
224    
225 tim 1.10 Node enq(Object x) {
226 dl 1.2 Node p = new Node(x);
227 tim 1.10 if (last == null)
228 dl 1.2 last = head = p;
229 tim 1.10 else
230 dl 1.2 last = last.next = p;
231     return p;
232     }
233    
234     Node deq() {
235     Node p = head;
236 tim 1.10 if (p != null && (head = p.next) == null)
237 dl 1.2 last = null;
238     return p;
239     }
240     }
241    
242     /**
243     * Main put algorithm, used by put, timed offer
244 tim 1.10 */
245 dl 1.2 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
246 dl 1.6 if (x == null) throw new NullPointerException();
247 tim 1.10 for (;;) {
248 dl 1.2 Node node;
249     boolean mustWait;
250 tim 1.10
251 dl 1.2 qlock.lockInterruptibly();
252     try {
253     node = waitingTakes.deq();
254     if ( (mustWait = (node == null)) )
255     node = waitingPuts.enq(x);
256 tim 1.14 } finally {
257 dl 1.2 qlock.unlock();
258     }
259    
260     if (mustWait)
261     return node.waitForTake(timed, nanos);
262    
263     else if (node.set(x))
264     return true;
265    
266     // else taker cancelled, so retry
267     }
268 tim 1.1 }
269 dl 1.2
270     /**
271     * Main take algorithm, used by take, timed poll
272 tim 1.10 */
273 dl 1.2 private E doTake(boolean timed, long nanos) throws InterruptedException {
274     for (;;) {
275     Node node;
276     boolean mustWait;
277    
278     qlock.lockInterruptibly();
279     try {
280     node = waitingPuts.deq();
281     if ( (mustWait = (node == null)) )
282     node = waitingTakes.enq(null);
283 tim 1.14 } finally {
284 dl 1.2 qlock.unlock();
285     }
286    
287 dl 1.9 if (mustWait)
288 dl 1.2 return (E)node.waitForPut(timed, nanos);
289    
290     else {
291     E x = (E)node.get();
292     if (x != null)
293     return x;
294     // else cancelled, so retry
295     }
296     }
297 tim 1.1 }
298 dl 1.2
299 dholmes 1.11 /**
300 tim 1.13 * Creates a <tt>SynchronousQueue</tt>.
301 dholmes 1.11 */
302 dl 1.2 public SynchronousQueue() {}
303    
304    
305 dholmes 1.11 /**
306     * Adds the specified element to this queue, waiting if necessary for
307     * another thread to receive it.
308 dl 1.18 * @param o the element to add
309     * @throws InterruptedException if interrupted while waiting.
310     * @throws NullPointerException if the specified element is <tt>null</tt>.
311 dholmes 1.11 */
312     public void put(E o) throws InterruptedException {
313     doPut(o, false, 0);
314 tim 1.1 }
315    
316 dholmes 1.11 /**
317 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
318 dl 1.18 * up to the specified wait time for another thread to receive it.
319     * @param o the element to add
320     * @param timeout how long to wait before giving up, in units of
321     * <tt>unit</tt>
322     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
323     * <tt>timeout</tt> parameter
324 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
325     * the specified waiting time elapses before a taker appears.
326 dl 1.18 * @throws InterruptedException if interrupted while waiting.
327     * @throws NullPointerException if the specified element is <tt>null</tt>.
328 dholmes 1.11 */
329 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
330     return doPut(o, true, unit.toNanos(timeout));
331 tim 1.1 }
332    
333 dl 1.2
334 dholmes 1.11 /**
335     * Retrieves and removes the head of this queue, waiting if necessary
336     * for another thread to insert it.
337     * @return the head of this queue
338     */
339 dl 1.2 public E take() throws InterruptedException {
340     return doTake(false, 0);
341 tim 1.1 }
342 dl 1.2
343 dholmes 1.11 /**
344     * Retrieves and removes the head of this queue, waiting
345     * if necessary up to the specified wait time, for another thread
346     * to insert it.
347 dl 1.18 * @param timeout how long to wait before giving up, in units of
348     * <tt>unit</tt>
349     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
350     * <tt>timeout</tt> parameter
351     * @return the head of this queue, or <tt>null</tt> if the
352     * specified waiting time elapses before an element is present.
353     * @throws InterruptedException if interrupted while waiting.
354 dholmes 1.11 */
355 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
356     return doTake(true, unit.toNanos(timeout));
357 tim 1.1 }
358 dl 1.2
359     // Untimed nonblocking versions
360    
361 dl 1.18 /**
362 dl 1.20 * Inserts the specified element into this queue, if another thread is
363 dl 1.18 * waiting to receive it.
364     *
365     * @param o the element to add.
366     * @return <tt>true</tt> if it was possible to add the element to
367     * this queue, else <tt>false</tt>
368     * @throws NullPointerException if the specified element is <tt>null</tt>
369     */
370 dholmes 1.11 public boolean offer(E o) {
371     if (o == null) throw new NullPointerException();
372 tim 1.10
373     for (;;) {
374 dl 1.2 qlock.lock();
375     Node node;
376     try {
377     node = waitingTakes.deq();
378 tim 1.14 } finally {
379 dl 1.2 qlock.unlock();
380     }
381     if (node == null)
382     return false;
383 tim 1.10
384 dholmes 1.11 else if (node.set(o))
385 dl 1.2 return true;
386     // else retry
387     }
388 tim 1.1 }
389 dl 1.2
390 dl 1.18 /**
391     * Retrieves and removes the head of this queue, if another thread
392     * is currently making an element available.
393     *
394     * @return the head of this queue, or <tt>null</tt> if no
395     * element is available.
396     */
397 dl 1.2 public E poll() {
398     for (;;) {
399     Node node;
400     qlock.lock();
401     try {
402     node = waitingPuts.deq();
403 tim 1.14 } finally {
404 dl 1.2 qlock.unlock();
405     }
406     if (node == null)
407     return null;
408    
409     else {
410     Object x = node.get();
411     if (x != null)
412     return (E)x;
413     // else retry
414     }
415     }
416 tim 1.1 }
417 dl 1.2
418 dl 1.5 /**
419 dholmes 1.11 * Always returns <tt>true</tt>.
420     * A <tt>SynchronousQueue</tt> has no internal capacity.
421     * @return <tt>true</tt>
422 dl 1.5 */
423     public boolean isEmpty() {
424     return true;
425     }
426    
427     /**
428 dholmes 1.11 * Always returns zero.
429     * A <tt>SynchronousQueue</tt> has no internal capacity.
430 dl 1.5 * @return zero.
431     */
432     public int size() {
433     return 0;
434 tim 1.1 }
435 dl 1.2
436 dl 1.5 /**
437 dholmes 1.11 * Always returns zero.
438     * A <tt>SynchronousQueue</tt> has no internal capacity.
439 dl 1.5 * @return zero.
440     */
441     public int remainingCapacity() {
442     return 0;
443     }
444    
445     /**
446 dholmes 1.11 * Does nothing.
447     * A <tt>SynchronousQueue</tt> has no internal capacity.
448     */
449     public void clear() {}
450    
451     /**
452     * Always returns <tt>false</tt>.
453     * A <tt>SynchronousQueue</tt> has no internal capacity.
454 dl 1.18 * @param o the element
455 dholmes 1.11 * @return <tt>false</tt>
456     */
457     public boolean contains(Object o) {
458     return false;
459     }
460    
461     /**
462 dl 1.18 * Always returns <tt>false</tt>.
463     * A <tt>SynchronousQueue</tt> has no internal capacity.
464     *
465     * @param o the element to remove
466     * @return <tt>false</tt>
467     */
468     public boolean remove(Object o) {
469     return false;
470     }
471    
472     /**
473 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
474 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
475 dl 1.18 * @param c the collection
476 dl 1.16 * @return <tt>false</tt> unless given collection is empty
477 dholmes 1.11 */
478 dl 1.12 public boolean containsAll(Collection<?> c) {
479 dl 1.16 return c.isEmpty();
480 dholmes 1.11 }
481    
482     /**
483     * Always returns <tt>false</tt>.
484     * A <tt>SynchronousQueue</tt> has no internal capacity.
485 dl 1.18 * @param c the collection
486 dholmes 1.11 * @return <tt>false</tt>
487     */
488 dl 1.12 public boolean removeAll(Collection<?> c) {
489 dholmes 1.11 return false;
490     }
491    
492     /**
493     * Always returns <tt>false</tt>.
494     * A <tt>SynchronousQueue</tt> has no internal capacity.
495 dl 1.18 * @param c the collection
496 dholmes 1.11 * @return <tt>false</tt>
497     */
498 dl 1.12 public boolean retainAll(Collection<?> c) {
499 dholmes 1.11 return false;
500     }
501    
502     /**
503     * Always returns <tt>null</tt>.
504     * A <tt>SynchronousQueue</tt> does not return elements
505 dl 1.5 * unless actively waited on.
506 dholmes 1.11 * @return <tt>null</tt>
507 dl 1.5 */
508     public E peek() {
509     return null;
510     }
511    
512    
513     static class EmptyIterator<E> implements Iterator<E> {
514 dl 1.2 public boolean hasNext() {
515     return false;
516     }
517     public E next() {
518     throw new NoSuchElementException();
519     }
520     public void remove() {
521 dl 1.17 throw new IllegalStateException();
522 dl 1.2 }
523 tim 1.1 }
524 dl 1.2
525 dl 1.5 /**
526 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
527 tim 1.13 * <tt>false</tt>.
528     *
529 dholmes 1.11 * @return an empty iterator
530 dl 1.5 */
531 dl 1.2 public Iterator<E> iterator() {
532 dl 1.5 return new EmptyIterator<E>();
533 tim 1.1 }
534    
535 dl 1.2
536 dl 1.5 /**
537 dholmes 1.11 * Returns a zero-length array.
538     * @return a zero-length array
539 dl 1.5 */
540 dl 1.3 public Object[] toArray() {
541 tim 1.10 return (E[]) new Object[0];
542 tim 1.1 }
543    
544 dholmes 1.11 /**
545     * Sets the zeroeth element of the specified array to <tt>null</tt>
546     * (if the array has non-zero length) and returns it.
547     * @return the specified array
548     */
549 dl 1.2 public <T> T[] toArray(T[] a) {
550     if (a.length > 0)
551     a[0] = null;
552     return a;
553     }
554 dl 1.21
555    
556     public int drainTo(Collection<? super E> c) {
557     if (c == null)
558     throw new NullPointerException();
559     if (c == this)
560     throw new IllegalArgumentException();
561     int n = 0;
562     E e;
563     while ( (e = poll()) != null) {
564     c.add(e);
565     ++n;
566     }
567     return n;
568     }
569    
570     public int drainTo(Collection<? super E> c, int maxElements) {
571     if (c == null)
572     throw new NullPointerException();
573     if (c == this)
574     throw new IllegalArgumentException();
575     int n = 0;
576     E e;
577     while (n < maxElements && (e = poll()) != null) {
578     c.add(e);
579     ++n;
580     }
581     return n;
582     }
583 tim 1.1 }
584 dholmes 1.11
585    
586    
587    
588