ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.44
Committed: Mon Feb 9 13:28:48 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.43: +29 -30 lines
Log Message:
Wording fixes and improvements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity, not even a
15 * capacity of one. You cannot <tt>peek</tt> at a synchronous queue
16 * because an element is only present when you try to take it; you
17 * cannot add an element (using any method) unless another thread is
18 * trying to remove it; you cannot iterate as there is nothing to
19 * iterate. The <em>head</em> of the queue is the element that the
20 * first queued thread is trying to add to the queue; if there are no
21 * queued threads then no element is being added and the head is
22 * <tt>null</tt>. For purposes of other <tt>Collection</tt> methods
23 * (for example <tt>contains</tt>), a <tt>SynchronousQueue</tt> acts
24 * as an empty collection. This queue does not permit <tt>null</tt>
25 * elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must sync up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 *
33 * <p> This class supports an optional fairness policy for ordering
34 * waiting producer and consumer threads. By default, this ordering
35 * is not guaranteed. However, a queue constructed with fairness set
36 * to <tt>true</tt> grants threads access in FIFO order. Fairness
37 * generally decreases throughput but reduces variability and avoids
38 * starvation.
39 *
40 * <p>This class implements all of the <em>optional</em> methods
41 * of the {@link Collection} and {@link Iterator} interfaces.
42 *
43 * <p>This class is a member of the
44 * <a href="{@docRoot}/../guide/collections/index.html">
45 * Java Collections Framework</a>.
46 *
47 * @since 1.5
48 * @author Doug Lea
49 * @param <E> the type of elements held in this collection
50 */
51 public class SynchronousQueue<E> extends AbstractQueue<E>
52 implements BlockingQueue<E>, java.io.Serializable {
53 private static final long serialVersionUID = -3223113410248163686L;
54
55 /*
56 This implementation divides actions into two cases for puts:
57
58 * An arriving producer that does not already have a waiting consumer
59 creates a node holding item, and then waits for a consumer to take it.
60 * An arriving producer that does already have a waiting consumer fills
61 the slot node created by the consumer, and notifies it to continue.
62
63 And symmetrically, two for takes:
64
65 * An arriving consumer that does not already have a waiting producer
66 creates an empty slot node, and then waits for a producer to fill it.
67 * An arriving consumer that does already have a waiting producer takes
68 item from the node created by the producer, and notifies it to continue.
69
70 When a put or take waiting for the actions of its counterpart
71 aborts due to interruption or timeout, it marks the node
72 it created as "CANCELLED", which causes its counterpart to retry
73 the entire put or take sequence.
74
75 This requires keeping two simple queues, waitingProducers and
76 waitingConsumers. Each of these can be FIFO (preserves fairness)
77 or LIFO (improves throughput).
78 */
79
80 /** Lock protecting both wait queues */
81 private final ReentrantLock qlock;
82 /** Queue holding waiting puts */
83 private final WaitQueue waitingProducers;
84 /** Queue holding waiting takes */
85 private final WaitQueue waitingConsumers;
86
87 /**
88 * Creates a <tt>SynchronousQueue</tt> with nonfair access policy.
89 */
90 public SynchronousQueue() {
91 this(false);
92 }
93
94 /**
95 * Creates a <tt>SynchronousQueue</tt> with specified fairness policy.
96 * @param fair if true, threads contend in FIFO order for access;
97 * otherwise the order is unspecified.
98 */
99 public SynchronousQueue(boolean fair) {
100 if (fair) {
101 qlock = new ReentrantLock(true);
102 waitingProducers = new FifoWaitQueue();
103 waitingConsumers = new FifoWaitQueue();
104 }
105 else {
106 qlock = new ReentrantLock();
107 waitingProducers = new LifoWaitQueue();
108 waitingConsumers = new LifoWaitQueue();
109 }
110 }
111
112 /**
113 * Queue to hold waiting puts/takes; specialized to FiFo/Lifo below.
114 * These queues have all transient fields, but are serializable
115 * in order to recover fairness settings when deserialized.
116 */
117 static abstract class WaitQueue implements java.io.Serializable {
118 /** Create, add, and return node for x */
119 abstract Node enq(Object x);
120 /** Remove and return node, or null if empty */
121 abstract Node deq();
122 }
123
124 /**
125 * FIFO queue to hold waiting puts/takes.
126 */
127 static final class FifoWaitQueue extends WaitQueue implements java.io.Serializable {
128 private static final long serialVersionUID = -3623113410248163686L;
129 private transient Node head;
130 private transient Node last;
131
132 Node enq(Object x) {
133 Node p = new Node(x);
134 if (last == null)
135 last = head = p;
136 else
137 last = last.next = p;
138 return p;
139 }
140
141 Node deq() {
142 Node p = head;
143 if (p != null) {
144 if ((head = p.next) == null)
145 last = null;
146 p.next = null;
147 }
148 return p;
149 }
150 }
151
152 /**
153 * LIFO queue to hold waiting puts/takes.
154 */
155 static final class LifoWaitQueue extends WaitQueue implements java.io.Serializable {
156 private static final long serialVersionUID = -3633113410248163686L;
157 private transient Node head;
158
159 Node enq(Object x) {
160 return head = new Node(x, head);
161 }
162
163 Node deq() {
164 Node p = head;
165 if (p != null) {
166 head = p.next;
167 p.next = null;
168 }
169 return p;
170 }
171 }
172
173 /**
174 * Nodes each maintain an item and handle waits and signals for
175 * getting and setting it. The class extends
176 * AbstractQueuedSynchronizer to manage blocking, using AQS state
177 * 0 for waiting, 1 for ack, -1 for cancelled.
178 */
179 static final class Node extends AbstractQueuedSynchronizer {
180 /** Synchronization state value representing that node acked */
181 private static final int ACK = 1;
182 /** Synchronization state value representing that node cancelled */
183 private static final int CANCEL = -1;
184
185 /** The item being transferred */
186 Object item;
187 /** Next node in wait queue */
188 Node next;
189
190 /** Creates a node with initial item */
191 Node(Object x) { item = x; }
192
193 /** Creates a node with initial item and next */
194 Node(Object x, Node n) { item = x; next = n; }
195
196 /**
197 * Implements AQS base acquire to succeed if not in WAITING state
198 */
199 protected boolean tryAcquire(int ignore) {
200 return getState() != 0;
201 }
202
203 /**
204 * Implements AQS base release to signal if state changed
205 */
206 protected boolean tryRelease(int newState) {
207 return compareAndSetState(0, newState);
208 }
209
210 /**
211 * Takes item and nulls out field (for sake of GC)
212 */
213 private Object extract() {
214 Object x = item;
215 item = null;
216 return x;
217 }
218
219 /**
220 * Tries to cancel on interrupt; if so rethrowing,
221 * else setting interrupt state
222 */
223 private void checkCancellationOnInterrupt(InterruptedException ie)
224 throws InterruptedException {
225 if (release(CANCEL))
226 throw ie;
227 Thread.currentThread().interrupt();
228 }
229
230 /**
231 * Fills in the slot created by the consumer and signal consumer to
232 * continue.
233 */
234 boolean setItem(Object x) {
235 item = x; // can place in slot even if cancelled
236 return release(ACK);
237 }
238
239 /**
240 * Removes item from slot created by producer and signal producer
241 * to continue.
242 */
243 Object getItem() {
244 return (release(ACK))? extract() : null;
245 }
246
247 /**
248 * Waits for a consumer to take item placed by producer.
249 */
250 void waitForTake() throws InterruptedException {
251 try {
252 acquireInterruptibly(0);
253 } catch (InterruptedException ie) {
254 checkCancellationOnInterrupt(ie);
255 }
256 }
257
258 /**
259 * Waits for a producer to put item placed by consumer.
260 */
261 Object waitForPut() throws InterruptedException {
262 try {
263 acquireInterruptibly(0);
264 } catch (InterruptedException ie) {
265 checkCancellationOnInterrupt(ie);
266 }
267 return extract();
268 }
269
270 /**
271 * Waits for a consumer to take item placed by producer or time out.
272 */
273 boolean waitForTake(long nanos) throws InterruptedException {
274 try {
275 if (!tryAcquireNanos(0, nanos) &&
276 release(CANCEL))
277 return false;
278 } catch (InterruptedException ie) {
279 checkCancellationOnInterrupt(ie);
280 }
281 return true;
282 }
283
284 /**
285 * Waits for a producer to put item placed by consumer, or time out.
286 */
287 Object waitForPut(long nanos) throws InterruptedException {
288 try {
289 if (!tryAcquireNanos(0, nanos) &&
290 release(CANCEL))
291 return null;
292 } catch (InterruptedException ie) {
293 checkCancellationOnInterrupt(ie);
294 }
295 return extract();
296 }
297 }
298
299 /**
300 * Adds the specified element to this queue, waiting if necessary for
301 * another thread to receive it.
302 * @param o the element to add
303 * @throws InterruptedException if interrupted while waiting.
304 * @throws NullPointerException if the specified element is <tt>null</tt>.
305 */
306 public void put(E o) throws InterruptedException {
307 if (o == null) throw new NullPointerException();
308 final ReentrantLock qlock = this.qlock;
309
310 for (;;) {
311 Node node;
312 boolean mustWait;
313 if (Thread.interrupted()) throw new InterruptedException();
314 qlock.lock();
315 try {
316 node = waitingConsumers.deq();
317 if ( (mustWait = (node == null)) )
318 node = waitingProducers.enq(o);
319 } finally {
320 qlock.unlock();
321 }
322
323 if (mustWait) {
324 node.waitForTake();
325 return;
326 }
327
328 else if (node.setItem(o))
329 return;
330
331 // else consumer cancelled, so retry
332 }
333 }
334
335 /**
336 * Inserts the specified element into this queue, waiting if necessary
337 * up to the specified wait time for another thread to receive it.
338 * @param o the element to add
339 * @param timeout how long to wait before giving up, in units of
340 * <tt>unit</tt>
341 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
342 * <tt>timeout</tt> parameter
343 * @return <tt>true</tt> if successful, or <tt>false</tt> if
344 * the specified waiting time elapses before a consumer appears.
345 * @throws InterruptedException if interrupted while waiting.
346 * @throws NullPointerException if the specified element is <tt>null</tt>.
347 */
348 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
349 if (o == null) throw new NullPointerException();
350 long nanos = unit.toNanos(timeout);
351 final ReentrantLock qlock = this.qlock;
352 for (;;) {
353 Node node;
354 boolean mustWait;
355 if (Thread.interrupted()) throw new InterruptedException();
356 qlock.lock();
357 try {
358 node = waitingConsumers.deq();
359 if ( (mustWait = (node == null)) )
360 node = waitingProducers.enq(o);
361 } finally {
362 qlock.unlock();
363 }
364
365 if (mustWait)
366 return node.waitForTake(nanos);
367
368 else if (node.setItem(o))
369 return true;
370
371 // else consumer cancelled, so retry
372 }
373 }
374
375 /**
376 * Retrieves and removes the head of this queue, waiting if necessary
377 * for another thread to insert it.
378 * @throws InterruptedException if interrupted while waiting.
379 * @return the head of this queue
380 */
381 public E take() throws InterruptedException {
382 final ReentrantLock qlock = this.qlock;
383 for (;;) {
384 Node node;
385 boolean mustWait;
386
387 if (Thread.interrupted()) throw new InterruptedException();
388 qlock.lock();
389 try {
390 node = waitingProducers.deq();
391 if ( (mustWait = (node == null)) )
392 node = waitingConsumers.enq(null);
393 } finally {
394 qlock.unlock();
395 }
396
397 if (mustWait) {
398 Object x = node.waitForPut();
399 return (E)x;
400 }
401 else {
402 Object x = node.getItem();
403 if (x != null)
404 return (E)x;
405 // else cancelled, so retry
406 }
407 }
408 }
409
410 /**
411 * Retrieves and removes the head of this queue, waiting
412 * if necessary up to the specified wait time, for another thread
413 * to insert it.
414 * @param timeout how long to wait before giving up, in units of
415 * <tt>unit</tt>
416 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
417 * <tt>timeout</tt> parameter
418 * @return the head of this queue, or <tt>null</tt> if the
419 * specified waiting time elapses before an element is present.
420 * @throws InterruptedException if interrupted while waiting.
421 */
422 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
423 long nanos = unit.toNanos(timeout);
424 final ReentrantLock qlock = this.qlock;
425
426 for (;;) {
427 Node node;
428 boolean mustWait;
429
430 if (Thread.interrupted()) throw new InterruptedException();
431 qlock.lock();
432 try {
433 node = waitingProducers.deq();
434 if ( (mustWait = (node == null)) )
435 node = waitingConsumers.enq(null);
436 } finally {
437 qlock.unlock();
438 }
439
440 if (mustWait) {
441 Object x = node.waitForPut(nanos);
442 return (E)x;
443 }
444 else {
445 Object x = node.getItem();
446 if (x != null)
447 return (E)x;
448 // else cancelled, so retry
449 }
450 }
451 }
452
453 // Untimed nonblocking versions
454
455 /**
456 * Inserts the specified element into this queue, if another thread is
457 * waiting to receive it.
458 *
459 * @param o the element to add.
460 * @return <tt>true</tt> if it was possible to add the element to
461 * this queue, else <tt>false</tt>
462 * @throws NullPointerException if the specified element is <tt>null</tt>
463 */
464 public boolean offer(E o) {
465 if (o == null) throw new NullPointerException();
466 final ReentrantLock qlock = this.qlock;
467
468 for (;;) {
469 Node node;
470 qlock.lock();
471 try {
472 node = waitingConsumers.deq();
473 } finally {
474 qlock.unlock();
475 }
476 if (node == null)
477 return false;
478
479 else if (node.setItem(o))
480 return true;
481 // else retry
482 }
483 }
484
485 /**
486 * Retrieves and removes the head of this queue, if another thread
487 * is currently making an element available.
488 *
489 * @return the head of this queue, or <tt>null</tt> if no
490 * element is available.
491 */
492 public E poll() {
493 final ReentrantLock qlock = this.qlock;
494 for (;;) {
495 Node node;
496 qlock.lock();
497 try {
498 node = waitingProducers.deq();
499 } finally {
500 qlock.unlock();
501 }
502 if (node == null)
503 return null;
504
505 else {
506 Object x = node.getItem();
507 if (x != null)
508 return (E)x;
509 // else retry
510 }
511 }
512 }
513
514 /**
515 * Always returns <tt>true</tt>.
516 * A <tt>SynchronousQueue</tt> has no internal capacity.
517 * @return <tt>true</tt>
518 */
519 public boolean isEmpty() {
520 return true;
521 }
522
523 /**
524 * Always returns zero.
525 * A <tt>SynchronousQueue</tt> has no internal capacity.
526 * @return zero.
527 */
528 public int size() {
529 return 0;
530 }
531
532 /**
533 * Always returns zero.
534 * A <tt>SynchronousQueue</tt> has no internal capacity.
535 * @return zero.
536 */
537 public int remainingCapacity() {
538 return 0;
539 }
540
541 /**
542 * Does nothing.
543 * A <tt>SynchronousQueue</tt> has no internal capacity.
544 */
545 public void clear() {}
546
547 /**
548 * Always returns <tt>false</tt>.
549 * A <tt>SynchronousQueue</tt> has no internal capacity.
550 * @param o the element
551 * @return <tt>false</tt>
552 */
553 public boolean contains(Object o) {
554 return false;
555 }
556
557 /**
558 * Always returns <tt>false</tt>.
559 * A <tt>SynchronousQueue</tt> has no internal capacity.
560 *
561 * @param o the element to remove
562 * @return <tt>false</tt>
563 */
564 public boolean remove(Object o) {
565 return false;
566 }
567
568 /**
569 * Returns <tt>false</tt> unless given collection is empty.
570 * A <tt>SynchronousQueue</tt> has no internal capacity.
571 * @param c the collection
572 * @return <tt>false</tt> unless given collection is empty
573 */
574 public boolean containsAll(Collection<?> c) {
575 return c.isEmpty();
576 }
577
578 /**
579 * Always returns <tt>false</tt>.
580 * A <tt>SynchronousQueue</tt> has no internal capacity.
581 * @param c the collection
582 * @return <tt>false</tt>
583 */
584 public boolean removeAll(Collection<?> c) {
585 return false;
586 }
587
588 /**
589 * Always returns <tt>false</tt>.
590 * A <tt>SynchronousQueue</tt> has no internal capacity.
591 * @param c the collection
592 * @return <tt>false</tt>
593 */
594 public boolean retainAll(Collection<?> c) {
595 return false;
596 }
597
598 /**
599 * Always returns <tt>null</tt>.
600 * A <tt>SynchronousQueue</tt> does not return elements
601 * unless actively waited on.
602 * @return <tt>null</tt>
603 */
604 public E peek() {
605 return null;
606 }
607
608
609 static class EmptyIterator<E> implements Iterator<E> {
610 public boolean hasNext() {
611 return false;
612 }
613 public E next() {
614 throw new NoSuchElementException();
615 }
616 public void remove() {
617 throw new IllegalStateException();
618 }
619 }
620
621 /**
622 * Returns an empty iterator in which <tt>hasNext</tt> always returns
623 * <tt>false</tt>.
624 *
625 * @return an empty iterator
626 */
627 public Iterator<E> iterator() {
628 return new EmptyIterator<E>();
629 }
630
631
632 /**
633 * Returns a zero-length array.
634 * @return a zero-length array
635 */
636 public Object[] toArray() {
637 return new Object[0];
638 }
639
640 /**
641 * Sets the zeroeth element of the specified array to <tt>null</tt>
642 * (if the array has non-zero length) and returns it.
643 * @param a the array
644 * @return the specified array
645 */
646 public <T> T[] toArray(T[] a) {
647 if (a.length > 0)
648 a[0] = null;
649 return a;
650 }
651
652
653 public int drainTo(Collection<? super E> c) {
654 if (c == null)
655 throw new NullPointerException();
656 if (c == this)
657 throw new IllegalArgumentException();
658 int n = 0;
659 E e;
660 while ( (e = poll()) != null) {
661 c.add(e);
662 ++n;
663 }
664 return n;
665 }
666
667 public int drainTo(Collection<? super E> c, int maxElements) {
668 if (c == null)
669 throw new NullPointerException();
670 if (c == this)
671 throw new IllegalArgumentException();
672 int n = 0;
673 E e;
674 while (n < maxElements && (e = poll()) != null) {
675 c.add(e);
676 ++n;
677 }
678 return n;
679 }
680 }
681
682
683
684
685