ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.52
Committed: Wed May 18 19:05:07 2005 UTC (19 years ago) by jsr166
Branch: MAIN
Changes since 1.51: +15 -14 lines
Log Message:
doc clarifications

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each insert
13 * operation must wait for a corresponding remove operation by another
14 * thread, and vice versa. A synchronous queue does not have any
15 * internal capacity, not even a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to remove it; you cannot insert an element
18 * (using any method) unless another thread is trying to remove it;
19 * you cannot iterate as there is nothing to iterate. The
20 * <em>head</em> of the queue is the element that the first queued
21 * inserting thread is trying to add to the queue; if there is no such
22 * queued thread then no element is available for removal and
23 * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other
24 * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
25 * <tt>SynchronousQueue</tt> acts as an empty collection. This queue
26 * does not permit <tt>null</tt> elements.
27 *
28 * <p>Synchronous queues are similar to rendezvous channels used in
29 * CSP and Ada. They are well suited for handoff designs, in which an
30 * object running in one thread must sync up with an object running
31 * in another thread in order to hand it some information, event, or
32 * task.
33 *
34 * <p> This class supports an optional fairness policy for ordering
35 * waiting producer and consumer threads. By default, this ordering
36 * is not guaranteed. However, a queue constructed with fairness set
37 * to <tt>true</tt> grants threads access in FIFO order. Fairness
38 * generally decreases throughput but reduces variability and avoids
39 * starvation.
40 *
41 * <p>This class and its iterator implement all of the
42 * <em>optional</em> methods of the {@link Collection} and {@link
43 * Iterator} interfaces.
44 *
45 * <p>This class is a member of the
46 * <a href="{@docRoot}/../guide/collections/index.html">
47 * Java Collections Framework</a>.
48 *
49 * @since 1.5
50 * @author Doug Lea
51 * @param <E> the type of elements held in this collection
52 */
53 public class SynchronousQueue<E> extends AbstractQueue<E>
54 implements BlockingQueue<E>, java.io.Serializable {
55 private static final long serialVersionUID = -3223113410248163686L;
56
57 /*
58 This implementation divides actions into two cases for puts:
59
60 * An arriving producer that does not already have a waiting consumer
61 creates a node holding item, and then waits for a consumer to take it.
62 * An arriving producer that does already have a waiting consumer fills
63 the slot node created by the consumer, and notifies it to continue.
64
65 And symmetrically, two for takes:
66
67 * An arriving consumer that does not already have a waiting producer
68 creates an empty slot node, and then waits for a producer to fill it.
69 * An arriving consumer that does already have a waiting producer takes
70 item from the node created by the producer, and notifies it to continue.
71
72 When a put or take waiting for the actions of its counterpart
73 aborts due to interruption or timeout, it marks the node
74 it created as "CANCELLED", which causes its counterpart to retry
75 the entire put or take sequence.
76
77 This requires keeping two simple queues, waitingProducers and
78 waitingConsumers. Each of these can be FIFO (preserves fairness)
79 or LIFO (improves throughput).
80 */
81
82 /** Lock protecting both wait queues */
83 private final ReentrantLock qlock;
84 /** Queue holding waiting puts */
85 private final WaitQueue waitingProducers;
86 /** Queue holding waiting takes */
87 private final WaitQueue waitingConsumers;
88
89 /**
90 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
91 */
92 public SynchronousQueue() {
93 this(false);
94 }
95
96 /**
97 * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
98 * @param fair if true, threads contend in FIFO order for access;
99 * otherwise the order is unspecified.
100 */
101 public SynchronousQueue(boolean fair) {
102 if (fair) {
103 qlock = new ReentrantLock(true);
104 waitingProducers = new FifoWaitQueue();
105 waitingConsumers = new FifoWaitQueue();
106 }
107 else {
108 qlock = new ReentrantLock();
109 waitingProducers = new LifoWaitQueue();
110 waitingConsumers = new LifoWaitQueue();
111 }
112 }
113
114 /**
115 * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
116 * These queues have all transient fields, but are serializable
117 * in order to recover fairness settings when deserialized.
118 */
119 static abstract class WaitQueue implements java.io.Serializable {
120 /** Creates, adds, and returns node for x. */
121 abstract Node enq(Object x);
122 /** Removes and returns node, or null if empty. */
123 abstract Node deq();
124 /** Removes a cancelled node to avoid garbage retention. */
125 abstract void unlink(Node node);
126 /** Returns true if a cancelled node might be on queue. */
127 abstract boolean shouldUnlink(Node node);
128 }
129
130 /**
131 * FIFO queue to hold waiting puts/takes.
132 */
133 static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
134 private static final long serialVersionUID = -3623113410248163686L;
135 private transient Node head;
136 private transient Node last;
137
138 Node enq(Object x) {
139 Node p = new Node(x);
140 if (last == null)
141 last = head = p;
142 else
143 last = last.next = p;
144 return p;
145 }
146
147 Node deq() {
148 Node p = head;
149 if (p != null) {
150 if ((head = p.next) == null)
151 last = null;
152 p.next = null;
153 }
154 return p;
155 }
156
157 boolean shouldUnlink(Node node) {
158 return (node == last || node.next != null);
159 }
160
161 void unlink(Node node) {
162 Node p = head;
163 Node trail = null;
164 while (p != null) {
165 if (p == node) {
166 Node next = p.next;
167 if (trail == null)
168 head = next;
169 else
170 trail.next = next;
171 if (last == node)
172 last = trail;
173 break;
174 }
175 trail = p;
176 p = p.next;
177 }
178 }
179 }
180
181 /**
182 * LIFO queue to hold waiting puts/takes.
183 */
184 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
185 private static final long serialVersionUID = -3633113410248163686L;
186 private transient Node head;
187
188 Node enq(Object x) {
189 return head = new Node(x, head);
190 }
191
192 Node deq() {
193 Node p = head;
194 if (p != null) {
195 head = p.next;
196 p.next = null;
197 }
198 return p;
199 }
200
201 boolean shouldUnlink(Node node) {
202 // Return false if already dequeued or is bottom node (in which
203 // case we might retain at most one garbage node)
204 return (node == head || node.next != null);
205 }
206
207 void unlink(Node node) {
208 Node p = head;
209 Node trail = null;
210 while (p != null) {
211 if (p == node) {
212 Node next = p.next;
213 if (trail == null)
214 head = next;
215 else
216 trail.next = next;
217 break;
218 }
219 trail = p;
220 p = p.next;
221 }
222 }
223 }
224
225 /**
226 * Unlinks the given node from consumer queue. Called by cancelled
227 * (timeout, interrupt) waiters to avoid garbage retention in the
228 * absence of producers.
229 */
230 private void unlinkCancelledConsumer(Node node) {
231 // Use a form of double-check to avoid unnecessary locking and
232 // traversal. The first check outside lock might
233 // conservatively report true.
234 if (waitingConsumers.shouldUnlink(node)) {
235 qlock.lock();
236 try {
237 if (waitingConsumers.shouldUnlink(node))
238 waitingConsumers.unlink(node);
239 } finally {
240 qlock.unlock();
241 }
242 }
243 }
244
245 /**
246 * Unlinks the given node from producer queue. Symmetric
247 * to unlinkCancelledConsumer.
248 */
249 private void unlinkCancelledProducer(Node node) {
250 if (waitingProducers.shouldUnlink(node)) {
251 qlock.lock();
252 try {
253 if (waitingProducers.shouldUnlink(node))
254 waitingProducers.unlink(node);
255 } finally {
256 qlock.unlock();
257 }
258 }
259 }
260
261 /**
262 * Nodes each maintain an item and handle waits and signals for
263 * getting and setting it. The class extends
264 * AbstractQueuedSynchronizer to manage blocking, using AQS state
265 * 0 for waiting, 1 for ack, -1 for cancelled.
266 */
267 static final class Node extends AbstractQueuedSynchronizer {
268 /** Synchronization state value representing that node acked */
269 private static final int ACK = 1;
270 /** Synchronization state value representing that node cancelled */
271 private static final int CANCEL = -1;
272
273 /** The item being transferred */
274 Object item;
275 /** Next node in wait queue */
276 Node next;
277
278 /** Creates a node with initial item */
279 Node(Object x) { item = x; }
280
281 /** Creates a node with initial item and next */
282 Node(Object x, Node n) { item = x; next = n; }
283
284 /**
285 * Implements AQS base acquire to succeed if not in WAITING state
286 */
287 protected boolean tryAcquire(int ignore) {
288 return getState() != 0;
289 }
290
291 /**
292 * Implements AQS base release to signal if state changed
293 */
294 protected boolean tryRelease(int newState) {
295 return compareAndSetState(0, newState);
296 }
297
298 /**
299 * Takes item and nulls out field (for sake of GC)
300 */
301 private Object extract() {
302 Object x = item;
303 item = null;
304 return x;
305 }
306
307 /**
308 * Tries to cancel on interrupt; if so rethrowing,
309 * else setting interrupt state
310 */
311 private void checkCancellationOnInterrupt(InterruptedException ie)
312 throws InterruptedException {
313 if (release(CANCEL))
314 throw ie;
315 Thread.currentThread().interrupt();
316 }
317
318 /**
319 * Fills in the slot created by the consumer and signal consumer to
320 * continue.
321 */
322 boolean setItem(Object x) {
323 item = x; // can place in slot even if cancelled
324 return release(ACK);
325 }
326
327 /**
328 * Removes item from slot created by producer and signal producer
329 * to continue.
330 */
331 Object getItem() {
332 return (release(ACK))? extract() : null;
333 }
334
335 /**
336 * Waits for a consumer to take item placed by producer.
337 */
338 void waitForTake() throws InterruptedException {
339 try {
340 acquireInterruptibly(0);
341 } catch (InterruptedException ie) {
342 checkCancellationOnInterrupt(ie);
343 }
344 }
345
346 /**
347 * Waits for a producer to put item placed by consumer.
348 */
349 Object waitForPut() throws InterruptedException {
350 try {
351 acquireInterruptibly(0);
352 } catch (InterruptedException ie) {
353 checkCancellationOnInterrupt(ie);
354 }
355 return extract();
356 }
357
358 /**
359 * Waits for a consumer to take item placed by producer or time out.
360 */
361 boolean waitForTake(long nanos) throws InterruptedException {
362 try {
363 if (!tryAcquireNanos(0, nanos) &&
364 release(CANCEL))
365 return false;
366 } catch (InterruptedException ie) {
367 checkCancellationOnInterrupt(ie);
368 }
369 return true;
370 }
371
372 /**
373 * Waits for a producer to put item placed by consumer, or time out.
374 */
375 Object waitForPut(long nanos) throws InterruptedException {
376 try {
377 if (!tryAcquireNanos(0, nanos) &&
378 release(CANCEL))
379 return null;
380 } catch (InterruptedException ie) {
381 checkCancellationOnInterrupt(ie);
382 }
383 return extract();
384 }
385 }
386
387 /**
388 * Adds the specified element to this queue, waiting if necessary for
389 * another thread to receive it.
390 *
391 * @throws InterruptedException {@inheritDoc}
392 * @throws NullPointerException {@inheritDoc}
393 */
394 public void put(E e) throws InterruptedException {
395 if (e == null) throw new NullPointerException();
396 final ReentrantLock qlock = this.qlock;
397
398 for (;;) {
399 Node node;
400 boolean mustWait;
401 if (Thread.interrupted()) throw new InterruptedException();
402 qlock.lock();
403 try {
404 node = waitingConsumers.deq();
405 if ( (mustWait = (node == null)) )
406 node = waitingProducers.enq(e);
407 } finally {
408 qlock.unlock();
409 }
410
411 if (mustWait) {
412 try {
413 node.waitForTake();
414 return;
415 } catch (InterruptedException ex) {
416 unlinkCancelledProducer(node);
417 throw ex;
418 }
419 }
420
421 else if (node.setItem(e))
422 return;
423
424 // else consumer cancelled, so retry
425 }
426 }
427
428 /**
429 * Inserts the specified element into this queue, waiting if necessary
430 * up to the specified wait time for another thread to receive it.
431 *
432 * @return <tt>true</tt> if successful, or <tt>false</tt> if the
433 * specified waiting time elapses before a consumer appears.
434 * @throws InterruptedException {@inheritDoc}
435 * @throws NullPointerException {@inheritDoc}
436 */
437 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
438 if (e == null) throw new NullPointerException();
439 long nanos = unit.toNanos(timeout);
440 final ReentrantLock qlock = this.qlock;
441 for (;;) {
442 Node node;
443 boolean mustWait;
444 if (Thread.interrupted()) throw new InterruptedException();
445 qlock.lock();
446 try {
447 node = waitingConsumers.deq();
448 if ( (mustWait = (node == null)) )
449 node = waitingProducers.enq(e);
450 } finally {
451 qlock.unlock();
452 }
453
454 if (mustWait) {
455 try {
456 boolean x = node.waitForTake(nanos);
457 if (!x)
458 unlinkCancelledProducer(node);
459 return x;
460 } catch (InterruptedException ex) {
461 unlinkCancelledProducer(node);
462 throw ex;
463 }
464 }
465
466 else if (node.setItem(e))
467 return true;
468
469 // else consumer cancelled, so retry
470 }
471 }
472
473 /**
474 * Retrieves and removes the head of this queue, waiting if necessary
475 * for another thread to insert it.
476 *
477 * @return the head of this queue
478 * @throws InterruptedException {@inheritDoc}
479 */
480 public E take() throws InterruptedException {
481 final ReentrantLock qlock = this.qlock;
482 for (;;) {
483 Node node;
484 boolean mustWait;
485
486 if (Thread.interrupted()) throw new InterruptedException();
487 qlock.lock();
488 try {
489 node = waitingProducers.deq();
490 if ( (mustWait = (node == null)) )
491 node = waitingConsumers.enq(null);
492 } finally {
493 qlock.unlock();
494 }
495
496 if (mustWait) {
497 try {
498 Object x = node.waitForPut();
499 return (E)x;
500 } catch (InterruptedException ex) {
501 unlinkCancelledConsumer(node);
502 throw ex;
503 }
504 }
505 else {
506 Object x = node.getItem();
507 if (x != null)
508 return (E)x;
509 // else cancelled, so retry
510 }
511 }
512 }
513
514 /**
515 * Retrieves and removes the head of this queue, waiting
516 * if necessary up to the specified wait time, for another thread
517 * to insert it.
518 *
519 * @return the head of this queue, or <tt>null</tt> if the
520 * specified waiting time elapses before an element is present.
521 * @throws InterruptedException {@inheritDoc}
522 */
523 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
524 long nanos = unit.toNanos(timeout);
525 final ReentrantLock qlock = this.qlock;
526
527 for (;;) {
528 Node node;
529 boolean mustWait;
530
531 if (Thread.interrupted()) throw new InterruptedException();
532 qlock.lock();
533 try {
534 node = waitingProducers.deq();
535 if ( (mustWait = (node == null)) )
536 node = waitingConsumers.enq(null);
537 } finally {
538 qlock.unlock();
539 }
540
541 if (mustWait) {
542 try {
543 Object x = node.waitForPut(nanos);
544 if (x == null)
545 unlinkCancelledConsumer(node);
546 return (E)x;
547 } catch (InterruptedException ex) {
548 unlinkCancelledConsumer(node);
549 throw ex;
550 }
551 }
552 else {
553 Object x = node.getItem();
554 if (x != null)
555 return (E)x;
556 // else cancelled, so retry
557 }
558 }
559 }
560
561 // Untimed nonblocking versions
562
563 /**
564 * Inserts the specified element into this queue, if another thread is
565 * waiting to receive it.
566 *
567 * @param e the element to add
568 * @return <tt>true</tt> if the element was added to this queue, else
569 * <tt>false</tt>
570 * @throws NullPointerException if the specified element is null
571 */
572 public boolean offer(E e) {
573 if (e == null) throw new NullPointerException();
574 final ReentrantLock qlock = this.qlock;
575
576 for (;;) {
577 Node node;
578 qlock.lock();
579 try {
580 node = waitingConsumers.deq();
581 } finally {
582 qlock.unlock();
583 }
584 if (node == null)
585 return false;
586
587 else if (node.setItem(e))
588 return true;
589 // else retry
590 }
591 }
592
593 /**
594 * Retrieves and removes the head of this queue, if another thread
595 * is currently making an element available.
596 *
597 * @return the head of this queue, or <tt>null</tt> if no
598 * element is available.
599 */
600 public E poll() {
601 final ReentrantLock qlock = this.qlock;
602 for (;;) {
603 Node node;
604 qlock.lock();
605 try {
606 node = waitingProducers.deq();
607 } finally {
608 qlock.unlock();
609 }
610 if (node == null)
611 return null;
612
613 else {
614 Object x = node.getItem();
615 if (x != null)
616 return (E)x;
617 // else retry
618 }
619 }
620 }
621
622 /**
623 * Always returns <tt>true</tt>.
624 * A <tt>SynchronousQueue</tt> has no internal capacity.
625 *
626 * @return <tt>true</tt>
627 */
628 public boolean isEmpty() {
629 return true;
630 }
631
632 /**
633 * Always returns zero.
634 * A <tt>SynchronousQueue</tt> has no internal capacity.
635 *
636 * @return zero
637 */
638 public int size() {
639 return 0;
640 }
641
642 /**
643 * Always returns zero.
644 * A <tt>SynchronousQueue</tt> has no internal capacity.
645 *
646 * @return zero
647 */
648 public int remainingCapacity() {
649 return 0;
650 }
651
652 /**
653 * Does nothing.
654 * A <tt>SynchronousQueue</tt> has no internal capacity.
655 */
656 public void clear() {}
657
658 /**
659 * Always returns <tt>false</tt>.
660 * A <tt>SynchronousQueue</tt> has no internal capacity.
661 *
662 * @param o the element
663 * @return <tt>false</tt>
664 */
665 public boolean contains(Object o) {
666 return false;
667 }
668
669 /**
670 * Always returns <tt>false</tt>.
671 * A <tt>SynchronousQueue</tt> has no internal capacity.
672 *
673 * @param o the element to remove
674 * @return <tt>false</tt>
675 */
676 public boolean remove(Object o) {
677 return false;
678 }
679
680 /**
681 * Returns <tt>false</tt> unless the given collection is empty.
682 * A <tt>SynchronousQueue</tt> has no internal capacity.
683 *
684 * @param c the collection
685 * @return <tt>false</tt> unless the given collection is empty
686 * @throws NullPointerException if the specified collection is null
687 */
688 public boolean containsAll(Collection<?> c) {
689 return c.isEmpty();
690 }
691
692 /**
693 * Always returns <tt>false</tt>.
694 * A <tt>SynchronousQueue</tt> has no internal capacity.
695 *
696 * @param c the collection
697 * @return <tt>false</tt>
698 */
699 public boolean removeAll(Collection<?> c) {
700 return false;
701 }
702
703 /**
704 * Always returns <tt>false</tt>.
705 * A <tt>SynchronousQueue</tt> has no internal capacity.
706 *
707 * @param c the collection
708 * @return <tt>false</tt>
709 */
710 public boolean retainAll(Collection<?> c) {
711 return false;
712 }
713
714 /**
715 * Always returns <tt>null</tt>.
716 * A <tt>SynchronousQueue</tt> does not return elements
717 * unless actively waited on.
718 *
719 * @return <tt>null</tt>
720 */
721 public E peek() {
722 return null;
723 }
724
725
726 static class EmptyIterator<E> implements Iterator<E> {
727 public boolean hasNext() {
728 return false;
729 }
730 public E next() {
731 throw new NoSuchElementException();
732 }
733 public void remove() {
734 throw new IllegalStateException();
735 }
736 }
737
738 /**
739 * Returns an empty iterator in which <tt>hasNext</tt> always returns
740 * <tt>false</tt>.
741 *
742 * @return an empty iterator
743 */
744 public Iterator<E> iterator() {
745 return new EmptyIterator<E>();
746 }
747
748
749 /**
750 * Returns a zero-length array.
751 * @return a zero-length array
752 */
753 public Object[] toArray() {
754 return new Object[0];
755 }
756
757 /**
758 * Sets the zeroeth element of the specified array to <tt>null</tt>
759 * (if the array has non-zero length) and returns it.
760 *
761 * @param a the array
762 * @return the specified array
763 * @throws NullPointerException if the specified array is null
764 */
765 public <T> T[] toArray(T[] a) {
766 if (a.length > 0)
767 a[0] = null;
768 return a;
769 }
770
771 /**
772 * @throws UnsupportedOperationException {@inheritDoc}
773 * @throws ClassCastException {@inheritDoc}
774 * @throws NullPointerException {@inheritDoc}
775 * @throws IllegalArgumentException {@inheritDoc}
776 */
777 public int drainTo(Collection<? super E> c) {
778 if (c == null)
779 throw new NullPointerException();
780 if (c == this)
781 throw new IllegalArgumentException();
782 int n = 0;
783 E e;
784 while ( (e = poll()) != null) {
785 c.add(e);
786 ++n;
787 }
788 return n;
789 }
790
791 /**
792 * @throws UnsupportedOperationException {@inheritDoc}
793 * @throws ClassCastException {@inheritDoc}
794 * @throws NullPointerException {@inheritDoc}
795 * @throws IllegalArgumentException {@inheritDoc}
796 */
797 public int drainTo(Collection<? super E> c, int maxElements) {
798 if (c == null)
799 throw new NullPointerException();
800 if (c == this)
801 throw new IllegalArgumentException();
802 int n = 0;
803 E e;
804 while (n < maxElements && (e = poll()) != null) {
805 c.add(e);
806 ++n;
807 }
808 return n;
809 }
810 }