ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.21
Committed: Sun Oct 5 23:00:18 2003 UTC (20 years, 8 months ago) by dl
Branch: MAIN
Changes since 1.20: +29 -0 lines
Log Message:
added drainTo; clarified various exception specs

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity - in
15 * particular it does not have a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to take it; you cannot add an element (using
18 * any method) unless another thread is trying to remove it; you
19 * cannot iterate as there is nothing to iterate. The <em>head</em>
20 * of the queue is the element that the first queued thread is trying
21 * to add to the queue; if there are no queued threads then no element
22 * is being added and the head is <tt>null</tt>. For purposes of
23 * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 * queue does not permit <tt>null</tt> elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must synch up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 * <p>This class implements all of the <em>optional</em> methods
33 * of the {@link Collection} and {@link Iterator} interfaces.
34 * @since 1.5
35 * @author Doug Lea
36 **/
37 public class SynchronousQueue<E> extends AbstractQueue<E>
38 implements BlockingQueue<E>, java.io.Serializable {
39 private static final long serialVersionUID = -3223113410248163686L;
40
41 /*
42 This implementation divides actions into two cases for puts:
43
44 * An arriving putter that does not already have a waiting taker
45 creates a node holding item, and then waits for a taker to take it.
46 * An arriving putter that does already have a waiting taker fills
47 the slot node created by the taker, and notifies it to continue.
48
49 And symmetrically, two for takes:
50
51 * An arriving taker that does not already have a waiting putter
52 creates an empty slot node, and then waits for a putter to fill it.
53 * An arriving taker that does already have a waiting putter takes
54 item from the node created by the putter, and notifies it to continue.
55
56 This requires keeping two simple queues: waitingPuts and waitingTakes.
57
58 When a put or take waiting for the actions of its counterpart
59 aborts due to interruption or timeout, it marks the node
60 it created as "CANCELLED", which causes its counterpart to retry
61 the entire put or take sequence.
62 */
63
64 /**
65 * Special marker used in queue nodes to indicate that
66 * the thread waiting for a change in the node has timed out
67 * or been interrupted.
68 **/
69 private static final Object CANCELLED = new Object();
70
71 /*
72 * Note that all fields are transient final, so there is
73 * no explicit serialization code.
74 */
75
76 private transient final WaitQueue waitingPuts = new WaitQueue();
77 private transient final WaitQueue waitingTakes = new WaitQueue();
78 private transient final ReentrantLock qlock = new ReentrantLock();
79
80 /**
81 * Nodes each maintain an item and handle waits and signals for
82 * getting and setting it. The class opportunistically extends
83 * ReentrantLock to save an extra object allocation per
84 * rendezvous.
85 */
86 private static class Node extends ReentrantLock {
87 /** Condition to wait on for other party; lazily constructed */
88 Condition done;
89 /** The item being transferred */
90 Object item;
91 /** Next node in wait queue */
92 Node next;
93
94 Node(Object x) { item = x; }
95
96 /**
97 * Fill in the slot created by the taker and signal taker to
98 * continue.
99 */
100 boolean set(Object x) {
101 this.lock();
102 try {
103 if (item != CANCELLED) {
104 item = x;
105 if (done != null)
106 done.signal();
107 return true;
108 } else // taker has cancelled
109 return false;
110 } finally {
111 this.unlock();
112 }
113 }
114
115 /**
116 * Remove item from slot created by putter and signal putter
117 * to continue.
118 */
119 Object get() {
120 this.lock();
121 try {
122 Object x = item;
123 if (x != CANCELLED) {
124 item = null;
125 next = null;
126 if (done != null)
127 done.signal();
128 return x;
129 } else
130 return null;
131 } finally {
132 this.unlock();
133 }
134 }
135
136 /**
137 * Wait for a taker to take item placed by putter, or time out.
138 */
139 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
140 this.lock();
141 try {
142 for (;;) {
143 if (item == null)
144 return true;
145 if (timed) {
146 if (nanos <= 0) {
147 item = CANCELLED;
148 return false;
149 }
150 }
151 if (done == null)
152 done = this.newCondition();
153 if (timed)
154 nanos = done.awaitNanos(nanos);
155 else
156 done.await();
157 }
158 } catch (InterruptedException ie) {
159 // If taken, return normally but set interrupt status
160 if (item == null) {
161 Thread.currentThread().interrupt();
162 return true;
163 } else {
164 item = CANCELLED;
165 done.signal(); // propagate signal
166 throw ie;
167 }
168 } finally {
169 this.unlock();
170 }
171 }
172
173 /**
174 * Wait for a putter to put item placed by taker, or time out.
175 */
176 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
177 this.lock();
178 try {
179 for (;;) {
180 Object x = item;
181 if (x != null) {
182 item = null;
183 next = null;
184 return x;
185 }
186 if (timed) {
187 if (nanos <= 0) {
188 item = CANCELLED;
189 return null;
190 }
191 }
192 if (done == null)
193 done = this.newCondition();
194 if (timed)
195 nanos = done.awaitNanos(nanos);
196 else
197 done.await();
198 }
199 } catch (InterruptedException ie) {
200 Object y = item;
201 if (y != null) {
202 item = null;
203 next = null;
204 Thread.currentThread().interrupt();
205 return y;
206 } else {
207 item = CANCELLED;
208 done.signal(); // propagate signal
209 throw ie;
210 }
211 } finally {
212 this.unlock();
213 }
214 }
215 }
216
217 /**
218 * Simple FIFO queue class to hold waiting puts/takes.
219 **/
220 private static class WaitQueue<E> {
221 Node head;
222 Node last;
223
224 Node enq(Object x) {
225 Node p = new Node(x);
226 if (last == null)
227 last = head = p;
228 else
229 last = last.next = p;
230 return p;
231 }
232
233 Node deq() {
234 Node p = head;
235 if (p != null && (head = p.next) == null)
236 last = null;
237 return p;
238 }
239 }
240
241 /**
242 * Main put algorithm, used by put, timed offer
243 */
244 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
245 if (x == null) throw new NullPointerException();
246 for (;;) {
247 Node node;
248 boolean mustWait;
249
250 qlock.lockInterruptibly();
251 try {
252 node = waitingTakes.deq();
253 if ( (mustWait = (node == null)) )
254 node = waitingPuts.enq(x);
255 } finally {
256 qlock.unlock();
257 }
258
259 if (mustWait)
260 return node.waitForTake(timed, nanos);
261
262 else if (node.set(x))
263 return true;
264
265 // else taker cancelled, so retry
266 }
267 }
268
269 /**
270 * Main take algorithm, used by take, timed poll
271 */
272 private E doTake(boolean timed, long nanos) throws InterruptedException {
273 for (;;) {
274 Node node;
275 boolean mustWait;
276
277 qlock.lockInterruptibly();
278 try {
279 node = waitingPuts.deq();
280 if ( (mustWait = (node == null)) )
281 node = waitingTakes.enq(null);
282 } finally {
283 qlock.unlock();
284 }
285
286 if (mustWait)
287 return (E)node.waitForPut(timed, nanos);
288
289 else {
290 E x = (E)node.get();
291 if (x != null)
292 return x;
293 // else cancelled, so retry
294 }
295 }
296 }
297
298 /**
299 * Creates a <tt>SynchronousQueue</tt>.
300 */
301 public SynchronousQueue() {}
302
303
304 /**
305 * Adds the specified element to this queue, waiting if necessary for
306 * another thread to receive it.
307 * @param o the element to add
308 * @throws InterruptedException if interrupted while waiting.
309 * @throws NullPointerException if the specified element is <tt>null</tt>.
310 */
311 public void put(E o) throws InterruptedException {
312 doPut(o, false, 0);
313 }
314
315 /**
316 * Inserts the specified element into this queue, waiting if necessary
317 * up to the specified wait time for another thread to receive it.
318 * @param o the element to add
319 * @param timeout how long to wait before giving up, in units of
320 * <tt>unit</tt>
321 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
322 * <tt>timeout</tt> parameter
323 * @return <tt>true</tt> if successful, or <tt>false</tt> if
324 * the specified waiting time elapses before a taker appears.
325 * @throws InterruptedException if interrupted while waiting.
326 * @throws NullPointerException if the specified element is <tt>null</tt>.
327 */
328 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
329 return doPut(o, true, unit.toNanos(timeout));
330 }
331
332
333 /**
334 * Retrieves and removes the head of this queue, waiting if necessary
335 * for another thread to insert it.
336 * @return the head of this queue
337 */
338 public E take() throws InterruptedException {
339 return doTake(false, 0);
340 }
341
342 /**
343 * Retrieves and removes the head of this queue, waiting
344 * if necessary up to the specified wait time, for another thread
345 * to insert it.
346 * @param timeout how long to wait before giving up, in units of
347 * <tt>unit</tt>
348 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
349 * <tt>timeout</tt> parameter
350 * @return the head of this queue, or <tt>null</tt> if the
351 * specified waiting time elapses before an element is present.
352 * @throws InterruptedException if interrupted while waiting.
353 */
354 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
355 return doTake(true, unit.toNanos(timeout));
356 }
357
358 // Untimed nonblocking versions
359
360 /**
361 * Inserts the specified element into this queue, if another thread is
362 * waiting to receive it.
363 *
364 * @param o the element to add.
365 * @return <tt>true</tt> if it was possible to add the element to
366 * this queue, else <tt>false</tt>
367 * @throws NullPointerException if the specified element is <tt>null</tt>
368 */
369 public boolean offer(E o) {
370 if (o == null) throw new NullPointerException();
371
372 for (;;) {
373 qlock.lock();
374 Node node;
375 try {
376 node = waitingTakes.deq();
377 } finally {
378 qlock.unlock();
379 }
380 if (node == null)
381 return false;
382
383 else if (node.set(o))
384 return true;
385 // else retry
386 }
387 }
388
389 /**
390 * Retrieves and removes the head of this queue, if another thread
391 * is currently making an element available.
392 *
393 * @return the head of this queue, or <tt>null</tt> if no
394 * element is available.
395 */
396 public E poll() {
397 for (;;) {
398 Node node;
399 qlock.lock();
400 try {
401 node = waitingPuts.deq();
402 } finally {
403 qlock.unlock();
404 }
405 if (node == null)
406 return null;
407
408 else {
409 Object x = node.get();
410 if (x != null)
411 return (E)x;
412 // else retry
413 }
414 }
415 }
416
417 /**
418 * Always returns <tt>true</tt>.
419 * A <tt>SynchronousQueue</tt> has no internal capacity.
420 * @return <tt>true</tt>
421 */
422 public boolean isEmpty() {
423 return true;
424 }
425
426 /**
427 * Always returns zero.
428 * A <tt>SynchronousQueue</tt> has no internal capacity.
429 * @return zero.
430 */
431 public int size() {
432 return 0;
433 }
434
435 /**
436 * Always returns zero.
437 * A <tt>SynchronousQueue</tt> has no internal capacity.
438 * @return zero.
439 */
440 public int remainingCapacity() {
441 return 0;
442 }
443
444 /**
445 * Does nothing.
446 * A <tt>SynchronousQueue</tt> has no internal capacity.
447 */
448 public void clear() {}
449
450 /**
451 * Always returns <tt>false</tt>.
452 * A <tt>SynchronousQueue</tt> has no internal capacity.
453 * @param o the element
454 * @return <tt>false</tt>
455 */
456 public boolean contains(Object o) {
457 return false;
458 }
459
460 /**
461 * Always returns <tt>false</tt>.
462 * A <tt>SynchronousQueue</tt> has no internal capacity.
463 *
464 * @param o the element to remove
465 * @return <tt>false</tt>
466 */
467 public boolean remove(Object o) {
468 return false;
469 }
470
471 /**
472 * Returns <tt>false</tt> unless given collection is empty.
473 * A <tt>SynchronousQueue</tt> has no internal capacity.
474 * @param c the collection
475 * @return <tt>false</tt> unless given collection is empty
476 */
477 public boolean containsAll(Collection<?> c) {
478 return c.isEmpty();
479 }
480
481 /**
482 * Always returns <tt>false</tt>.
483 * A <tt>SynchronousQueue</tt> has no internal capacity.
484 * @param c the collection
485 * @return <tt>false</tt>
486 */
487 public boolean removeAll(Collection<?> c) {
488 return false;
489 }
490
491 /**
492 * Always returns <tt>false</tt>.
493 * A <tt>SynchronousQueue</tt> has no internal capacity.
494 * @param c the collection
495 * @return <tt>false</tt>
496 */
497 public boolean retainAll(Collection<?> c) {
498 return false;
499 }
500
501 /**
502 * Always returns <tt>null</tt>.
503 * A <tt>SynchronousQueue</tt> does not return elements
504 * unless actively waited on.
505 * @return <tt>null</tt>
506 */
507 public E peek() {
508 return null;
509 }
510
511
512 static class EmptyIterator<E> implements Iterator<E> {
513 public boolean hasNext() {
514 return false;
515 }
516 public E next() {
517 throw new NoSuchElementException();
518 }
519 public void remove() {
520 throw new IllegalStateException();
521 }
522 }
523
524 /**
525 * Returns an empty iterator in which <tt>hasNext</tt> always returns
526 * <tt>false</tt>.
527 *
528 * @return an empty iterator
529 */
530 public Iterator<E> iterator() {
531 return new EmptyIterator<E>();
532 }
533
534
535 /**
536 * Returns a zero-length array.
537 * @return a zero-length array
538 */
539 public Object[] toArray() {
540 return (E[]) new Object[0];
541 }
542
543 /**
544 * Sets the zeroeth element of the specified array to <tt>null</tt>
545 * (if the array has non-zero length) and returns it.
546 * @return the specified array
547 */
548 public <T> T[] toArray(T[] a) {
549 if (a.length > 0)
550 a[0] = null;
551 return a;
552 }
553
554
555 public int drainTo(Collection<? super E> c) {
556 if (c == null)
557 throw new NullPointerException();
558 if (c == this)
559 throw new IllegalArgumentException();
560 int n = 0;
561 E e;
562 while ( (e = poll()) != null) {
563 c.add(e);
564 ++n;
565 }
566 return n;
567 }
568
569 public int drainTo(Collection<? super E> c, int maxElements) {
570 if (c == null)
571 throw new NullPointerException();
572 if (c == this)
573 throw new IllegalArgumentException();
574 int n = 0;
575 E e;
576 while (n < maxElements && (e = poll()) != null) {
577 c.add(e);
578 ++n;
579 }
580 return n;
581 }
582 }
583
584
585
586
587