ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.31
Committed: Tue Dec 30 23:55:43 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.30: +112 -111 lines
Log Message:
More responsive cancellation

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity - in
15 * particular it does not have a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to take it; you cannot add an element (using
18 * any method) unless another thread is trying to remove it; you
19 * cannot iterate as there is nothing to iterate. The <em>head</em>
20 * of the queue is the element that the first queued thread is trying
21 * to add to the queue; if there are no queued threads then no element
22 * is being added and the head is <tt>null</tt>. For purposes of
23 * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 * queue does not permit <tt>null</tt> elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must sync up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 * <p>This class implements all of the <em>optional</em> methods
33 * of the {@link Collection} and {@link Iterator} interfaces.
34 * @since 1.5
35 * @author Doug Lea
36 * @param <E> the type of elements held in this collection
37 */
38 public class SynchronousQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = -3223113410248163686L;
41
42 /*
43 This implementation divides actions into two cases for puts:
44
45 * An arriving putter that does not already have a waiting taker
46 creates a node holding item, and then waits for a taker to take it.
47 * An arriving putter that does already have a waiting taker fills
48 the slot node created by the taker, and notifies it to continue.
49
50 And symmetrically, two for takes:
51
52 * An arriving taker that does not already have a waiting putter
53 creates an empty slot node, and then waits for a putter to fill it.
54 * An arriving taker that does already have a waiting putter takes
55 item from the node created by the putter, and notifies it to continue.
56
57 This requires keeping two simple queues: waitingPuts and waitingTakes.
58
59 When a put or take waiting for the actions of its counterpart
60 aborts due to interruption or timeout, it marks the node
61 it created as "CANCELLED", which causes its counterpart to retry
62 the entire put or take sequence.
63 */
64
65
66 /*
67 * Note that all fields are transient final, so there is
68 * no explicit serialization code.
69 */
70
71 private transient final WaitQueue waitingPuts = new WaitQueue();
72 private transient final WaitQueue waitingTakes = new WaitQueue();
73 private transient final ReentrantLock qlock = new ReentrantLock();
74
75 /**
76 * Nodes each maintain an item and handle waits and signals for
77 * getting and setting it. The class extends
78 * AbstractQueuedSynchronizer to manage blocking, using AQS state
79 * 0 for waiting, 1 for ack, -1 for cancelled.
80 */
81 private static final class Node extends AbstractQueuedSynchronizer {
82 /** The item being transferred */
83 Object item;
84 /** Next node in wait queue */
85 Node next;
86 Node(Object x) { item = x; }
87
88 private static final int WAITING = 0;
89 private static final int ACKED = 1;
90 private static final int CANCELLED = -1;
91
92 /**
93 * Implements AQS base acquire to succeed if not in WAITING state
94 */
95 public int acquireExclusiveState(boolean b, int ignore) {
96 return get() == WAITING ? -1 : 0;
97 }
98
99 /**
100 * Implements AQS base release to always signal.
101 * Status is changed in ack or cancel methods before calling,
102 * which is needed to ensure we win cancel race.
103 */
104 public boolean releaseExclusiveState(int ignore) {
105 return true;
106 }
107
108 /**
109 * Try to acknowledge; fail if not waiting
110 */
111 private boolean ack() {
112 if (!compareAndSet(WAITING, ACKED))
113 return false;
114 releaseExclusive(0);
115 return true;
116 }
117
118 /**
119 * Try to cancel; fail if not waiting
120 */
121 private boolean cancel() {
122 if (!compareAndSet(WAITING, CANCELLED))
123 return false;
124 releaseExclusive(0);
125 return true;
126 }
127
128 /**
129 * Take item and null out fields (for sake of GC)
130 */
131 private Object extract() {
132 Object x = item;
133 item = null;
134 next = null;
135 return x;
136 }
137
138 /**
139 * Fill in the slot created by the taker and signal taker to
140 * continue.
141 */
142 boolean setItem(Object x) {
143 item = x;
144 return ack();
145 }
146
147 /**
148 * Remove item from slot created by putter and signal putter
149 * to continue.
150 */
151 Object getItem() {
152 if (!ack())
153 return null;
154 return extract();
155 }
156
157 /**
158 * Wait for a taker to take item placed by putter.
159 */
160 boolean waitForTake() throws InterruptedException {
161 try {
162 acquireExclusiveInterruptibly(0);
163 return true;
164 } catch (InterruptedException ie) {
165 if (cancel())
166 throw ie;
167 Thread.currentThread().interrupt();
168 return true;
169 }
170 }
171
172 /**
173 * Wait for a taker to take item placed by putter, or time out.
174 */
175 boolean waitForTake(long nanos) throws InterruptedException {
176 try {
177 return acquireExclusiveTimed(0, nanos) || !cancel();
178 } catch (InterruptedException ie) {
179 if (cancel())
180 throw ie;
181 Thread.currentThread().interrupt();
182 return true;
183 }
184 }
185
186 /**
187 * Wait for a putter to put item placed by taker.
188 */
189 Object waitForPut() throws InterruptedException {
190 try {
191 acquireExclusiveInterruptibly(0);
192 return extract();
193 } catch (InterruptedException ie) {
194 if (cancel())
195 throw ie;
196 Thread.currentThread().interrupt();
197 return extract();
198 }
199 }
200
201 /**
202 * Wait for a putter to put item placed by taker, or time out.
203 */
204 Object waitForPut(long nanos) throws InterruptedException {
205 try {
206 if (acquireExclusiveTimed(0, nanos) || !cancel())
207 return extract();
208 return null;
209 } catch (InterruptedException ie) {
210 if (cancel())
211 throw ie;
212 Thread.currentThread().interrupt();
213 return extract();
214 }
215 }
216 }
217
218 /**
219 * Simple FIFO queue class to hold waiting puts/takes.
220 **/
221 private static class WaitQueue<E> {
222 Node head;
223 Node last;
224
225 Node enq(Object x) {
226 Node p = new Node(x);
227 if (last == null)
228 last = head = p;
229 else
230 last = last.next = p;
231 return p;
232 }
233
234 Node deq() {
235 Node p = head;
236 if (p != null && (head = p.next) == null)
237 last = null;
238 return p;
239 }
240 }
241
242 /**
243 * Main put algorithm, used by put, timed offer
244 */
245 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
246 if (x == null) throw new NullPointerException();
247 for (;;) {
248 Node node;
249 boolean mustWait;
250 final ReentrantLock qlock = this.qlock;
251 qlock.lockInterruptibly();
252 try {
253 node = waitingTakes.deq();
254 if ( (mustWait = (node == null)) )
255 node = waitingPuts.enq(x);
256 } finally {
257 qlock.unlock();
258 }
259
260 if (mustWait)
261 return timed? node.waitForTake(nanos) : node.waitForTake();
262
263 else if (node.setItem(x))
264 return true;
265
266 // else taker cancelled, so retry
267 }
268 }
269
270 /**
271 * Main take algorithm, used by take, timed poll
272 */
273 private E doTake(boolean timed, long nanos) throws InterruptedException {
274 for (;;) {
275 Node node;
276 boolean mustWait;
277
278 final ReentrantLock qlock = this.qlock;
279 qlock.lockInterruptibly();
280 try {
281 node = waitingPuts.deq();
282 if ( (mustWait = (node == null)) )
283 node = waitingTakes.enq(null);
284 } finally {
285 qlock.unlock();
286 }
287
288 if (mustWait) {
289 Object x = timed? node.waitForPut(nanos) : node.waitForPut();
290 return (E)x;
291 }
292 else {
293 Object x = node.getItem();
294 if (x != null)
295 return (E)x;
296 // else cancelled, so retry
297 }
298 }
299 }
300
301 /**
302 * Creates a <tt>SynchronousQueue</tt>.
303 */
304 public SynchronousQueue() {}
305
306
307 /**
308 * Adds the specified element to this queue, waiting if necessary for
309 * another thread to receive it.
310 * @param o the element to add
311 * @throws InterruptedException if interrupted while waiting.
312 * @throws NullPointerException if the specified element is <tt>null</tt>.
313 */
314 public void put(E o) throws InterruptedException {
315 doPut(o, false, 0);
316 }
317
318 /**
319 * Inserts the specified element into this queue, waiting if necessary
320 * up to the specified wait time for another thread to receive it.
321 * @param o the element to add
322 * @param timeout how long to wait before giving up, in units of
323 * <tt>unit</tt>
324 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
325 * <tt>timeout</tt> parameter
326 * @return <tt>true</tt> if successful, or <tt>false</tt> if
327 * the specified waiting time elapses before a taker appears.
328 * @throws InterruptedException if interrupted while waiting.
329 * @throws NullPointerException if the specified element is <tt>null</tt>.
330 */
331 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
332 return doPut(o, true, unit.toNanos(timeout));
333 }
334
335
336 /**
337 * Retrieves and removes the head of this queue, waiting if necessary
338 * for another thread to insert it.
339 * @return the head of this queue
340 */
341 public E take() throws InterruptedException {
342 return doTake(false, 0);
343 }
344
345 /**
346 * Retrieves and removes the head of this queue, waiting
347 * if necessary up to the specified wait time, for another thread
348 * to insert it.
349 * @param timeout how long to wait before giving up, in units of
350 * <tt>unit</tt>
351 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
352 * <tt>timeout</tt> parameter
353 * @return the head of this queue, or <tt>null</tt> if the
354 * specified waiting time elapses before an element is present.
355 * @throws InterruptedException if interrupted while waiting.
356 */
357 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
358 return doTake(true, unit.toNanos(timeout));
359 }
360
361 // Untimed nonblocking versions
362
363 /**
364 * Inserts the specified element into this queue, if another thread is
365 * waiting to receive it.
366 *
367 * @param o the element to add.
368 * @return <tt>true</tt> if it was possible to add the element to
369 * this queue, else <tt>false</tt>
370 * @throws NullPointerException if the specified element is <tt>null</tt>
371 */
372 public boolean offer(E o) {
373 if (o == null) throw new NullPointerException();
374 final ReentrantLock qlock = this.qlock;
375
376 for (;;) {
377 Node node;
378 qlock.lock();
379 try {
380 node = waitingTakes.deq();
381 } finally {
382 qlock.unlock();
383 }
384 if (node == null)
385 return false;
386
387 else if (node.setItem(o))
388 return true;
389 // else retry
390 }
391 }
392
393 /**
394 * Retrieves and removes the head of this queue, if another thread
395 * is currently making an element available.
396 *
397 * @return the head of this queue, or <tt>null</tt> if no
398 * element is available.
399 */
400 public E poll() {
401 final ReentrantLock qlock = this.qlock;
402 for (;;) {
403 Node node;
404 qlock.lock();
405 try {
406 node = waitingPuts.deq();
407 } finally {
408 qlock.unlock();
409 }
410 if (node == null)
411 return null;
412
413 else {
414 Object x = node.getItem();
415 if (x != null)
416 return (E)x;
417 // else retry
418 }
419 }
420 }
421
422 /**
423 * Always returns <tt>true</tt>.
424 * A <tt>SynchronousQueue</tt> has no internal capacity.
425 * @return <tt>true</tt>
426 */
427 public boolean isEmpty() {
428 return true;
429 }
430
431 /**
432 * Always returns zero.
433 * A <tt>SynchronousQueue</tt> has no internal capacity.
434 * @return zero.
435 */
436 public int size() {
437 return 0;
438 }
439
440 /**
441 * Always returns zero.
442 * A <tt>SynchronousQueue</tt> has no internal capacity.
443 * @return zero.
444 */
445 public int remainingCapacity() {
446 return 0;
447 }
448
449 /**
450 * Does nothing.
451 * A <tt>SynchronousQueue</tt> has no internal capacity.
452 */
453 public void clear() {}
454
455 /**
456 * Always returns <tt>false</tt>.
457 * A <tt>SynchronousQueue</tt> has no internal capacity.
458 * @param o the element
459 * @return <tt>false</tt>
460 */
461 public boolean contains(Object o) {
462 return false;
463 }
464
465 /**
466 * Always returns <tt>false</tt>.
467 * A <tt>SynchronousQueue</tt> has no internal capacity.
468 *
469 * @param o the element to remove
470 * @return <tt>false</tt>
471 */
472 public boolean remove(Object o) {
473 return false;
474 }
475
476 /**
477 * Returns <tt>false</tt> unless given collection is empty.
478 * A <tt>SynchronousQueue</tt> has no internal capacity.
479 * @param c the collection
480 * @return <tt>false</tt> unless given collection is empty
481 */
482 public boolean containsAll(Collection<?> c) {
483 return c.isEmpty();
484 }
485
486 /**
487 * Always returns <tt>false</tt>.
488 * A <tt>SynchronousQueue</tt> has no internal capacity.
489 * @param c the collection
490 * @return <tt>false</tt>
491 */
492 public boolean removeAll(Collection<?> c) {
493 return false;
494 }
495
496 /**
497 * Always returns <tt>false</tt>.
498 * A <tt>SynchronousQueue</tt> has no internal capacity.
499 * @param c the collection
500 * @return <tt>false</tt>
501 */
502 public boolean retainAll(Collection<?> c) {
503 return false;
504 }
505
506 /**
507 * Always returns <tt>null</tt>.
508 * A <tt>SynchronousQueue</tt> does not return elements
509 * unless actively waited on.
510 * @return <tt>null</tt>
511 */
512 public E peek() {
513 return null;
514 }
515
516
517 static class EmptyIterator<E> implements Iterator<E> {
518 public boolean hasNext() {
519 return false;
520 }
521 public E next() {
522 throw new NoSuchElementException();
523 }
524 public void remove() {
525 throw new IllegalStateException();
526 }
527 }
528
529 /**
530 * Returns an empty iterator in which <tt>hasNext</tt> always returns
531 * <tt>false</tt>.
532 *
533 * @return an empty iterator
534 */
535 public Iterator<E> iterator() {
536 return new EmptyIterator<E>();
537 }
538
539
540 /**
541 * Returns a zero-length array.
542 * @return a zero-length array
543 */
544 public Object[] toArray() {
545 return new Object[0];
546 }
547
548 /**
549 * Sets the zeroeth element of the specified array to <tt>null</tt>
550 * (if the array has non-zero length) and returns it.
551 * @return the specified array
552 */
553 public <T> T[] toArray(T[] a) {
554 if (a.length > 0)
555 a[0] = null;
556 return a;
557 }
558
559
560 public int drainTo(Collection<? super E> c) {
561 if (c == null)
562 throw new NullPointerException();
563 if (c == this)
564 throw new IllegalArgumentException();
565 int n = 0;
566 E e;
567 while ( (e = poll()) != null) {
568 c.add(e);
569 ++n;
570 }
571 return n;
572 }
573
574 public int drainTo(Collection<? super E> c, int maxElements) {
575 if (c == null)
576 throw new NullPointerException();
577 if (c == this)
578 throw new IllegalArgumentException();
579 int n = 0;
580 E e;
581 while (n < maxElements && (e = poll()) != null) {
582 c.add(e);
583 ++n;
584 }
585 return n;
586 }
587 }
588
589
590
591
592