ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.16
Committed: Fri Aug 29 14:12:41 2003 UTC (20 years, 9 months ago) by dl
Branch: MAIN
Changes since 1.15: +3 -3 lines
Log Message:
Fix containsAll spec/code to return true if arg empty

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each <tt>put</tt>
13 * must wait for a <tt>take</tt>, and vice versa.
14 * A synchronous queue does not have any internal capacity - in particular
15 * it does not have a capacity of one. You cannot <tt>peek</tt> at a
16 * synchronous queue because an element is only present when you try to take
17 * it; you cannot add an element (using any method) unless another thread is
18 * trying to remove it; you cannot iterate as there is nothing to iterate.
19 * The <em>head</em> of the queue is the element that the first queued thread
20 * is trying to add to the queue; if there are no queued threads then no
21 * element is being added and the head is <tt>null</tt>.
22 * Many of the <tt>Collection</tt> methods make little or no sense for a
23 * synchronous queue.
24 * This queue does not permit <tt>null</tt> elements.
25 * <p>Synchronous queues are similar to rendezvous channels used
26 * in CSP and Ada. They are well suited for handoff designs, in which
27 * an object running in one thread must synch up with an object
28 * running in another thread in order to hand it some information,
29 * event, or task.
30 * @since 1.5
31 * @author Doug Lea
32 **/
33 public class SynchronousQueue<E> extends AbstractQueue<E>
34 implements BlockingQueue<E>, java.io.Serializable {
35 private static final long serialVersionUID = -3223113410248163686L;
36
37 /*
38 This implementation divides actions into two cases for puts:
39
40 * An arriving putter that does not already have a waiting taker
41 creates a node holding item, and then waits for a taker to take it.
42 * An arriving putter that does already have a waiting taker fills
43 the slot node created by the taker, and notifies it to continue.
44
45 And symmetrically, two for takes:
46
47 * An arriving taker that does not already have a waiting putter
48 creates an empty slot node, and then waits for a putter to fill it.
49 * An arriving taker that does already have a waiting putter takes
50 item from the node created by the putter, and notifies it to continue.
51
52 This requires keeping two simple queues: waitingPuts and waitingTakes.
53
54 When a put or take waiting for the actions of its counterpart
55 aborts due to interruption or timeout, it marks the node
56 it created as "CANCELLED", which causes its counterpart to retry
57 the entire put or take sequence.
58 */
59
60 /**
61 * Special marker used in queue nodes to indicate that
62 * the thread waiting for a change in the node has timed out
63 * or been interrupted.
64 **/
65 private static final Object CANCELLED = new Object();
66
67 /*
68 * Note that all fields are transient final, so there is
69 * no explicit serialization code.
70 */
71
72 private transient final WaitQueue waitingPuts = new WaitQueue();
73 private transient final WaitQueue waitingTakes = new WaitQueue();
74 private transient final ReentrantLock qlock = new ReentrantLock();
75
76 /**
77 * Nodes each maintain an item and handle waits and signals for
78 * getting and setting it. The class opportunistically extends
79 * ReentrantLock to save an extra object allocation per
80 * rendezvous.
81 */
82 private static class Node extends ReentrantLock {
83 /** Condition to wait on for other party; lazily constructed */
84 Condition done;
85 /** The item being transferred */
86 Object item;
87 /** Next node in wait queue */
88 Node next;
89
90 Node(Object x) { item = x; }
91
92 /**
93 * Fill in the slot created by the taker and signal taker to
94 * continue.
95 */
96 boolean set(Object x) {
97 this.lock();
98 try {
99 if (item != CANCELLED) {
100 item = x;
101 if (done != null)
102 done.signal();
103 return true;
104 } else // taker has cancelled
105 return false;
106 } finally {
107 this.unlock();
108 }
109 }
110
111 /**
112 * Remove item from slot created by putter and signal putter
113 * to continue.
114 */
115 Object get() {
116 this.lock();
117 try {
118 Object x = item;
119 if (x != CANCELLED) {
120 item = null;
121 next = null;
122 if (done != null)
123 done.signal();
124 return x;
125 } else
126 return null;
127 } finally {
128 this.unlock();
129 }
130 }
131
132 /**
133 * Wait for a taker to take item placed by putter, or time out.
134 */
135 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
136 this.lock();
137 try {
138 for (;;) {
139 if (item == null)
140 return true;
141 if (timed) {
142 if (nanos <= 0) {
143 item = CANCELLED;
144 return false;
145 }
146 }
147 if (done == null)
148 done = this.newCondition();
149 if (timed)
150 nanos = done.awaitNanos(nanos);
151 else
152 done.await();
153 }
154 } catch (InterruptedException ie) {
155 // If taken, return normally but set interrupt status
156 if (item == null) {
157 Thread.currentThread().interrupt();
158 return true;
159 } else {
160 item = CANCELLED;
161 done.signal(); // propagate signal
162 throw ie;
163 }
164 } finally {
165 this.unlock();
166 }
167 }
168
169 /**
170 * Wait for a putter to put item placed by taker, or time out.
171 */
172 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
173 this.lock();
174 try {
175 for (;;) {
176 Object x = item;
177 if (x != null) {
178 item = null;
179 next = null;
180 return x;
181 }
182 if (timed) {
183 if (nanos <= 0) {
184 item = CANCELLED;
185 return null;
186 }
187 }
188 if (done == null)
189 done = this.newCondition();
190 if (timed)
191 nanos = done.awaitNanos(nanos);
192 else
193 done.await();
194 }
195 } catch (InterruptedException ie) {
196 Object y = item;
197 if (y != null) {
198 item = null;
199 next = null;
200 Thread.currentThread().interrupt();
201 return y;
202 } else {
203 item = CANCELLED;
204 done.signal(); // propagate signal
205 throw ie;
206 }
207 } finally {
208 this.unlock();
209 }
210 }
211 }
212
213 /**
214 * Simple FIFO queue class to hold waiting puts/takes.
215 **/
216 private static class WaitQueue<E> {
217 Node head;
218 Node last;
219
220 Node enq(Object x) {
221 Node p = new Node(x);
222 if (last == null)
223 last = head = p;
224 else
225 last = last.next = p;
226 return p;
227 }
228
229 Node deq() {
230 Node p = head;
231 if (p != null && (head = p.next) == null)
232 last = null;
233 return p;
234 }
235 }
236
237 /**
238 * Main put algorithm, used by put, timed offer
239 */
240 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
241 if (x == null) throw new NullPointerException();
242 for (;;) {
243 Node node;
244 boolean mustWait;
245
246 qlock.lockInterruptibly();
247 try {
248 node = waitingTakes.deq();
249 if ( (mustWait = (node == null)) )
250 node = waitingPuts.enq(x);
251 } finally {
252 qlock.unlock();
253 }
254
255 if (mustWait)
256 return node.waitForTake(timed, nanos);
257
258 else if (node.set(x))
259 return true;
260
261 // else taker cancelled, so retry
262 }
263 }
264
265 /**
266 * Main take algorithm, used by take, timed poll
267 */
268 private E doTake(boolean timed, long nanos) throws InterruptedException {
269 for (;;) {
270 Node node;
271 boolean mustWait;
272
273 qlock.lockInterruptibly();
274 try {
275 node = waitingPuts.deq();
276 if ( (mustWait = (node == null)) )
277 node = waitingTakes.enq(null);
278 } finally {
279 qlock.unlock();
280 }
281
282 if (mustWait)
283 return (E)node.waitForPut(timed, nanos);
284
285 else {
286 E x = (E)node.get();
287 if (x != null)
288 return x;
289 // else cancelled, so retry
290 }
291 }
292 }
293
294 /**
295 * Creates a <tt>SynchronousQueue</tt>.
296 */
297 public SynchronousQueue() {}
298
299
300 /**
301 * Adds the specified element to this queue, waiting if necessary for
302 * another thread to receive it.
303 * @throws NullPointerException {@inheritDoc}
304 */
305 public void put(E o) throws InterruptedException {
306 doPut(o, false, 0);
307 }
308
309 /**
310 * Adds the specified element to this queue, waiting if necessary up to the
311 * specified wait time for another thread to receive it.
312 * @return <tt>true</tt> if successful, or <tt>false</tt> if
313 * the specified waiting time elapses before a taker appears.
314 * @throws NullPointerException {@inheritDoc}
315 */
316 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
317 return doPut(x, true, unit.toNanos(timeout));
318 }
319
320
321 /**
322 * Retrieves and removes the head of this queue, waiting if necessary
323 * for another thread to insert it.
324 * @return the head of this queue
325 */
326 public E take() throws InterruptedException {
327 return doTake(false, 0);
328 }
329
330 /**
331 * Retrieves and removes the head of this queue, waiting
332 * if necessary up to the specified wait time, for another thread
333 * to insert it.
334 */
335 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
336 return doTake(true, unit.toNanos(timeout));
337 }
338
339 // Untimed nonblocking versions
340
341 /**
342 * Adds the specified element to this queue, if another thread is
343 * waiting to receive it.
344 *
345 * @throws NullpointerException {@inheritDoc}
346 */
347 public boolean offer(E o) {
348 if (o == null) throw new NullPointerException();
349
350 for (;;) {
351 qlock.lock();
352 Node node;
353 try {
354 node = waitingTakes.deq();
355 } finally {
356 qlock.unlock();
357 }
358 if (node == null)
359 return false;
360
361 else if (node.set(o))
362 return true;
363 // else retry
364 }
365 }
366
367
368 public E poll() {
369 for (;;) {
370 Node node;
371 qlock.lock();
372 try {
373 node = waitingPuts.deq();
374 } finally {
375 qlock.unlock();
376 }
377 if (node == null)
378 return null;
379
380 else {
381 Object x = node.get();
382 if (x != null)
383 return (E)x;
384 // else retry
385 }
386 }
387 }
388
389
390 /**
391 * Adds the specified element to this queue.
392 * @return <tt>true</tt> (as per the general contract of
393 * <tt>Collection.add</tt>).
394 *
395 * @throws NullPointerException {@inheritDoc}
396 * @throws IllegalStateException if no thread is waiting to receive the
397 * element being added
398 */
399 public boolean add(E o) {
400 return super.add(o);
401 }
402
403
404 /**
405 * Adds all of the elements in the specified collection to this queue.
406 * The behavior of this operation is undefined if
407 * the specified collection is modified while the operation is in
408 * progress. (This implies that the behavior of this call is undefined if
409 * the specified collection is this queue, and this queue is nonempty.)
410 * <p>
411 * This implementation iterates over the specified collection, and adds
412 * each object returned by the iterator to this collection, in turn.
413 * @throws NullPointerException {@inheritDoc}
414 * @throws IllegalStateException if no thread is waiting to receive the
415 * element being added
416 */
417 public boolean addAll(Collection<? extends E> c) {
418 return super.addAll(c);
419 }
420
421 /**
422 * Always returns <tt>true</tt>.
423 * A <tt>SynchronousQueue</tt> has no internal capacity.
424 * @return <tt>true</tt>
425 */
426 public boolean isEmpty() {
427 return true;
428 }
429
430 /**
431 * Always returns zero.
432 * A <tt>SynchronousQueue</tt> has no internal capacity.
433 * @return zero.
434 */
435 public int size() {
436 return 0;
437 }
438
439 /**
440 * Always returns zero.
441 * A <tt>SynchronousQueue</tt> has no internal capacity.
442 * @return zero.
443 */
444 public int remainingCapacity() {
445 return 0;
446 }
447
448 /**
449 * Does nothing.
450 * A <tt>SynchronousQueue</tt> has no internal capacity.
451 */
452 public void clear() {}
453
454 /**
455 * Always returns <tt>false</tt>.
456 * A <tt>SynchronousQueue</tt> has no internal capacity.
457 * @return <tt>false</tt>
458 */
459 public boolean contains(Object o) {
460 return false;
461 }
462
463 /**
464 * Returns <tt>false</tt> unless given collection is empty.
465 * A <tt>SynchronousQueue</tt> has no internal capacity.
466 * @return <tt>false</tt> unless given collection is empty
467 */
468 public boolean containsAll(Collection<?> c) {
469 return c.isEmpty();
470 }
471
472 /**
473 * Always returns <tt>false</tt>.
474 * A <tt>SynchronousQueue</tt> has no internal capacity.
475 * @return <tt>false</tt>
476 */
477 public boolean removeAll(Collection<?> c) {
478 return false;
479 }
480
481 /**
482 * Always returns <tt>false</tt>.
483 * A <tt>SynchronousQueue</tt> has no internal capacity.
484 * @return <tt>false</tt>
485 */
486 public boolean retainAll(Collection<?> c) {
487 return false;
488 }
489
490 /**
491 * Always returns <tt>null</tt>.
492 * A <tt>SynchronousQueue</tt> does not return elements
493 * unless actively waited on.
494 * @return <tt>null</tt>
495 */
496 public E peek() {
497 return null;
498 }
499
500
501 static class EmptyIterator<E> implements Iterator<E> {
502 public boolean hasNext() {
503 return false;
504 }
505 public E next() {
506 throw new NoSuchElementException();
507 }
508 public void remove() {
509 throw new UnsupportedOperationException();
510 }
511 }
512
513 /**
514 * Returns an empty iterator: <tt>hasNext</tt> always returns
515 * <tt>false</tt>.
516 *
517 * @return an empty iterator
518 */
519 public Iterator<E> iterator() {
520 return new EmptyIterator<E>();
521 }
522
523
524 /**
525 * Returns a zero-length array.
526 * @return a zero-length array
527 */
528 public Object[] toArray() {
529 return (E[]) new Object[0];
530 }
531
532 /**
533 * Sets the zeroeth element of the specified array to <tt>null</tt>
534 * (if the array has non-zero length) and returns it.
535 * @return the specified array
536 */
537 public <T> T[] toArray(T[] a) {
538 if (a.length > 0)
539 a[0] = null;
540 return a;
541 }
542 }
543
544
545
546
547