ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.7
Committed: Sat Jun 28 15:33:31 2003 UTC (20 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.6: +42 -36 lines
Log Message:
Fixed emulation mode

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11 * A {@link Queue} in which each put must wait for a take, and vice
12 * versa. SynchronousQueues are similar to rendezvous channels used
13 * in CSP and Ada. They are well suited for handoff designs, in which
14 * an object running in one thread must synch up with an object
15 * running in another thread in order to hand it some information,
16 * event, or task.
17 * @since 1.5
18 * @author Doug Lea
19 **/
20 public class SynchronousQueue<E> extends AbstractQueue<E>
21 implements BlockingQueue<E>, java.io.Serializable {
22
23 /*
24 This implementation divides actions into two cases for puts:
25
26 * An arriving putter that does not already have a waiting taker
27 creates a node holding item, and then waits for a taker to take it.
28 * An arriving putter that does already have a waiting taker fills
29 the slot node created by the taker, and notifies it to continue.
30
31 And symmetrically, two for takes:
32
33 * An arriving taker that does not already have a waiting putter
34 creates an empty slot node, and then waits for a putter to fill it.
35 * An arriving taker that does already have a waiting putter takes
36 item from the node created by the putter, and notifies it to continue.
37
38 This requires keeping two simple queues: waitingPuts and waitingTakes.
39
40 When a put or take waiting for the actions of its counterpart
41 aborts due to interruption or timeout, it marks the node
42 it created as "CANCELLED", which causes its counterpart to retry
43 the entire put or take sequence.
44 */
45
46 /**
47 * Special marker used in queue nodes to indicate that
48 * the thread waiting for a change in the node has timed out
49 * or been interrupted.
50 **/
51 private static final Object CANCELLED = new Object();
52
53 /*
54 * Note that all fields are transient final, so there is
55 * no explicit serialization code.
56 */
57
58 private transient final WaitQueue waitingPuts = new WaitQueue();
59 private transient final WaitQueue waitingTakes = new WaitQueue();
60 private transient final ReentrantLock qlock = new ReentrantLock();
61
62 /**
63 * Nodes each maintain an item and handle waits and signals for
64 * getting and setting it. The class opportunistically extends
65 * ReentrantLock to save an extra object allocation per
66 * rendezvous.
67 */
68 private static class Node extends ReentrantLock {
69 /** Condition to wait on for other party; lazily constructed */
70 Condition done;
71 /** The item being transferred */
72 Object item;
73 /** Next node in wait queue */
74 Node next;
75
76 Node(Object x) { item = x; }
77
78 /**
79 * Fill in the slot created by the taker and signal taker to
80 * continue.
81 */
82 boolean set(Object x) {
83 this.lock();
84 try {
85 if (item != CANCELLED) {
86 item = x;
87 if (done != null)
88 done.signal();
89 return true;
90 }
91 else // taker has cancelled
92 return false;
93 }
94 finally {
95 this.unlock();
96 }
97 }
98
99 /**
100 * Remove item from slot created by putter and signal putter
101 * to continue.
102 */
103 Object get() {
104 this.lock();
105 try {
106 Object x = item;
107 if (x != CANCELLED) {
108 item = null;
109 next = null;
110 if (done != null)
111 done.signal();
112 return x;
113 }
114 else
115 return null;
116 }
117 finally {
118 this.unlock();
119 }
120 }
121
122
123 /**
124 * Wait for a taker to take item placed by putter, or time out.
125 */
126 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
127 this.lock();
128 try {
129 for (;;) {
130 if (item == null)
131 return true;
132 if (timed) {
133 if (nanos <= 0) {
134 item = CANCELLED;
135 return false;
136 }
137 }
138 try {
139 if (done == null)
140 done = this.newCondition();
141 if (timed)
142 nanos = done.awaitNanos(nanos);
143 else
144 done.await();
145 }
146 catch (InterruptedException ie) {
147 // If taken, return normally but set interrupt status
148 if (item == null) {
149 Thread.currentThread().interrupt();
150 return true;
151 }
152 else {
153 item = CANCELLED;
154 done.signal(); // propagate signal
155 throw ie;
156 }
157 }
158 }
159 }
160 finally {
161 this.unlock();
162 }
163 }
164
165
166 /**
167 * Wait for a putter to put item placed by taker, or time out.
168 */
169 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
170 this.lock();
171 try {
172 for (;;) {
173 Object x = item;
174 if (x != null) {
175 item = null;
176 next = null;
177 return x;
178 }
179 if (timed) {
180 if (nanos <= 0) {
181 item = CANCELLED;
182 return null;
183 }
184 }
185 try {
186 if (done == null)
187 done = this.newCondition();
188 if (timed)
189 nanos = done.awaitNanos(nanos);
190 else
191 done.await();
192 }
193 catch (InterruptedException ie) {
194 x = item;
195 if (x != null) {
196 item = null;
197 next = null;
198 Thread.currentThread().interrupt();
199 return x;
200 }
201 else {
202 item = CANCELLED;
203 done.signal(); // propagate signal
204 throw ie;
205 }
206 }
207 }
208 }
209 finally {
210 this.unlock();
211 }
212 }
213 }
214
215 /**
216 * Simple FIFO queue class to hold waiting puts/takes.
217 **/
218 private static class WaitQueue<E> {
219 Node head;
220 Node last;
221
222 Node enq(Object x) {
223 Node p = new Node(x);
224 if (last == null)
225 last = head = p;
226 else
227 last = last.next = p;
228 return p;
229 }
230
231 Node deq() {
232 Node p = head;
233 if (p != null && (head = p.next) == null)
234 last = null;
235 return p;
236 }
237 }
238
239 /**
240 * Main put algorithm, used by put, timed offer
241 */
242 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
243 if (x == null) throw new NullPointerException();
244 for (;;) {
245 Node node;
246 boolean mustWait;
247
248 qlock.lockInterruptibly();
249 try {
250 node = waitingTakes.deq();
251 if ( (mustWait = (node == null)) )
252 node = waitingPuts.enq(x);
253 }
254 finally {
255 qlock.unlock();
256 }
257
258 if (mustWait)
259 return node.waitForTake(timed, nanos);
260
261 else if (node.set(x))
262 return true;
263
264 // else taker cancelled, so retry
265 }
266 }
267
268 /**
269 * Main take algorithm, used by take, timed poll
270 */
271 private E doTake(boolean timed, long nanos) throws InterruptedException {
272 for (;;) {
273 Node node;
274 boolean mustWait;
275
276 qlock.lockInterruptibly();
277 try {
278 node = waitingPuts.deq();
279 if ( (mustWait = (node == null)) )
280 node = waitingTakes.enq(null);
281 }
282 finally {
283 qlock.unlock();
284 }
285
286 if (mustWait)
287 return (E)node.waitForPut(timed, nanos);
288
289 else {
290 E x = (E)node.get();
291 if (x != null)
292 return x;
293 // else cancelled, so retry
294 }
295 }
296 }
297
298 public SynchronousQueue() {}
299
300
301 public void put(E x) throws InterruptedException {
302 doPut(x, false, 0);
303 }
304
305 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
306 return doPut(x, true, unit.toNanos(timeout));
307 }
308
309
310
311 public E take() throws InterruptedException {
312 return doTake(false, 0);
313 }
314
315 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
316 return doTake(true, unit.toNanos(timeout));
317 }
318
319 // Untimed nonblocking versions
320
321 public boolean offer(E x) {
322 if (x == null) throw new NullPointerException();
323
324 for (;;) {
325 qlock.lock();
326 Node node;
327 try {
328 node = waitingTakes.deq();
329 }
330 finally {
331 qlock.unlock();
332 }
333 if (node == null)
334 return false;
335
336 else if (node.set(x))
337 return true;
338 // else retry
339 }
340 }
341
342 public E poll() {
343 for (;;) {
344 Node node;
345 qlock.lock();
346 try {
347 node = waitingPuts.deq();
348 }
349 finally {
350 qlock.unlock();
351 }
352 if (node == null)
353 return null;
354
355 else {
356 Object x = node.get();
357 if (x != null)
358 return (E)x;
359 // else retry
360 }
361 }
362 }
363
364 /**
365 * Always returns true. SynchronousQueues have no internal capacity.
366 * @return true.
367 */
368 public boolean isEmpty() {
369 return true;
370 }
371
372 /**
373 * Always returns 0. SynchronousQueues have no internal capacity.
374 * @return zero.
375 */
376 public int size() {
377 return 0;
378 }
379
380 /**
381 * Always returns zero. SynchronousQueues have no internal capacity.
382 * @return zero.
383 */
384 public int remainingCapacity() {
385 return 0;
386 }
387
388 /**
389 * Always returns null. SynchronousQueues do not return elements
390 * unless actively waited on.
391 * @return null.
392 */
393 public E peek() {
394 return null;
395 }
396
397
398 static class EmptyIterator<E> implements Iterator<E> {
399 public boolean hasNext() {
400 return false;
401 }
402 public E next() {
403 throw new NoSuchElementException();
404 }
405 public void remove() {
406 throw new UnsupportedOperationException();
407 }
408 }
409
410 /**
411 * Returns an empty iterator.
412 */
413 public Iterator<E> iterator() {
414 return new EmptyIterator<E>();
415 }
416
417
418 /**
419 * Returns an empty array.
420 */
421 public Object[] toArray() {
422 return new E[0];
423 }
424
425 public <T> T[] toArray(T[] a) {
426 if (a.length > 0)
427 a[0] = null;
428 return a;
429 }
430 }