ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.38
Committed: Fri Jan 9 14:45:17 2004 UTC (20 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.37: +2 -2 lines
Log Message:
Cosmetics

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity - in
15 * particular it does not have a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to take it; you cannot add an element (using
18 * any method) unless another thread is trying to remove it; you
19 * cannot iterate as there is nothing to iterate. The <em>head</em>
20 * of the queue is the element that the first queued thread is trying
21 * to add to the queue; if there are no queued threads then no element
22 * is being added and the head is <tt>null</tt>. For purposes of
23 * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 * queue does not permit <tt>null</tt> elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must sync up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 * <p>This class implements all of the <em>optional</em> methods
33 * of the {@link Collection} and {@link Iterator} interfaces.
34 * @since 1.5
35 * @author Doug Lea
36 * @param <E> the type of elements held in this collection
37 */
38 public class SynchronousQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = -3223113410248163686L;
41
42 /*
43 This implementation divides actions into two cases for puts:
44
45 * An arriving putter that does not already have a waiting taker
46 creates a node holding item, and then waits for a taker to take it.
47 * An arriving putter that does already have a waiting taker fills
48 the slot node created by the taker, and notifies it to continue.
49
50 And symmetrically, two for takes:
51
52 * An arriving taker that does not already have a waiting putter
53 creates an empty slot node, and then waits for a putter to fill it.
54 * An arriving taker that does already have a waiting putter takes
55 item from the node created by the putter, and notifies it to continue.
56
57 This requires keeping two simple queues: waitingPuts and waitingTakes.
58
59 When a put or take waiting for the actions of its counterpart
60 aborts due to interruption or timeout, it marks the node
61 it created as "CANCELLED", which causes its counterpart to retry
62 the entire put or take sequence.
63 */
64
65
66 /*
67 * Note that all fields are transient final, so there is
68 * no explicit serialization code.
69 */
70
71 private transient final WaitQueue waitingPuts = new WaitQueue();
72 private transient final WaitQueue waitingTakes = new WaitQueue();
73 private transient final ReentrantLock qlock = new ReentrantLock();
74
75 /**
76 * Nodes each maintain an item and handle waits and signals for
77 * getting and setting it. The class extends
78 * AbstractQueuedSynchronizer to manage blocking, using AQS state
79 * 0 for waiting, 1 for ack, -1 for cancelled.
80 */
81 private static final class Node extends AbstractQueuedSynchronizer {
82 /** Synchronization state value representing that node acked */
83 private static final int ACK = 1;
84 /** Synchronization state value representing that node cancelled */
85 private static final int CANCEL = -1;
86
87 /** The item being transferred */
88 Object item;
89 /** Next node in wait queue */
90 Node next;
91
92 /** Create node with initial item */
93 Node(Object x) { item = x; }
94
95 /**
96 * Implements AQS base acquire to succeed if not in WAITING state
97 */
98 protected boolean tryAcquireExclusive(int ignore) {
99 return getState() != 0;
100 }
101
102 /**
103 * Implements AQS base release to signal if state changed
104 */
105 protected boolean tryReleaseExclusive(int newState) {
106 return compareAndSetState(0, newState);
107 }
108
109 /**
110 * Take item and null out field (for sake of GC)
111 */
112 private Object extract() {
113 Object x = item;
114 item = null;
115 return x;
116 }
117
118 /**
119 * Try to cancel on interrupt; if so rethrowing,
120 * else setting interrupt state
121 */
122 private void checkCancellationOnInterrupt(InterruptedException ie)
123 throws InterruptedException {
124 if (releaseExclusive(CANCEL))
125 throw ie;
126 Thread.currentThread().interrupt();
127 }
128
129 /**
130 * Fill in the slot created by the taker and signal taker to
131 * continue.
132 */
133 boolean setItem(Object x) {
134 item = x; // can place in slot even if cancelled
135 return releaseExclusive(ACK);
136 }
137
138 /**
139 * Remove item from slot created by putter and signal putter
140 * to continue.
141 */
142 Object getItem() {
143 return (releaseExclusive(ACK))? extract() : null;
144 }
145
146 /**
147 * Wait for a taker to take item placed by putter.
148 */
149 void waitForTake() throws InterruptedException {
150 try {
151 acquireExclusiveInterruptibly(0);
152 } catch (InterruptedException ie) {
153 checkCancellationOnInterrupt(ie);
154 }
155 }
156
157 /**
158 * Wait for a putter to put item placed by taker.
159 */
160 Object waitForPut() throws InterruptedException {
161 try {
162 acquireExclusiveInterruptibly(0);
163 } catch (InterruptedException ie) {
164 checkCancellationOnInterrupt(ie);
165 }
166 return extract();
167 }
168
169 /**
170 * Wait for a taker to take item placed by putter or time out.
171 */
172 boolean waitForTake(long nanos) throws InterruptedException {
173 try {
174 if (!acquireExclusiveNanos(0, nanos) &&
175 releaseExclusive(CANCEL))
176 return false;
177 } catch (InterruptedException ie) {
178 checkCancellationOnInterrupt(ie);
179 }
180 return true;
181 }
182
183 /**
184 * Wait for a putter to put item placed by taker, or time out.
185 */
186 Object waitForPut(long nanos) throws InterruptedException {
187 try {
188 if (!acquireExclusiveNanos(0, nanos) &&
189 releaseExclusive(CANCEL))
190 return null;
191 } catch (InterruptedException ie) {
192 checkCancellationOnInterrupt(ie);
193 }
194 return extract();
195 }
196
197 }
198
199 /**
200 * Simple FIFO queue class to hold waiting puts/takes.
201 **/
202 private static class WaitQueue<E> {
203 Node head;
204 Node last;
205
206 Node enq(Object x) {
207 Node p = new Node(x);
208 if (last == null)
209 last = head = p;
210 else
211 last = last.next = p;
212 return p;
213 }
214
215 Node deq() {
216 Node p = head;
217 if (p != null) {
218 if ((head = p.next) == null)
219 last = null;
220 p.next = null;
221 }
222 return p;
223 }
224 }
225
226 /**
227 * Creates a <tt>SynchronousQueue</tt>.
228 */
229 public SynchronousQueue() {}
230
231
232 /**
233 * Adds the specified element to this queue, waiting if necessary for
234 * another thread to receive it.
235 * @param o the element to add
236 * @throws InterruptedException if interrupted while waiting.
237 * @throws NullPointerException if the specified element is <tt>null</tt>.
238 */
239 public void put(E o) throws InterruptedException {
240 if (o == null) throw new NullPointerException();
241 final ReentrantLock qlock = this.qlock;
242
243 for (;;) {
244 Node node;
245 boolean mustWait;
246 qlock.lockInterruptibly();
247 try {
248 node = waitingTakes.deq();
249 if ( (mustWait = (node == null)) )
250 node = waitingPuts.enq(o);
251 } finally {
252 qlock.unlock();
253 }
254
255 if (mustWait) {
256 node.waitForTake();
257 return;
258 }
259
260 else if (node.setItem(o))
261 return;
262
263 // else taker cancelled, so retry
264 }
265 }
266
267 /**
268 * Inserts the specified element into this queue, waiting if necessary
269 * up to the specified wait time for another thread to receive it.
270 * @param o the element to add
271 * @param timeout how long to wait before giving up, in units of
272 * <tt>unit</tt>
273 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
274 * <tt>timeout</tt> parameter
275 * @return <tt>true</tt> if successful, or <tt>false</tt> if
276 * the specified waiting time elapses before a taker appears.
277 * @throws InterruptedException if interrupted while waiting.
278 * @throws NullPointerException if the specified element is <tt>null</tt>.
279 */
280 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
281 if (o == null) throw new NullPointerException();
282 long nanos = unit.toNanos(timeout);
283 final ReentrantLock qlock = this.qlock;
284 for (;;) {
285 Node node;
286 boolean mustWait;
287 qlock.lockInterruptibly();
288 try {
289 node = waitingTakes.deq();
290 if ( (mustWait = (node == null)) )
291 node = waitingPuts.enq(o);
292 } finally {
293 qlock.unlock();
294 }
295
296 if (mustWait)
297 return node.waitForTake(nanos);
298
299 else if (node.setItem(o))
300 return true;
301
302 // else taker cancelled, so retry
303 }
304
305 }
306
307
308 /**
309 * Retrieves and removes the head of this queue, waiting if necessary
310 * for another thread to insert it.
311 * @return the head of this queue
312 */
313 public E take() throws InterruptedException {
314 final ReentrantLock qlock = this.qlock;
315 for (;;) {
316 Node node;
317 boolean mustWait;
318
319 qlock.lockInterruptibly();
320 try {
321 node = waitingPuts.deq();
322 if ( (mustWait = (node == null)) )
323 node = waitingTakes.enq(null);
324 } finally {
325 qlock.unlock();
326 }
327
328 if (mustWait) {
329 Object x = node.waitForPut();
330 return (E)x;
331 }
332 else {
333 Object x = node.getItem();
334 if (x != null)
335 return (E)x;
336 // else cancelled, so retry
337 }
338 }
339 }
340
341 /**
342 * Retrieves and removes the head of this queue, waiting
343 * if necessary up to the specified wait time, for another thread
344 * to insert it.
345 * @param timeout how long to wait before giving up, in units of
346 * <tt>unit</tt>
347 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
348 * <tt>timeout</tt> parameter
349 * @return the head of this queue, or <tt>null</tt> if the
350 * specified waiting time elapses before an element is present.
351 * @throws InterruptedException if interrupted while waiting.
352 */
353 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
354 long nanos = unit.toNanos(timeout);
355 final ReentrantLock qlock = this.qlock;
356
357 for (;;) {
358 Node node;
359 boolean mustWait;
360
361 qlock.lockInterruptibly();
362 try {
363 node = waitingPuts.deq();
364 if ( (mustWait = (node == null)) )
365 node = waitingTakes.enq(null);
366 } finally {
367 qlock.unlock();
368 }
369
370 if (mustWait) {
371 Object x = node.waitForPut(nanos);
372 return (E)x;
373 }
374 else {
375 Object x = node.getItem();
376 if (x != null)
377 return (E)x;
378 // else cancelled, so retry
379 }
380 }
381 }
382
383 // Untimed nonblocking versions
384
385 /**
386 * Inserts the specified element into this queue, if another thread is
387 * waiting to receive it.
388 *
389 * @param o the element to add.
390 * @return <tt>true</tt> if it was possible to add the element to
391 * this queue, else <tt>false</tt>
392 * @throws NullPointerException if the specified element is <tt>null</tt>
393 */
394 public boolean offer(E o) {
395 if (o == null) throw new NullPointerException();
396 final ReentrantLock qlock = this.qlock;
397
398 for (;;) {
399 Node node;
400 qlock.lock();
401 try {
402 node = waitingTakes.deq();
403 } finally {
404 qlock.unlock();
405 }
406 if (node == null)
407 return false;
408
409 else if (node.setItem(o))
410 return true;
411 // else retry
412 }
413 }
414
415 /**
416 * Retrieves and removes the head of this queue, if another thread
417 * is currently making an element available.
418 *
419 * @return the head of this queue, or <tt>null</tt> if no
420 * element is available.
421 */
422 public E poll() {
423 final ReentrantLock qlock = this.qlock;
424 for (;;) {
425 Node node;
426 qlock.lock();
427 try {
428 node = waitingPuts.deq();
429 } finally {
430 qlock.unlock();
431 }
432 if (node == null)
433 return null;
434
435 else {
436 Object x = node.getItem();
437 if (x != null)
438 return (E)x;
439 // else retry
440 }
441 }
442 }
443
444 /**
445 * Always returns <tt>true</tt>.
446 * A <tt>SynchronousQueue</tt> has no internal capacity.
447 * @return <tt>true</tt>
448 */
449 public boolean isEmpty() {
450 return true;
451 }
452
453 /**
454 * Always returns zero.
455 * A <tt>SynchronousQueue</tt> has no internal capacity.
456 * @return zero.
457 */
458 public int size() {
459 return 0;
460 }
461
462 /**
463 * Always returns zero.
464 * A <tt>SynchronousQueue</tt> has no internal capacity.
465 * @return zero.
466 */
467 public int remainingCapacity() {
468 return 0;
469 }
470
471 /**
472 * Does nothing.
473 * A <tt>SynchronousQueue</tt> has no internal capacity.
474 */
475 public void clear() {}
476
477 /**
478 * Always returns <tt>false</tt>.
479 * A <tt>SynchronousQueue</tt> has no internal capacity.
480 * @param o the element
481 * @return <tt>false</tt>
482 */
483 public boolean contains(Object o) {
484 return false;
485 }
486
487 /**
488 * Always returns <tt>false</tt>.
489 * A <tt>SynchronousQueue</tt> has no internal capacity.
490 *
491 * @param o the element to remove
492 * @return <tt>false</tt>
493 */
494 public boolean remove(Object o) {
495 return false;
496 }
497
498 /**
499 * Returns <tt>false</tt> unless given collection is empty.
500 * A <tt>SynchronousQueue</tt> has no internal capacity.
501 * @param c the collection
502 * @return <tt>false</tt> unless given collection is empty
503 */
504 public boolean containsAll(Collection<?> c) {
505 return c.isEmpty();
506 }
507
508 /**
509 * Always returns <tt>false</tt>.
510 * A <tt>SynchronousQueue</tt> has no internal capacity.
511 * @param c the collection
512 * @return <tt>false</tt>
513 */
514 public boolean removeAll(Collection<?> c) {
515 return false;
516 }
517
518 /**
519 * Always returns <tt>false</tt>.
520 * A <tt>SynchronousQueue</tt> has no internal capacity.
521 * @param c the collection
522 * @return <tt>false</tt>
523 */
524 public boolean retainAll(Collection<?> c) {
525 return false;
526 }
527
528 /**
529 * Always returns <tt>null</tt>.
530 * A <tt>SynchronousQueue</tt> does not return elements
531 * unless actively waited on.
532 * @return <tt>null</tt>
533 */
534 public E peek() {
535 return null;
536 }
537
538
539 static class EmptyIterator<E> implements Iterator<E> {
540 public boolean hasNext() {
541 return false;
542 }
543 public E next() {
544 throw new NoSuchElementException();
545 }
546 public void remove() {
547 throw new IllegalStateException();
548 }
549 }
550
551 /**
552 * Returns an empty iterator in which <tt>hasNext</tt> always returns
553 * <tt>false</tt>.
554 *
555 * @return an empty iterator
556 */
557 public Iterator<E> iterator() {
558 return new EmptyIterator<E>();
559 }
560
561
562 /**
563 * Returns a zero-length array.
564 * @return a zero-length array
565 */
566 public Object[] toArray() {
567 return new Object[0];
568 }
569
570 /**
571 * Sets the zeroeth element of the specified array to <tt>null</tt>
572 * (if the array has non-zero length) and returns it.
573 * @return the specified array
574 */
575 public <T> T[] toArray(T[] a) {
576 if (a.length > 0)
577 a[0] = null;
578 return a;
579 }
580
581
582 public int drainTo(Collection<? super E> c) {
583 if (c == null)
584 throw new NullPointerException();
585 if (c == this)
586 throw new IllegalArgumentException();
587 int n = 0;
588 E e;
589 while ( (e = poll()) != null) {
590 c.add(e);
591 ++n;
592 }
593 return n;
594 }
595
596 public int drainTo(Collection<? super E> c, int maxElements) {
597 if (c == null)
598 throw new NullPointerException();
599 if (c == this)
600 throw new IllegalArgumentException();
601 int n = 0;
602 E e;
603 while (n < maxElements && (e = poll()) != null) {
604 c.add(e);
605 ++n;
606 }
607 return n;
608 }
609 }
610
611
612
613
614