ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.35
Committed: Fri Jan 2 21:02:32 2004 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.34: +157 -104 lines
Log Message:
Avoid timeout problems in fair modes; improve AQS method names

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity - in
15 * particular it does not have a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to take it; you cannot add an element (using
18 * any method) unless another thread is trying to remove it; you
19 * cannot iterate as there is nothing to iterate. The <em>head</em>
20 * of the queue is the element that the first queued thread is trying
21 * to add to the queue; if there are no queued threads then no element
22 * is being added and the head is <tt>null</tt>. For purposes of
23 * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 * queue does not permit <tt>null</tt> elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must sync up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 * <p>This class implements all of the <em>optional</em> methods
33 * of the {@link Collection} and {@link Iterator} interfaces.
34 * @since 1.5
35 * @author Doug Lea
36 * @param <E> the type of elements held in this collection
37 */
38 public class SynchronousQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = -3223113410248163686L;
41
42 /*
43 This implementation divides actions into two cases for puts:
44
45 * An arriving putter that does not already have a waiting taker
46 creates a node holding item, and then waits for a taker to take it.
47 * An arriving putter that does already have a waiting taker fills
48 the slot node created by the taker, and notifies it to continue.
49
50 And symmetrically, two for takes:
51
52 * An arriving taker that does not already have a waiting putter
53 creates an empty slot node, and then waits for a putter to fill it.
54 * An arriving taker that does already have a waiting putter takes
55 item from the node created by the putter, and notifies it to continue.
56
57 This requires keeping two simple queues: waitingPuts and waitingTakes.
58
59 When a put or take waiting for the actions of its counterpart
60 aborts due to interruption or timeout, it marks the node
61 it created as "CANCELLED", which causes its counterpart to retry
62 the entire put or take sequence.
63 */
64
65
66 /*
67 * Note that all fields are transient final, so there is
68 * no explicit serialization code.
69 */
70
71 private transient final WaitQueue waitingPuts = new WaitQueue();
72 private transient final WaitQueue waitingTakes = new WaitQueue();
73 private transient final ReentrantLock qlock = new ReentrantLock();
74
75 /**
76 * Nodes each maintain an item and handle waits and signals for
77 * getting and setting it. The class extends
78 * AbstractQueuedSynchronizer to manage blocking, using AQS state
79 * 0 for waiting, 1 for ack, -1 for cancelled.
80 */
81 private static final class Node extends AbstractQueuedSynchronizer {
82 /** Synchronization state value representing that node acked */
83 private static final int ACK = 1;
84 /** Synchronization state value representing that node cancelled */
85 private static final int CANCEL = -1;
86
87 /** The item being transferred */
88 Object item;
89 /** Next node in wait queue */
90 Node next;
91
92 /** Create node with initial item */
93 Node(Object x) { item = x; }
94
95 /**
96 * Implements AQS base acquire to succeed if not in WAITING state
97 */
98 protected boolean tryAcquireExclusive(boolean b, int ignore) {
99 return getState() != 0;
100 }
101
102 /**
103 * Implements AQS base release to signal if state changed
104 */
105 protected boolean tryReleaseExclusive(int newState) {
106 return compareAndSetState(0, newState);
107 }
108
109 /**
110 * Take item and null out field (for sake of GC)
111 */
112 private Object extract() {
113 Object x = item;
114 item = null;
115 return x;
116 }
117
118 /**
119 * Try to cancel on interrupt; if so rethrowing,
120 * else setting interrupt state
121 */
122 private void checkCancellationOnInterrupt(InterruptedException ie)
123 throws InterruptedException {
124 if (releaseExclusive(CANCEL))
125 throw ie;
126 Thread.currentThread().interrupt();
127 }
128
129 /**
130 * Fill in the slot created by the taker and signal taker to
131 * continue.
132 */
133 boolean setItem(Object x) {
134 item = x; // can place in slot even if cancelled
135 return releaseExclusive(ACK);
136 }
137
138 /**
139 * Remove item from slot created by putter and signal putter
140 * to continue.
141 */
142 Object getItem() {
143 return (releaseExclusive(ACK))? extract() : null;
144 }
145
146 /**
147 * Wait for a taker to take item placed by putter.
148 */
149 void waitForTake() throws InterruptedException {
150 try {
151 acquireExclusiveInterruptibly(0);
152 } catch (InterruptedException ie) {
153 checkCancellationOnInterrupt(ie);
154 }
155 }
156
157 /**
158 * Wait for a putter to put item placed by taker.
159 */
160 Object waitForPut() throws InterruptedException {
161 try {
162 acquireExclusiveInterruptibly(0);
163 } catch (InterruptedException ie) {
164 checkCancellationOnInterrupt(ie);
165 }
166 return extract();
167 }
168
169 /**
170 * Wait for a taker to take item placed by putter or time out.
171 */
172 boolean waitForTake(long nanos) throws InterruptedException {
173 try {
174 if (!acquireExclusiveTimed(0, nanos) &&
175 releaseExclusive(CANCEL))
176 return false;
177 } catch (InterruptedException ie) {
178 checkCancellationOnInterrupt(ie);
179 }
180 return true;
181 }
182
183 /**
184 * Wait for a putter to put item placed by taker, or time out.
185 */
186 Object waitForPut(long nanos) throws InterruptedException {
187 try {
188 if (!acquireExclusiveTimed(0, nanos) &&
189 releaseExclusive(CANCEL))
190 return null;
191 } catch (InterruptedException ie) {
192 checkCancellationOnInterrupt(ie);
193 }
194 return extract();
195 }
196
197 }
198
199 /**
200 * Simple FIFO queue class to hold waiting puts/takes.
201 **/
202 private static class WaitQueue<E> {
203 Node head;
204 Node last;
205
206 Node enq(Object x) {
207 Node p = new Node(x);
208 if (last == null)
209 last = head = p;
210 else
211 last = last.next = p;
212 return p;
213 }
214
215 Node deq() {
216 Node p = head;
217 if (p != null) {
218 if ((head = p.next) == null)
219 last = null;
220 p.next = null;
221 }
222 return p;
223 }
224 }
225
226 /**
227 * Creates a <tt>SynchronousQueue</tt>.
228 */
229 public SynchronousQueue() {}
230
231
232 /**
233 * Adds the specified element to this queue, waiting if necessary for
234 * another thread to receive it.
235 * @param o the element to add
236 * @throws InterruptedException if interrupted while waiting.
237 * @throws NullPointerException if the specified element is <tt>null</tt>.
238 */
239 public void put(E o) throws InterruptedException {
240 if (o == null) throw new NullPointerException();
241 final ReentrantLock qlock = this.qlock;
242
243 for (;;) {
244 Node node;
245 boolean mustWait;
246 qlock.lockInterruptibly();
247 try {
248 node = waitingTakes.deq();
249 if ( (mustWait = (node == null)) )
250 node = waitingPuts.enq(o);
251 } finally {
252 qlock.unlock();
253 }
254
255 if (mustWait) {
256 node.waitForTake();
257 return;
258 }
259
260 else if (node.setItem(o))
261 return;
262
263 // else taker cancelled, so retry
264 }
265 }
266
267 /**
268 * Inserts the specified element into this queue, waiting if necessary
269 * up to the specified wait time for another thread to receive it.
270 * @param o the element to add
271 * @param timeout how long to wait before giving up, in units of
272 * <tt>unit</tt>
273 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
274 * <tt>timeout</tt> parameter
275 * @return <tt>true</tt> if successful, or <tt>false</tt> if
276 * the specified waiting time elapses before a taker appears.
277 * @throws InterruptedException if interrupted while waiting.
278 * @throws NullPointerException if the specified element is <tt>null</tt>.
279 */
280 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
281 if (o == null) throw new NullPointerException();
282 long nanos = unit.toNanos(timeout);
283 final ReentrantLock qlock = this.qlock;
284 for (;;) {
285 Node node;
286 boolean mustWait;
287 qlock.lockInterruptibly();
288 try {
289 node = waitingTakes.deq();
290 if ( (mustWait = (node == null)) )
291 node = waitingPuts.enq(o);
292 } finally {
293 qlock.unlock();
294 }
295
296 if (mustWait)
297 return node.waitForTake(nanos);
298
299 else if (node.setItem(o))
300 return true;
301
302 // else taker cancelled, so retry
303 }
304
305 }
306
307
308 /**
309 * Retrieves and removes the head of this queue, waiting if necessary
310 * for another thread to insert it.
311 * @return the head of this queue
312 */
313 public E take() throws InterruptedException {
314 final ReentrantLock qlock = this.qlock;
315 for (;;) {
316 Node node;
317 boolean mustWait;
318
319 qlock.lockInterruptibly();
320 try {
321 node = waitingPuts.deq();
322 if ( (mustWait = (node == null)) )
323 node = waitingTakes.enq(null);
324 } finally {
325 qlock.unlock();
326 }
327
328 if (mustWait)
329 return (E)node.waitForPut();
330
331 else {
332 Object x = node.getItem();
333 if (x != null)
334 return (E)x;
335 // else cancelled, so retry
336 }
337 }
338 }
339
340 /**
341 * Retrieves and removes the head of this queue, waiting
342 * if necessary up to the specified wait time, for another thread
343 * to insert it.
344 * @param timeout how long to wait before giving up, in units of
345 * <tt>unit</tt>
346 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
347 * <tt>timeout</tt> parameter
348 * @return the head of this queue, or <tt>null</tt> if the
349 * specified waiting time elapses before an element is present.
350 * @throws InterruptedException if interrupted while waiting.
351 */
352 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
353 long nanos = unit.toNanos(timeout);
354 final ReentrantLock qlock = this.qlock;
355
356 for (;;) {
357 Node node;
358 boolean mustWait;
359
360 qlock.lockInterruptibly();
361 try {
362 node = waitingPuts.deq();
363 if ( (mustWait = (node == null)) )
364 node = waitingTakes.enq(null);
365 } finally {
366 qlock.unlock();
367 }
368
369 if (mustWait)
370 return (E) node.waitForPut(nanos);
371
372 else {
373 Object x = node.getItem();
374 if (x != null)
375 return (E)x;
376 // else cancelled, so retry
377 }
378 }
379 }
380
381 // Untimed nonblocking versions
382
383 /**
384 * Inserts the specified element into this queue, if another thread is
385 * waiting to receive it.
386 *
387 * @param o the element to add.
388 * @return <tt>true</tt> if it was possible to add the element to
389 * this queue, else <tt>false</tt>
390 * @throws NullPointerException if the specified element is <tt>null</tt>
391 */
392 public boolean offer(E o) {
393 if (o == null) throw new NullPointerException();
394 final ReentrantLock qlock = this.qlock;
395
396 for (;;) {
397 Node node;
398 qlock.lock();
399 try {
400 node = waitingTakes.deq();
401 } finally {
402 qlock.unlock();
403 }
404 if (node == null)
405 return false;
406
407 else if (node.setItem(o))
408 return true;
409 // else retry
410 }
411 }
412
413 /**
414 * Retrieves and removes the head of this queue, if another thread
415 * is currently making an element available.
416 *
417 * @return the head of this queue, or <tt>null</tt> if no
418 * element is available.
419 */
420 public E poll() {
421 final ReentrantLock qlock = this.qlock;
422 for (;;) {
423 Node node;
424 qlock.lock();
425 try {
426 node = waitingPuts.deq();
427 } finally {
428 qlock.unlock();
429 }
430 if (node == null)
431 return null;
432
433 else {
434 Object x = node.getItem();
435 if (x != null)
436 return (E)x;
437 // else retry
438 }
439 }
440 }
441
442 /**
443 * Always returns <tt>true</tt>.
444 * A <tt>SynchronousQueue</tt> has no internal capacity.
445 * @return <tt>true</tt>
446 */
447 public boolean isEmpty() {
448 return true;
449 }
450
451 /**
452 * Always returns zero.
453 * A <tt>SynchronousQueue</tt> has no internal capacity.
454 * @return zero.
455 */
456 public int size() {
457 return 0;
458 }
459
460 /**
461 * Always returns zero.
462 * A <tt>SynchronousQueue</tt> has no internal capacity.
463 * @return zero.
464 */
465 public int remainingCapacity() {
466 return 0;
467 }
468
469 /**
470 * Does nothing.
471 * A <tt>SynchronousQueue</tt> has no internal capacity.
472 */
473 public void clear() {}
474
475 /**
476 * Always returns <tt>false</tt>.
477 * A <tt>SynchronousQueue</tt> has no internal capacity.
478 * @param o the element
479 * @return <tt>false</tt>
480 */
481 public boolean contains(Object o) {
482 return false;
483 }
484
485 /**
486 * Always returns <tt>false</tt>.
487 * A <tt>SynchronousQueue</tt> has no internal capacity.
488 *
489 * @param o the element to remove
490 * @return <tt>false</tt>
491 */
492 public boolean remove(Object o) {
493 return false;
494 }
495
496 /**
497 * Returns <tt>false</tt> unless given collection is empty.
498 * A <tt>SynchronousQueue</tt> has no internal capacity.
499 * @param c the collection
500 * @return <tt>false</tt> unless given collection is empty
501 */
502 public boolean containsAll(Collection<?> c) {
503 return c.isEmpty();
504 }
505
506 /**
507 * Always returns <tt>false</tt>.
508 * A <tt>SynchronousQueue</tt> has no internal capacity.
509 * @param c the collection
510 * @return <tt>false</tt>
511 */
512 public boolean removeAll(Collection<?> c) {
513 return false;
514 }
515
516 /**
517 * Always returns <tt>false</tt>.
518 * A <tt>SynchronousQueue</tt> has no internal capacity.
519 * @param c the collection
520 * @return <tt>false</tt>
521 */
522 public boolean retainAll(Collection<?> c) {
523 return false;
524 }
525
526 /**
527 * Always returns <tt>null</tt>.
528 * A <tt>SynchronousQueue</tt> does not return elements
529 * unless actively waited on.
530 * @return <tt>null</tt>
531 */
532 public E peek() {
533 return null;
534 }
535
536
537 static class EmptyIterator<E> implements Iterator<E> {
538 public boolean hasNext() {
539 return false;
540 }
541 public E next() {
542 throw new NoSuchElementException();
543 }
544 public void remove() {
545 throw new IllegalStateException();
546 }
547 }
548
549 /**
550 * Returns an empty iterator in which <tt>hasNext</tt> always returns
551 * <tt>false</tt>.
552 *
553 * @return an empty iterator
554 */
555 public Iterator<E> iterator() {
556 return new EmptyIterator<E>();
557 }
558
559
560 /**
561 * Returns a zero-length array.
562 * @return a zero-length array
563 */
564 public Object[] toArray() {
565 return new Object[0];
566 }
567
568 /**
569 * Sets the zeroeth element of the specified array to <tt>null</tt>
570 * (if the array has non-zero length) and returns it.
571 * @return the specified array
572 */
573 public <T> T[] toArray(T[] a) {
574 if (a.length > 0)
575 a[0] = null;
576 return a;
577 }
578
579
580 public int drainTo(Collection<? super E> c) {
581 if (c == null)
582 throw new NullPointerException();
583 if (c == this)
584 throw new IllegalArgumentException();
585 int n = 0;
586 E e;
587 while ( (e = poll()) != null) {
588 c.add(e);
589 ++n;
590 }
591 return n;
592 }
593
594 public int drainTo(Collection<? super E> c, int maxElements) {
595 if (c == null)
596 throw new NullPointerException();
597 if (c == this)
598 throw new IllegalArgumentException();
599 int n = 0;
600 E e;
601 while (n < maxElements && (e = poll()) != null) {
602 c.add(e);
603 ++n;
604 }
605 return n;
606 }
607 }
608
609
610
611
612