ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.41
Committed: Wed Jan 21 15:20:35 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.40: +2 -2 lines
Log Message:
doc improvements; consistent conventions for nested classes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity - in
15 * particular it does not have a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to take it; you cannot add an element (using
18 * any method) unless another thread is trying to remove it; you
19 * cannot iterate as there is nothing to iterate. The <em>head</em>
20 * of the queue is the element that the first queued thread is trying
21 * to add to the queue; if there are no queued threads then no element
22 * is being added and the head is <tt>null</tt>. For purposes of
23 * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 * queue does not permit <tt>null</tt> elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must sync up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 * <p>This class implements all of the <em>optional</em> methods
33 * of the {@link Collection} and {@link Iterator} interfaces.
34 * @since 1.5
35 * @author Doug Lea
36 * @param <E> the type of elements held in this collection
37 */
38 public class SynchronousQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = -3223113410248163686L;
41
42 /*
43 This implementation divides actions into two cases for puts:
44
45 * An arriving putter that does not already have a waiting taker
46 creates a node holding item, and then waits for a taker to take it.
47 * An arriving putter that does already have a waiting taker fills
48 the slot node created by the taker, and notifies it to continue.
49
50 And symmetrically, two for takes:
51
52 * An arriving taker that does not already have a waiting putter
53 creates an empty slot node, and then waits for a putter to fill it.
54 * An arriving taker that does already have a waiting putter takes
55 item from the node created by the putter, and notifies it to continue.
56
57 This requires keeping two simple queues: waitingPuts and waitingTakes.
58
59 When a put or take waiting for the actions of its counterpart
60 aborts due to interruption or timeout, it marks the node
61 it created as "CANCELLED", which causes its counterpart to retry
62 the entire put or take sequence.
63 */
64
65
66 /*
67 * Note that all fields are transient final, so there is
68 * no explicit serialization code.
69 */
70
71 private transient final WaitQueue waitingPuts = new WaitQueue();
72 private transient final WaitQueue waitingTakes = new WaitQueue();
73 private transient final ReentrantLock qlock = new ReentrantLock();
74
75 /**
76 * Nodes each maintain an item and handle waits and signals for
77 * getting and setting it. The class extends
78 * AbstractQueuedSynchronizer to manage blocking, using AQS state
79 * 0 for waiting, 1 for ack, -1 for cancelled.
80 */
81 static final class Node extends AbstractQueuedSynchronizer {
82 /** Synchronization state value representing that node acked */
83 private static final int ACK = 1;
84 /** Synchronization state value representing that node cancelled */
85 private static final int CANCEL = -1;
86
87 /** The item being transferred */
88 Object item;
89 /** Next node in wait queue */
90 Node next;
91
92 /** Create node with initial item */
93 Node(Object x) { item = x; }
94
95 /**
96 * Implements AQS base acquire to succeed if not in WAITING state
97 */
98 protected boolean tryAcquire(int ignore) {
99 return getState() != 0;
100 }
101
102 /**
103 * Implements AQS base release to signal if state changed
104 */
105 protected boolean tryRelease(int newState) {
106 return compareAndSetState(0, newState);
107 }
108
109 /**
110 * Take item and null out field (for sake of GC)
111 */
112 private Object extract() {
113 Object x = item;
114 item = null;
115 return x;
116 }
117
118 /**
119 * Try to cancel on interrupt; if so rethrowing,
120 * else setting interrupt state
121 */
122 private void checkCancellationOnInterrupt(InterruptedException ie)
123 throws InterruptedException {
124 if (release(CANCEL))
125 throw ie;
126 Thread.currentThread().interrupt();
127 }
128
129 /**
130 * Fill in the slot created by the taker and signal taker to
131 * continue.
132 */
133 boolean setItem(Object x) {
134 item = x; // can place in slot even if cancelled
135 return release(ACK);
136 }
137
138 /**
139 * Remove item from slot created by putter and signal putter
140 * to continue.
141 */
142 Object getItem() {
143 return (release(ACK))? extract() : null;
144 }
145
146 /**
147 * Wait for a taker to take item placed by putter.
148 */
149 void waitForTake() throws InterruptedException {
150 try {
151 acquireInterruptibly(0);
152 } catch (InterruptedException ie) {
153 checkCancellationOnInterrupt(ie);
154 }
155 }
156
157 /**
158 * Wait for a putter to put item placed by taker.
159 */
160 Object waitForPut() throws InterruptedException {
161 try {
162 acquireInterruptibly(0);
163 } catch (InterruptedException ie) {
164 checkCancellationOnInterrupt(ie);
165 }
166 return extract();
167 }
168
169 /**
170 * Wait for a taker to take item placed by putter or time out.
171 */
172 boolean waitForTake(long nanos) throws InterruptedException {
173 try {
174 if (!tryAcquireNanos(0, nanos) &&
175 release(CANCEL))
176 return false;
177 } catch (InterruptedException ie) {
178 checkCancellationOnInterrupt(ie);
179 }
180 return true;
181 }
182
183 /**
184 * Wait for a putter to put item placed by taker, or time out.
185 */
186 Object waitForPut(long nanos) throws InterruptedException {
187 try {
188 if (!tryAcquireNanos(0, nanos) &&
189 release(CANCEL))
190 return null;
191 } catch (InterruptedException ie) {
192 checkCancellationOnInterrupt(ie);
193 }
194 return extract();
195 }
196
197 }
198
199 /**
200 * Simple FIFO queue class to hold waiting puts/takes.
201 **/
202 static final class WaitQueue<E> {
203 Node head;
204 Node last;
205
206 Node enq(Object x) {
207 Node p = new Node(x);
208 if (last == null)
209 last = head = p;
210 else
211 last = last.next = p;
212 return p;
213 }
214
215 Node deq() {
216 Node p = head;
217 if (p != null) {
218 if ((head = p.next) == null)
219 last = null;
220 p.next = null;
221 }
222 return p;
223 }
224 }
225
226 /**
227 * Creates a <tt>SynchronousQueue</tt>.
228 */
229 public SynchronousQueue() {}
230
231
232 /**
233 * Adds the specified element to this queue, waiting if necessary for
234 * another thread to receive it.
235 * @param o the element to add
236 * @throws InterruptedException if interrupted while waiting.
237 * @throws NullPointerException if the specified element is <tt>null</tt>.
238 */
239 public void put(E o) throws InterruptedException {
240 if (o == null) throw new NullPointerException();
241 final ReentrantLock qlock = this.qlock;
242
243 for (;;) {
244 Node node;
245 boolean mustWait;
246 qlock.lockInterruptibly();
247 try {
248 node = waitingTakes.deq();
249 if ( (mustWait = (node == null)) )
250 node = waitingPuts.enq(o);
251 } finally {
252 qlock.unlock();
253 }
254
255 if (mustWait) {
256 node.waitForTake();
257 return;
258 }
259
260 else if (node.setItem(o))
261 return;
262
263 // else taker cancelled, so retry
264 }
265 }
266
267 /**
268 * Inserts the specified element into this queue, waiting if necessary
269 * up to the specified wait time for another thread to receive it.
270 * @param o the element to add
271 * @param timeout how long to wait before giving up, in units of
272 * <tt>unit</tt>
273 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
274 * <tt>timeout</tt> parameter
275 * @return <tt>true</tt> if successful, or <tt>false</tt> if
276 * the specified waiting time elapses before a taker appears.
277 * @throws InterruptedException if interrupted while waiting.
278 * @throws NullPointerException if the specified element is <tt>null</tt>.
279 */
280 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
281 if (o == null) throw new NullPointerException();
282 long nanos = unit.toNanos(timeout);
283 final ReentrantLock qlock = this.qlock;
284 for (;;) {
285 Node node;
286 boolean mustWait;
287 qlock.lockInterruptibly();
288 try {
289 node = waitingTakes.deq();
290 if ( (mustWait = (node == null)) )
291 node = waitingPuts.enq(o);
292 } finally {
293 qlock.unlock();
294 }
295
296 if (mustWait)
297 return node.waitForTake(nanos);
298
299 else if (node.setItem(o))
300 return true;
301
302 // else taker cancelled, so retry
303 }
304
305 }
306
307
308 /**
309 * Retrieves and removes the head of this queue, waiting if necessary
310 * for another thread to insert it.
311 * @throws InterruptedException if interrupted while waiting.
312 * @return the head of this queue
313 */
314 public E take() throws InterruptedException {
315 final ReentrantLock qlock = this.qlock;
316 for (;;) {
317 Node node;
318 boolean mustWait;
319
320 qlock.lockInterruptibly();
321 try {
322 node = waitingPuts.deq();
323 if ( (mustWait = (node == null)) )
324 node = waitingTakes.enq(null);
325 } finally {
326 qlock.unlock();
327 }
328
329 if (mustWait) {
330 Object x = node.waitForPut();
331 return (E)x;
332 }
333 else {
334 Object x = node.getItem();
335 if (x != null)
336 return (E)x;
337 // else cancelled, so retry
338 }
339 }
340 }
341
342 /**
343 * Retrieves and removes the head of this queue, waiting
344 * if necessary up to the specified wait time, for another thread
345 * to insert it.
346 * @param timeout how long to wait before giving up, in units of
347 * <tt>unit</tt>
348 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
349 * <tt>timeout</tt> parameter
350 * @return the head of this queue, or <tt>null</tt> if the
351 * specified waiting time elapses before an element is present.
352 * @throws InterruptedException if interrupted while waiting.
353 */
354 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
355 long nanos = unit.toNanos(timeout);
356 final ReentrantLock qlock = this.qlock;
357
358 for (;;) {
359 Node node;
360 boolean mustWait;
361
362 qlock.lockInterruptibly();
363 try {
364 node = waitingPuts.deq();
365 if ( (mustWait = (node == null)) )
366 node = waitingTakes.enq(null);
367 } finally {
368 qlock.unlock();
369 }
370
371 if (mustWait) {
372 Object x = node.waitForPut(nanos);
373 return (E)x;
374 }
375 else {
376 Object x = node.getItem();
377 if (x != null)
378 return (E)x;
379 // else cancelled, so retry
380 }
381 }
382 }
383
384 // Untimed nonblocking versions
385
386 /**
387 * Inserts the specified element into this queue, if another thread is
388 * waiting to receive it.
389 *
390 * @param o the element to add.
391 * @return <tt>true</tt> if it was possible to add the element to
392 * this queue, else <tt>false</tt>
393 * @throws NullPointerException if the specified element is <tt>null</tt>
394 */
395 public boolean offer(E o) {
396 if (o == null) throw new NullPointerException();
397 final ReentrantLock qlock = this.qlock;
398
399 for (;;) {
400 Node node;
401 qlock.lock();
402 try {
403 node = waitingTakes.deq();
404 } finally {
405 qlock.unlock();
406 }
407 if (node == null)
408 return false;
409
410 else if (node.setItem(o))
411 return true;
412 // else retry
413 }
414 }
415
416 /**
417 * Retrieves and removes the head of this queue, if another thread
418 * is currently making an element available.
419 *
420 * @return the head of this queue, or <tt>null</tt> if no
421 * element is available.
422 */
423 public E poll() {
424 final ReentrantLock qlock = this.qlock;
425 for (;;) {
426 Node node;
427 qlock.lock();
428 try {
429 node = waitingPuts.deq();
430 } finally {
431 qlock.unlock();
432 }
433 if (node == null)
434 return null;
435
436 else {
437 Object x = node.getItem();
438 if (x != null)
439 return (E)x;
440 // else retry
441 }
442 }
443 }
444
445 /**
446 * Always returns <tt>true</tt>.
447 * A <tt>SynchronousQueue</tt> has no internal capacity.
448 * @return <tt>true</tt>
449 */
450 public boolean isEmpty() {
451 return true;
452 }
453
454 /**
455 * Always returns zero.
456 * A <tt>SynchronousQueue</tt> has no internal capacity.
457 * @return zero.
458 */
459 public int size() {
460 return 0;
461 }
462
463 /**
464 * Always returns zero.
465 * A <tt>SynchronousQueue</tt> has no internal capacity.
466 * @return zero.
467 */
468 public int remainingCapacity() {
469 return 0;
470 }
471
472 /**
473 * Does nothing.
474 * A <tt>SynchronousQueue</tt> has no internal capacity.
475 */
476 public void clear() {}
477
478 /**
479 * Always returns <tt>false</tt>.
480 * A <tt>SynchronousQueue</tt> has no internal capacity.
481 * @param o the element
482 * @return <tt>false</tt>
483 */
484 public boolean contains(Object o) {
485 return false;
486 }
487
488 /**
489 * Always returns <tt>false</tt>.
490 * A <tt>SynchronousQueue</tt> has no internal capacity.
491 *
492 * @param o the element to remove
493 * @return <tt>false</tt>
494 */
495 public boolean remove(Object o) {
496 return false;
497 }
498
499 /**
500 * Returns <tt>false</tt> unless given collection is empty.
501 * A <tt>SynchronousQueue</tt> has no internal capacity.
502 * @param c the collection
503 * @return <tt>false</tt> unless given collection is empty
504 */
505 public boolean containsAll(Collection<?> c) {
506 return c.isEmpty();
507 }
508
509 /**
510 * Always returns <tt>false</tt>.
511 * A <tt>SynchronousQueue</tt> has no internal capacity.
512 * @param c the collection
513 * @return <tt>false</tt>
514 */
515 public boolean removeAll(Collection<?> c) {
516 return false;
517 }
518
519 /**
520 * Always returns <tt>false</tt>.
521 * A <tt>SynchronousQueue</tt> has no internal capacity.
522 * @param c the collection
523 * @return <tt>false</tt>
524 */
525 public boolean retainAll(Collection<?> c) {
526 return false;
527 }
528
529 /**
530 * Always returns <tt>null</tt>.
531 * A <tt>SynchronousQueue</tt> does not return elements
532 * unless actively waited on.
533 * @return <tt>null</tt>
534 */
535 public E peek() {
536 return null;
537 }
538
539
540 static class EmptyIterator<E> implements Iterator<E> {
541 public boolean hasNext() {
542 return false;
543 }
544 public E next() {
545 throw new NoSuchElementException();
546 }
547 public void remove() {
548 throw new IllegalStateException();
549 }
550 }
551
552 /**
553 * Returns an empty iterator in which <tt>hasNext</tt> always returns
554 * <tt>false</tt>.
555 *
556 * @return an empty iterator
557 */
558 public Iterator<E> iterator() {
559 return new EmptyIterator<E>();
560 }
561
562
563 /**
564 * Returns a zero-length array.
565 * @return a zero-length array
566 */
567 public Object[] toArray() {
568 return new Object[0];
569 }
570
571 /**
572 * Sets the zeroeth element of the specified array to <tt>null</tt>
573 * (if the array has non-zero length) and returns it.
574 * @param a the array
575 * @return the specified array
576 */
577 public <T> T[] toArray(T[] a) {
578 if (a.length > 0)
579 a[0] = null;
580 return a;
581 }
582
583
584 public int drainTo(Collection<? super E> c) {
585 if (c == null)
586 throw new NullPointerException();
587 if (c == this)
588 throw new IllegalArgumentException();
589 int n = 0;
590 E e;
591 while ( (e = poll()) != null) {
592 c.add(e);
593 ++n;
594 }
595 return n;
596 }
597
598 public int drainTo(Collection<? super E> c, int maxElements) {
599 if (c == null)
600 throw new NullPointerException();
601 if (c == this)
602 throw new IllegalArgumentException();
603 int n = 0;
604 E e;
605 while (n < maxElements && (e = poll()) != null) {
606 c.add(e);
607 ++n;
608 }
609 return n;
610 }
611 }
612
613
614
615
616