ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.14
Committed: Fri Aug 8 20:05:07 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.13: +14 -28 lines
Log Message:
Scrunched catch, finally, else clauses.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@linkplain BlockingQueue blocking queue} in which each <tt>put</tt>
13 * must wait for a <tt>take</tt>, and vice versa.
14 * A synchronous queue does not have any internal capacity - in particular
15 * it does not have a capacity of one. You cannot <tt>peek</tt> at a
16 * synchronous queue because an element is only present when you try to take
17 * it; you cannot add an element (using any method) unless another thread is
18 * trying to remove it; you cannot iterate as there is nothing to iterate.
19 * The <em>head</em> of the queue is the element that the first queued thread
20 * is trying to add to the queue; if there are no queued threads then no
21 * element is being added and the head is <tt>null</tt>.
22 * Many of the <tt>Collection</tt> methods make little or no sense for a
23 * synchronous queue.
24 * This queue does not permit <tt>null</tt> elements.
25 * <p>Synchronous queues are similar to rendezvous channels used
26 * in CSP and Ada. They are well suited for handoff designs, in which
27 * an object running in one thread must synch up with an object
28 * running in another thread in order to hand it some information,
29 * event, or task.
30 * @since 1.5
31 * @author Doug Lea
32 **/
33 public class SynchronousQueue<E> extends AbstractQueue<E>
34 implements BlockingQueue<E>, java.io.Serializable {
35
36 /*
37 This implementation divides actions into two cases for puts:
38
39 * An arriving putter that does not already have a waiting taker
40 creates a node holding item, and then waits for a taker to take it.
41 * An arriving putter that does already have a waiting taker fills
42 the slot node created by the taker, and notifies it to continue.
43
44 And symmetrically, two for takes:
45
46 * An arriving taker that does not already have a waiting putter
47 creates an empty slot node, and then waits for a putter to fill it.
48 * An arriving taker that does already have a waiting putter takes
49 item from the node created by the putter, and notifies it to continue.
50
51 This requires keeping two simple queues: waitingPuts and waitingTakes.
52
53 When a put or take waiting for the actions of its counterpart
54 aborts due to interruption or timeout, it marks the node
55 it created as "CANCELLED", which causes its counterpart to retry
56 the entire put or take sequence.
57 */
58
59 /**
60 * Special marker used in queue nodes to indicate that
61 * the thread waiting for a change in the node has timed out
62 * or been interrupted.
63 **/
64 private static final Object CANCELLED = new Object();
65
66 /*
67 * Note that all fields are transient final, so there is
68 * no explicit serialization code.
69 */
70
71 private transient final WaitQueue waitingPuts = new WaitQueue();
72 private transient final WaitQueue waitingTakes = new WaitQueue();
73 private transient final ReentrantLock qlock = new ReentrantLock();
74
75 /**
76 * Nodes each maintain an item and handle waits and signals for
77 * getting and setting it. The class opportunistically extends
78 * ReentrantLock to save an extra object allocation per
79 * rendezvous.
80 */
81 private static class Node extends ReentrantLock {
82 /** Condition to wait on for other party; lazily constructed */
83 Condition done;
84 /** The item being transferred */
85 Object item;
86 /** Next node in wait queue */
87 Node next;
88
89 Node(Object x) { item = x; }
90
91 /**
92 * Fill in the slot created by the taker and signal taker to
93 * continue.
94 */
95 boolean set(Object x) {
96 this.lock();
97 try {
98 if (item != CANCELLED) {
99 item = x;
100 if (done != null)
101 done.signal();
102 return true;
103 } else // taker has cancelled
104 return false;
105 } finally {
106 this.unlock();
107 }
108 }
109
110 /**
111 * Remove item from slot created by putter and signal putter
112 * to continue.
113 */
114 Object get() {
115 this.lock();
116 try {
117 Object x = item;
118 if (x != CANCELLED) {
119 item = null;
120 next = null;
121 if (done != null)
122 done.signal();
123 return x;
124 } else
125 return null;
126 } finally {
127 this.unlock();
128 }
129 }
130
131 /**
132 * Wait for a taker to take item placed by putter, or time out.
133 */
134 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
135 this.lock();
136 try {
137 for (;;) {
138 if (item == null)
139 return true;
140 if (timed) {
141 if (nanos <= 0) {
142 item = CANCELLED;
143 return false;
144 }
145 }
146 if (done == null)
147 done = this.newCondition();
148 if (timed)
149 nanos = done.awaitNanos(nanos);
150 else
151 done.await();
152 }
153 } catch (InterruptedException ie) {
154 // If taken, return normally but set interrupt status
155 if (item == null) {
156 Thread.currentThread().interrupt();
157 return true;
158 } else {
159 item = CANCELLED;
160 done.signal(); // propagate signal
161 throw ie;
162 }
163 } finally {
164 this.unlock();
165 }
166 }
167
168 /**
169 * Wait for a putter to put item placed by taker, or time out.
170 */
171 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
172 this.lock();
173 try {
174 for (;;) {
175 Object x = item;
176 if (x != null) {
177 item = null;
178 next = null;
179 return x;
180 }
181 if (timed) {
182 if (nanos <= 0) {
183 item = CANCELLED;
184 return null;
185 }
186 }
187 if (done == null)
188 done = this.newCondition();
189 if (timed)
190 nanos = done.awaitNanos(nanos);
191 else
192 done.await();
193 }
194 } catch (InterruptedException ie) {
195 Object y = item;
196 if (y != null) {
197 item = null;
198 next = null;
199 Thread.currentThread().interrupt();
200 return y;
201 } else {
202 item = CANCELLED;
203 done.signal(); // propagate signal
204 throw ie;
205 }
206 } finally {
207 this.unlock();
208 }
209 }
210 }
211
212 /**
213 * Simple FIFO queue class to hold waiting puts/takes.
214 **/
215 private static class WaitQueue<E> {
216 Node head;
217 Node last;
218
219 Node enq(Object x) {
220 Node p = new Node(x);
221 if (last == null)
222 last = head = p;
223 else
224 last = last.next = p;
225 return p;
226 }
227
228 Node deq() {
229 Node p = head;
230 if (p != null && (head = p.next) == null)
231 last = null;
232 return p;
233 }
234 }
235
236 /**
237 * Main put algorithm, used by put, timed offer
238 */
239 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
240 if (x == null) throw new NullPointerException();
241 for (;;) {
242 Node node;
243 boolean mustWait;
244
245 qlock.lockInterruptibly();
246 try {
247 node = waitingTakes.deq();
248 if ( (mustWait = (node == null)) )
249 node = waitingPuts.enq(x);
250 } finally {
251 qlock.unlock();
252 }
253
254 if (mustWait)
255 return node.waitForTake(timed, nanos);
256
257 else if (node.set(x))
258 return true;
259
260 // else taker cancelled, so retry
261 }
262 }
263
264 /**
265 * Main take algorithm, used by take, timed poll
266 */
267 private E doTake(boolean timed, long nanos) throws InterruptedException {
268 for (;;) {
269 Node node;
270 boolean mustWait;
271
272 qlock.lockInterruptibly();
273 try {
274 node = waitingPuts.deq();
275 if ( (mustWait = (node == null)) )
276 node = waitingTakes.enq(null);
277 } finally {
278 qlock.unlock();
279 }
280
281 if (mustWait)
282 return (E)node.waitForPut(timed, nanos);
283
284 else {
285 E x = (E)node.get();
286 if (x != null)
287 return x;
288 // else cancelled, so retry
289 }
290 }
291 }
292
293 /**
294 * Creates a <tt>SynchronousQueue</tt>.
295 */
296 public SynchronousQueue() {}
297
298
299 /**
300 * Adds the specified element to this queue, waiting if necessary for
301 * another thread to receive it.
302 * @throws NullPointerException {@inheritDoc}
303 */
304 public void put(E o) throws InterruptedException {
305 doPut(o, false, 0);
306 }
307
308 /**
309 * Adds the specified element to this queue, waiting if necessary up to the
310 * specified wait time for another thread to receive it.
311 * @return <tt>true</tt> if successful, or <tt>false</tt> if
312 * the specified waiting time elapses before a taker appears.
313 * @throws NullPointerException {@inheritDoc}
314 */
315 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
316 return doPut(x, true, unit.toNanos(timeout));
317 }
318
319
320 /**
321 * Retrieves and removes the head of this queue, waiting if necessary
322 * for another thread to insert it.
323 * @return the head of this queue
324 */
325 public E take() throws InterruptedException {
326 return doTake(false, 0);
327 }
328
329 /**
330 * Retrieves and removes the head of this queue, waiting
331 * if necessary up to the specified wait time, for another thread
332 * to insert it.
333 */
334 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
335 return doTake(true, unit.toNanos(timeout));
336 }
337
338 // Untimed nonblocking versions
339
340 /**
341 * Adds the specified element to this queue, if another thread is
342 * waiting to receive it.
343 *
344 * @throws NullpointerException {@inheritDoc}
345 */
346 public boolean offer(E o) {
347 if (o == null) throw new NullPointerException();
348
349 for (;;) {
350 qlock.lock();
351 Node node;
352 try {
353 node = waitingTakes.deq();
354 } finally {
355 qlock.unlock();
356 }
357 if (node == null)
358 return false;
359
360 else if (node.set(o))
361 return true;
362 // else retry
363 }
364 }
365
366
367 public E poll() {
368 for (;;) {
369 Node node;
370 qlock.lock();
371 try {
372 node = waitingPuts.deq();
373 } finally {
374 qlock.unlock();
375 }
376 if (node == null)
377 return null;
378
379 else {
380 Object x = node.get();
381 if (x != null)
382 return (E)x;
383 // else retry
384 }
385 }
386 }
387
388
389 /**
390 * Adds the specified element to this queue.
391 * @return <tt>true</tt> (as per the general contract of
392 * <tt>Collection.add</tt>).
393 *
394 * @throws NullPointerException {@inheritDoc}
395 * @throws IllegalStateException if no thread is waiting to receive the
396 * element being added
397 */
398 public boolean add(E o) {
399 return super.add(o);
400 }
401
402
403 /**
404 * Adds all of the elements in the specified collection to this queue.
405 * The behavior of this operation is undefined if
406 * the specified collection is modified while the operation is in
407 * progress. (This implies that the behavior of this call is undefined if
408 * the specified collection is this queue, and this queue is nonempty.)
409 * <p>
410 * This implementation iterates over the specified collection, and adds
411 * each object returned by the iterator to this collection, in turn.
412 * @throws NullPointerException {@inheritDoc}
413 * @throws IllegalStateException if no thread is waiting to receive the
414 * element being added
415 */
416 public boolean addAll(Collection<? extends E> c) {
417 return super.addAll(c);
418 }
419
420 /**
421 * Always returns <tt>true</tt>.
422 * A <tt>SynchronousQueue</tt> has no internal capacity.
423 * @return <tt>true</tt>
424 */
425 public boolean isEmpty() {
426 return true;
427 }
428
429 /**
430 * Always returns zero.
431 * A <tt>SynchronousQueue</tt> has no internal capacity.
432 * @return zero.
433 */
434 public int size() {
435 return 0;
436 }
437
438 /**
439 * Always returns zero.
440 * A <tt>SynchronousQueue</tt> has no internal capacity.
441 * @return zero.
442 */
443 public int remainingCapacity() {
444 return 0;
445 }
446
447 /**
448 * Does nothing.
449 * A <tt>SynchronousQueue</tt> has no internal capacity.
450 */
451 public void clear() {}
452
453 /**
454 * Always returns <tt>false</tt>.
455 * A <tt>SynchronousQueue</tt> has no internal capacity.
456 * @return <tt>false</tt>
457 */
458 public boolean contains(Object o) {
459 return false;
460 }
461
462 /**
463 * Always returns <tt>false</tt>.
464 * A <tt>SynchronousQueue</tt> has no internal capacity.
465 * @return <tt>false</tt>
466 */
467 public boolean containsAll(Collection<?> c) {
468 return false;
469 }
470
471 /**
472 * Always returns <tt>false</tt>.
473 * A <tt>SynchronousQueue</tt> has no internal capacity.
474 * @return <tt>false</tt>
475 */
476 public boolean removeAll(Collection<?> c) {
477 return false;
478 }
479
480 /**
481 * Always returns <tt>false</tt>.
482 * A <tt>SynchronousQueue</tt> has no internal capacity.
483 * @return <tt>false</tt>
484 */
485 public boolean retainAll(Collection<?> c) {
486 return false;
487 }
488
489 /**
490 * Always returns <tt>null</tt>.
491 * A <tt>SynchronousQueue</tt> does not return elements
492 * unless actively waited on.
493 * @return <tt>null</tt>
494 */
495 public E peek() {
496 return null;
497 }
498
499
500 static class EmptyIterator<E> implements Iterator<E> {
501 public boolean hasNext() {
502 return false;
503 }
504 public E next() {
505 throw new NoSuchElementException();
506 }
507 public void remove() {
508 throw new UnsupportedOperationException();
509 }
510 }
511
512 /**
513 * Returns an empty iterator: <tt>hasNext</tt> always returns
514 * <tt>false</tt>.
515 *
516 * @return an empty iterator
517 */
518 public Iterator<E> iterator() {
519 return new EmptyIterator<E>();
520 }
521
522
523 /**
524 * Returns a zero-length array.
525 * @return a zero-length array
526 */
527 public Object[] toArray() {
528 return (E[]) new Object[0];
529 }
530
531 /**
532 * Sets the zeroeth element of the specified array to <tt>null</tt>
533 * (if the array has non-zero length) and returns it.
534 * @return the specified array
535 */
536 public <T> T[] toArray(T[] a) {
537 if (a.length > 0)
538 a[0] = null;
539 return a;
540 }
541 }
542
543
544
545
546