ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.28
Committed: Sat Dec 27 17:19:03 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.27: +1 -1 lines
Log Message:
Adapt to AbstractQueuedSynchronizer

File Contents

# User Rev Content
1 dl 1.2 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7 tim 1.1 package java.util.concurrent;
8 dl 1.8 import java.util.concurrent.locks.*;
9 tim 1.1 import java.util.*;
10    
11     /**
12 dl 1.18 * A {@linkplain BlockingQueue blocking queue} in which each
13     * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14     * synchronous queue does not have any internal capacity - in
15     * particular it does not have a capacity of one. You cannot
16     * <tt>peek</tt> at a synchronous queue because an element is only
17     * present when you try to take it; you cannot add an element (using
18     * any method) unless another thread is trying to remove it; you
19     * cannot iterate as there is nothing to iterate. The <em>head</em>
20     * of the queue is the element that the first queued thread is trying
21     * to add to the queue; if there are no queued threads then no element
22     * is being added and the head is <tt>null</tt>. For purposes of
23     * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 dl 1.19 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 dl 1.18 * queue does not permit <tt>null</tt> elements.
26     *
27     * <p>Synchronous queues are similar to rendezvous channels used in
28     * CSP and Ada. They are well suited for handoff designs, in which an
29     * object running in one thread must synch up with an object running
30     * in another thread in order to hand it some information, event, or
31     * task.
32 dl 1.19 * <p>This class implements all of the <em>optional</em> methods
33     * of the {@link Collection} and {@link Iterator} interfaces.
34 dl 1.6 * @since 1.5
35     * @author Doug Lea
36 dl 1.24 * @param <E> the type of elements held in this collection
37 dl 1.23 */
38 dl 1.2 public class SynchronousQueue<E> extends AbstractQueue<E>
39 tim 1.1 implements BlockingQueue<E>, java.io.Serializable {
40 dl 1.15 private static final long serialVersionUID = -3223113410248163686L;
41 tim 1.1
42 dl 1.2 /*
43     This implementation divides actions into two cases for puts:
44    
45 tim 1.10 * An arriving putter that does not already have a waiting taker
46 dl 1.2 creates a node holding item, and then waits for a taker to take it.
47     * An arriving putter that does already have a waiting taker fills
48     the slot node created by the taker, and notifies it to continue.
49    
50     And symmetrically, two for takes:
51    
52     * An arriving taker that does not already have a waiting putter
53     creates an empty slot node, and then waits for a putter to fill it.
54     * An arriving taker that does already have a waiting putter takes
55     item from the node created by the putter, and notifies it to continue.
56    
57     This requires keeping two simple queues: waitingPuts and waitingTakes.
58 tim 1.10
59 dl 1.2 When a put or take waiting for the actions of its counterpart
60     aborts due to interruption or timeout, it marks the node
61     it created as "CANCELLED", which causes its counterpart to retry
62     the entire put or take sequence.
63     */
64    
65 tim 1.10 /**
66 dl 1.2 * Special marker used in queue nodes to indicate that
67     * the thread waiting for a change in the node has timed out
68     * or been interrupted.
69     **/
70     private static final Object CANCELLED = new Object();
71    
72     /*
73     * Note that all fields are transient final, so there is
74     * no explicit serialization code.
75     */
76    
77     private transient final WaitQueue waitingPuts = new WaitQueue();
78     private transient final WaitQueue waitingTakes = new WaitQueue();
79     private transient final ReentrantLock qlock = new ReentrantLock();
80    
81     /**
82     * Nodes each maintain an item and handle waits and signals for
83     * getting and setting it. The class opportunistically extends
84     * ReentrantLock to save an extra object allocation per
85     * rendezvous.
86     */
87     private static class Node extends ReentrantLock {
88 dl 1.6 /** Condition to wait on for other party; lazily constructed */
89 dl 1.28 Condition done;
90 dl 1.6 /** The item being transferred */
91 dl 1.2 Object item;
92 dl 1.6 /** Next node in wait queue */
93 dl 1.2 Node next;
94 dl 1.6
95 dl 1.2 Node(Object x) { item = x; }
96    
97     /**
98     * Fill in the slot created by the taker and signal taker to
99     * continue.
100     */
101     boolean set(Object x) {
102     this.lock();
103     try {
104     if (item != CANCELLED) {
105     item = x;
106     if (done != null)
107     done.signal();
108     return true;
109 tim 1.14 } else // taker has cancelled
110 dl 1.2 return false;
111 tim 1.14 } finally {
112 dl 1.2 this.unlock();
113     }
114     }
115    
116     /**
117     * Remove item from slot created by putter and signal putter
118     * to continue.
119     */
120     Object get() {
121     this.lock();
122     try {
123     Object x = item;
124     if (x != CANCELLED) {
125     item = null;
126     next = null;
127     if (done != null)
128     done.signal();
129     return x;
130 tim 1.14 } else
131 dl 1.2 return null;
132 tim 1.14 } finally {
133 dl 1.2 this.unlock();
134     }
135     }
136    
137     /**
138     * Wait for a taker to take item placed by putter, or time out.
139     */
140     boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
141     this.lock();
142     try {
143     for (;;) {
144     if (item == null)
145     return true;
146     if (timed) {
147     if (nanos <= 0) {
148     item = CANCELLED;
149     return false;
150     }
151     }
152 dl 1.9 if (done == null)
153     done = this.newCondition();
154     if (timed)
155     nanos = done.awaitNanos(nanos);
156     else
157     done.await();
158     }
159 tim 1.14 } catch (InterruptedException ie) {
160 dl 1.9 // If taken, return normally but set interrupt status
161     if (item == null) {
162     Thread.currentThread().interrupt();
163     return true;
164 tim 1.14 } else {
165 dl 1.9 item = CANCELLED;
166     done.signal(); // propagate signal
167     throw ie;
168 dl 1.2 }
169 tim 1.14 } finally {
170 dl 1.2 this.unlock();
171     }
172     }
173    
174     /**
175     * Wait for a putter to put item placed by taker, or time out.
176     */
177     Object waitForPut(boolean timed, long nanos) throws InterruptedException {
178     this.lock();
179     try {
180     for (;;) {
181     Object x = item;
182     if (x != null) {
183     item = null;
184     next = null;
185     return x;
186     }
187     if (timed) {
188     if (nanos <= 0) {
189     item = CANCELLED;
190     return null;
191     }
192     }
193 dl 1.9 if (done == null)
194     done = this.newCondition();
195     if (timed)
196     nanos = done.awaitNanos(nanos);
197     else
198     done.await();
199     }
200 tim 1.14 } catch (InterruptedException ie) {
201 dl 1.9 Object y = item;
202     if (y != null) {
203     item = null;
204     next = null;
205     Thread.currentThread().interrupt();
206     return y;
207 tim 1.14 } else {
208 dl 1.9 item = CANCELLED;
209     done.signal(); // propagate signal
210     throw ie;
211 dl 1.2 }
212 tim 1.14 } finally {
213 dl 1.2 this.unlock();
214     }
215     }
216     }
217    
218     /**
219     * Simple FIFO queue class to hold waiting puts/takes.
220     **/
221     private static class WaitQueue<E> {
222     Node head;
223     Node last;
224    
225 tim 1.10 Node enq(Object x) {
226 dl 1.2 Node p = new Node(x);
227 tim 1.10 if (last == null)
228 dl 1.2 last = head = p;
229 tim 1.10 else
230 dl 1.2 last = last.next = p;
231     return p;
232     }
233    
234     Node deq() {
235     Node p = head;
236 tim 1.10 if (p != null && (head = p.next) == null)
237 dl 1.2 last = null;
238     return p;
239     }
240     }
241    
242     /**
243     * Main put algorithm, used by put, timed offer
244 tim 1.10 */
245 dl 1.2 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
246 dl 1.6 if (x == null) throw new NullPointerException();
247 tim 1.10 for (;;) {
248 dl 1.2 Node node;
249     boolean mustWait;
250 dl 1.27 final ReentrantLock qlock = this.qlock;
251 dl 1.2 qlock.lockInterruptibly();
252     try {
253     node = waitingTakes.deq();
254     if ( (mustWait = (node == null)) )
255     node = waitingPuts.enq(x);
256 tim 1.14 } finally {
257 dl 1.2 qlock.unlock();
258     }
259    
260     if (mustWait)
261     return node.waitForTake(timed, nanos);
262    
263     else if (node.set(x))
264     return true;
265    
266     // else taker cancelled, so retry
267     }
268 tim 1.1 }
269 dl 1.2
270     /**
271     * Main take algorithm, used by take, timed poll
272 tim 1.10 */
273 dl 1.2 private E doTake(boolean timed, long nanos) throws InterruptedException {
274     for (;;) {
275     Node node;
276     boolean mustWait;
277    
278 dl 1.27 final ReentrantLock qlock = this.qlock;
279 dl 1.2 qlock.lockInterruptibly();
280     try {
281     node = waitingPuts.deq();
282     if ( (mustWait = (node == null)) )
283     node = waitingTakes.enq(null);
284 tim 1.14 } finally {
285 dl 1.2 qlock.unlock();
286     }
287    
288 dl 1.9 if (mustWait)
289 dl 1.2 return (E)node.waitForPut(timed, nanos);
290    
291     else {
292     E x = (E)node.get();
293     if (x != null)
294     return x;
295     // else cancelled, so retry
296     }
297     }
298 tim 1.1 }
299 dl 1.2
300 dholmes 1.11 /**
301 tim 1.13 * Creates a <tt>SynchronousQueue</tt>.
302 dholmes 1.11 */
303 dl 1.2 public SynchronousQueue() {}
304    
305    
306 dholmes 1.11 /**
307     * Adds the specified element to this queue, waiting if necessary for
308     * another thread to receive it.
309 dl 1.18 * @param o the element to add
310     * @throws InterruptedException if interrupted while waiting.
311     * @throws NullPointerException if the specified element is <tt>null</tt>.
312 dholmes 1.11 */
313     public void put(E o) throws InterruptedException {
314     doPut(o, false, 0);
315 tim 1.1 }
316    
317 dholmes 1.11 /**
318 dl 1.20 * Inserts the specified element into this queue, waiting if necessary
319 dl 1.18 * up to the specified wait time for another thread to receive it.
320     * @param o the element to add
321     * @param timeout how long to wait before giving up, in units of
322     * <tt>unit</tt>
323     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
324     * <tt>timeout</tt> parameter
325 dholmes 1.11 * @return <tt>true</tt> if successful, or <tt>false</tt> if
326     * the specified waiting time elapses before a taker appears.
327 dl 1.18 * @throws InterruptedException if interrupted while waiting.
328     * @throws NullPointerException if the specified element is <tt>null</tt>.
329 dholmes 1.11 */
330 dl 1.18 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
331     return doPut(o, true, unit.toNanos(timeout));
332 tim 1.1 }
333    
334 dl 1.2
335 dholmes 1.11 /**
336     * Retrieves and removes the head of this queue, waiting if necessary
337     * for another thread to insert it.
338     * @return the head of this queue
339     */
340 dl 1.2 public E take() throws InterruptedException {
341     return doTake(false, 0);
342 tim 1.1 }
343 dl 1.2
344 dholmes 1.11 /**
345     * Retrieves and removes the head of this queue, waiting
346     * if necessary up to the specified wait time, for another thread
347     * to insert it.
348 dl 1.18 * @param timeout how long to wait before giving up, in units of
349     * <tt>unit</tt>
350     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
351     * <tt>timeout</tt> parameter
352     * @return the head of this queue, or <tt>null</tt> if the
353     * specified waiting time elapses before an element is present.
354     * @throws InterruptedException if interrupted while waiting.
355 dholmes 1.11 */
356 dl 1.2 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
357     return doTake(true, unit.toNanos(timeout));
358 tim 1.1 }
359 dl 1.2
360     // Untimed nonblocking versions
361    
362 dl 1.18 /**
363 dl 1.20 * Inserts the specified element into this queue, if another thread is
364 dl 1.18 * waiting to receive it.
365     *
366     * @param o the element to add.
367     * @return <tt>true</tt> if it was possible to add the element to
368     * this queue, else <tt>false</tt>
369     * @throws NullPointerException if the specified element is <tt>null</tt>
370     */
371 dholmes 1.11 public boolean offer(E o) {
372     if (o == null) throw new NullPointerException();
373 dl 1.27 final ReentrantLock qlock = this.qlock;
374 tim 1.10
375     for (;;) {
376 tim 1.26 Node node;
377 dl 1.2 qlock.lock();
378     try {
379     node = waitingTakes.deq();
380 tim 1.14 } finally {
381 dl 1.2 qlock.unlock();
382     }
383     if (node == null)
384     return false;
385 tim 1.10
386 dholmes 1.11 else if (node.set(o))
387 dl 1.2 return true;
388     // else retry
389     }
390 tim 1.1 }
391 dl 1.2
392 dl 1.18 /**
393     * Retrieves and removes the head of this queue, if another thread
394     * is currently making an element available.
395     *
396     * @return the head of this queue, or <tt>null</tt> if no
397     * element is available.
398     */
399 dl 1.2 public E poll() {
400 dl 1.27 final ReentrantLock qlock = this.qlock;
401 dl 1.2 for (;;) {
402     Node node;
403     qlock.lock();
404     try {
405     node = waitingPuts.deq();
406 tim 1.14 } finally {
407 dl 1.2 qlock.unlock();
408     }
409     if (node == null)
410     return null;
411    
412     else {
413     Object x = node.get();
414     if (x != null)
415     return (E)x;
416     // else retry
417     }
418     }
419 tim 1.1 }
420 dl 1.2
421 dl 1.5 /**
422 dholmes 1.11 * Always returns <tt>true</tt>.
423     * A <tt>SynchronousQueue</tt> has no internal capacity.
424     * @return <tt>true</tt>
425 dl 1.5 */
426     public boolean isEmpty() {
427     return true;
428     }
429    
430     /**
431 dholmes 1.11 * Always returns zero.
432     * A <tt>SynchronousQueue</tt> has no internal capacity.
433 dl 1.5 * @return zero.
434     */
435     public int size() {
436     return 0;
437 tim 1.1 }
438 dl 1.2
439 dl 1.5 /**
440 dholmes 1.11 * Always returns zero.
441     * A <tt>SynchronousQueue</tt> has no internal capacity.
442 dl 1.5 * @return zero.
443     */
444     public int remainingCapacity() {
445     return 0;
446     }
447    
448     /**
449 dholmes 1.11 * Does nothing.
450     * A <tt>SynchronousQueue</tt> has no internal capacity.
451     */
452     public void clear() {}
453    
454     /**
455     * Always returns <tt>false</tt>.
456     * A <tt>SynchronousQueue</tt> has no internal capacity.
457 dl 1.18 * @param o the element
458 dholmes 1.11 * @return <tt>false</tt>
459     */
460     public boolean contains(Object o) {
461     return false;
462     }
463    
464     /**
465 dl 1.18 * Always returns <tt>false</tt>.
466     * A <tt>SynchronousQueue</tt> has no internal capacity.
467     *
468     * @param o the element to remove
469     * @return <tt>false</tt>
470     */
471     public boolean remove(Object o) {
472     return false;
473     }
474    
475     /**
476 dl 1.16 * Returns <tt>false</tt> unless given collection is empty.
477 dholmes 1.11 * A <tt>SynchronousQueue</tt> has no internal capacity.
478 dl 1.18 * @param c the collection
479 dl 1.16 * @return <tt>false</tt> unless given collection is empty
480 dholmes 1.11 */
481 dl 1.12 public boolean containsAll(Collection<?> c) {
482 dl 1.16 return c.isEmpty();
483 dholmes 1.11 }
484    
485     /**
486     * Always returns <tt>false</tt>.
487     * A <tt>SynchronousQueue</tt> has no internal capacity.
488 dl 1.18 * @param c the collection
489 dholmes 1.11 * @return <tt>false</tt>
490     */
491 dl 1.12 public boolean removeAll(Collection<?> c) {
492 dholmes 1.11 return false;
493     }
494    
495     /**
496     * Always returns <tt>false</tt>.
497     * A <tt>SynchronousQueue</tt> has no internal capacity.
498 dl 1.18 * @param c the collection
499 dholmes 1.11 * @return <tt>false</tt>
500     */
501 dl 1.12 public boolean retainAll(Collection<?> c) {
502 dholmes 1.11 return false;
503     }
504    
505     /**
506     * Always returns <tt>null</tt>.
507     * A <tt>SynchronousQueue</tt> does not return elements
508 dl 1.5 * unless actively waited on.
509 dholmes 1.11 * @return <tt>null</tt>
510 dl 1.5 */
511     public E peek() {
512     return null;
513     }
514    
515    
516     static class EmptyIterator<E> implements Iterator<E> {
517 dl 1.2 public boolean hasNext() {
518     return false;
519     }
520     public E next() {
521     throw new NoSuchElementException();
522     }
523     public void remove() {
524 dl 1.17 throw new IllegalStateException();
525 dl 1.2 }
526 tim 1.1 }
527 dl 1.2
528 dl 1.5 /**
529 dl 1.18 * Returns an empty iterator in which <tt>hasNext</tt> always returns
530 tim 1.13 * <tt>false</tt>.
531     *
532 dholmes 1.11 * @return an empty iterator
533 dl 1.5 */
534 dl 1.2 public Iterator<E> iterator() {
535 dl 1.5 return new EmptyIterator<E>();
536 tim 1.1 }
537    
538 dl 1.2
539 dl 1.5 /**
540 dholmes 1.11 * Returns a zero-length array.
541     * @return a zero-length array
542 dl 1.5 */
543 dl 1.3 public Object[] toArray() {
544 dl 1.25 return new Object[0];
545 tim 1.1 }
546    
547 dholmes 1.11 /**
548     * Sets the zeroeth element of the specified array to <tt>null</tt>
549     * (if the array has non-zero length) and returns it.
550     * @return the specified array
551     */
552 dl 1.2 public <T> T[] toArray(T[] a) {
553     if (a.length > 0)
554     a[0] = null;
555     return a;
556     }
557 dl 1.21
558    
559     public int drainTo(Collection<? super E> c) {
560     if (c == null)
561     throw new NullPointerException();
562     if (c == this)
563     throw new IllegalArgumentException();
564     int n = 0;
565     E e;
566     while ( (e = poll()) != null) {
567     c.add(e);
568     ++n;
569     }
570     return n;
571     }
572    
573     public int drainTo(Collection<? super E> c, int maxElements) {
574     if (c == null)
575     throw new NullPointerException();
576     if (c == this)
577     throw new IllegalArgumentException();
578     int n = 0;
579     E e;
580     while (n < maxElements && (e = poll()) != null) {
581     c.add(e);
582     ++n;
583     }
584     return n;
585     }
586 tim 1.1 }
587 dholmes 1.11
588    
589    
590    
591