ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.12
Committed: Wed Aug 6 10:28:23 2003 UTC (20 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.11: +3 -3 lines
Log Message:
Fixed SQ signatures; Fix CLQ.size

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each <tt>put</tt>
13 * must wait for a <tt>take</tt>, and vice versa.
14 * A synchronous queue does not have any internal capacity - in particular
15 * it does not have a capacity of one. You cannot <tt>peek</tt> at a
16 * synchronous queue because an element is only present when you try to take
17 * it; you cannot add an element (using any method) unless another thread is
18 * trying to remove it; you cannot iterate as there is nothing to iterate.
19 * The <em>head</em> of the queue is the element that the first queued thread
20 * is trying to add to the queue; if there are no queued threads then no
21 * element is being added and the head is <tt>null</tt>.
22 * Many of the <tt>Collection</tt> methods make little or no sense for a
23 * synchronous queue.
24 * This queue does not permit <tt>null</tt> elements.
25 * <p>Synchronous queues are similar to rendezvous channels used
26 * in CSP and Ada. They are well suited for handoff designs, in which
27 * an object running in one thread must synch up with an object
28 * running in another thread in order to hand it some information,
29 * event, or task.
30 * @since 1.5
31 * @author Doug Lea
32 **/
33 public class SynchronousQueue<E> extends AbstractQueue<E>
34 implements BlockingQueue<E>, java.io.Serializable {
35
36 /*
37 This implementation divides actions into two cases for puts:
38
39 * An arriving putter that does not already have a waiting taker
40 creates a node holding item, and then waits for a taker to take it.
41 * An arriving putter that does already have a waiting taker fills
42 the slot node created by the taker, and notifies it to continue.
43
44 And symmetrically, two for takes:
45
46 * An arriving taker that does not already have a waiting putter
47 creates an empty slot node, and then waits for a putter to fill it.
48 * An arriving taker that does already have a waiting putter takes
49 item from the node created by the putter, and notifies it to continue.
50
51 This requires keeping two simple queues: waitingPuts and waitingTakes.
52
53 When a put or take waiting for the actions of its counterpart
54 aborts due to interruption or timeout, it marks the node
55 it created as "CANCELLED", which causes its counterpart to retry
56 the entire put or take sequence.
57 */
58
59 /**
60 * Special marker used in queue nodes to indicate that
61 * the thread waiting for a change in the node has timed out
62 * or been interrupted.
63 **/
64 private static final Object CANCELLED = new Object();
65
66 /*
67 * Note that all fields are transient final, so there is
68 * no explicit serialization code.
69 */
70
71 private transient final WaitQueue waitingPuts = new WaitQueue();
72 private transient final WaitQueue waitingTakes = new WaitQueue();
73 private transient final ReentrantLock qlock = new ReentrantLock();
74
75 /**
76 * Nodes each maintain an item and handle waits and signals for
77 * getting and setting it. The class opportunistically extends
78 * ReentrantLock to save an extra object allocation per
79 * rendezvous.
80 */
81 private static class Node extends ReentrantLock {
82 /** Condition to wait on for other party; lazily constructed */
83 Condition done;
84 /** The item being transferred */
85 Object item;
86 /** Next node in wait queue */
87 Node next;
88
89 Node(Object x) { item = x; }
90
91 /**
92 * Fill in the slot created by the taker and signal taker to
93 * continue.
94 */
95 boolean set(Object x) {
96 this.lock();
97 try {
98 if (item != CANCELLED) {
99 item = x;
100 if (done != null)
101 done.signal();
102 return true;
103 }
104 else // taker has cancelled
105 return false;
106 }
107 finally {
108 this.unlock();
109 }
110 }
111
112 /**
113 * Remove item from slot created by putter and signal putter
114 * to continue.
115 */
116 Object get() {
117 this.lock();
118 try {
119 Object x = item;
120 if (x != CANCELLED) {
121 item = null;
122 next = null;
123 if (done != null)
124 done.signal();
125 return x;
126 }
127 else
128 return null;
129 }
130 finally {
131 this.unlock();
132 }
133 }
134
135 /**
136 * Wait for a taker to take item placed by putter, or time out.
137 */
138 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
139 this.lock();
140 try {
141 for (;;) {
142 if (item == null)
143 return true;
144 if (timed) {
145 if (nanos <= 0) {
146 item = CANCELLED;
147 return false;
148 }
149 }
150 if (done == null)
151 done = this.newCondition();
152 if (timed)
153 nanos = done.awaitNanos(nanos);
154 else
155 done.await();
156 }
157 }
158 catch (InterruptedException ie) {
159 // If taken, return normally but set interrupt status
160 if (item == null) {
161 Thread.currentThread().interrupt();
162 return true;
163 }
164 else {
165 item = CANCELLED;
166 done.signal(); // propagate signal
167 throw ie;
168 }
169 }
170 finally {
171 this.unlock();
172 }
173 }
174
175 /**
176 * Wait for a putter to put item placed by taker, or time out.
177 */
178 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
179 this.lock();
180 try {
181 for (;;) {
182 Object x = item;
183 if (x != null) {
184 item = null;
185 next = null;
186 return x;
187 }
188 if (timed) {
189 if (nanos <= 0) {
190 item = CANCELLED;
191 return null;
192 }
193 }
194 if (done == null)
195 done = this.newCondition();
196 if (timed)
197 nanos = done.awaitNanos(nanos);
198 else
199 done.await();
200 }
201 }
202 catch (InterruptedException ie) {
203 Object y = item;
204 if (y != null) {
205 item = null;
206 next = null;
207 Thread.currentThread().interrupt();
208 return y;
209 }
210 else {
211 item = CANCELLED;
212 done.signal(); // propagate signal
213 throw ie;
214 }
215 }
216 finally {
217 this.unlock();
218 }
219 }
220 }
221
222 /**
223 * Simple FIFO queue class to hold waiting puts/takes.
224 **/
225 private static class WaitQueue<E> {
226 Node head;
227 Node last;
228
229 Node enq(Object x) {
230 Node p = new Node(x);
231 if (last == null)
232 last = head = p;
233 else
234 last = last.next = p;
235 return p;
236 }
237
238 Node deq() {
239 Node p = head;
240 if (p != null && (head = p.next) == null)
241 last = null;
242 return p;
243 }
244 }
245
246 /**
247 * Main put algorithm, used by put, timed offer
248 */
249 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
250 if (x == null) throw new NullPointerException();
251 for (;;) {
252 Node node;
253 boolean mustWait;
254
255 qlock.lockInterruptibly();
256 try {
257 node = waitingTakes.deq();
258 if ( (mustWait = (node == null)) )
259 node = waitingPuts.enq(x);
260 }
261 finally {
262 qlock.unlock();
263 }
264
265 if (mustWait)
266 return node.waitForTake(timed, nanos);
267
268 else if (node.set(x))
269 return true;
270
271 // else taker cancelled, so retry
272 }
273 }
274
275 /**
276 * Main take algorithm, used by take, timed poll
277 */
278 private E doTake(boolean timed, long nanos) throws InterruptedException {
279 for (;;) {
280 Node node;
281 boolean mustWait;
282
283 qlock.lockInterruptibly();
284 try {
285 node = waitingPuts.deq();
286 if ( (mustWait = (node == null)) )
287 node = waitingTakes.enq(null);
288 }
289 finally {
290 qlock.unlock();
291 }
292
293 if (mustWait)
294 return (E)node.waitForPut(timed, nanos);
295
296 else {
297 E x = (E)node.get();
298 if (x != null)
299 return x;
300 // else cancelled, so retry
301 }
302 }
303 }
304
305 /**
306 * Creates a <tt>SynchronousQueue</tt>
307 */
308 public SynchronousQueue() {}
309
310
311 /**
312 * Adds the specified element to this queue, waiting if necessary for
313 * another thread to receive it.
314 * @throws NullPointerException {@inheritDoc}
315 */
316 public void put(E o) throws InterruptedException {
317 doPut(o, false, 0);
318 }
319
320 /**
321 * Adds the specified element to this queue, waiting if necessary up to the
322 * specified wait time for another thread to receive it.
323 * @return <tt>true</tt> if successful, or <tt>false</tt> if
324 * the specified waiting time elapses before a taker appears.
325 * @throws NullPointerException {@inheritDoc}
326 */
327 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
328 return doPut(x, true, unit.toNanos(timeout));
329 }
330
331
332 /**
333 * Retrieves and removes the head of this queue, waiting if necessary
334 * for another thread to insert it.
335 * @return the head of this queue
336 */
337 public E take() throws InterruptedException {
338 return doTake(false, 0);
339 }
340
341 /**
342 * Retrieves and removes the head of this queue, waiting
343 * if necessary up to the specified wait time, for another thread
344 * to insert it.
345 */
346 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
347 return doTake(true, unit.toNanos(timeout));
348 }
349
350 // Untimed nonblocking versions
351
352 /**
353 * Adds the specified element to this queue, if another thread is
354 * waiting to receive it.
355 *
356 * @throws NullpointerException {@inheritDoc}
357 */
358 public boolean offer(E o) {
359 if (o == null) throw new NullPointerException();
360
361 for (;;) {
362 qlock.lock();
363 Node node;
364 try {
365 node = waitingTakes.deq();
366 }
367 finally {
368 qlock.unlock();
369 }
370 if (node == null)
371 return false;
372
373 else if (node.set(o))
374 return true;
375 // else retry
376 }
377 }
378
379
380 public E poll() {
381 for (;;) {
382 Node node;
383 qlock.lock();
384 try {
385 node = waitingPuts.deq();
386 }
387 finally {
388 qlock.unlock();
389 }
390 if (node == null)
391 return null;
392
393 else {
394 Object x = node.get();
395 if (x != null)
396 return (E)x;
397 // else retry
398 }
399 }
400 }
401
402
403 /**
404 * Adds the specified element to this queue.
405 * @return <tt>true</tt> (as per the general contract of
406 * <tt>Collection.add</tt>).
407 *
408 * @throws NullPointerException {@inheritDoc}
409 * @throws IllegalStateException if no thread is waiting to receive the
410 * element being added
411 */
412 public boolean add(E o) {
413 return super.add(o);
414 }
415
416
417 /**
418 * Adds all of the elements in the specified collection to this queue.
419 * The behavior of this operation is undefined if
420 * the specified collection is modified while the operation is in
421 * progress. (This implies that the behavior of this call is undefined if
422 * the specified collection is this queue, and this queue is nonempty.)
423 * <p>
424 * This implementation iterates over the specified collection, and adds
425 * each object returned by the iterator to this collection, in turn.
426 * @throws NullPointerException {@inheritDoc}
427 * @throws IllegalStateException if no thread is waiting to receive the
428 * element being added
429 */
430 public boolean addAll(Collection<? extends E> c) {
431 return super.addAll(c);
432 }
433
434 /**
435 * Always returns <tt>true</tt>.
436 * A <tt>SynchronousQueue</tt> has no internal capacity.
437 * @return <tt>true</tt>
438 */
439 public boolean isEmpty() {
440 return true;
441 }
442
443 /**
444 * Always returns zero.
445 * A <tt>SynchronousQueue</tt> has no internal capacity.
446 * @return zero.
447 */
448 public int size() {
449 return 0;
450 }
451
452 /**
453 * Always returns zero.
454 * A <tt>SynchronousQueue</tt> has no internal capacity.
455 * @return zero.
456 */
457 public int remainingCapacity() {
458 return 0;
459 }
460
461 /**
462 * Does nothing.
463 * A <tt>SynchronousQueue</tt> has no internal capacity.
464 */
465 public void clear() {}
466
467 /**
468 * Always returns <tt>false</tt>.
469 * A <tt>SynchronousQueue</tt> has no internal capacity.
470 * @return <tt>false</tt>
471 */
472 public boolean contains(Object o) {
473 return false;
474 }
475
476 /**
477 * Always returns <tt>false</tt>.
478 * A <tt>SynchronousQueue</tt> has no internal capacity.
479 * @return <tt>false</tt>
480 */
481 public boolean containsAll(Collection<?> c) {
482 return false;
483 }
484
485 /**
486 * Always returns <tt>false</tt>.
487 * A <tt>SynchronousQueue</tt> has no internal capacity.
488 * @return <tt>false</tt>
489 */
490 public boolean removeAll(Collection<?> c) {
491 return false;
492 }
493
494 /**
495 * Always returns <tt>false</tt>.
496 * A <tt>SynchronousQueue</tt> has no internal capacity.
497 * @return <tt>false</tt>
498 */
499 public boolean retainAll(Collection<?> c) {
500 return false;
501 }
502
503 /**
504 * Always returns <tt>null</tt>.
505 * A <tt>SynchronousQueue</tt> does not return elements
506 * unless actively waited on.
507 * @return <tt>null</tt>
508 */
509 public E peek() {
510 return null;
511 }
512
513
514 static class EmptyIterator<E> implements Iterator<E> {
515 public boolean hasNext() {
516 return false;
517 }
518 public E next() {
519 throw new NoSuchElementException();
520 }
521 public void remove() {
522 throw new UnsupportedOperationException();
523 }
524 }
525
526 /**
527 * Returns an empty iterator: <tt>hasNext</tt> always returns
528 * <tt>false</tt>
529 * @return an empty iterator
530 */
531 public Iterator<E> iterator() {
532 return new EmptyIterator<E>();
533 }
534
535
536 /**
537 * Returns a zero-length array.
538 * @return a zero-length array
539 */
540 public Object[] toArray() {
541 return (E[]) new Object[0];
542 }
543
544 /**
545 * Sets the zeroeth element of the specified array to <tt>null</tt>
546 * (if the array has non-zero length) and returns it.
547 * @return the specified array
548 */
549 public <T> T[] toArray(T[] a) {
550 if (a.length > 0)
551 a[0] = null;
552 return a;
553 }
554 }
555
556
557
558
559