ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.54
Committed: Sun Jun 19 23:13:42 2005 UTC (18 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.53: +1 -1 lines
Log Message:
doc fixes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each insert
13 * operation must wait for a corresponding remove operation by another
14 * thread, and vice versa. A synchronous queue does not have any
15 * internal capacity, not even a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to remove it; you cannot insert an element
18 * (using any method) unless another thread is trying to remove it;
19 * you cannot iterate as there is nothing to iterate. The
20 * <em>head</em> of the queue is the element that the first queued
21 * inserting thread is trying to add to the queue; if there is no such
22 * queued thread then no element is available for removal and
23 * <tt>poll()</tt> will return <tt>null</tt>. For purposes of other
24 * <tt>Collection</tt> methods (for example <tt>contains</tt>), a
25 * <tt>SynchronousQueue</tt> acts as an empty collection. This queue
26 * does not permit <tt>null</tt> elements.
27 *
28 * <p>Synchronous queues are similar to rendezvous channels used in
29 * CSP and Ada. They are well suited for handoff designs, in which an
30 * object running in one thread must sync up with an object running
31 * in another thread in order to hand it some information, event, or
32 * task.
33 *
34 * <p> This class supports an optional fairness policy for ordering
35 * waiting producer and consumer threads. By default, this ordering
36 * is not guaranteed. However, a queue constructed with fairness set
37 * to <tt>true</tt> grants threads access in FIFO order. Fairness
38 * generally decreases throughput but reduces variability and avoids
39 * starvation.
40 *
41 * <p>This class and its iterator implement all of the
42 * <em>optional</em> methods of the {@link Collection} and {@link
43 * Iterator} interfaces.
44 *
45 * <p>This class is a member of the
46 * <a href="{@docRoot}/../guide/collections/index.html">
47 * Java Collections Framework</a>.
48 *
49 * @since 1.5
50 * @author Doug Lea
51 * @param <E> the type of elements held in this collection
52 */
53 public class SynchronousQueue<E> extends AbstractQueue<E>
54 implements BlockingQueue<E>, java.io.Serializable {
55 private static final long serialVersionUID = -3223113410248163686L;
56
57 /*
58 This implementation divides actions into two cases for puts:
59
60 * An arriving producer that does not already have a waiting consumer
61 creates a node holding item, and then waits for a consumer to take it.
62 * An arriving producer that does already have a waiting consumer fills
63 the slot node created by the consumer, and notifies it to continue.
64
65 And symmetrically, two for takes:
66
67 * An arriving consumer that does not already have a waiting producer
68 creates an empty slot node, and then waits for a producer to fill it.
69 * An arriving consumer that does already have a waiting producer takes
70 item from the node created by the producer, and notifies it to continue.
71
72 When a put or take waiting for the actions of its counterpart
73 aborts due to interruption or timeout, it marks the node
74 it created as "CANCELLED", which causes its counterpart to retry
75 the entire put or take sequence.
76
77 This requires keeping two simple queues, waitingProducers and
78 waitingConsumers. Each of these can be FIFO (preserves fairness)
79 or LIFO (improves throughput).
80 */
81
82 /** Lock protecting both wait queues */
83 private final ReentrantLock qlock;
84 /** Queue holding waiting puts */
85 private final WaitQueue waitingProducers;
86 /** Queue holding waiting takes */
87 private final WaitQueue waitingConsumers;
88
89 /**
90 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
91 */
92 public SynchronousQueue() {
93 this(false);
94 }
95
96 /**
97 * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
98 * @param fair if true, threads contend in FIFO order for access;
99 * otherwise the order is unspecified.
100 */
101 public SynchronousQueue(boolean fair) {
102 if (fair) {
103 qlock = new ReentrantLock(true);
104 waitingProducers = new FifoWaitQueue();
105 waitingConsumers = new FifoWaitQueue();
106 }
107 else {
108 qlock = new ReentrantLock();
109 waitingProducers = new LifoWaitQueue();
110 waitingConsumers = new LifoWaitQueue();
111 }
112 }
113
114 /**
115 * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
116 * These queues have all transient fields, but are serializable
117 * in order to recover fairness settings when deserialized.
118 */
119 static abstract class WaitQueue implements java.io.Serializable {
120 /** Creates, adds, and returns node for x. */
121 abstract Node enq(Object x);
122 /** Removes and returns node, or null if empty. */
123 abstract Node deq();
124 /** Removes a cancelled node to avoid garbage retention. */
125 abstract void unlink(Node node);
126 /** Returns true if a cancelled node might be on queue. */
127 abstract boolean shouldUnlink(Node node);
128 }
129
130 /**
131 * FIFO queue to hold waiting puts/takes.
132 */
133 static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
134 private static final long serialVersionUID = -3623113410248163686L;
135 private transient Node head;
136 private transient Node last;
137
138 Node enq(Object x) {
139 Node p = new Node(x);
140 if (last == null)
141 last = head = p;
142 else
143 last = last.next = p;
144 return p;
145 }
146
147 Node deq() {
148 Node p = head;
149 if (p != null) {
150 if ((head = p.next) == null)
151 last = null;
152 p.next = null;
153 }
154 return p;
155 }
156
157 boolean shouldUnlink(Node node) {
158 return (node == last || node.next != null);
159 }
160
161 void unlink(Node node) {
162 Node p = head;
163 Node trail = null;
164 while (p != null) {
165 if (p == node) {
166 Node next = p.next;
167 if (trail == null)
168 head = next;
169 else
170 trail.next = next;
171 if (last == node)
172 last = trail;
173 break;
174 }
175 trail = p;
176 p = p.next;
177 }
178 }
179 }
180
181 /**
182 * LIFO queue to hold waiting puts/takes.
183 */
184 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
185 private static final long serialVersionUID = -3633113410248163686L;
186 private transient Node head;
187
188 Node enq(Object x) {
189 return head = new Node(x, head);
190 }
191
192 Node deq() {
193 Node p = head;
194 if (p != null) {
195 head = p.next;
196 p.next = null;
197 }
198 return p;
199 }
200
201 boolean shouldUnlink(Node node) {
202 // Return false if already dequeued or is bottom node (in which
203 // case we might retain at most one garbage node)
204 return (node == head || node.next != null);
205 }
206
207 void unlink(Node node) {
208 Node p = head;
209 Node trail = null;
210 while (p != null) {
211 if (p == node) {
212 Node next = p.next;
213 if (trail == null)
214 head = next;
215 else
216 trail.next = next;
217 break;
218 }
219 trail = p;
220 p = p.next;
221 }
222 }
223 }
224
225 /**
226 * Unlinks the given node from consumer queue. Called by cancelled
227 * (timeout, interrupt) waiters to avoid garbage retention in the
228 * absence of producers.
229 */
230 private void unlinkCancelledConsumer(Node node) {
231 // Use a form of double-check to avoid unnecessary locking and
232 // traversal. The first check outside lock might
233 // conservatively report true.
234 if (waitingConsumers.shouldUnlink(node)) {
235 qlock.lock();
236 try {
237 if (waitingConsumers.shouldUnlink(node))
238 waitingConsumers.unlink(node);
239 } finally {
240 qlock.unlock();
241 }
242 }
243 }
244
245 /**
246 * Unlinks the given node from producer queue. Symmetric
247 * to unlinkCancelledConsumer.
248 */
249 private void unlinkCancelledProducer(Node node) {
250 if (waitingProducers.shouldUnlink(node)) {
251 qlock.lock();
252 try {
253 if (waitingProducers.shouldUnlink(node))
254 waitingProducers.unlink(node);
255 } finally {
256 qlock.unlock();
257 }
258 }
259 }
260
261 /**
262 * Nodes each maintain an item and handle waits and signals for
263 * getting and setting it. The class extends
264 * AbstractQueuedSynchronizer to manage blocking, using AQS state
265 * 0 for waiting, 1 for ack, -1 for cancelled.
266 */
267 static final class Node extends AbstractQueuedSynchronizer {
268 private static final long serialVersionUID = -2631493897867746127L;
269
270 /** Synchronization state value representing that node acked */
271 private static final int ACK = 1;
272 /** Synchronization state value representing that node cancelled */
273 private static final int CANCEL = -1;
274
275 /** The item being transferred */
276 Object item;
277 /** Next node in wait queue */
278 Node next;
279
280 /** Creates a node with initial item */
281 Node(Object x) { item = x; }
282
283 /** Creates a node with initial item and next */
284 Node(Object x, Node n) { item = x; next = n; }
285
286 /**
287 * Implements AQS base acquire to succeed if not in WAITING state
288 */
289 protected boolean tryAcquire(int ignore) {
290 return getState() != 0;
291 }
292
293 /**
294 * Implements AQS base release to signal if state changed
295 */
296 protected boolean tryRelease(int newState) {
297 return compareAndSetState(0, newState);
298 }
299
300 /**
301 * Takes item and nulls out field (for sake of GC)
302 */
303 private Object extract() {
304 Object x = item;
305 item = null;
306 return x;
307 }
308
309 /**
310 * Tries to cancel on interrupt; if so rethrowing,
311 * else setting interrupt state
312 */
313 private void checkCancellationOnInterrupt(InterruptedException ie)
314 throws InterruptedException {
315 if (release(CANCEL))
316 throw ie;
317 Thread.currentThread().interrupt();
318 }
319
320 /**
321 * Fills in the slot created by the consumer and signal consumer to
322 * continue.
323 */
324 boolean setItem(Object x) {
325 item = x; // can place in slot even if cancelled
326 return release(ACK);
327 }
328
329 /**
330 * Removes item from slot created by producer and signal producer
331 * to continue.
332 */
333 Object getItem() {
334 return (release(ACK))? extract() : null;
335 }
336
337 /**
338 * Waits for a consumer to take item placed by producer.
339 */
340 void waitForTake() throws InterruptedException {
341 try {
342 acquireInterruptibly(0);
343 } catch (InterruptedException ie) {
344 checkCancellationOnInterrupt(ie);
345 }
346 }
347
348 /**
349 * Waits for a producer to put item placed by consumer.
350 */
351 Object waitForPut() throws InterruptedException {
352 try {
353 acquireInterruptibly(0);
354 } catch (InterruptedException ie) {
355 checkCancellationOnInterrupt(ie);
356 }
357 return extract();
358 }
359
360 /**
361 * Waits for a consumer to take item placed by producer or time out.
362 */
363 boolean waitForTake(long nanos) throws InterruptedException {
364 try {
365 if (!tryAcquireNanos(0, nanos) &&
366 release(CANCEL))
367 return false;
368 } catch (InterruptedException ie) {
369 checkCancellationOnInterrupt(ie);
370 }
371 return true;
372 }
373
374 /**
375 * Waits for a producer to put item placed by consumer, or time out.
376 */
377 Object waitForPut(long nanos) throws InterruptedException {
378 try {
379 if (!tryAcquireNanos(0, nanos) &&
380 release(CANCEL))
381 return null;
382 } catch (InterruptedException ie) {
383 checkCancellationOnInterrupt(ie);
384 }
385 return extract();
386 }
387 }
388
389 /**
390 * Adds the specified element to this queue, waiting if necessary for
391 * another thread to receive it.
392 *
393 * @throws InterruptedException {@inheritDoc}
394 * @throws NullPointerException {@inheritDoc}
395 */
396 public void put(E e) throws InterruptedException {
397 if (e == null) throw new NullPointerException();
398 final ReentrantLock qlock = this.qlock;
399
400 for (;;) {
401 Node node;
402 boolean mustWait;
403 if (Thread.interrupted()) throw new InterruptedException();
404 qlock.lock();
405 try {
406 node = waitingConsumers.deq();
407 if ( (mustWait = (node == null)) )
408 node = waitingProducers.enq(e);
409 } finally {
410 qlock.unlock();
411 }
412
413 if (mustWait) {
414 try {
415 node.waitForTake();
416 return;
417 } catch (InterruptedException ex) {
418 unlinkCancelledProducer(node);
419 throw ex;
420 }
421 }
422
423 else if (node.setItem(e))
424 return;
425
426 // else consumer cancelled, so retry
427 }
428 }
429
430 /**
431 * Inserts the specified element into this queue, waiting if necessary
432 * up to the specified wait time for another thread to receive it.
433 *
434 * @return <tt>true</tt> if successful, or <tt>false</tt> if the
435 * specified waiting time elapses before a consumer appears.
436 * @throws InterruptedException {@inheritDoc}
437 * @throws NullPointerException {@inheritDoc}
438 */
439 public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
440 if (e == null) throw new NullPointerException();
441 long nanos = unit.toNanos(timeout);
442 final ReentrantLock qlock = this.qlock;
443 for (;;) {
444 Node node;
445 boolean mustWait;
446 if (Thread.interrupted()) throw new InterruptedException();
447 qlock.lock();
448 try {
449 node = waitingConsumers.deq();
450 if ( (mustWait = (node == null)) )
451 node = waitingProducers.enq(e);
452 } finally {
453 qlock.unlock();
454 }
455
456 if (mustWait) {
457 try {
458 boolean x = node.waitForTake(nanos);
459 if (!x)
460 unlinkCancelledProducer(node);
461 return x;
462 } catch (InterruptedException ex) {
463 unlinkCancelledProducer(node);
464 throw ex;
465 }
466 }
467
468 else if (node.setItem(e))
469 return true;
470
471 // else consumer cancelled, so retry
472 }
473 }
474
475 /**
476 * Retrieves and removes the head of this queue, waiting if necessary
477 * for another thread to insert it.
478 *
479 * @return the head of this queue
480 * @throws InterruptedException {@inheritDoc}
481 */
482 public E take() throws InterruptedException {
483 final ReentrantLock qlock = this.qlock;
484 for (;;) {
485 Node node;
486 boolean mustWait;
487
488 if (Thread.interrupted()) throw new InterruptedException();
489 qlock.lock();
490 try {
491 node = waitingProducers.deq();
492 if ( (mustWait = (node == null)) )
493 node = waitingConsumers.enq(null);
494 } finally {
495 qlock.unlock();
496 }
497
498 if (mustWait) {
499 try {
500 Object x = node.waitForPut();
501 return (E)x;
502 } catch (InterruptedException ex) {
503 unlinkCancelledConsumer(node);
504 throw ex;
505 }
506 }
507 else {
508 Object x = node.getItem();
509 if (x != null)
510 return (E)x;
511 // else cancelled, so retry
512 }
513 }
514 }
515
516 /**
517 * Retrieves and removes the head of this queue, waiting
518 * if necessary up to the specified wait time, for another thread
519 * to insert it.
520 *
521 * @return the head of this queue, or <tt>null</tt> if the
522 * specified waiting time elapses before an element is present.
523 * @throws InterruptedException {@inheritDoc}
524 */
525 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
526 long nanos = unit.toNanos(timeout);
527 final ReentrantLock qlock = this.qlock;
528
529 for (;;) {
530 Node node;
531 boolean mustWait;
532
533 if (Thread.interrupted()) throw new InterruptedException();
534 qlock.lock();
535 try {
536 node = waitingProducers.deq();
537 if ( (mustWait = (node == null)) )
538 node = waitingConsumers.enq(null);
539 } finally {
540 qlock.unlock();
541 }
542
543 if (mustWait) {
544 try {
545 Object x = node.waitForPut(nanos);
546 if (x == null)
547 unlinkCancelledConsumer(node);
548 return (E)x;
549 } catch (InterruptedException ex) {
550 unlinkCancelledConsumer(node);
551 throw ex;
552 }
553 }
554 else {
555 Object x = node.getItem();
556 if (x != null)
557 return (E)x;
558 // else cancelled, so retry
559 }
560 }
561 }
562
563 // Untimed nonblocking versions
564
565 /**
566 * Inserts the specified element into this queue, if another thread is
567 * waiting to receive it.
568 *
569 * @param e the element to add
570 * @return <tt>true</tt> if the element was added to this queue, else
571 * <tt>false</tt>
572 * @throws NullPointerException if the specified element is null
573 */
574 public boolean offer(E e) {
575 if (e == null) throw new NullPointerException();
576 final ReentrantLock qlock = this.qlock;
577
578 for (;;) {
579 Node node;
580 qlock.lock();
581 try {
582 node = waitingConsumers.deq();
583 } finally {
584 qlock.unlock();
585 }
586 if (node == null)
587 return false;
588
589 else if (node.setItem(e))
590 return true;
591 // else retry
592 }
593 }
594
595 /**
596 * Retrieves and removes the head of this queue, if another thread
597 * is currently making an element available.
598 *
599 * @return the head of this queue, or <tt>null</tt> if no
600 * element is available.
601 */
602 public E poll() {
603 final ReentrantLock qlock = this.qlock;
604 for (;;) {
605 Node node;
606 qlock.lock();
607 try {
608 node = waitingProducers.deq();
609 } finally {
610 qlock.unlock();
611 }
612 if (node == null)
613 return null;
614
615 else {
616 Object x = node.getItem();
617 if (x != null)
618 return (E)x;
619 // else retry
620 }
621 }
622 }
623
624 /**
625 * Always returns <tt>true</tt>.
626 * A <tt>SynchronousQueue</tt> has no internal capacity.
627 *
628 * @return <tt>true</tt>
629 */
630 public boolean isEmpty() {
631 return true;
632 }
633
634 /**
635 * Always returns zero.
636 * A <tt>SynchronousQueue</tt> has no internal capacity.
637 *
638 * @return zero
639 */
640 public int size() {
641 return 0;
642 }
643
644 /**
645 * Always returns zero.
646 * A <tt>SynchronousQueue</tt> has no internal capacity.
647 *
648 * @return zero
649 */
650 public int remainingCapacity() {
651 return 0;
652 }
653
654 /**
655 * Does nothing.
656 * A <tt>SynchronousQueue</tt> has no internal capacity.
657 */
658 public void clear() {}
659
660 /**
661 * Always returns <tt>false</tt>.
662 * A <tt>SynchronousQueue</tt> has no internal capacity.
663 *
664 * @param o object to be checked for containment in this queue
665 * @return <tt>false</tt>
666 */
667 public boolean contains(Object o) {
668 return false;
669 }
670
671 /**
672 * Always returns <tt>false</tt>.
673 * A <tt>SynchronousQueue</tt> has no internal capacity.
674 *
675 * @param o the element to remove
676 * @return <tt>false</tt>
677 */
678 public boolean remove(Object o) {
679 return false;
680 }
681
682 /**
683 * Returns <tt>false</tt> unless the given collection is empty.
684 * A <tt>SynchronousQueue</tt> has no internal capacity.
685 *
686 * @param c the collection
687 * @return <tt>false</tt> unless the given collection is empty
688 * @throws NullPointerException if the specified collection is null
689 */
690 public boolean containsAll(Collection<?> c) {
691 return c.isEmpty();
692 }
693
694 /**
695 * Always returns <tt>false</tt>.
696 * A <tt>SynchronousQueue</tt> has no internal capacity.
697 *
698 * @param c the collection
699 * @return <tt>false</tt>
700 */
701 public boolean removeAll(Collection<?> c) {
702 return false;
703 }
704
705 /**
706 * Always returns <tt>false</tt>.
707 * A <tt>SynchronousQueue</tt> has no internal capacity.
708 *
709 * @param c the collection
710 * @return <tt>false</tt>
711 */
712 public boolean retainAll(Collection<?> c) {
713 return false;
714 }
715
716 /**
717 * Always returns <tt>null</tt>.
718 * A <tt>SynchronousQueue</tt> does not return elements
719 * unless actively waited on.
720 *
721 * @return <tt>null</tt>
722 */
723 public E peek() {
724 return null;
725 }
726
727
728 static class EmptyIterator<E> implements Iterator<E> {
729 public boolean hasNext() {
730 return false;
731 }
732 public E next() {
733 throw new NoSuchElementException();
734 }
735 public void remove() {
736 throw new IllegalStateException();
737 }
738 }
739
740 /**
741 * Returns an empty iterator in which <tt>hasNext</tt> always returns
742 * <tt>false</tt>.
743 *
744 * @return an empty iterator
745 */
746 public Iterator<E> iterator() {
747 return new EmptyIterator<E>();
748 }
749
750
751 /**
752 * Returns a zero-length array.
753 * @return a zero-length array
754 */
755 public Object[] toArray() {
756 return new Object[0];
757 }
758
759 /**
760 * Sets the zeroeth element of the specified array to <tt>null</tt>
761 * (if the array has non-zero length) and returns it.
762 *
763 * @param a the array
764 * @return the specified array
765 * @throws NullPointerException if the specified array is null
766 */
767 public <T> T[] toArray(T[] a) {
768 if (a.length > 0)
769 a[0] = null;
770 return a;
771 }
772
773 /**
774 * @throws UnsupportedOperationException {@inheritDoc}
775 * @throws ClassCastException {@inheritDoc}
776 * @throws NullPointerException {@inheritDoc}
777 * @throws IllegalArgumentException {@inheritDoc}
778 */
779 public int drainTo(Collection<? super E> c) {
780 if (c == null)
781 throw new NullPointerException();
782 if (c == this)
783 throw new IllegalArgumentException();
784 int n = 0;
785 E e;
786 while ( (e = poll()) != null) {
787 c.add(e);
788 ++n;
789 }
790 return n;
791 }
792
793 /**
794 * @throws UnsupportedOperationException {@inheritDoc}
795 * @throws ClassCastException {@inheritDoc}
796 * @throws NullPointerException {@inheritDoc}
797 * @throws IllegalArgumentException {@inheritDoc}
798 */
799 public int drainTo(Collection<? super E> c, int maxElements) {
800 if (c == null)
801 throw new NullPointerException();
802 if (c == this)
803 throw new IllegalArgumentException();
804 int n = 0;
805 E e;
806 while (n < maxElements && (e = poll()) != null) {
807 c.add(e);
808 ++n;
809 }
810 return n;
811 }
812 }