ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.33
Committed: Fri Jan 2 00:38:33 2004 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.32: +16 -41 lines
Log Message:
Use ACS in FutureTask; doc improvements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each
13 * <tt>put</tt> must wait for a <tt>take</tt>, and vice versa. A
14 * synchronous queue does not have any internal capacity - in
15 * particular it does not have a capacity of one. You cannot
16 * <tt>peek</tt> at a synchronous queue because an element is only
17 * present when you try to take it; you cannot add an element (using
18 * any method) unless another thread is trying to remove it; you
19 * cannot iterate as there is nothing to iterate. The <em>head</em>
20 * of the queue is the element that the first queued thread is trying
21 * to add to the queue; if there are no queued threads then no element
22 * is being added and the head is <tt>null</tt>. For purposes of
23 * other <tt>Collection</tt> methods (for example <tt>contains</tt>),
24 * a <tt>SynchronousQueue</tt> acts as an empty collection. This
25 * queue does not permit <tt>null</tt> elements.
26 *
27 * <p>Synchronous queues are similar to rendezvous channels used in
28 * CSP and Ada. They are well suited for handoff designs, in which an
29 * object running in one thread must sync up with an object running
30 * in another thread in order to hand it some information, event, or
31 * task.
32 * <p>This class implements all of the <em>optional</em> methods
33 * of the {@link Collection} and {@link Iterator} interfaces.
34 * @since 1.5
35 * @author Doug Lea
36 * @param <E> the type of elements held in this collection
37 */
38 public class SynchronousQueue<E> extends AbstractQueue<E>
39 implements BlockingQueue<E>, java.io.Serializable {
40 private static final long serialVersionUID = -3223113410248163686L;
41
42 /*
43 This implementation divides actions into two cases for puts:
44
45 * An arriving putter that does not already have a waiting taker
46 creates a node holding item, and then waits for a taker to take it.
47 * An arriving putter that does already have a waiting taker fills
48 the slot node created by the taker, and notifies it to continue.
49
50 And symmetrically, two for takes:
51
52 * An arriving taker that does not already have a waiting putter
53 creates an empty slot node, and then waits for a putter to fill it.
54 * An arriving taker that does already have a waiting putter takes
55 item from the node created by the putter, and notifies it to continue.
56
57 This requires keeping two simple queues: waitingPuts and waitingTakes.
58
59 When a put or take waiting for the actions of its counterpart
60 aborts due to interruption or timeout, it marks the node
61 it created as "CANCELLED", which causes its counterpart to retry
62 the entire put or take sequence.
63 */
64
65
66 /*
67 * Note that all fields are transient final, so there is
68 * no explicit serialization code.
69 */
70
71 private transient final WaitQueue waitingPuts = new WaitQueue();
72 private transient final WaitQueue waitingTakes = new WaitQueue();
73 private transient final ReentrantLock qlock = new ReentrantLock();
74
75 /**
76 * Nodes each maintain an item and handle waits and signals for
77 * getting and setting it. The class extends
78 * AbstractQueuedSynchronizer to manage blocking, using AQS state
79 * 0 for waiting, 1 for ack, -1 for cancelled.
80 */
81 private static final class Node extends AbstractQueuedSynchronizer {
82 /** The item being transferred */
83 Object item;
84 /** Next node in wait queue */
85 Node next;
86 Node(Object x) { item = x; }
87
88 private static final int WAITING = 0;
89 private static final int ACKED = 1;
90 private static final int CANCELLED = -1;
91
92 /**
93 * Implements AQS base acquire to succeed if not in WAITING state
94 */
95 public boolean tryAcquireExclusiveState(boolean b, int ignore) {
96 return getState() != WAITING;
97 }
98
99 /**
100 * Implements AQS base release to always signal.
101 * Status is changed in ack or cancel methods before calling,
102 * Which we need to do because we need their return values.
103 */
104 public boolean releaseExclusiveState(int ignore) {
105 return true;
106 }
107
108 /**
109 * Try to acknowledge; fail if not waiting
110 */
111 private boolean ack() {
112 if (!compareAndSetState(WAITING, ACKED))
113 return false;
114 releaseExclusive(0);
115 return true;
116 }
117
118 /**
119 * Try to cancel; fail if not waiting
120 */
121 private boolean cancel() {
122 if (!compareAndSetState(WAITING, CANCELLED))
123 return false;
124 releaseExclusive(0);
125 return true;
126 }
127
128 /**
129 * Take item and null out fields (for sake of GC)
130 */
131 private Object extract() {
132 Object x = item;
133 item = null;
134 next = null;
135 return x;
136 }
137
138 /**
139 * Fill in the slot created by the taker and signal taker to
140 * continue.
141 */
142 boolean setItem(Object x) {
143 item = x;
144 return ack();
145 }
146
147 /**
148 * Remove item from slot created by putter and signal putter
149 * to continue.
150 */
151 Object getItem() {
152 return (ack())? extract() : null;
153 }
154
155 /**
156 * Wait for a taker to take item placed by putter or time out.
157 */
158 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
159 try {
160 if (!timed)
161 acquireExclusiveInterruptibly(0);
162 else if (!acquireExclusiveTimed(0, nanos) && cancel())
163 return false;
164 return true;
165 } catch (InterruptedException ie) {
166 if (cancel())
167 throw ie;
168 Thread.currentThread().interrupt();
169 return true;
170 }
171 }
172
173 /**
174 * Wait for a putter to put item placed by taker, or time out.
175 */
176 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
177 try {
178 if (!timed)
179 acquireExclusiveInterruptibly(0);
180 else if (!acquireExclusiveTimed(0, nanos) && cancel())
181 return null;
182 return extract();
183 } catch (InterruptedException ie) {
184 if (cancel())
185 throw ie;
186 Thread.currentThread().interrupt();
187 return extract();
188 }
189 }
190
191 }
192
193 /**
194 * Simple FIFO queue class to hold waiting puts/takes.
195 **/
196 private static class WaitQueue<E> {
197 Node head;
198 Node last;
199
200 Node enq(Object x) {
201 Node p = new Node(x);
202 if (last == null)
203 last = head = p;
204 else
205 last = last.next = p;
206 return p;
207 }
208
209 Node deq() {
210 Node p = head;
211 if (p != null && (head = p.next) == null)
212 last = null;
213 return p;
214 }
215 }
216
217 /**
218 * Main put algorithm, used by put, timed offer
219 */
220 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
221 if (x == null) throw new NullPointerException();
222 for (;;) {
223 Node node;
224 boolean mustWait;
225 final ReentrantLock qlock = this.qlock;
226 qlock.lockInterruptibly();
227 try {
228 node = waitingTakes.deq();
229 if ( (mustWait = (node == null)) )
230 node = waitingPuts.enq(x);
231 } finally {
232 qlock.unlock();
233 }
234
235 if (mustWait)
236 return node.waitForTake(timed, nanos);
237
238 else if (node.setItem(x))
239 return true;
240
241 // else taker cancelled, so retry
242 }
243 }
244
245 /**
246 * Main take algorithm, used by take, timed poll
247 */
248 private E doTake(boolean timed, long nanos) throws InterruptedException {
249 for (;;) {
250 Node node;
251 boolean mustWait;
252
253 final ReentrantLock qlock = this.qlock;
254 qlock.lockInterruptibly();
255 try {
256 node = waitingPuts.deq();
257 if ( (mustWait = (node == null)) )
258 node = waitingTakes.enq(null);
259 } finally {
260 qlock.unlock();
261 }
262
263 if (mustWait) {
264 Object x = node.waitForPut(timed, nanos);
265 return (E)x;
266 }
267 else {
268 Object x = node.getItem();
269 if (x != null)
270 return (E)x;
271 // else cancelled, so retry
272 }
273 }
274 }
275
276 /**
277 * Creates a <tt>SynchronousQueue</tt>.
278 */
279 public SynchronousQueue() {}
280
281
282 /**
283 * Adds the specified element to this queue, waiting if necessary for
284 * another thread to receive it.
285 * @param o the element to add
286 * @throws InterruptedException if interrupted while waiting.
287 * @throws NullPointerException if the specified element is <tt>null</tt>.
288 */
289 public void put(E o) throws InterruptedException {
290 doPut(o, false, 0);
291 }
292
293 /**
294 * Inserts the specified element into this queue, waiting if necessary
295 * up to the specified wait time for another thread to receive it.
296 * @param o the element to add
297 * @param timeout how long to wait before giving up, in units of
298 * <tt>unit</tt>
299 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
300 * <tt>timeout</tt> parameter
301 * @return <tt>true</tt> if successful, or <tt>false</tt> if
302 * the specified waiting time elapses before a taker appears.
303 * @throws InterruptedException if interrupted while waiting.
304 * @throws NullPointerException if the specified element is <tt>null</tt>.
305 */
306 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
307 return doPut(o, true, unit.toNanos(timeout));
308 }
309
310
311 /**
312 * Retrieves and removes the head of this queue, waiting if necessary
313 * for another thread to insert it.
314 * @return the head of this queue
315 */
316 public E take() throws InterruptedException {
317 return doTake(false, 0);
318 }
319
320 /**
321 * Retrieves and removes the head of this queue, waiting
322 * if necessary up to the specified wait time, for another thread
323 * to insert it.
324 * @param timeout how long to wait before giving up, in units of
325 * <tt>unit</tt>
326 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
327 * <tt>timeout</tt> parameter
328 * @return the head of this queue, or <tt>null</tt> if the
329 * specified waiting time elapses before an element is present.
330 * @throws InterruptedException if interrupted while waiting.
331 */
332 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
333 return doTake(true, unit.toNanos(timeout));
334 }
335
336 // Untimed nonblocking versions
337
338 /**
339 * Inserts the specified element into this queue, if another thread is
340 * waiting to receive it.
341 *
342 * @param o the element to add.
343 * @return <tt>true</tt> if it was possible to add the element to
344 * this queue, else <tt>false</tt>
345 * @throws NullPointerException if the specified element is <tt>null</tt>
346 */
347 public boolean offer(E o) {
348 if (o == null) throw new NullPointerException();
349 final ReentrantLock qlock = this.qlock;
350
351 for (;;) {
352 Node node;
353 qlock.lock();
354 try {
355 node = waitingTakes.deq();
356 } finally {
357 qlock.unlock();
358 }
359 if (node == null)
360 return false;
361
362 else if (node.setItem(o))
363 return true;
364 // else retry
365 }
366 }
367
368 /**
369 * Retrieves and removes the head of this queue, if another thread
370 * is currently making an element available.
371 *
372 * @return the head of this queue, or <tt>null</tt> if no
373 * element is available.
374 */
375 public E poll() {
376 final ReentrantLock qlock = this.qlock;
377 for (;;) {
378 Node node;
379 qlock.lock();
380 try {
381 node = waitingPuts.deq();
382 } finally {
383 qlock.unlock();
384 }
385 if (node == null)
386 return null;
387
388 else {
389 Object x = node.getItem();
390 if (x != null)
391 return (E)x;
392 // else retry
393 }
394 }
395 }
396
397 /**
398 * Always returns <tt>true</tt>.
399 * A <tt>SynchronousQueue</tt> has no internal capacity.
400 * @return <tt>true</tt>
401 */
402 public boolean isEmpty() {
403 return true;
404 }
405
406 /**
407 * Always returns zero.
408 * A <tt>SynchronousQueue</tt> has no internal capacity.
409 * @return zero.
410 */
411 public int size() {
412 return 0;
413 }
414
415 /**
416 * Always returns zero.
417 * A <tt>SynchronousQueue</tt> has no internal capacity.
418 * @return zero.
419 */
420 public int remainingCapacity() {
421 return 0;
422 }
423
424 /**
425 * Does nothing.
426 * A <tt>SynchronousQueue</tt> has no internal capacity.
427 */
428 public void clear() {}
429
430 /**
431 * Always returns <tt>false</tt>.
432 * A <tt>SynchronousQueue</tt> has no internal capacity.
433 * @param o the element
434 * @return <tt>false</tt>
435 */
436 public boolean contains(Object o) {
437 return false;
438 }
439
440 /**
441 * Always returns <tt>false</tt>.
442 * A <tt>SynchronousQueue</tt> has no internal capacity.
443 *
444 * @param o the element to remove
445 * @return <tt>false</tt>
446 */
447 public boolean remove(Object o) {
448 return false;
449 }
450
451 /**
452 * Returns <tt>false</tt> unless given collection is empty.
453 * A <tt>SynchronousQueue</tt> has no internal capacity.
454 * @param c the collection
455 * @return <tt>false</tt> unless given collection is empty
456 */
457 public boolean containsAll(Collection<?> c) {
458 return c.isEmpty();
459 }
460
461 /**
462 * Always returns <tt>false</tt>.
463 * A <tt>SynchronousQueue</tt> has no internal capacity.
464 * @param c the collection
465 * @return <tt>false</tt>
466 */
467 public boolean removeAll(Collection<?> c) {
468 return false;
469 }
470
471 /**
472 * Always returns <tt>false</tt>.
473 * A <tt>SynchronousQueue</tt> has no internal capacity.
474 * @param c the collection
475 * @return <tt>false</tt>
476 */
477 public boolean retainAll(Collection<?> c) {
478 return false;
479 }
480
481 /**
482 * Always returns <tt>null</tt>.
483 * A <tt>SynchronousQueue</tt> does not return elements
484 * unless actively waited on.
485 * @return <tt>null</tt>
486 */
487 public E peek() {
488 return null;
489 }
490
491
492 static class EmptyIterator<E> implements Iterator<E> {
493 public boolean hasNext() {
494 return false;
495 }
496 public E next() {
497 throw new NoSuchElementException();
498 }
499 public void remove() {
500 throw new IllegalStateException();
501 }
502 }
503
504 /**
505 * Returns an empty iterator in which <tt>hasNext</tt> always returns
506 * <tt>false</tt>.
507 *
508 * @return an empty iterator
509 */
510 public Iterator<E> iterator() {
511 return new EmptyIterator<E>();
512 }
513
514
515 /**
516 * Returns a zero-length array.
517 * @return a zero-length array
518 */
519 public Object[] toArray() {
520 return new Object[0];
521 }
522
523 /**
524 * Sets the zeroeth element of the specified array to <tt>null</tt>
525 * (if the array has non-zero length) and returns it.
526 * @return the specified array
527 */
528 public <T> T[] toArray(T[] a) {
529 if (a.length > 0)
530 a[0] = null;
531 return a;
532 }
533
534
535 public int drainTo(Collection<? super E> c) {
536 if (c == null)
537 throw new NullPointerException();
538 if (c == this)
539 throw new IllegalArgumentException();
540 int n = 0;
541 E e;
542 while ( (e = poll()) != null) {
543 c.add(e);
544 ++n;
545 }
546 return n;
547 }
548
549 public int drainTo(Collection<? super E> c, int maxElements) {
550 if (c == null)
551 throw new NullPointerException();
552 if (c == this)
553 throw new IllegalArgumentException();
554 int n = 0;
555 E e;
556 while (n < maxElements && (e = poll()) != null) {
557 c.add(e);
558 ++n;
559 }
560 return n;
561 }
562 }
563
564
565
566
567