ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.51
Committed: Tue May 17 16:16:01 2005 UTC (19 years ago) by jsr166
Branch: MAIN
Changes since 1.50: +2 -2 lines
Log Message:
doc fixes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity, not even a
15 * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
16 * because an element is only present when you try to take it; you
17 * cannot add an element (using any method) unless another thread is
18 * trying to remove it; you cannot iterate as there is nothing to
19 * iterate. The <em>head</em> of the queue is the element that the
20 * first queued thread is trying to add to the queue; if there are no
21 * queued threads then no element is being added and the head is
22 * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods
23 * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
24 * as an empty collection. This queue does not permit <tt>null</tt>
25 * elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must sync up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 *
33 * <p> This class supports an optional fairness policy for ordering
34 * waiting producer and consumer threads. By default, this ordering
35 * is not guaranteed. However, a queue constructed with fairness set
36 * to <tt>true</tt> grants threads access in FIFO order. Fairness
37 * generally decreases throughput but reduces variability and avoids
38 * starvation.
39 *
40 * <p>This class and its iterator implement all of the
41 * <em>optional</em> methods of the {@link Collection} and {@link
42 * Iterator} interfaces.
43 *
44 * <p>This class is a member of the
45 * <a href="{@docRoot}/../guide/collections/index.html">
46 * Java Collections Framework</a>.
47 *
48 * @since 1.5
49 * @author Doug Lea
50 * @param <E> the type of elements held in this collection
51 */
52 public class SynchronousQueue<E> extends AbstractQueue<E>
53 implements BlockingQueue<E>, java.io.Serializable {
54 private static final long serialVersionUID = -3223113410248163686L;
55
56 /*
57 This implementation divides actions into two cases for puts:
58
59 * An arriving producer that does not already have a waiting consumer
60 creates a node holding item, and then waits for a consumer to take it.
61 * An arriving producer that does already have a waiting consumer fills
62 the slot node created by the consumer, and notifies it to continue.
63
64 And symmetrically, two for takes:
65
66 * An arriving consumer that does not already have a waiting producer
67 creates an empty slot node, and then waits for a producer to fill it.
68 * An arriving consumer that does already have a waiting producer takes
69 item from the node created by the producer, and notifies it to continue.
70
71 When a put or take waiting for the actions of its counterpart
72 aborts due to interruption or timeout, it marks the node
73 it created as "CANCELLED", which causes its counterpart to retry
74 the entire put or take sequence.
75
76 This requires keeping two simple queues, waitingProducers and
77 waitingConsumers. Each of these can be FIFO (preserves fairness)
78 or LIFO (improves throughput).
79 */
80
81 /** Lock protecting both wait queues */
82 private final ReentrantLock qlock;
83 /** Queue holding waiting puts */
84 private final WaitQueue waitingProducers;
85 /** Queue holding waiting takes */
86 private final WaitQueue waitingConsumers;
87
88 /**
89 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
90 */
91 public SynchronousQueue() {
92 this(false);
93 }
94
95 /**
96 * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
97 * @param fair if true, threads contend in FIFO order for access;
98 * otherwise the order is unspecified.
99 */
100 public SynchronousQueue(boolean fair) {
101 if (fair) {
102 qlock = new ReentrantLock(true);
103 waitingProducers = new FifoWaitQueue();
104 waitingConsumers = new FifoWaitQueue();
105 }
106 else {
107 qlock = new ReentrantLock();
108 waitingProducers = new LifoWaitQueue();
109 waitingConsumers = new LifoWaitQueue();
110 }
111 }
112
113 /**
114 * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
115 * These queues have all transient fields, but are serializable
116 * in order to recover fairness settings when deserialized.
117 */
118 static abstract class WaitQueue implements java.io.Serializable {
119 /** Creates, adds, and returns node for x. */
120 abstract Node enq(Object x);
121 /** Removes and returns node, or null if empty. */
122 abstract Node deq();
123 /** Removes a cancelled node to avoid garbage retention. */
124 abstract void unlink(Node node);
125 /** Returns true if a cancelled node might be on queue. */
126 abstract boolean shouldUnlink(Node node);
127 }
128
129 /**
130 * FIFO queue to hold waiting puts/takes.
131 */
132 static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
133 private static final long serialVersionUID = -3623113410248163686L;
134 private transient Node head;
135 private transient Node last;
136
137 Node enq(Object x) {
138 Node p = new Node(x);
139 if (last == null)
140 last = head = p;
141 else
142 last = last.next = p;
143 return p;
144 }
145
146 Node deq() {
147 Node p = head;
148 if (p != null) {
149 if ((head = p.next) == null)
150 last = null;
151 p.next = null;
152 }
153 return p;
154 }
155
156 boolean shouldUnlink(Node node) {
157 return (node == last || node.next != null);
158 }
159
160 void unlink(Node node) {
161 Node p = head;
162 Node trail = null;
163 while (p != null) {
164 if (p == node) {
165 Node next = p.next;
166 if (trail == null)
167 head = next;
168 else
169 trail.next = next;
170 if (last == node)
171 last = trail;
172 break;
173 }
174 trail = p;
175 p = p.next;
176 }
177 }
178 }
179
180 /**
181 * LIFO queue to hold waiting puts/takes.
182 */
183 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
184 private static final long serialVersionUID = -3633113410248163686L;
185 private transient Node head;
186
187 Node enq(Object x) {
188 return head = new Node(x, head);
189 }
190
191 Node deq() {
192 Node p = head;
193 if (p != null) {
194 head = p.next;
195 p.next = null;
196 }
197 return p;
198 }
199
200 boolean shouldUnlink(Node node) {
201 // Return false if already dequeued or is bottom node (in which
202 // case we might retain at most one garbage node)
203 return (node == head || node.next != null);
204 }
205
206 void unlink(Node node) {
207 Node p = head;
208 Node trail = null;
209 while (p != null) {
210 if (p == node) {
211 Node next = p.next;
212 if (trail == null)
213 head = next;
214 else
215 trail.next = next;
216 break;
217 }
218 trail = p;
219 p = p.next;
220 }
221 }
222 }
223
224 /**
225 * Unlinks the given node from consumer queue. Called by cancelled
226 * (timeout, interrupt) waiters to avoid garbage retention in the
227 * absence of producers.
228 */
229 private void unlinkCancelledConsumer(Node node) {
230 // Use a form of double-check to avoid unnecessary locking and
231 // traversal. The first check outside lock might
232 // conservatively report true.
233 if (waitingConsumers.shouldUnlink(node)) {
234 qlock.lock();
235 try {
236 if (waitingConsumers.shouldUnlink(node))
237 waitingConsumers.unlink(node);
238 } finally {
239 qlock.unlock();
240 }
241 }
242 }
243
244 /**
245 * Unlinks the given node from producer queue. Symmetric
246 * to unlinkCancelledConsumer.
247 */
248 private void unlinkCancelledProducer(Node node) {
249 if (waitingProducers.shouldUnlink(node)) {
250 qlock.lock();
251 try {
252 if (waitingProducers.shouldUnlink(node))
253 waitingProducers.unlink(node);
254 } finally {
255 qlock.unlock();
256 }
257 }
258 }
259
260 /**
261 * Nodes each maintain an item and handle waits and signals for
262 * getting and setting it. The class extends
263 * AbstractQueuedSynchronizer to manage blocking, using AQS state
264 * 0 for waiting, 1 for ack, -1 for cancelled.
265 */
266 static final class Node extends AbstractQueuedSynchronizer {
267 /** Synchronization state value representing that node acked */
268 private static final int ACK = 1;
269 /** Synchronization state value representing that node cancelled */
270 private static final int CANCEL = -1;
271
272 /** The item being transferred */
273 Object item;
274 /** Next node in wait queue */
275 Node next;
276
277 /** Creates a node with initial item */
278 Node(Object x) { item = x; }
279
280 /** Creates a node with initial item and next */
281 Node(Object x, Node n) { item = x; next = n; }
282
283 /**
284 * Implements AQS base acquire to succeed if not in WAITING state
285 */
286 protected boolean tryAcquire(int ignore) {
287 return getState() != 0;
288 }
289
290 /**
291 * Implements AQS base release to signal if state changed
292 */
293 protected boolean tryRelease(int newState) {
294 return compareAndSetState(0, newState);
295 }
296
297 /**
298 * Takes item and nulls out field (for sake of GC)
299 */
300 private Object extract() {
301 Object x = item;
302 item = null;
303 return x;
304 }
305
306 /**
307 * Tries to cancel on interrupt; if so rethrowing,
308 * else setting interrupt state
309 */
310 private void checkCancellationOnInterrupt(InterruptedException ie)
311 throws InterruptedException {
312 if (release(CANCEL))
313 throw ie;
314 Thread.currentThread().interrupt();
315 }
316
317 /**
318 * Fills in the slot created by the consumer and signal consumer to
319 * continue.
320 */
321 boolean setItem(Object x) {
322 item = x; // can place in slot even if cancelled
323 return release(ACK);
324 }
325
326 /**
327 * Removes item from slot created by producer and signal producer
328 * to continue.
329 */
330 Object getItem() {
331 return (release(ACK))? extract() : null;
332 }
333
334 /**
335 * Waits for a consumer to take item placed by producer.
336 */
337 void waitForTake() throws InterruptedException {
338 try {
339 acquireInterruptibly(0);
340 } catch (InterruptedException ie) {
341 checkCancellationOnInterrupt(ie);
342 }
343 }
344
345 /**
346 * Waits for a producer to put item placed by consumer.
347 */
348 Object waitForPut() throws InterruptedException {
349 try {
350 acquireInterruptibly(0);
351 } catch (InterruptedException ie) {
352 checkCancellationOnInterrupt(ie);
353 }
354 return extract();
355 }
356
357 /**
358 * Waits for a consumer to take item placed by producer or time out.
359 */
360 boolean waitForTake(long nanos) throws InterruptedException {
361 try {
362 if (!tryAcquireNanos(0, nanos) &&
363 release(CANCEL))
364 return false;
365 } catch (InterruptedException ie) {
366 checkCancellationOnInterrupt(ie);
367 }
368 return true;
369 }
370
371 /**
372 * Waits for a producer to put item placed by consumer, or time out.
373 */
374 Object waitForPut(long nanos) throws InterruptedException {
375 try {
376 if (!tryAcquireNanos(0, nanos) &&
377 release(CANCEL))
378 return null;
379 } catch (InterruptedException ie) {
380 checkCancellationOnInterrupt(ie);
381 }
382 return extract();
383 }
384 }
385
386 /**
387 * Adds the specified element to this queue, waiting if necessary for
388 * another thread to receive it.
389 *
390 * @throws InterruptedException {@inheritDoc}
391 * @throws NullPointerException {@inheritDoc}
392 */
393 public void put(E e) throws InterruptedException {
394 if (e == null) throw new NullPointerException();
395 final ReentrantLock qlock = this.qlock;
396
397 for (;;) {
398 Node node;
399 boolean mustWait;
400 if (Thread.interrupted()) throw new InterruptedException();
401 qlock.lock();
402 try {
403 node = waitingConsumers.deq();
404 if ( (mustWait = (node == null)) )
405 node = waitingProducers.enq(e);
406 } finally {
407 qlock.unlock();
408 }
409
410 if (mustWait) {
411 try {
412 node.waitForTake();
413 return;
414 } catch (InterruptedException ex) {
415 unlinkCancelledProducer(node);
416 throw ex;
417 }
418 }
419
420 else if (node.setItem(e))
421 return;
422
423 // else consumer cancelled, so retry
424 }
425 }
426
427 /**
428 * Inserts the specified element into this queue, waiting if necessary
429 * up to the specified wait time for another thread to receive it.
430 *
431 * @return <tt>true</tt> if successful, or <tt>false</tt> if the
432 * specified waiting time elapses before a consumer appears.
433 * @throws InterruptedException {@inheritDoc}
434 * @throws NullPointerException {@inheritDoc}
435 */
436 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
437 if (e == null) throw new NullPointerException();
438 long nanos = unit.toNanos(timeout);
439 final ReentrantLock qlock = this.qlock;
440 for (;;) {
441 Node node;
442 boolean mustWait;
443 if (Thread.interrupted()) throw new InterruptedException();
444 qlock.lock();
445 try {
446 node = waitingConsumers.deq();
447 if ( (mustWait = (node == null)) )
448 node = waitingProducers.enq(e);
449 } finally {
450 qlock.unlock();
451 }
452
453 if (mustWait) {
454 try {
455 boolean x = node.waitForTake(nanos);
456 if (!x)
457 unlinkCancelledProducer(node);
458 return x;
459 } catch (InterruptedException ex) {
460 unlinkCancelledProducer(node);
461 throw ex;
462 }
463 }
464
465 else if (node.setItem(e))
466 return true;
467
468 // else consumer cancelled, so retry
469 }
470 }
471
472 /**
473 * Retrieves and removes the head of this queue, waiting if necessary
474 * for another thread to insert it.
475 *
476 * @return the head of this queue
477 * @throws InterruptedException {@inheritDoc}
478 */
479 public E take() throws InterruptedException {
480 final ReentrantLock qlock = this.qlock;
481 for (;;) {
482 Node node;
483 boolean mustWait;
484
485 if (Thread.interrupted()) throw new InterruptedException();
486 qlock.lock();
487 try {
488 node = waitingProducers.deq();
489 if ( (mustWait = (node == null)) )
490 node = waitingConsumers.enq(null);
491 } finally {
492 qlock.unlock();
493 }
494
495 if (mustWait) {
496 try {
497 Object x = node.waitForPut();
498 return (E)x;
499 } catch (InterruptedException ex) {
500 unlinkCancelledConsumer(node);
501 throw ex;
502 }
503 }
504 else {
505 Object x = node.getItem();
506 if (x != null)
507 return (E)x;
508 // else cancelled, so retry
509 }
510 }
511 }
512
513 /**
514 * Retrieves and removes the head of this queue, waiting
515 * if necessary up to the specified wait time, for another thread
516 * to insert it.
517 *
518 * @return the head of this queue, or <tt>null</tt> if the
519 * specified waiting time elapses before an element is present.
520 * @throws InterruptedException {@inheritDoc}
521 */
522 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
523 long nanos = unit.toNanos(timeout);
524 final ReentrantLock qlock = this.qlock;
525
526 for (;;) {
527 Node node;
528 boolean mustWait;
529
530 if (Thread.interrupted()) throw new InterruptedException();
531 qlock.lock();
532 try {
533 node = waitingProducers.deq();
534 if ( (mustWait = (node == null)) )
535 node = waitingConsumers.enq(null);
536 } finally {
537 qlock.unlock();
538 }
539
540 if (mustWait) {
541 try {
542 Object x = node.waitForPut(nanos);
543 if (x == null)
544 unlinkCancelledConsumer(node);
545 return (E)x;
546 } catch (InterruptedException ex) {
547 unlinkCancelledConsumer(node);
548 throw ex;
549 }
550 }
551 else {
552 Object x = node.getItem();
553 if (x != null)
554 return (E)x;
555 // else cancelled, so retry
556 }
557 }
558 }
559
560 // Untimed nonblocking versions
561
562 /**
563 * Inserts the specified element into this queue, if another thread is
564 * waiting to receive it.
565 *
566 * @param e the element to add
567 * @return <tt>true</tt> if the element was added to this queue, else
568 * <tt>false</tt>
569 * @throws NullPointerException if the specified element is null
570 */
571 public boolean offer(E e) {
572 if (e == null) throw new NullPointerException();
573 final ReentrantLock qlock = this.qlock;
574
575 for (;;) {
576 Node node;
577 qlock.lock();
578 try {
579 node = waitingConsumers.deq();
580 } finally {
581 qlock.unlock();
582 }
583 if (node == null)
584 return false;
585
586 else if (node.setItem(e))
587 return true;
588 // else retry
589 }
590 }
591
592 /**
593 * Retrieves and removes the head of this queue, if another thread
594 * is currently making an element available.
595 *
596 * @return the head of this queue, or <tt>null</tt> if no
597 * element is available.
598 */
599 public E poll() {
600 final ReentrantLock qlock = this.qlock;
601 for (;;) {
602 Node node;
603 qlock.lock();
604 try {
605 node = waitingProducers.deq();
606 } finally {
607 qlock.unlock();
608 }
609 if (node == null)
610 return null;
611
612 else {
613 Object x = node.getItem();
614 if (x != null)
615 return (E)x;
616 // else retry
617 }
618 }
619 }
620
621 /**
622 * Always returns <tt>true</tt>.
623 * A <tt>SynchronousQueue</tt> has no internal capacity.
624 *
625 * @return <tt>true</tt>
626 */
627 public boolean isEmpty() {
628 return true;
629 }
630
631 /**
632 * Always returns zero.
633 * A <tt>SynchronousQueue</tt> has no internal capacity.
634 *
635 * @return zero
636 */
637 public int size() {
638 return 0;
639 }
640
641 /**
642 * Always returns zero.
643 * A <tt>SynchronousQueue</tt> has no internal capacity.
644 *
645 * @return zero
646 */
647 public int remainingCapacity() {
648 return 0;
649 }
650
651 /**
652 * Does nothing.
653 * A <tt>SynchronousQueue</tt> has no internal capacity.
654 */
655 public void clear() {}
656
657 /**
658 * Always returns <tt>false</tt>.
659 * A <tt>SynchronousQueue</tt> has no internal capacity.
660 *
661 * @param o the element
662 * @return <tt>false</tt>
663 */
664 public boolean contains(Object o) {
665 return false;
666 }
667
668 /**
669 * Always returns <tt>false</tt>.
670 * A <tt>SynchronousQueue</tt> has no internal capacity.
671 *
672 * @param o the element to remove
673 * @return <tt>false</tt>
674 */
675 public boolean remove(Object o) {
676 return false;
677 }
678
679 /**
680 * Returns <tt>false</tt> unless the given collection is empty.
681 * A <tt>SynchronousQueue</tt> has no internal capacity.
682 *
683 * @param c the collection
684 * @return <tt>false</tt> unless the given collection is empty
685 * @throws NullPointerException if the specified collection is null
686 */
687 public boolean containsAll(Collection<?> c) {
688 return c.isEmpty();
689 }
690
691 /**
692 * Always returns <tt>false</tt>.
693 * A <tt>SynchronousQueue</tt> has no internal capacity.
694 *
695 * @param c the collection
696 * @return <tt>false</tt>
697 */
698 public boolean removeAll(Collection<?> c) {
699 return false;
700 }
701
702 /**
703 * Always returns <tt>false</tt>.
704 * A <tt>SynchronousQueue</tt> has no internal capacity.
705 *
706 * @param c the collection
707 * @return <tt>false</tt>
708 */
709 public boolean retainAll(Collection<?> c) {
710 return false;
711 }
712
713 /**
714 * Always returns <tt>null</tt>.
715 * A <tt>SynchronousQueue</tt> does not return elements
716 * unless actively waited on.
717 *
718 * @return <tt>null</tt>
719 */
720 public E peek() {
721 return null;
722 }
723
724
725 static class EmptyIterator<E> implements Iterator<E> {
726 public boolean hasNext() {
727 return false;
728 }
729 public E next() {
730 throw new NoSuchElementException();
731 }
732 public void remove() {
733 throw new IllegalStateException();
734 }
735 }
736
737 /**
738 * Returns an empty iterator in which <tt>hasNext</tt> always returns
739 * <tt>false</tt>.
740 *
741 * @return an empty iterator
742 */
743 public Iterator<E> iterator() {
744 return new EmptyIterator<E>();
745 }
746
747
748 /**
749 * Returns a zero-length array.
750 * @return a zero-length array
751 */
752 public Object[] toArray() {
753 return new Object[0];
754 }
755
756 /**
757 * Sets the zeroeth element of the specified array to <tt>null</tt>
758 * (if the array has non-zero length) and returns it.
759 *
760 * @param a the array
761 * @return the specified array
762 * @throws NullPointerException if the specified array is null
763 */
764 public <T> T[] toArray(T[] a) {
765 if (a.length > 0)
766 a[0] = null;
767 return a;
768 }
769
770 /**
771 * @throws UnsupportedOperationException {@inheritDoc}
772 * @throws ClassCastException {@inheritDoc}
773 * @throws NullPointerException {@inheritDoc}
774 * @throws IllegalArgumentException {@inheritDoc}
775 */
776 public int drainTo(Collection<? super E> c) {
777 if (c == null)
778 throw new NullPointerException();
779 if (c == this)
780 throw new IllegalArgumentException();
781 int n = 0;
782 E e;
783 while ( (e = poll()) != null) {
784 c.add(e);
785 ++n;
786 }
787 return n;
788 }
789
790 /**
791 * @throws UnsupportedOperationException {@inheritDoc}
792 * @throws ClassCastException {@inheritDoc}
793 * @throws NullPointerException {@inheritDoc}
794 * @throws IllegalArgumentException {@inheritDoc}
795 */
796 public int drainTo(Collection<? super E> c, int maxElements) {
797 if (c == null)
798 throw new NullPointerException();
799 if (c == this)
800 throw new IllegalArgumentException();
801 int n = 0;
802 E e;
803 while (n < maxElements && (e = poll()) != null) {
804 c.add(e);
805 ++n;
806 }
807 return n;
808 }
809 }