ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.18
Committed: Fri Sep 12 15:40:10 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.17: +73 -64 lines
Log Message:
Adapt AbstractQueue changes; Conditionalize CancellableTask.reset; new TimeUnit methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity - in
15 * particular it does not have a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to take it; you cannot add an element (using
18 * any method) unless another thread is trying to remove it; you
19 * cannot iterate as there is nothing to iterate. The <em>head</em>
20 * of the queue is the element that the first queued thread is trying
21 * to add to the queue; if there are no queued threads then no element
22 * is being added and the head is <tt>null</tt>. For purposes of
23 * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 * a <tt>SynchronousQueue<//t> acts as an empty collection. This
25 * queue does not permit <tt>null</tt> elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must synch up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 * @since 1.5
33 * @author Doug Lea
34 **/
35 public class SynchronousQueue<E> extends AbstractQueue<E>
36 implements BlockingQueue<E>, java.io.Serializable {
37 private static final long serialVersionUID = -3223113410248163686L;
38
39 /*
40 This implementation divides actions into two cases for puts:
41
42 * An arriving putter that does not already have a waiting taker
43 creates a node holding item, and then waits for a taker to take it.
44 * An arriving putter that does already have a waiting taker fills
45 the slot node created by the taker, and notifies it to continue.
46
47 And symmetrically, two for takes:
48
49 * An arriving taker that does not already have a waiting putter
50 creates an empty slot node, and then waits for a putter to fill it.
51 * An arriving taker that does already have a waiting putter takes
52 item from the node created by the putter, and notifies it to continue.
53
54 This requires keeping two simple queues: waitingPuts and waitingTakes.
55
56 When a put or take waiting for the actions of its counterpart
57 aborts due to interruption or timeout, it marks the node
58 it created as "CANCELLED", which causes its counterpart to retry
59 the entire put or take sequence.
60 */
61
62 /**
63 * Special marker used in queue nodes to indicate that
64 * the thread waiting for a change in the node has timed out
65 * or been interrupted.
66 **/
67 private static final Object CANCELLED = new Object();
68
69 /*
70 * Note that all fields are transient final, so there is
71 * no explicit serialization code.
72 */
73
74 private transient final WaitQueue waitingPuts = new WaitQueue();
75 private transient final WaitQueue waitingTakes = new WaitQueue();
76 private transient final ReentrantLock qlock = new ReentrantLock();
77
78 /**
79 * Nodes each maintain an item and handle waits and signals for
80 * getting and setting it. The class opportunistically extends
81 * ReentrantLock to save an extra object allocation per
82 * rendezvous.
83 */
84 private static class Node extends ReentrantLock {
85 /** Condition to wait on for other party; lazily constructed */
86 Condition done;
87 /** The item being transferred */
88 Object item;
89 /** Next node in wait queue */
90 Node next;
91
92 Node(Object x) { item = x; }
93
94 /**
95 * Fill in the slot created by the taker and signal taker to
96 * continue.
97 */
98 boolean set(Object x) {
99 this.lock();
100 try {
101 if (item != CANCELLED) {
102 item = x;
103 if (done != null)
104 done.signal();
105 return true;
106 } else // taker has cancelled
107 return false;
108 } finally {
109 this.unlock();
110 }
111 }
112
113 /**
114 * Remove item from slot created by putter and signal putter
115 * to continue.
116 */
117 Object get() {
118 this.lock();
119 try {
120 Object x = item;
121 if (x != CANCELLED) {
122 item = null;
123 next = null;
124 if (done != null)
125 done.signal();
126 return x;
127 } else
128 return null;
129 } finally {
130 this.unlock();
131 }
132 }
133
134 /**
135 * Wait for a taker to take item placed by putter, or time out.
136 */
137 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
138 this.lock();
139 try {
140 for (;;) {
141 if (item == null)
142 return true;
143 if (timed) {
144 if (nanos <= 0) {
145 item = CANCELLED;
146 return false;
147 }
148 }
149 if (done == null)
150 done = this.newCondition();
151 if (timed)
152 nanos = done.awaitNanos(nanos);
153 else
154 done.await();
155 }
156 } catch (InterruptedException ie) {
157 // If taken, return normally but set interrupt status
158 if (item == null) {
159 Thread.currentThread().interrupt();
160 return true;
161 } else {
162 item = CANCELLED;
163 done.signal(); // propagate signal
164 throw ie;
165 }
166 } finally {
167 this.unlock();
168 }
169 }
170
171 /**
172 * Wait for a putter to put item placed by taker, or time out.
173 */
174 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
175 this.lock();
176 try {
177 for (;;) {
178 Object x = item;
179 if (x != null) {
180 item = null;
181 next = null;
182 return x;
183 }
184 if (timed) {
185 if (nanos <= 0) {
186 item = CANCELLED;
187 return null;
188 }
189 }
190 if (done == null)
191 done = this.newCondition();
192 if (timed)
193 nanos = done.awaitNanos(nanos);
194 else
195 done.await();
196 }
197 } catch (InterruptedException ie) {
198 Object y = item;
199 if (y != null) {
200 item = null;
201 next = null;
202 Thread.currentThread().interrupt();
203 return y;
204 } else {
205 item = CANCELLED;
206 done.signal(); // propagate signal
207 throw ie;
208 }
209 } finally {
210 this.unlock();
211 }
212 }
213 }
214
215 /**
216 * Simple FIFO queue class to hold waiting puts/takes.
217 **/
218 private static class WaitQueue<E> {
219 Node head;
220 Node last;
221
222 Node enq(Object x) {
223 Node p = new Node(x);
224 if (last == null)
225 last = head = p;
226 else
227 last = last.next = p;
228 return p;
229 }
230
231 Node deq() {
232 Node p = head;
233 if (p != null && (head = p.next) == null)
234 last = null;
235 return p;
236 }
237 }
238
239 /**
240 * Main put algorithm, used by put, timed offer
241 */
242 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
243 if (x == null) throw new NullPointerException();
244 for (;;) {
245 Node node;
246 boolean mustWait;
247
248 qlock.lockInterruptibly();
249 try {
250 node = waitingTakes.deq();
251 if ( (mustWait = (node == null)) )
252 node = waitingPuts.enq(x);
253 } finally {
254 qlock.unlock();
255 }
256
257 if (mustWait)
258 return node.waitForTake(timed, nanos);
259
260 else if (node.set(x))
261 return true;
262
263 // else taker cancelled, so retry
264 }
265 }
266
267 /**
268 * Main take algorithm, used by take, timed poll
269 */
270 private E doTake(boolean timed, long nanos) throws InterruptedException {
271 for (;;) {
272 Node node;
273 boolean mustWait;
274
275 qlock.lockInterruptibly();
276 try {
277 node = waitingPuts.deq();
278 if ( (mustWait = (node == null)) )
279 node = waitingTakes.enq(null);
280 } finally {
281 qlock.unlock();
282 }
283
284 if (mustWait)
285 return (E)node.waitForPut(timed, nanos);
286
287 else {
288 E x = (E)node.get();
289 if (x != null)
290 return x;
291 // else cancelled, so retry
292 }
293 }
294 }
295
296 /**
297 * Creates a <tt>SynchronousQueue</tt>.
298 */
299 public SynchronousQueue() {}
300
301
302 /**
303 * Adds the specified element to this queue, waiting if necessary for
304 * another thread to receive it.
305 * @param o the element to add
306 * @throws InterruptedException if interrupted while waiting.
307 * @throws NullPointerException if the specified element is <tt>null</tt>.
308 */
309 public void put(E o) throws InterruptedException {
310 doPut(o, false, 0);
311 }
312
313 /**
314 * Adds the specified element to this queue, waiting if necessary
315 * up to the specified wait time for another thread to receive it.
316 * @param o the element to add
317 * @param timeout how long to wait before giving up, in units of
318 * <tt>unit</tt>
319 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
320 * <tt>timeout</tt> parameter
321 * @return <tt>true</tt> if successful, or <tt>false</tt> if
322 * the specified waiting time elapses before a taker appears.
323 * @throws InterruptedException if interrupted while waiting.
324 * @throws NullPointerException if the specified element is <tt>null</tt>.
325 */
326 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
327 return doPut(o, true, unit.toNanos(timeout));
328 }
329
330
331 /**
332 * Retrieves and removes the head of this queue, waiting if necessary
333 * for another thread to insert it.
334 * @return the head of this queue
335 */
336 public E take() throws InterruptedException {
337 return doTake(false, 0);
338 }
339
340 /**
341 * Retrieves and removes the head of this queue, waiting
342 * if necessary up to the specified wait time, for another thread
343 * to insert it.
344 * @param timeout how long to wait before giving up, in units of
345 * <tt>unit</tt>
346 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
347 * <tt>timeout</tt> parameter
348 * @return the head of this queue, or <tt>null</tt> if the
349 * specified waiting time elapses before an element is present.
350 * @throws InterruptedException if interrupted while waiting.
351 */
352 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
353 return doTake(true, unit.toNanos(timeout));
354 }
355
356 // Untimed nonblocking versions
357
358 /**
359 * Adds the specified element to this queue, if another thread is
360 * waiting to receive it.
361 *
362 * @param o the element to add.
363 * @return <tt>true</tt> if it was possible to add the element to
364 * this queue, else <tt>false</tt>
365 * @throws NullPointerException if the specified element is <tt>null</tt>
366 */
367 public boolean offer(E o) {
368 if (o == null) throw new NullPointerException();
369
370 for (;;) {
371 qlock.lock();
372 Node node;
373 try {
374 node = waitingTakes.deq();
375 } finally {
376 qlock.unlock();
377 }
378 if (node == null)
379 return false;
380
381 else if (node.set(o))
382 return true;
383 // else retry
384 }
385 }
386
387 /**
388 * Retrieves and removes the head of this queue, if another thread
389 * is currently making an element available.
390 *
391 * @return the head of this queue, or <tt>null</tt> if no
392 * element is available.
393 */
394 public E poll() {
395 for (;;) {
396 Node node;
397 qlock.lock();
398 try {
399 node = waitingPuts.deq();
400 } finally {
401 qlock.unlock();
402 }
403 if (node == null)
404 return null;
405
406 else {
407 Object x = node.get();
408 if (x != null)
409 return (E)x;
410 // else retry
411 }
412 }
413 }
414
415 /**
416 * Always returns <tt>true</tt>.
417 * A <tt>SynchronousQueue</tt> has no internal capacity.
418 * @return <tt>true</tt>
419 */
420 public boolean isEmpty() {
421 return true;
422 }
423
424 /**
425 * Always returns zero.
426 * A <tt>SynchronousQueue</tt> has no internal capacity.
427 * @return zero.
428 */
429 public int size() {
430 return 0;
431 }
432
433 /**
434 * Always returns zero.
435 * A <tt>SynchronousQueue</tt> has no internal capacity.
436 * @return zero.
437 */
438 public int remainingCapacity() {
439 return 0;
440 }
441
442 /**
443 * Does nothing.
444 * A <tt>SynchronousQueue</tt> has no internal capacity.
445 */
446 public void clear() {}
447
448 /**
449 * Always returns <tt>false</tt>.
450 * A <tt>SynchronousQueue</tt> has no internal capacity.
451 * @param o the element
452 * @return <tt>false</tt>
453 */
454 public boolean contains(Object o) {
455 return false;
456 }
457
458 /**
459 * Always returns <tt>false</tt>.
460 * A <tt>SynchronousQueue</tt> has no internal capacity.
461 *
462 * @param o the element to remove
463 * @return <tt>false</tt>
464 */
465 public boolean remove(Object o) {
466 return false;
467 }
468
469 /**
470 * Returns <tt>false</tt> unless given collection is empty.
471 * A <tt>SynchronousQueue</tt> has no internal capacity.
472 * @param c the collection
473 * @return <tt>false</tt> unless given collection is empty
474 */
475 public boolean containsAll(Collection<?> c) {
476 return c.isEmpty();
477 }
478
479 /**
480 * Always returns <tt>false</tt>.
481 * A <tt>SynchronousQueue</tt> has no internal capacity.
482 * @param c the collection
483 * @return <tt>false</tt>
484 */
485 public boolean removeAll(Collection<?> c) {
486 return false;
487 }
488
489 /**
490 * Always returns <tt>false</tt>.
491 * A <tt>SynchronousQueue</tt> has no internal capacity.
492 * @param c the collection
493 * @return <tt>false</tt>
494 */
495 public boolean retainAll(Collection<?> c) {
496 return false;
497 }
498
499 /**
500 * Always returns <tt>null</tt>.
501 * A <tt>SynchronousQueue</tt> does not return elements
502 * unless actively waited on.
503 * @return <tt>null</tt>
504 */
505 public E peek() {
506 return null;
507 }
508
509
510 static class EmptyIterator<E> implements Iterator<E> {
511 public boolean hasNext() {
512 return false;
513 }
514 public E next() {
515 throw new NoSuchElementException();
516 }
517 public void remove() {
518 throw new IllegalStateException();
519 }
520 }
521
522 /**
523 * Returns an empty iterator in which <tt>hasNext</tt> always returns
524 * <tt>false</tt>.
525 *
526 * @return an empty iterator
527 */
528 public Iterator<E> iterator() {
529 return new EmptyIterator<E>();
530 }
531
532
533 /**
534 * Returns a zero-length array.
535 * @return a zero-length array
536 */
537 public Object[] toArray() {
538 return (E[]) new Object[0];
539 }
540
541 /**
542 * Sets the zeroeth element of the specified array to <tt>null</tt>
543 * (if the array has non-zero length) and returns it.
544 * @return the specified array
545 */
546 public <T> T[] toArray(T[] a) {
547 if (a.length > 0)
548 a[0] = null;
549 return a;
550 }
551 }
552
553
554
555
556