ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.47
Committed: Wed Mar 2 17:15:26 2005 UTC (19 years, 3 months ago) by dl
Branch: MAIN
Changes since 1.46: +121 -8 lines
Log Message:
Avoid gargage retention with timeouts

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity, not even a
15 * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
16 * because an element is only present when you try to take it; you
17 * cannot add an element (using any method) unless another thread is
18 * trying to remove it; you cannot iterate as there is nothing to
19 * iterate. The <em>head</em> of the queue is the element that the
20 * first queued thread is trying to add to the queue; if there are no
21 * queued threads then no element is being added and the head is
22 * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods
23 * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
24 * as an empty collection. This queue does not permit <tt>null</tt>
25 * elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must sync up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 *
33 * <p> This class supports an optional fairness policy for ordering
34 * waiting producer and consumer threads. By default, this ordering
35 * is not guaranteed. However, a queue constructed with fairness set
36 * to <tt>true</tt> grants threads access in FIFO order. Fairness
37 * generally decreases throughput but reduces variability and avoids
38 * starvation.
39 *
40 * <p>This class and its iterator implement all of the
41 * <em>optional</em> methods of the {@link Collection} and {@link
42 * Iterator} interfaces.
43 *
44 * <p>This class is a member of the
45 * <a href="{@docRoot}/../guide/collections/index.html">
46 * Java Collections Framework</a>.
47 *
48 * @since 1.5
49 * @author Doug Lea
50 * @param <E> the type of elements held in this collection
51 */
52 public class SynchronousQueue<E> extends AbstractQueue<E>
53 implements BlockingQueue<E>, java.io.Serializable {
54 private static final long serialVersionUID = -3223113410248163686L;
55
56 /*
57 This implementation divides actions into two cases for puts:
58
59 * An arriving producer that does not already have a waiting consumer
60 creates a node holding item, and then waits for a consumer to take it.
61 * An arriving producer that does already have a waiting consumer fills
62 the slot node created by the consumer, and notifies it to continue.
63
64 And symmetrically, two for takes:
65
66 * An arriving consumer that does not already have a waiting producer
67 creates an empty slot node, and then waits for a producer to fill it.
68 * An arriving consumer that does already have a waiting producer takes
69 item from the node created by the producer, and notifies it to continue.
70
71 When a put or take waiting for the actions of its counterpart
72 aborts due to interruption or timeout, it marks the node
73 it created as "CANCELLED", which causes its counterpart to retry
74 the entire put or take sequence.
75
76 This requires keeping two simple queues, waitingProducers and
77 waitingConsumers. Each of these can be FIFO (preserves fairness)
78 or LIFO (improves throughput).
79 */
80
81 /** Lock protecting both wait queues */
82 private final ReentrantLock qlock;
83 /** Queue holding waiting puts */
84 private final WaitQueue waitingProducers;
85 /** Queue holding waiting takes */
86 private final WaitQueue waitingConsumers;
87
88 /**
89 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
90 */
91 public SynchronousQueue() {
92 this(false);
93 }
94
95 /**
96 * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
97 * @param fair if true, threads contend in FIFO order for access;
98 * otherwise the order is unspecified.
99 */
100 public SynchronousQueue(boolean fair) {
101 if (fair) {
102 qlock = new ReentrantLock(true);
103 waitingProducers = new FifoWaitQueue();
104 waitingConsumers = new FifoWaitQueue();
105 }
106 else {
107 qlock = new ReentrantLock();
108 waitingProducers = new LifoWaitQueue();
109 waitingConsumers = new LifoWaitQueue();
110 }
111 }
112
113 /**
114 * Queue to hold waiting puts/takes; specialized to Fifo/Lifo below.
115 * These queues have all transient fields, but are serializable
116 * in order to recover fairness settings when deserialized.
117 */
118 static abstract class WaitQueue implements java.io.Serializable {
119 /** Create, add, and return node for x */
120 abstract Node enq(Object x);
121 /** Remove and return node, or null if empty */
122 abstract Node deq();
123 /** Remove a cancelled node to avoid garbage retention. */
124 abstract void unlink(Node node);
125 /** Return true if a cancelled node might be on queue */
126 abstract boolean shouldUnlink(Node node);
127 }
128
129 /**
130 * FIFO queue to hold waiting puts/takes.
131 */
132 static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
133 private static final long serialVersionUID = -3623113410248163686L;
134 private transient Node head;
135 private transient Node last;
136
137 Node enq(Object x) {
138 Node p = new Node(x);
139 if (last == null)
140 last = head = p;
141 else
142 last = last.next = p;
143 return p;
144 }
145
146 Node deq() {
147 Node p = head;
148 if (p != null) {
149 if ((head = p.next) == null)
150 last = null;
151 p.next = null;
152 }
153 return p;
154 }
155
156 boolean shouldUnlink(Node node) {
157 return (node == last || node.next != null);
158 }
159
160
161 void unlink(Node node) {
162 Node p = head;
163 Node trail = null;
164 while (p != null) {
165 if (p == node) {
166 Node next = p.next;
167 if (trail == null)
168 head = next;
169 else
170 trail.next = next;
171 if (last == node)
172 last = trail;
173 break;
174 }
175 trail = p;
176 p = p.next;
177 }
178 }
179 }
180
181 /**
182 * LIFO queue to hold waiting puts/takes.
183 */
184 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
185 private static final long serialVersionUID = -3633113410248163686L;
186 private transient Node head;
187
188 Node enq(Object x) {
189 return head = new Node(x, head);
190 }
191
192 Node deq() {
193 Node p = head;
194 if (p != null) {
195 head = p.next;
196 p.next = null;
197 }
198 return p;
199 }
200
201 boolean shouldUnlink(Node node) {
202 // Return false if already dequeued or is bottom node (in which
203 // case we might retain at most one garbage node)
204 return (node == head || node.next != null);
205 }
206
207 void unlink(Node node) {
208 Node p = head;
209 Node trail = null;
210 while (p != null) {
211 if (p == node) {
212 Node next = p.next;
213 if (trail == null)
214 head = next;
215 else
216 trail.next = next;
217 break;
218 }
219 trail = p;
220 p = p.next;
221 }
222 }
223 }
224
225 /*
226 * Unlink the given node from consumer queue. Called by cancelled
227 * (timeout, interrupt) waiters to avoid garbage retention in the
228 * absence of producers.
229 */
230 private void unlinkCancelledConsumer(Node node) {
231 // Use a form of double-check to avoid unnecessary locking and
232 // traversal. The first check outside lock might
233 // conservatively report true.
234 if (waitingConsumers.shouldUnlink(node)) {
235 qlock.lock();
236 try {
237 if (waitingConsumers.shouldUnlink(node))
238 waitingConsumers.unlink(node);
239 } finally {
240 qlock.unlock();
241 }
242 }
243 }
244
245 /*
246 * Unlink the given node from producer queue. Symmetric
247 * to unlinkCancelledConsumer.
248 */
249 private void unlinkCancelledProducer(Node node) {
250 if (waitingProducers.shouldUnlink(node)) {
251 qlock.lock();
252 try {
253 if (waitingProducers.shouldUnlink(node))
254 waitingProducers.unlink(node);
255 } finally {
256 qlock.unlock();
257 }
258 }
259 }
260
261 /**
262 * Nodes each maintain an item and handle waits and signals for
263 * getting and setting it. The class extends
264 * AbstractQueuedSynchronizer to manage blocking, using AQS state
265 * 0 for waiting, 1 for ack, -1 for cancelled.
266 */
267 static final class Node extends AbstractQueuedSynchronizer {
268 /** Synchronization state value representing that node acked */
269 private static final int ACK = 1;
270 /** Synchronization state value representing that node cancelled */
271 private static final int CANCEL = -1;
272
273 /** The item being transferred */
274 Object item;
275 /** Next node in wait queue */
276 Node next;
277
278 /** Creates a node with initial item */
279 Node(Object x) { item = x; }
280
281 /** Creates a node with initial item and next */
282 Node(Object x, Node n) { item = x; next = n; }
283
284 /**
285 * Implements AQS base acquire to succeed if not in WAITING state
286 */
287 protected boolean tryAcquire(int ignore) {
288 return getState() != 0;
289 }
290
291 /**
292 * Implements AQS base release to signal if state changed
293 */
294 protected boolean tryRelease(int newState) {
295 return compareAndSetState(0, newState);
296 }
297
298 /**
299 * Takes item and nulls out field (for sake of GC)
300 */
301 private Object extract() {
302 Object x = item;
303 item = null;
304 return x;
305 }
306
307 /**
308 * Tries to cancel on interrupt; if so rethrowing,
309 * else setting interrupt state
310 */
311 private void checkCancellationOnInterrupt(InterruptedException ie)
312 throws InterruptedException {
313 if (release(CANCEL))
314 throw ie;
315 Thread.currentThread().interrupt();
316 }
317
318 /**
319 * Fills in the slot created by the consumer and signal consumer to
320 * continue.
321 */
322 boolean setItem(Object x) {
323 item = x; // can place in slot even if cancelled
324 return release(ACK);
325 }
326
327 /**
328 * Removes item from slot created by producer and signal producer
329 * to continue.
330 */
331 Object getItem() {
332 return (release(ACK))? extract() : null;
333 }
334
335 /**
336 * Waits for a consumer to take item placed by producer.
337 */
338 void waitForTake() throws InterruptedException {
339 try {
340 acquireInterruptibly(0);
341 } catch (InterruptedException ie) {
342 checkCancellationOnInterrupt(ie);
343 }
344 }
345
346 /**
347 * Waits for a producer to put item placed by consumer.
348 */
349 Object waitForPut() throws InterruptedException {
350 try {
351 acquireInterruptibly(0);
352 } catch (InterruptedException ie) {
353 checkCancellationOnInterrupt(ie);
354 }
355 return extract();
356 }
357
358 /**
359 * Waits for a consumer to take item placed by producer or time out.
360 */
361 boolean waitForTake(long nanos) throws InterruptedException {
362 try {
363 if (!tryAcquireNanos(0, nanos) &&
364 release(CANCEL))
365 return false;
366 } catch (InterruptedException ie) {
367 checkCancellationOnInterrupt(ie);
368 }
369 return true;
370 }
371
372 /**
373 * Waits for a producer to put item placed by consumer, or time out.
374 */
375 Object waitForPut(long nanos) throws InterruptedException {
376 try {
377 if (!tryAcquireNanos(0, nanos) &&
378 release(CANCEL))
379 return null;
380 } catch (InterruptedException ie) {
381 checkCancellationOnInterrupt(ie);
382 }
383 return extract();
384 }
385 }
386
387 /**
388 * Adds the specified element to this queue, waiting if necessary for
389 * another thread to receive it.
390 * @param o the element to add
391 * @throws InterruptedException if interrupted while waiting.
392 * @throws NullPointerException if the specified element is <tt>null</tt>.
393 */
394 public void put(E o) throws InterruptedException {
395 if (o == null) throw new NullPointerException();
396 final ReentrantLock qlock = this.qlock;
397
398 for (;;) {
399 Node node;
400 boolean mustWait;
401 if (Thread.interrupted()) throw new InterruptedException();
402 qlock.lock();
403 try {
404 node = waitingConsumers.deq();
405 if ( (mustWait = (node == null)) )
406 node = waitingProducers.enq(o);
407 } finally {
408 qlock.unlock();
409 }
410
411 if (mustWait) {
412 try {
413 node.waitForTake();
414 return;
415 } catch (InterruptedException ex) {
416 unlinkCancelledProducer(node);
417 throw ex;
418 }
419 }
420
421 else if (node.setItem(o))
422 return;
423
424 // else consumer cancelled, so retry
425 }
426 }
427
428 /**
429 * Inserts the specified element into this queue, waiting if necessary
430 * up to the specified wait time for another thread to receive it.
431 * @param o the element to add
432 * @param timeout how long to wait before giving up, in units of
433 * <tt>unit</tt>
434 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
435 * <tt>timeout</tt> parameter
436 * @return <tt>true</tt> if successful, or <tt>false</tt> if
437 * the specified waiting time elapses before a consumer appears.
438 * @throws InterruptedException if interrupted while waiting.
439 * @throws NullPointerException if the specified element is <tt>null</tt>.
440 */
441 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
442 if (o == null) throw new NullPointerException();
443 long nanos = unit.toNanos(timeout);
444 final ReentrantLock qlock = this.qlock;
445 for (;;) {
446 Node node;
447 boolean mustWait;
448 if (Thread.interrupted()) throw new InterruptedException();
449 qlock.lock();
450 try {
451 node = waitingConsumers.deq();
452 if ( (mustWait = (node == null)) )
453 node = waitingProducers.enq(o);
454 } finally {
455 qlock.unlock();
456 }
457
458 if (mustWait) {
459 try {
460 boolean x = node.waitForTake(nanos);
461 if (!x)
462 unlinkCancelledProducer(node);
463 return x;
464 } catch (InterruptedException ex) {
465 unlinkCancelledProducer(node);
466 throw ex;
467 }
468 }
469
470 else if (node.setItem(o))
471 return true;
472
473 // else consumer cancelled, so retry
474 }
475 }
476
477 /**
478 * Retrieves and removes the head of this queue, waiting if necessary
479 * for another thread to insert it.
480 * @throws InterruptedException if interrupted while waiting.
481 * @return the head of this queue
482 */
483 public E take() throws InterruptedException {
484 final ReentrantLock qlock = this.qlock;
485 for (;;) {
486 Node node;
487 boolean mustWait;
488
489 if (Thread.interrupted()) throw new InterruptedException();
490 qlock.lock();
491 try {
492 node = waitingProducers.deq();
493 if ( (mustWait = (node == null)) )
494 node = waitingConsumers.enq(null);
495 } finally {
496 qlock.unlock();
497 }
498
499 if (mustWait) {
500 try {
501 Object x = node.waitForPut();
502 return (E)x;
503 } catch (InterruptedException ex) {
504 unlinkCancelledConsumer(node);
505 throw ex;
506 }
507 }
508 else {
509 Object x = node.getItem();
510 if (x != null)
511 return (E)x;
512 // else cancelled, so retry
513 }
514 }
515 }
516
517 /**
518 * Retrieves and removes the head of this queue, waiting
519 * if necessary up to the specified wait time, for another thread
520 * to insert it.
521 * @param timeout how long to wait before giving up, in units of
522 * <tt>unit</tt>
523 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
524 * <tt>timeout</tt> parameter
525 * @return the head of this queue, or <tt>null</tt> if the
526 * specified waiting time elapses before an element is present.
527 * @throws InterruptedException if interrupted while waiting.
528 */
529 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
530 long nanos = unit.toNanos(timeout);
531 final ReentrantLock qlock = this.qlock;
532
533 for (;;) {
534 Node node;
535 boolean mustWait;
536
537 if (Thread.interrupted()) throw new InterruptedException();
538 qlock.lock();
539 try {
540 node = waitingProducers.deq();
541 if ( (mustWait = (node == null)) )
542 node = waitingConsumers.enq(null);
543 } finally {
544 qlock.unlock();
545 }
546
547 if (mustWait) {
548 try {
549 Object x = node.waitForPut(nanos);
550 if (x == null)
551 unlinkCancelledConsumer(node);
552 return (E)x;
553 } catch (InterruptedException ex) {
554 unlinkCancelledConsumer(node);
555 throw ex;
556 }
557 }
558 else {
559 Object x = node.getItem();
560 if (x != null)
561 return (E)x;
562 // else cancelled, so retry
563 }
564 }
565 }
566
567 // Untimed nonblocking versions
568
569 /**
570 * Inserts the specified element into this queue, if another thread is
571 * waiting to receive it.
572 *
573 * @param o the element to add.
574 * @return <tt>true</tt> if it was possible to add the element to
575 * this queue, else <tt>false</tt>
576 * @throws NullPointerException if the specified element is <tt>null</tt>
577 */
578 public boolean offer(E o) {
579 if (o == null) throw new NullPointerException();
580 final ReentrantLock qlock = this.qlock;
581
582 for (;;) {
583 Node node;
584 qlock.lock();
585 try {
586 node = waitingConsumers.deq();
587 } finally {
588 qlock.unlock();
589 }
590 if (node == null)
591 return false;
592
593 else if (node.setItem(o))
594 return true;
595 // else retry
596 }
597 }
598
599 /**
600 * Retrieves and removes the head of this queue, if another thread
601 * is currently making an element available.
602 *
603 * @return the head of this queue, or <tt>null</tt> if no
604 * element is available.
605 */
606 public E poll() {
607 final ReentrantLock qlock = this.qlock;
608 for (;;) {
609 Node node;
610 qlock.lock();
611 try {
612 node = waitingProducers.deq();
613 } finally {
614 qlock.unlock();
615 }
616 if (node == null)
617 return null;
618
619 else {
620 Object x = node.getItem();
621 if (x != null)
622 return (E)x;
623 // else retry
624 }
625 }
626 }
627
628 /**
629 * Always returns <tt>true</tt>.
630 * A <tt>SynchronousQueue</tt> has no internal capacity.
631 * @return <tt>true</tt>
632 */
633 public boolean isEmpty() {
634 return true;
635 }
636
637 /**
638 * Always returns zero.
639 * A <tt>SynchronousQueue</tt> has no internal capacity.
640 * @return zero.
641 */
642 public int size() {
643 return 0;
644 }
645
646 /**
647 * Always returns zero.
648 * A <tt>SynchronousQueue</tt> has no internal capacity.
649 * @return zero.
650 */
651 public int remainingCapacity() {
652 return 0;
653 }
654
655 /**
656 * Does nothing.
657 * A <tt>SynchronousQueue</tt> has no internal capacity.
658 */
659 public void clear() {}
660
661 /**
662 * Always returns <tt>false</tt>.
663 * A <tt>SynchronousQueue</tt> has no internal capacity.
664 * @param o the element
665 * @return <tt>false</tt>
666 */
667 public boolean contains(Object o) {
668 return false;
669 }
670
671 /**
672 * Always returns <tt>false</tt>.
673 * A <tt>SynchronousQueue</tt> has no internal capacity.
674 *
675 * @param o the element to remove
676 * @return <tt>false</tt>
677 */
678 public boolean remove(Object o) {
679 return false;
680 }
681
682 /**
683 * Returns <tt>false</tt> unless given collection is empty.
684 * A <tt>SynchronousQueue</tt> has no internal capacity.
685 * @param c the collection
686 * @return <tt>false</tt> unless given collection is empty
687 */
688 public boolean containsAll(Collection<?> c) {
689 return c.isEmpty();
690 }
691
692 /**
693 * Always returns <tt>false</tt>.
694 * A <tt>SynchronousQueue</tt> has no internal capacity.
695 * @param c the collection
696 * @return <tt>false</tt>
697 */
698 public boolean removeAll(Collection<?> c) {
699 return false;
700 }
701
702 /**
703 * Always returns <tt>false</tt>.
704 * A <tt>SynchronousQueue</tt> has no internal capacity.
705 * @param c the collection
706 * @return <tt>false</tt>
707 */
708 public boolean retainAll(Collection<?> c) {
709 return false;
710 }
711
712 /**
713 * Always returns <tt>null</tt>.
714 * A <tt>SynchronousQueue</tt> does not return elements
715 * unless actively waited on.
716 * @return <tt>null</tt>
717 */
718 public E peek() {
719 return null;
720 }
721
722
723 static class EmptyIterator<E> implements Iterator<E> {
724 public boolean hasNext() {
725 return false;
726 }
727 public E next() {
728 throw new NoSuchElementException();
729 }
730 public void remove() {
731 throw new IllegalStateException();
732 }
733 }
734
735 /**
736 * Returns an empty iterator in which <tt>hasNext</tt> always returns
737 * <tt>false</tt>.
738 *
739 * @return an empty iterator
740 */
741 public Iterator<E> iterator() {
742 return new EmptyIterator<E>();
743 }
744
745
746 /**
747 * Returns a zero-length array.
748 * @return a zero-length array
749 */
750 public Object[] toArray() {
751 return new Object[0];
752 }
753
754 /**
755 * Sets the zeroeth element of the specified array to <tt>null</tt>
756 * (if the array has non-zero length) and returns it.
757 * @param a the array
758 * @return the specified array
759 */
760 public <T> T[] toArray(T[] a) {
761 if (a.length > 0)
762 a[0] = null;
763 return a;
764 }
765
766
767 public int drainTo(Collection<? super E> c) {
768 if (c == null)
769 throw new NullPointerException();
770 if (c == this)
771 throw new IllegalArgumentException();
772 int n = 0;
773 E e;
774 while ( (e = poll()) != null) {
775 c.add(e);
776 ++n;
777 }
778 return n;
779 }
780
781 public int drainTo(Collection<? super E> c, int maxElements) {
782 if (c == null)
783 throw new NullPointerException();
784 if (c == this)
785 throw new IllegalArgumentException();
786 int n = 0;
787 E e;
788 while (n < maxElements && (e = poll()) != null) {
789 c.add(e);
790 ++n;
791 }
792 return n;
793 }
794 }
795
796
797
798
799