ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.43
Committed: Sun Feb 8 15:35:10 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.42: +138 -73 lines
Log Message:
Wording cleanups; Improve SynchronousQueue serialization and fairness support

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity - in
15 * particular it does not have a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to take it; you cannot add an element (using
18 * any method) unless another thread is trying to remove it; you
19 * cannot iterate as there is nothing to iterate. The <em>head</em>
20 * of the queue is the element that the first queued thread is trying
21 * to add to the queue; if there are no queued threads then no element
22 * is being added and the head is <tt>null</tt>. For purposes of
23 * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 * queue does not permit <tt>null</tt> elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must sync up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 *
33 * <p> This class supports an optional fairness policy for ordering
34 * waiting producer and consumer threads. By default, this ordering
35 * is not guaranteed. However, a queue constructed with fairness set
36 * to <tt>true</tt> grants threads access in FIFO order. Fairness
37 * generally decreases throughput but reduces variability and avoids
38 * starvation.
39 *
40 * <p>This class implements all of the <em>optional</em> methods
41 * of the {@link Collection} and {@link Iterator} interfaces.
42 *
43 * <p>This class is a member of the
44 * <a href="{@docRoot}/../guide/collections/index.html">
45 * Java Collections Framework</a>.
46 *
47 * @since 1.5
48 * @author Doug Lea
49 * @param <E> the type of elements held in this collection
50 */
51 public class SynchronousQueue<E> extends AbstractQueue<E>
52 implements BlockingQueue<E>, java.io.Serializable {
53 private static final long serialVersionUID = -3223113410248163686L;
54
55 /*
56 This implementation divides actions into two cases for puts:
57
58 * An arriving producer that does not already have a waiting consumer
59 creates a node holding item, and then waits for a consumer to take it.
60 * An arriving producer that does already have a waiting consumer fills
61 the slot node created by the consumer, and notifies it to continue.
62
63 And symmetrically, two for takes:
64
65 * An arriving consumer that does not already have a waiting producer
66 creates an empty slot node, and then waits for a producer to fill it.
67 * An arriving consumer that does already have a waiting producer takes
68 item from the node created by the producer, and notifies it to continue.
69
70 When a put or take waiting for the actions of its counterpart
71 aborts due to interruption or timeout, it marks the node
72 it created as "CANCELLED", which causes its counterpart to retry
73 the entire put or take sequence.
74
75 This requires keeping two simple queues, waitingProducers and
76 waitingConsumers. Each of these can be FIFO (preserves fairness)
77 or LIFO (improves throughput).
78 */
79
80 /** Lock protecting both wait queues */
81 private final ReentrantLock qlock;
82 /** Queue holding waiting puts */
83 private final WaitQueue waitingProducers;
84 /** Queue holding waiting takes */
85 private final WaitQueue waitingConsumers;
86
87 /**
88 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
89 */
90 public SynchronousQueue() {
91 qlock = new ReentrantLock();
92 waitingProducers = new LifoWaitQueue();
93 waitingConsumers = new LifoWaitQueue();
94 }
95
96 /**
97 * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
98 * @param fair if true, threads contend in FIFO order for access.
99 */
100 public SynchronousQueue(boolean fair) {
101 if (fair) {
102 qlock = new ReentrantLock(true);
103 waitingProducers = new FifoWaitQueue();
104 waitingConsumers = new FifoWaitQueue();
105 }
106 else {
107 qlock = new ReentrantLock();
108 waitingProducers = new LifoWaitQueue();
109 waitingConsumers = new LifoWaitQueue();
110 }
111 }
112
113 /**
114 * Queue to hold waiting puts/takes; specialized to FiFo/Lifo below.
115 * These queues have all transient fields, but are serializable
116 * in order to retain fairness settings when deserialized.
117 */
118 static abstract class WaitQueue implements java.io.Serializable {
119 /** Create, add, and return node for x */
120 abstract Node enq(Object x);
121 /** Remove and return node, or null if empty */
122 abstract Node deq();
123 }
124
125 /**
126 * FIFO queue to hold waiting puts/takes.
127 */
128 static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
129 private static final long serialVersionUID = -3623113410248163686L;
130 private transient Node head;
131 private transient Node last;
132
133 Node enq(Object x) {
134 Node p = new Node(x);
135 if (last == null)
136 last = head = p;
137 else
138 last = last.next = p;
139 return p;
140 }
141
142 Node deq() {
143 Node p = head;
144 if (p != null) {
145 if ((head = p.next) == null)
146 last = null;
147 p.next = null;
148 }
149 return p;
150 }
151 }
152
153 /**
154 * LIFO queue to hold waiting puts/takes.
155 */
156 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
157 private static final long serialVersionUID = -3633113410248163686L;
158 private transient Node head;
159
160 Node enq(Object x) {
161 return head = new Node(x, head);
162 }
163
164 Node deq() {
165 Node p = head;
166 if (p != null)
167 head = p.next;
168 return p;
169 }
170 }
171
172 /**
173 * Nodes each maintain an item and handle waits and signals for
174 * getting and setting it. The class extends
175 * AbstractQueuedSynchronizer to manage blocking, using AQS state
176 * 0 for waiting, 1 for ack, -1 for cancelled.
177 */
178 static final class Node extends AbstractQueuedSynchronizer {
179 /** Synchronization state value representing that node acked */
180 private static final int ACK = 1;
181 /** Synchronization state value representing that node cancelled */
182 private static final int CANCEL = -1;
183
184 /** The item being transferred */
185 Object item;
186 /** Next node in wait queue */
187 Node next;
188
189 /** Create node with initial item */
190 Node(Object x) { item = x; }
191
192 /** Create node with initial item and next */
193 Node(Object x, Node n) { item = x; next = n; }
194
195 /**
196 * Implements AQS base acquire to succeed if not in WAITING state
197 */
198 protected boolean tryAcquire(int ignore) {
199 return getState() != 0;
200 }
201
202 /**
203 * Implements AQS base release to signal if state changed
204 */
205 protected boolean tryRelease(int newState) {
206 return compareAndSetState(0, newState);
207 }
208
209 /**
210 * Take item and null out field (for sake of GC)
211 */
212 private Object extract() {
213 Object x = item;
214 item = null;
215 return x;
216 }
217
218 /**
219 * Try to cancel on interrupt; if so rethrowing,
220 * else setting interrupt state
221 */
222 private void checkCancellationOnInterrupt(InterruptedException ie)
223 throws InterruptedException {
224 if (release(CANCEL))
225 throw ie;
226 Thread.currentThread().interrupt();
227 }
228
229 /**
230 * Fill in the slot created by the consumer and signal consumer to
231 * continue.
232 */
233 boolean setItem(Object x) {
234 item = x; // can place in slot even if cancelled
235 return release(ACK);
236 }
237
238 /**
239 * Remove item from slot created by producer and signal producer
240 * to continue.
241 */
242 Object getItem() {
243 return (release(ACK))? extract() : null;
244 }
245
246 /**
247 * Wait for a consumer to take item placed by producer.
248 */
249 void waitForTake() throws InterruptedException {
250 try {
251 acquireInterruptibly(0);
252 } catch (InterruptedException ie) {
253 checkCancellationOnInterrupt(ie);
254 }
255 }
256
257 /**
258 * Wait for a producer to put item placed by consumer.
259 */
260 Object waitForPut() throws InterruptedException {
261 try {
262 acquireInterruptibly(0);
263 } catch (InterruptedException ie) {
264 checkCancellationOnInterrupt(ie);
265 }
266 return extract();
267 }
268
269 /**
270 * Wait for a consumer to take item placed by producer or time out.
271 */
272 boolean waitForTake(long nanos) throws InterruptedException {
273 try {
274 if (!tryAcquireNanos(0, nanos) &&
275 release(CANCEL))
276 return false;
277 } catch (InterruptedException ie) {
278 checkCancellationOnInterrupt(ie);
279 }
280 return true;
281 }
282
283 /**
284 * Wait for a producer to put item placed by consumer, or time out.
285 */
286 Object waitForPut(long nanos) throws InterruptedException {
287 try {
288 if (!tryAcquireNanos(0, nanos) &&
289 release(CANCEL))
290 return null;
291 } catch (InterruptedException ie) {
292 checkCancellationOnInterrupt(ie);
293 }
294 return extract();
295 }
296 }
297
298
299
300 /**
301 * Adds the specified element to this queue, waiting if necessary for
302 * another thread to receive it.
303 * @param o the element to add
304 * @throws InterruptedException if interrupted while waiting.
305 * @throws NullPointerException if the specified element is <tt>null</tt>.
306 */
307 public void put(E o) throws InterruptedException {
308 if (o == null) throw new NullPointerException();
309 final ReentrantLock qlock = this.qlock;
310
311 for (;;) {
312 Node node;
313 boolean mustWait;
314 if (Thread.interrupted()) throw new InterruptedException();
315 qlock.lock();
316 try {
317 node = waitingConsumers.deq();
318 if ( (mustWait = (node == null)) )
319 node = waitingProducers.enq(o);
320 } finally {
321 qlock.unlock();
322 }
323
324 if (mustWait) {
325 node.waitForTake();
326 return;
327 }
328
329 else if (node.setItem(o))
330 return;
331
332 // else consumer cancelled, so retry
333 }
334 }
335
336 /**
337 * Inserts the specified element into this queue, waiting if necessary
338 * up to the specified wait time for another thread to receive it.
339 * @param o the element to add
340 * @param timeout how long to wait before giving up, in units of
341 * <tt>unit</tt>
342 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
343 * <tt>timeout</tt> parameter
344 * @return <tt>true</tt> if successful, or <tt>false</tt> if
345 * the specified waiting time elapses before a consumer appears.
346 * @throws InterruptedException if interrupted while waiting.
347 * @throws NullPointerException if the specified element is <tt>null</tt>.
348 */
349 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
350 if (o == null) throw new NullPointerException();
351 long nanos = unit.toNanos(timeout);
352 final ReentrantLock qlock = this.qlock;
353 for (;;) {
354 Node node;
355 boolean mustWait;
356 if (Thread.interrupted()) throw new InterruptedException();
357 qlock.lock();
358 try {
359 node = waitingConsumers.deq();
360 if ( (mustWait = (node == null)) )
361 node = waitingProducers.enq(o);
362 } finally {
363 qlock.unlock();
364 }
365
366 if (mustWait)
367 return node.waitForTake(nanos);
368
369 else if (node.setItem(o))
370 return true;
371
372 // else consumer cancelled, so retry
373 }
374 }
375
376 /**
377 * Retrieves and removes the head of this queue, waiting if necessary
378 * for another thread to insert it.
379 * @throws InterruptedException if interrupted while waiting.
380 * @return the head of this queue
381 */
382 public E take() throws InterruptedException {
383 final ReentrantLock qlock = this.qlock;
384 for (;;) {
385 Node node;
386 boolean mustWait;
387
388 if (Thread.interrupted()) throw new InterruptedException();
389 qlock.lock();
390 try {
391 node = waitingProducers.deq();
392 if ( (mustWait = (node == null)) )
393 node = waitingConsumers.enq(null);
394 } finally {
395 qlock.unlock();
396 }
397
398 if (mustWait) {
399 Object x = node.waitForPut();
400 return (E)x;
401 }
402 else {
403 Object x = node.getItem();
404 if (x != null)
405 return (E)x;
406 // else cancelled, so retry
407 }
408 }
409 }
410
411 /**
412 * Retrieves and removes the head of this queue, waiting
413 * if necessary up to the specified wait time, for another thread
414 * to insert it.
415 * @param timeout how long to wait before giving up, in units of
416 * <tt>unit</tt>
417 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
418 * <tt>timeout</tt> parameter
419 * @return the head of this queue, or <tt>null</tt> if the
420 * specified waiting time elapses before an element is present.
421 * @throws InterruptedException if interrupted while waiting.
422 */
423 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
424 long nanos = unit.toNanos(timeout);
425 final ReentrantLock qlock = this.qlock;
426
427 for (;;) {
428 Node node;
429 boolean mustWait;
430
431 if (Thread.interrupted()) throw new InterruptedException();
432 qlock.lock();
433 try {
434 node = waitingProducers.deq();
435 if ( (mustWait = (node == null)) )
436 node = waitingConsumers.enq(null);
437 } finally {
438 qlock.unlock();
439 }
440
441 if (mustWait) {
442 Object x = node.waitForPut(nanos);
443 return (E)x;
444 }
445 else {
446 Object x = node.getItem();
447 if (x != null)
448 return (E)x;
449 // else cancelled, so retry
450 }
451 }
452 }
453
454 // Untimed nonblocking versions
455
456 /**
457 * Inserts the specified element into this queue, if another thread is
458 * waiting to receive it.
459 *
460 * @param o the element to add.
461 * @return <tt>true</tt> if it was possible to add the element to
462 * this queue, else <tt>false</tt>
463 * @throws NullPointerException if the specified element is <tt>null</tt>
464 */
465 public boolean offer(E o) {
466 if (o == null) throw new NullPointerException();
467 final ReentrantLock qlock = this.qlock;
468
469 for (;;) {
470 Node node;
471 qlock.lock();
472 try {
473 node = waitingConsumers.deq();
474 } finally {
475 qlock.unlock();
476 }
477 if (node == null)
478 return false;
479
480 else if (node.setItem(o))
481 return true;
482 // else retry
483 }
484 }
485
486 /**
487 * Retrieves and removes the head of this queue, if another thread
488 * is currently making an element available.
489 *
490 * @return the head of this queue, or <tt>null</tt> if no
491 * element is available.
492 */
493 public E poll() {
494 final ReentrantLock qlock = this.qlock;
495 for (;;) {
496 Node node;
497 qlock.lock();
498 try {
499 node = waitingProducers.deq();
500 } finally {
501 qlock.unlock();
502 }
503 if (node == null)
504 return null;
505
506 else {
507 Object x = node.getItem();
508 if (x != null)
509 return (E)x;
510 // else retry
511 }
512 }
513 }
514
515 /**
516 * Always returns <tt>true</tt>.
517 * A <tt>SynchronousQueue</tt> has no internal capacity.
518 * @return <tt>true</tt>
519 */
520 public boolean isEmpty() {
521 return true;
522 }
523
524 /**
525 * Always returns zero.
526 * A <tt>SynchronousQueue</tt> has no internal capacity.
527 * @return zero.
528 */
529 public int size() {
530 return 0;
531 }
532
533 /**
534 * Always returns zero.
535 * A <tt>SynchronousQueue</tt> has no internal capacity.
536 * @return zero.
537 */
538 public int remainingCapacity() {
539 return 0;
540 }
541
542 /**
543 * Does nothing.
544 * A <tt>SynchronousQueue</tt> has no internal capacity.
545 */
546 public void clear() {}
547
548 /**
549 * Always returns <tt>false</tt>.
550 * A <tt>SynchronousQueue</tt> has no internal capacity.
551 * @param o the element
552 * @return <tt>false</tt>
553 */
554 public boolean contains(Object o) {
555 return false;
556 }
557
558 /**
559 * Always returns <tt>false</tt>.
560 * A <tt>SynchronousQueue</tt> has no internal capacity.
561 *
562 * @param o the element to remove
563 * @return <tt>false</tt>
564 */
565 public boolean remove(Object o) {
566 return false;
567 }
568
569 /**
570 * Returns <tt>false</tt> unless given collection is empty.
571 * A <tt>SynchronousQueue</tt> has no internal capacity.
572 * @param c the collection
573 * @return <tt>false</tt> unless given collection is empty
574 */
575 public boolean containsAll(Collection<?> c) {
576 return c.isEmpty();
577 }
578
579 /**
580 * Always returns <tt>false</tt>.
581 * A <tt>SynchronousQueue</tt> has no internal capacity.
582 * @param c the collection
583 * @return <tt>false</tt>
584 */
585 public boolean removeAll(Collection<?> c) {
586 return false;
587 }
588
589 /**
590 * Always returns <tt>false</tt>.
591 * A <tt>SynchronousQueue</tt> has no internal capacity.
592 * @param c the collection
593 * @return <tt>false</tt>
594 */
595 public boolean retainAll(Collection<?> c) {
596 return false;
597 }
598
599 /**
600 * Always returns <tt>null</tt>.
601 * A <tt>SynchronousQueue</tt> does not return elements
602 * unless actively waited on.
603 * @return <tt>null</tt>
604 */
605 public E peek() {
606 return null;
607 }
608
609
610 static class EmptyIterator<E> implements Iterator<E> {
611 public boolean hasNext() {
612 return false;
613 }
614 public E next() {
615 throw new NoSuchElementException();
616 }
617 public void remove() {
618 throw new IllegalStateException();
619 }
620 }
621
622 /**
623 * Returns an empty iterator in which <tt>hasNext</tt> always returns
624 * <tt>false</tt>.
625 *
626 * @return an empty iterator
627 */
628 public Iterator<E> iterator() {
629 return new EmptyIterator<E>();
630 }
631
632
633 /**
634 * Returns a zero-length array.
635 * @return a zero-length array
636 */
637 public Object[] toArray() {
638 return new Object[0];
639 }
640
641 /**
642 * Sets the zeroeth element of the specified array to <tt>null</tt>
643 * (if the array has non-zero length) and returns it.
644 * @param a the array
645 * @return the specified array
646 */
647 public <T> T[] toArray(T[] a) {
648 if (a.length > 0)
649 a[0] = null;
650 return a;
651 }
652
653
654 public int drainTo(Collection<? super E> c) {
655 if (c == null)
656 throw new NullPointerException();
657 if (c == this)
658 throw new IllegalArgumentException();
659 int n = 0;
660 E e;
661 while ( (e = poll()) != null) {
662 c.add(e);
663 ++n;
664 }
665 return n;
666 }
667
668 public int drainTo(Collection<? super E> c, int maxElements) {
669 if (c == null)
670 throw new NullPointerException();
671 if (c == this)
672 throw new IllegalArgumentException();
673 int n = 0;
674 E e;
675 while (n < maxElements && (e = poll()) != null) {
676 c.add(e);
677 ++n;
678 }
679 return n;
680 }
681 }
682
683
684
685
686