ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.24
Committed: Sun Oct 19 13:38:34 2003 UTC (20 years, 7 months ago) by dl
Branch: MAIN
Changes since 1.23: +1 -1 lines
Log Message:
Changed doc strings for generic params

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity - in
15 * particular it does not have a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to take it; you cannot add an element (using
18 * any method) unless another thread is trying to remove it; you
19 * cannot iterate as there is nothing to iterate. The <em>head</em>
20 * of the queue is the element that the first queued thread is trying
21 * to add to the queue; if there are no queued threads then no element
22 * is being added and the head is <tt>null</tt>. For purposes of
23 * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 * queue does not permit <tt>null</tt> elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must synch up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 * <p>This class implements all of the <em>optional</em> methods
33 * of the {@link Collection} and {@link Iterator} interfaces.
34 * @since 1.5
35 * @author Doug Lea
36 * @param <E> the type of elements held in this collection
37 */
38 public class SynchronousQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = -3223113410248163686L;
41
42 /*
43 This implementation divides actions into two cases for puts:
44
45 * An arriving putter that does not already have a waiting taker
46 creates a node holding item, and then waits for a taker to take it.
47 * An arriving putter that does already have a waiting taker fills
48 the slot node created by the taker, and notifies it to continue.
49
50 And symmetrically, two for takes:
51
52 * An arriving taker that does not already have a waiting putter
53 creates an empty slot node, and then waits for a putter to fill it.
54 * An arriving taker that does already have a waiting putter takes
55 item from the node created by the putter, and notifies it to continue.
56
57 This requires keeping two simple queues: waitingPuts and waitingTakes.
58
59 When a put or take waiting for the actions of its counterpart
60 aborts due to interruption or timeout, it marks the node
61 it created as "CANCELLED", which causes its counterpart to retry
62 the entire put or take sequence.
63 */
64
65 /**
66 * Special marker used in queue nodes to indicate that
67 * the thread waiting for a change in the node has timed out
68 * or been interrupted.
69 **/
70 private static final Object CANCELLED = new Object();
71
72 /*
73 * Note that all fields are transient final, so there is
74 * no explicit serialization code.
75 */
76
77 private transient final WaitQueue waitingPuts = new WaitQueue();
78 private transient final WaitQueue waitingTakes = new WaitQueue();
79 private transient final ReentrantLock qlock = new ReentrantLock();
80
81 /**
82 * Nodes each maintain an item and handle waits and signals for
83 * getting and setting it. The class opportunistically extends
84 * ReentrantLock to save an extra object allocation per
85 * rendezvous.
86 */
87 private static class Node extends ReentrantLock {
88 /** Condition to wait on for other party; lazily constructed */
89 ReentrantLock.ConditionObject done;
90 /** The item being transferred */
91 Object item;
92 /** Next node in wait queue */
93 Node next;
94
95 Node(Object x) { item = x; }
96
97 /**
98 * Fill in the slot created by the taker and signal taker to
99 * continue.
100 */
101 boolean set(Object x) {
102 this.lock();
103 try {
104 if (item != CANCELLED) {
105 item = x;
106 if (done != null)
107 done.signal();
108 return true;
109 } else // taker has cancelled
110 return false;
111 } finally {
112 this.unlock();
113 }
114 }
115
116 /**
117 * Remove item from slot created by putter and signal putter
118 * to continue.
119 */
120 Object get() {
121 this.lock();
122 try {
123 Object x = item;
124 if (x != CANCELLED) {
125 item = null;
126 next = null;
127 if (done != null)
128 done.signal();
129 return x;
130 } else
131 return null;
132 } finally {
133 this.unlock();
134 }
135 }
136
137 /**
138 * Wait for a taker to take item placed by putter, or time out.
139 */
140 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
141 this.lock();
142 try {
143 for (;;) {
144 if (item == null)
145 return true;
146 if (timed) {
147 if (nanos <= 0) {
148 item = CANCELLED;
149 return false;
150 }
151 }
152 if (done == null)
153 done = this.newCondition();
154 if (timed)
155 nanos = done.awaitNanos(nanos);
156 else
157 done.await();
158 }
159 } catch (InterruptedException ie) {
160 // If taken, return normally but set interrupt status
161 if (item == null) {
162 Thread.currentThread().interrupt();
163 return true;
164 } else {
165 item = CANCELLED;
166 done.signal(); // propagate signal
167 throw ie;
168 }
169 } finally {
170 this.unlock();
171 }
172 }
173
174 /**
175 * Wait for a putter to put item placed by taker, or time out.
176 */
177 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
178 this.lock();
179 try {
180 for (;;) {
181 Object x = item;
182 if (x != null) {
183 item = null;
184 next = null;
185 return x;
186 }
187 if (timed) {
188 if (nanos <= 0) {
189 item = CANCELLED;
190 return null;
191 }
192 }
193 if (done == null)
194 done = this.newCondition();
195 if (timed)
196 nanos = done.awaitNanos(nanos);
197 else
198 done.await();
199 }
200 } catch (InterruptedException ie) {
201 Object y = item;
202 if (y != null) {
203 item = null;
204 next = null;
205 Thread.currentThread().interrupt();
206 return y;
207 } else {
208 item = CANCELLED;
209 done.signal(); // propagate signal
210 throw ie;
211 }
212 } finally {
213 this.unlock();
214 }
215 }
216 }
217
218 /**
219 * Simple FIFO queue class to hold waiting puts/takes.
220 **/
221 private static class WaitQueue<E> {
222 Node head;
223 Node last;
224
225 Node enq(Object x) {
226 Node p = new Node(x);
227 if (last == null)
228 last = head = p;
229 else
230 last = last.next = p;
231 return p;
232 }
233
234 Node deq() {
235 Node p = head;
236 if (p != null && (head = p.next) == null)
237 last = null;
238 return p;
239 }
240 }
241
242 /**
243 * Main put algorithm, used by put, timed offer
244 */
245 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
246 if (x == null) throw new NullPointerException();
247 for (;;) {
248 Node node;
249 boolean mustWait;
250
251 qlock.lockInterruptibly();
252 try {
253 node = waitingTakes.deq();
254 if ( (mustWait = (node == null)) )
255 node = waitingPuts.enq(x);
256 } finally {
257 qlock.unlock();
258 }
259
260 if (mustWait)
261 return node.waitForTake(timed, nanos);
262
263 else if (node.set(x))
264 return true;
265
266 // else taker cancelled, so retry
267 }
268 }
269
270 /**
271 * Main take algorithm, used by take, timed poll
272 */
273 private E doTake(boolean timed, long nanos) throws InterruptedException {
274 for (;;) {
275 Node node;
276 boolean mustWait;
277
278 qlock.lockInterruptibly();
279 try {
280 node = waitingPuts.deq();
281 if ( (mustWait = (node == null)) )
282 node = waitingTakes.enq(null);
283 } finally {
284 qlock.unlock();
285 }
286
287 if (mustWait)
288 return (E)node.waitForPut(timed, nanos);
289
290 else {
291 E x = (E)node.get();
292 if (x != null)
293 return x;
294 // else cancelled, so retry
295 }
296 }
297 }
298
299 /**
300 * Creates a <tt>SynchronousQueue</tt>.
301 */
302 public SynchronousQueue() {}
303
304
305 /**
306 * Adds the specified element to this queue, waiting if necessary for
307 * another thread to receive it.
308 * @param o the element to add
309 * @throws InterruptedException if interrupted while waiting.
310 * @throws NullPointerException if the specified element is <tt>null</tt>.
311 */
312 public void put(E o) throws InterruptedException {
313 doPut(o, false, 0);
314 }
315
316 /**
317 * Inserts the specified element into this queue, waiting if necessary
318 * up to the specified wait time for another thread to receive it.
319 * @param o the element to add
320 * @param timeout how long to wait before giving up, in units of
321 * <tt>unit</tt>
322 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
323 * <tt>timeout</tt> parameter
324 * @return <tt>true</tt> if successful, or <tt>false</tt> if
325 * the specified waiting time elapses before a taker appears.
326 * @throws InterruptedException if interrupted while waiting.
327 * @throws NullPointerException if the specified element is <tt>null</tt>.
328 */
329 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
330 return doPut(o, true, unit.toNanos(timeout));
331 }
332
333
334 /**
335 * Retrieves and removes the head of this queue, waiting if necessary
336 * for another thread to insert it.
337 * @return the head of this queue
338 */
339 public E take() throws InterruptedException {
340 return doTake(false, 0);
341 }
342
343 /**
344 * Retrieves and removes the head of this queue, waiting
345 * if necessary up to the specified wait time, for another thread
346 * to insert it.
347 * @param timeout how long to wait before giving up, in units of
348 * <tt>unit</tt>
349 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
350 * <tt>timeout</tt> parameter
351 * @return the head of this queue, or <tt>null</tt> if the
352 * specified waiting time elapses before an element is present.
353 * @throws InterruptedException if interrupted while waiting.
354 */
355 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
356 return doTake(true, unit.toNanos(timeout));
357 }
358
359 // Untimed nonblocking versions
360
361 /**
362 * Inserts the specified element into this queue, if another thread is
363 * waiting to receive it.
364 *
365 * @param o the element to add.
366 * @return <tt>true</tt> if it was possible to add the element to
367 * this queue, else <tt>false</tt>
368 * @throws NullPointerException if the specified element is <tt>null</tt>
369 */
370 public boolean offer(E o) {
371 if (o == null) throw new NullPointerException();
372
373 for (;;) {
374 qlock.lock();
375 Node node;
376 try {
377 node = waitingTakes.deq();
378 } finally {
379 qlock.unlock();
380 }
381 if (node == null)
382 return false;
383
384 else if (node.set(o))
385 return true;
386 // else retry
387 }
388 }
389
390 /**
391 * Retrieves and removes the head of this queue, if another thread
392 * is currently making an element available.
393 *
394 * @return the head of this queue, or <tt>null</tt> if no
395 * element is available.
396 */
397 public E poll() {
398 for (;;) {
399 Node node;
400 qlock.lock();
401 try {
402 node = waitingPuts.deq();
403 } finally {
404 qlock.unlock();
405 }
406 if (node == null)
407 return null;
408
409 else {
410 Object x = node.get();
411 if (x != null)
412 return (E)x;
413 // else retry
414 }
415 }
416 }
417
418 /**
419 * Always returns <tt>true</tt>.
420 * A <tt>SynchronousQueue</tt> has no internal capacity.
421 * @return <tt>true</tt>
422 */
423 public boolean isEmpty() {
424 return true;
425 }
426
427 /**
428 * Always returns zero.
429 * A <tt>SynchronousQueue</tt> has no internal capacity.
430 * @return zero.
431 */
432 public int size() {
433 return 0;
434 }
435
436 /**
437 * Always returns zero.
438 * A <tt>SynchronousQueue</tt> has no internal capacity.
439 * @return zero.
440 */
441 public int remainingCapacity() {
442 return 0;
443 }
444
445 /**
446 * Does nothing.
447 * A <tt>SynchronousQueue</tt> has no internal capacity.
448 */
449 public void clear() {}
450
451 /**
452 * Always returns <tt>false</tt>.
453 * A <tt>SynchronousQueue</tt> has no internal capacity.
454 * @param o the element
455 * @return <tt>false</tt>
456 */
457 public boolean contains(Object o) {
458 return false;
459 }
460
461 /**
462 * Always returns <tt>false</tt>.
463 * A <tt>SynchronousQueue</tt> has no internal capacity.
464 *
465 * @param o the element to remove
466 * @return <tt>false</tt>
467 */
468 public boolean remove(Object o) {
469 return false;
470 }
471
472 /**
473 * Returns <tt>false</tt> unless given collection is empty.
474 * A <tt>SynchronousQueue</tt> has no internal capacity.
475 * @param c the collection
476 * @return <tt>false</tt> unless given collection is empty
477 */
478 public boolean containsAll(Collection<?> c) {
479 return c.isEmpty();
480 }
481
482 /**
483 * Always returns <tt>false</tt>.
484 * A <tt>SynchronousQueue</tt> has no internal capacity.
485 * @param c the collection
486 * @return <tt>false</tt>
487 */
488 public boolean removeAll(Collection<?> c) {
489 return false;
490 }
491
492 /**
493 * Always returns <tt>false</tt>.
494 * A <tt>SynchronousQueue</tt> has no internal capacity.
495 * @param c the collection
496 * @return <tt>false</tt>
497 */
498 public boolean retainAll(Collection<?> c) {
499 return false;
500 }
501
502 /**
503 * Always returns <tt>null</tt>.
504 * A <tt>SynchronousQueue</tt> does not return elements
505 * unless actively waited on.
506 * @return <tt>null</tt>
507 */
508 public E peek() {
509 return null;
510 }
511
512
513 static class EmptyIterator<E> implements Iterator<E> {
514 public boolean hasNext() {
515 return false;
516 }
517 public E next() {
518 throw new NoSuchElementException();
519 }
520 public void remove() {
521 throw new IllegalStateException();
522 }
523 }
524
525 /**
526 * Returns an empty iterator in which <tt>hasNext</tt> always returns
527 * <tt>false</tt>.
528 *
529 * @return an empty iterator
530 */
531 public Iterator<E> iterator() {
532 return new EmptyIterator<E>();
533 }
534
535
536 /**
537 * Returns a zero-length array.
538 * @return a zero-length array
539 */
540 public Object[] toArray() {
541 return (E[]) new Object[0];
542 }
543
544 /**
545 * Sets the zeroeth element of the specified array to <tt>null</tt>
546 * (if the array has non-zero length) and returns it.
547 * @return the specified array
548 */
549 public <T> T[] toArray(T[] a) {
550 if (a.length > 0)
551 a[0] = null;
552 return a;
553 }
554
555
556 public int drainTo(Collection<? super E> c) {
557 if (c == null)
558 throw new NullPointerException();
559 if (c == this)
560 throw new IllegalArgumentException();
561 int n = 0;
562 E e;
563 while ( (e = poll()) != null) {
564 c.add(e);
565 ++n;
566 }
567 return n;
568 }
569
570 public int drainTo(Collection<? super E> c, int maxElements) {
571 if (c == null)
572 throw new NullPointerException();
573 if (c == this)
574 throw new IllegalArgumentException();
575 int n = 0;
576 E e;
577 while (n < maxElements && (e = poll()) != null) {
578 c.add(e);
579 ++n;
580 }
581 return n;
582 }
583 }
584
585
586
587
588