ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.29
Committed: Sat Dec 27 19:26:26 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.28: +2 -2 lines
Log Message:
Headers reference Creative Commons

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity - in
15 * particular it does not have a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to take it; you cannot add an element (using
18 * any method) unless another thread is trying to remove it; you
19 * cannot iterate as there is nothing to iterate. The <em>head</em>
20 * of the queue is the element that the first queued thread is trying
21 * to add to the queue; if there are no queued threads then no element
22 * is being added and the head is <tt>null</tt>. For purposes of
23 * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 * queue does not permit <tt>null</tt> elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must synch up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 * <p>This class implements all of the <em>optional</em> methods
33 * of the {@link Collection} and {@link Iterator} interfaces.
34 * @since 1.5
35 * @author Doug Lea
36 * @param <E> the type of elements held in this collection
37 */
38 public class SynchronousQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = -3223113410248163686L;
41
42 /*
43 This implementation divides actions into two cases for puts:
44
45 * An arriving putter that does not already have a waiting taker
46 creates a node holding item, and then waits for a taker to take it.
47 * An arriving putter that does already have a waiting taker fills
48 the slot node created by the taker, and notifies it to continue.
49
50 And symmetrically, two for takes:
51
52 * An arriving taker that does not already have a waiting putter
53 creates an empty slot node, and then waits for a putter to fill it.
54 * An arriving taker that does already have a waiting putter takes
55 item from the node created by the putter, and notifies it to continue.
56
57 This requires keeping two simple queues: waitingPuts and waitingTakes.
58
59 When a put or take waiting for the actions of its counterpart
60 aborts due to interruption or timeout, it marks the node
61 it created as "CANCELLED", which causes its counterpart to retry
62 the entire put or take sequence.
63 */
64
65 /**
66 * Special marker used in queue nodes to indicate that
67 * the thread waiting for a change in the node has timed out
68 * or been interrupted.
69 **/
70 private static final Object CANCELLED = new Object();
71
72 /*
73 * Note that all fields are transient final, so there is
74 * no explicit serialization code.
75 */
76
77 private transient final WaitQueue waitingPuts = new WaitQueue();
78 private transient final WaitQueue waitingTakes = new WaitQueue();
79 private transient final ReentrantLock qlock = new ReentrantLock();
80
81 /**
82 * Nodes each maintain an item and handle waits and signals for
83 * getting and setting it. The class opportunistically extends
84 * ReentrantLock to save an extra object allocation per
85 * rendezvous.
86 */
87 private static class Node extends ReentrantLock {
88 /** Condition to wait on for other party; lazily constructed */
89 Condition done;
90 /** The item being transferred */
91 Object item;
92 /** Next node in wait queue */
93 Node next;
94
95 Node(Object x) { item = x; }
96
97 /**
98 * Fill in the slot created by the taker and signal taker to
99 * continue.
100 */
101 boolean set(Object x) {
102 this.lock();
103 try {
104 if (item != CANCELLED) {
105 item = x;
106 if (done != null)
107 done.signal();
108 return true;
109 } else // taker has cancelled
110 return false;
111 } finally {
112 this.unlock();
113 }
114 }
115
116 /**
117 * Remove item from slot created by putter and signal putter
118 * to continue.
119 */
120 Object get() {
121 this.lock();
122 try {
123 Object x = item;
124 if (x != CANCELLED) {
125 item = null;
126 next = null;
127 if (done != null)
128 done.signal();
129 return x;
130 } else
131 return null;
132 } finally {
133 this.unlock();
134 }
135 }
136
137 /**
138 * Wait for a taker to take item placed by putter, or time out.
139 */
140 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
141 this.lock();
142 try {
143 for (;;) {
144 if (item == null)
145 return true;
146 if (timed) {
147 if (nanos <= 0) {
148 item = CANCELLED;
149 return false;
150 }
151 }
152 if (done == null)
153 done = this.newCondition();
154 if (timed)
155 nanos = done.awaitNanos(nanos);
156 else
157 done.await();
158 }
159 } catch (InterruptedException ie) {
160 // If taken, return normally but set interrupt status
161 if (item == null) {
162 Thread.currentThread().interrupt();
163 return true;
164 } else {
165 item = CANCELLED;
166 done.signal(); // propagate signal
167 throw ie;
168 }
169 } finally {
170 this.unlock();
171 }
172 }
173
174 /**
175 * Wait for a putter to put item placed by taker, or time out.
176 */
177 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
178 this.lock();
179 try {
180 for (;;) {
181 Object x = item;
182 if (x != null) {
183 item = null;
184 next = null;
185 return x;
186 }
187 if (timed) {
188 if (nanos <= 0) {
189 item = CANCELLED;
190 return null;
191 }
192 }
193 if (done == null)
194 done = this.newCondition();
195 if (timed)
196 nanos = done.awaitNanos(nanos);
197 else
198 done.await();
199 }
200 } catch (InterruptedException ie) {
201 Object y = item;
202 if (y != null) {
203 item = null;
204 next = null;
205 Thread.currentThread().interrupt();
206 return y;
207 } else {
208 item = CANCELLED;
209 done.signal(); // propagate signal
210 throw ie;
211 }
212 } finally {
213 this.unlock();
214 }
215 }
216 }
217
218 /**
219 * Simple FIFO queue class to hold waiting puts/takes.
220 **/
221 private static class WaitQueue<E> {
222 Node head;
223 Node last;
224
225 Node enq(Object x) {
226 Node p = new Node(x);
227 if (last == null)
228 last = head = p;
229 else
230 last = last.next = p;
231 return p;
232 }
233
234 Node deq() {
235 Node p = head;
236 if (p != null && (head = p.next) == null)
237 last = null;
238 return p;
239 }
240 }
241
242 /**
243 * Main put algorithm, used by put, timed offer
244 */
245 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
246 if (x == null) throw new NullPointerException();
247 for (;;) {
248 Node node;
249 boolean mustWait;
250 final ReentrantLock qlock = this.qlock;
251 qlock.lockInterruptibly();
252 try {
253 node = waitingTakes.deq();
254 if ( (mustWait = (node == null)) )
255 node = waitingPuts.enq(x);
256 } finally {
257 qlock.unlock();
258 }
259
260 if (mustWait)
261 return node.waitForTake(timed, nanos);
262
263 else if (node.set(x))
264 return true;
265
266 // else taker cancelled, so retry
267 }
268 }
269
270 /**
271 * Main take algorithm, used by take, timed poll
272 */
273 private E doTake(boolean timed, long nanos) throws InterruptedException {
274 for (;;) {
275 Node node;
276 boolean mustWait;
277
278 final ReentrantLock qlock = this.qlock;
279 qlock.lockInterruptibly();
280 try {
281 node = waitingPuts.deq();
282 if ( (mustWait = (node == null)) )
283 node = waitingTakes.enq(null);
284 } finally {
285 qlock.unlock();
286 }
287
288 if (mustWait)
289 return (E)node.waitForPut(timed, nanos);
290
291 else {
292 E x = (E)node.get();
293 if (x != null)
294 return x;
295 // else cancelled, so retry
296 }
297 }
298 }
299
300 /**
301 * Creates a <tt>SynchronousQueue</tt>.
302 */
303 public SynchronousQueue() {}
304
305
306 /**
307 * Adds the specified element to this queue, waiting if necessary for
308 * another thread to receive it.
309 * @param o the element to add
310 * @throws InterruptedException if interrupted while waiting.
311 * @throws NullPointerException if the specified element is <tt>null</tt>.
312 */
313 public void put(E o) throws InterruptedException {
314 doPut(o, false, 0);
315 }
316
317 /**
318 * Inserts the specified element into this queue, waiting if necessary
319 * up to the specified wait time for another thread to receive it.
320 * @param o the element to add
321 * @param timeout how long to wait before giving up, in units of
322 * <tt>unit</tt>
323 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
324 * <tt>timeout</tt> parameter
325 * @return <tt>true</tt> if successful, or <tt>false</tt> if
326 * the specified waiting time elapses before a taker appears.
327 * @throws InterruptedException if interrupted while waiting.
328 * @throws NullPointerException if the specified element is <tt>null</tt>.
329 */
330 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
331 return doPut(o, true, unit.toNanos(timeout));
332 }
333
334
335 /**
336 * Retrieves and removes the head of this queue, waiting if necessary
337 * for another thread to insert it.
338 * @return the head of this queue
339 */
340 public E take() throws InterruptedException {
341 return doTake(false, 0);
342 }
343
344 /**
345 * Retrieves and removes the head of this queue, waiting
346 * if necessary up to the specified wait time, for another thread
347 * to insert it.
348 * @param timeout how long to wait before giving up, in units of
349 * <tt>unit</tt>
350 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
351 * <tt>timeout</tt> parameter
352 * @return the head of this queue, or <tt>null</tt> if the
353 * specified waiting time elapses before an element is present.
354 * @throws InterruptedException if interrupted while waiting.
355 */
356 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
357 return doTake(true, unit.toNanos(timeout));
358 }
359
360 // Untimed nonblocking versions
361
362 /**
363 * Inserts the specified element into this queue, if another thread is
364 * waiting to receive it.
365 *
366 * @param o the element to add.
367 * @return <tt>true</tt> if it was possible to add the element to
368 * this queue, else <tt>false</tt>
369 * @throws NullPointerException if the specified element is <tt>null</tt>
370 */
371 public boolean offer(E o) {
372 if (o == null) throw new NullPointerException();
373 final ReentrantLock qlock = this.qlock;
374
375 for (;;) {
376 Node node;
377 qlock.lock();
378 try {
379 node = waitingTakes.deq();
380 } finally {
381 qlock.unlock();
382 }
383 if (node == null)
384 return false;
385
386 else if (node.set(o))
387 return true;
388 // else retry
389 }
390 }
391
392 /**
393 * Retrieves and removes the head of this queue, if another thread
394 * is currently making an element available.
395 *
396 * @return the head of this queue, or <tt>null</tt> if no
397 * element is available.
398 */
399 public E poll() {
400 final ReentrantLock qlock = this.qlock;
401 for (;;) {
402 Node node;
403 qlock.lock();
404 try {
405 node = waitingPuts.deq();
406 } finally {
407 qlock.unlock();
408 }
409 if (node == null)
410 return null;
411
412 else {
413 Object x = node.get();
414 if (x != null)
415 return (E)x;
416 // else retry
417 }
418 }
419 }
420
421 /**
422 * Always returns <tt>true</tt>.
423 * A <tt>SynchronousQueue</tt> has no internal capacity.
424 * @return <tt>true</tt>
425 */
426 public boolean isEmpty() {
427 return true;
428 }
429
430 /**
431 * Always returns zero.
432 * A <tt>SynchronousQueue</tt> has no internal capacity.
433 * @return zero.
434 */
435 public int size() {
436 return 0;
437 }
438
439 /**
440 * Always returns zero.
441 * A <tt>SynchronousQueue</tt> has no internal capacity.
442 * @return zero.
443 */
444 public int remainingCapacity() {
445 return 0;
446 }
447
448 /**
449 * Does nothing.
450 * A <tt>SynchronousQueue</tt> has no internal capacity.
451 */
452 public void clear() {}
453
454 /**
455 * Always returns <tt>false</tt>.
456 * A <tt>SynchronousQueue</tt> has no internal capacity.
457 * @param o the element
458 * @return <tt>false</tt>
459 */
460 public boolean contains(Object o) {
461 return false;
462 }
463
464 /**
465 * Always returns <tt>false</tt>.
466 * A <tt>SynchronousQueue</tt> has no internal capacity.
467 *
468 * @param o the element to remove
469 * @return <tt>false</tt>
470 */
471 public boolean remove(Object o) {
472 return false;
473 }
474
475 /**
476 * Returns <tt>false</tt> unless given collection is empty.
477 * A <tt>SynchronousQueue</tt> has no internal capacity.
478 * @param c the collection
479 * @return <tt>false</tt> unless given collection is empty
480 */
481 public boolean containsAll(Collection<?> c) {
482 return c.isEmpty();
483 }
484
485 /**
486 * Always returns <tt>false</tt>.
487 * A <tt>SynchronousQueue</tt> has no internal capacity.
488 * @param c the collection
489 * @return <tt>false</tt>
490 */
491 public boolean removeAll(Collection<?> c) {
492 return false;
493 }
494
495 /**
496 * Always returns <tt>false</tt>.
497 * A <tt>SynchronousQueue</tt> has no internal capacity.
498 * @param c the collection
499 * @return <tt>false</tt>
500 */
501 public boolean retainAll(Collection<?> c) {
502 return false;
503 }
504
505 /**
506 * Always returns <tt>null</tt>.
507 * A <tt>SynchronousQueue</tt> does not return elements
508 * unless actively waited on.
509 * @return <tt>null</tt>
510 */
511 public E peek() {
512 return null;
513 }
514
515
516 static class EmptyIterator<E> implements Iterator<E> {
517 public boolean hasNext() {
518 return false;
519 }
520 public E next() {
521 throw new NoSuchElementException();
522 }
523 public void remove() {
524 throw new IllegalStateException();
525 }
526 }
527
528 /**
529 * Returns an empty iterator in which <tt>hasNext</tt> always returns
530 * <tt>false</tt>.
531 *
532 * @return an empty iterator
533 */
534 public Iterator<E> iterator() {
535 return new EmptyIterator<E>();
536 }
537
538
539 /**
540 * Returns a zero-length array.
541 * @return a zero-length array
542 */
543 public Object[] toArray() {
544 return new Object[0];
545 }
546
547 /**
548 * Sets the zeroeth element of the specified array to <tt>null</tt>
549 * (if the array has non-zero length) and returns it.
550 * @return the specified array
551 */
552 public <T> T[] toArray(T[] a) {
553 if (a.length > 0)
554 a[0] = null;
555 return a;
556 }
557
558
559 public int drainTo(Collection<? super E> c) {
560 if (c == null)
561 throw new NullPointerException();
562 if (c == this)
563 throw new IllegalArgumentException();
564 int n = 0;
565 E e;
566 while ( (e = poll()) != null) {
567 c.add(e);
568 ++n;
569 }
570 return n;
571 }
572
573 public int drainTo(Collection<? super E> c, int maxElements) {
574 if (c == null)
575 throw new NullPointerException();
576 if (c == this)
577 throw new IllegalArgumentException();
578 int n = 0;
579 E e;
580 while (n < maxElements && (e = poll()) != null) {
581 c.add(e);
582 ++n;
583 }
584 return n;
585 }
586 }
587
588
589
590
591