ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.5
Committed: Sat Jun 7 18:20:21 2003 UTC (21 years ago) by dl
Branch: MAIN
CVS Tags: JSR166_PRELIMINARY_TEST_RELEASE_1
Changes since 1.4: +46 -30 lines
Log Message:
Misc documentation updates

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11 * A {@link Queue} in which each put must wait for a take, and vice
12 * versa. SynchronousQueues are similar to rendezvous channels used
13 * in CSP and Ada. They are well suited for handoff designs, in which
14 * an object running in one thread must synch up with an object
15 * running in another thread in order to hand it some information,
16 * event, or task.
17 **/
18 public class SynchronousQueue<E> extends AbstractQueue<E>
19 implements BlockingQueue<E>, java.io.Serializable {
20
21 /*
22 This implementation divides actions into two cases for puts:
23
24 * An arriving putter that does not already have a waiting taker
25 creates a node holding item, and then waits for a taker to take it.
26 * An arriving putter that does already have a waiting taker fills
27 the slot node created by the taker, and notifies it to continue.
28
29 And symmetrically, two for takes:
30
31 * An arriving taker that does not already have a waiting putter
32 creates an empty slot node, and then waits for a putter to fill it.
33 * An arriving taker that does already have a waiting putter takes
34 item from the node created by the putter, and notifies it to continue.
35
36 This requires keeping two simple queues: waitingPuts and waitingTakes.
37
38 When a put or take waiting for the actions of its counterpart
39 aborts due to interruption or timeout, it marks the node
40 it created as "CANCELLED", which causes its counterpart to retry
41 the entire put or take sequence.
42 */
43
44 /**
45 * Special marker used in queue nodes to indicate that
46 * the thread waiting for a change in the node has timed out
47 * or been interrupted.
48 **/
49 private static final Object CANCELLED = new Object();
50
51 /*
52 * Note that all fields are transient final, so there is
53 * no explicit serialization code.
54 */
55
56 private transient final WaitQueue waitingPuts = new WaitQueue();
57 private transient final WaitQueue waitingTakes = new WaitQueue();
58 private transient final ReentrantLock qlock = new ReentrantLock();
59
60 /**
61 * Nodes each maintain an item and handle waits and signals for
62 * getting and setting it. The class opportunistically extends
63 * ReentrantLock to save an extra object allocation per
64 * rendezvous.
65 */
66 private static class Node extends ReentrantLock {
67 Condition done;
68 Object item;
69 Node next;
70 Node(Object x) { item = x; }
71
72 /**
73 * Fill in the slot created by the taker and signal taker to
74 * continue.
75 */
76 boolean set(Object x) {
77 this.lock();
78 try {
79 if (item != CANCELLED) {
80 item = x;
81 if (done != null)
82 done.signal();
83 return true;
84 }
85 else // taker has cancelled
86 return false;
87 }
88 finally {
89 this.unlock();
90 }
91 }
92
93 /**
94 * Remove item from slot created by putter and signal putter
95 * to continue.
96 */
97 Object get() {
98 this.lock();
99 try {
100 Object x = item;
101 if (x != CANCELLED) {
102 item = null;
103 next = null;
104 if (done != null)
105 done.signal();
106 return x;
107 }
108 else
109 return null;
110 }
111 finally {
112 this.unlock();
113 }
114 }
115
116
117 /**
118 * Wait for a taker to take item placed by putter, or time out.
119 */
120 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
121 this.lock();
122 try {
123 for (;;) {
124 if (item == null)
125 return true;
126 if (done == null)
127 done = this.newCondition();
128 if (timed) {
129 if (nanos <= 0) {
130 item = CANCELLED;
131 return false;
132 }
133 nanos = done.awaitNanos(nanos);
134 }
135 else
136 done.await();
137 }
138 }
139 catch (InterruptedException ie) {
140 // If taken, return normally but set interrupt status
141 if (item == null) {
142 Thread.currentThread().interrupt();
143 return true;
144 }
145 else {
146 item = CANCELLED;
147 done.signal(); // propagate signal
148 throw ie;
149 }
150 }
151 finally {
152 this.unlock();
153 }
154 }
155
156
157 /**
158 * Wait for a putter to put item placed by taker, or time out.
159 */
160 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
161 this.lock();
162 try {
163 for (;;) {
164 Object x = item;
165 if (x != null) {
166 item = null;
167 next = null;
168 return x;
169 }
170 if (done == null)
171 done = this.newCondition();
172 if (timed) {
173 if (nanos <= 0) {
174 item = CANCELLED;
175 return null;
176 }
177 nanos = done.awaitNanos(nanos);
178 }
179 else
180 done.await();
181 }
182 }
183 catch(InterruptedException ie) {
184 Object x = item;
185 if (x != null) {
186 item = null;
187 next = null;
188 Thread.currentThread().interrupt();
189 return x;
190 }
191 else {
192 item = CANCELLED;
193 done.signal(); // propagate signal
194 throw ie;
195 }
196 }
197 finally {
198 this.unlock();
199 }
200 }
201 }
202
203 /**
204 * Simple FIFO queue class to hold waiting puts/takes.
205 **/
206 private static class WaitQueue<E> {
207 Node head;
208 Node last;
209
210 Node enq(Object x) {
211 Node p = new Node(x);
212 if (last == null)
213 last = head = p;
214 else
215 last = last.next = p;
216 return p;
217 }
218
219 Node deq() {
220 Node p = head;
221 if (p != null && (head = p.next) == null)
222 last = null;
223 return p;
224 }
225 }
226
227 /**
228 * Main put algorithm, used by put, timed offer
229 */
230 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
231 if (x == null) throw new IllegalArgumentException();
232 for (;;) {
233 Node node;
234 boolean mustWait;
235
236 qlock.lockInterruptibly();
237 try {
238 node = waitingTakes.deq();
239 if ( (mustWait = (node == null)) )
240 node = waitingPuts.enq(x);
241 }
242 finally {
243 qlock.unlock();
244 }
245
246 if (mustWait)
247 return node.waitForTake(timed, nanos);
248
249 else if (node.set(x))
250 return true;
251
252 // else taker cancelled, so retry
253 }
254 }
255
256 /**
257 * Main take algorithm, used by take, timed poll
258 */
259 private E doTake(boolean timed, long nanos) throws InterruptedException {
260 for (;;) {
261 Node node;
262 boolean mustWait;
263
264 qlock.lockInterruptibly();
265 try {
266 node = waitingPuts.deq();
267 if ( (mustWait = (node == null)) )
268 node = waitingTakes.enq(null);
269 }
270 finally {
271 qlock.unlock();
272 }
273
274 if (mustWait)
275 return (E)node.waitForPut(timed, nanos);
276
277 else {
278 E x = (E)node.get();
279 if (x != null)
280 return x;
281 // else cancelled, so retry
282 }
283 }
284 }
285
286 public SynchronousQueue() {}
287
288
289 public void put(E x) throws InterruptedException {
290 doPut(x, false, 0);
291 }
292
293 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
294 return doPut(x, true, unit.toNanos(timeout));
295 }
296
297
298
299 public E take() throws InterruptedException {
300 return doTake(false, 0);
301 }
302
303 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
304 return doTake(true, unit.toNanos(timeout));
305 }
306
307 // Untimed nonblocking versions
308
309 public boolean offer(E x) {
310 if (x == null) throw new IllegalArgumentException();
311
312 for (;;) {
313 qlock.lock();
314 Node node;
315 try {
316 node = waitingTakes.deq();
317 }
318 finally {
319 qlock.unlock();
320 }
321 if (node == null)
322 return false;
323
324 else if (node.set(x))
325 return true;
326 // else retry
327 }
328 }
329
330 public E poll() {
331 for (;;) {
332 Node node;
333 qlock.lock();
334 try {
335 node = waitingPuts.deq();
336 }
337 finally {
338 qlock.unlock();
339 }
340 if (node == null)
341 return null;
342
343 else {
344 Object x = node.get();
345 if (x != null)
346 return (E)x;
347 // else retry
348 }
349 }
350 }
351
352 /**
353 * Always returns true. SynchronousQueues have no internal capacity.
354 * @return true.
355 */
356 public boolean isEmpty() {
357 return true;
358 }
359
360 /**
361 * Always returns 0. SynchronousQueues have no internal capacity.
362 * @return zero.
363 */
364 public int size() {
365 return 0;
366 }
367
368 /**
369 * Always returns zero. SynchronousQueues have no internal capacity.
370 * @return zero.
371 */
372 public int remainingCapacity() {
373 return 0;
374 }
375
376 /**
377 * Always returns null. SynchronousQueues do not return elements
378 * unless actively waited on.
379 * @return null.
380 */
381 public E peek() {
382 return null;
383 }
384
385
386 static class EmptyIterator<E> implements Iterator<E> {
387 public boolean hasNext() {
388 return false;
389 }
390 public E next() {
391 throw new NoSuchElementException();
392 }
393 public void remove() {
394 throw new UnsupportedOperationException();
395 }
396 }
397
398 /**
399 * Returns an empty iterator.
400 */
401 public Iterator<E> iterator() {
402 return new EmptyIterator<E>();
403 }
404
405
406 /**
407 * Returns an empty array.
408 */
409 public Object[] toArray() {
410 return new E[0];
411 }
412
413 public <T> T[] toArray(T[] a) {
414 if (a.length > 0)
415 a[0] = null;
416 return a;
417 }
418 }