ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.10
Committed: Sat Jul 26 13:17:51 2003 UTC (20 years, 10 months ago) by tim
Branch: MAIN
Changes since 1.9: +15 -15 lines
Log Message:
Default compiler is now 2.2-ea. Some sources are not compatible with 2.0-ea.

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.*;
9 import java.util.*;
10
11 /**
12 * A {@link Queue} in which each put must wait for a take, and vice
13 * versa. SynchronousQueues are similar to rendezvous channels used
14 * in CSP and Ada. They are well suited for handoff designs, in which
15 * an object running in one thread must synch up with an object
16 * running in another thread in order to hand it some information,
17 * event, or task.
18 * @since 1.5
19 * @author Doug Lea
20 **/
21 public class SynchronousQueue<E> extends AbstractQueue<E>
22 implements BlockingQueue<E>, java.io.Serializable {
23
24 /*
25 This implementation divides actions into two cases for puts:
26
27 * An arriving putter that does not already have a waiting taker
28 creates a node holding item, and then waits for a taker to take it.
29 * An arriving putter that does already have a waiting taker fills
30 the slot node created by the taker, and notifies it to continue.
31
32 And symmetrically, two for takes:
33
34 * An arriving taker that does not already have a waiting putter
35 creates an empty slot node, and then waits for a putter to fill it.
36 * An arriving taker that does already have a waiting putter takes
37 item from the node created by the putter, and notifies it to continue.
38
39 This requires keeping two simple queues: waitingPuts and waitingTakes.
40
41 When a put or take waiting for the actions of its counterpart
42 aborts due to interruption or timeout, it marks the node
43 it created as "CANCELLED", which causes its counterpart to retry
44 the entire put or take sequence.
45 */
46
47 /**
48 * Special marker used in queue nodes to indicate that
49 * the thread waiting for a change in the node has timed out
50 * or been interrupted.
51 **/
52 private static final Object CANCELLED = new Object();
53
54 /*
55 * Note that all fields are transient final, so there is
56 * no explicit serialization code.
57 */
58
59 private transient final WaitQueue waitingPuts = new WaitQueue();
60 private transient final WaitQueue waitingTakes = new WaitQueue();
61 private transient final ReentrantLock qlock = new ReentrantLock();
62
63 /**
64 * Nodes each maintain an item and handle waits and signals for
65 * getting and setting it. The class opportunistically extends
66 * ReentrantLock to save an extra object allocation per
67 * rendezvous.
68 */
69 private static class Node extends ReentrantLock {
70 /** Condition to wait on for other party; lazily constructed */
71 Condition done;
72 /** The item being transferred */
73 Object item;
74 /** Next node in wait queue */
75 Node next;
76
77 Node(Object x) { item = x; }
78
79 /**
80 * Fill in the slot created by the taker and signal taker to
81 * continue.
82 */
83 boolean set(Object x) {
84 this.lock();
85 try {
86 if (item != CANCELLED) {
87 item = x;
88 if (done != null)
89 done.signal();
90 return true;
91 }
92 else // taker has cancelled
93 return false;
94 }
95 finally {
96 this.unlock();
97 }
98 }
99
100 /**
101 * Remove item from slot created by putter and signal putter
102 * to continue.
103 */
104 Object get() {
105 this.lock();
106 try {
107 Object x = item;
108 if (x != CANCELLED) {
109 item = null;
110 next = null;
111 if (done != null)
112 done.signal();
113 return x;
114 }
115 else
116 return null;
117 }
118 finally {
119 this.unlock();
120 }
121 }
122
123 /**
124 * Wait for a taker to take item placed by putter, or time out.
125 */
126 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
127 this.lock();
128 try {
129 for (;;) {
130 if (item == null)
131 return true;
132 if (timed) {
133 if (nanos <= 0) {
134 item = CANCELLED;
135 return false;
136 }
137 }
138 if (done == null)
139 done = this.newCondition();
140 if (timed)
141 nanos = done.awaitNanos(nanos);
142 else
143 done.await();
144 }
145 }
146 catch (InterruptedException ie) {
147 // If taken, return normally but set interrupt status
148 if (item == null) {
149 Thread.currentThread().interrupt();
150 return true;
151 }
152 else {
153 item = CANCELLED;
154 done.signal(); // propagate signal
155 throw ie;
156 }
157 }
158 finally {
159 this.unlock();
160 }
161 }
162
163 /**
164 * Wait for a putter to put item placed by taker, or time out.
165 */
166 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
167 this.lock();
168 try {
169 for (;;) {
170 Object x = item;
171 if (x != null) {
172 item = null;
173 next = null;
174 return x;
175 }
176 if (timed) {
177 if (nanos <= 0) {
178 item = CANCELLED;
179 return null;
180 }
181 }
182 if (done == null)
183 done = this.newCondition();
184 if (timed)
185 nanos = done.awaitNanos(nanos);
186 else
187 done.await();
188 }
189 }
190 catch (InterruptedException ie) {
191 Object y = item;
192 if (y != null) {
193 item = null;
194 next = null;
195 Thread.currentThread().interrupt();
196 return y;
197 }
198 else {
199 item = CANCELLED;
200 done.signal(); // propagate signal
201 throw ie;
202 }
203 }
204 finally {
205 this.unlock();
206 }
207 }
208 }
209
210 /**
211 * Simple FIFO queue class to hold waiting puts/takes.
212 **/
213 private static class WaitQueue<E> {
214 Node head;
215 Node last;
216
217 Node enq(Object x) {
218 Node p = new Node(x);
219 if (last == null)
220 last = head = p;
221 else
222 last = last.next = p;
223 return p;
224 }
225
226 Node deq() {
227 Node p = head;
228 if (p != null && (head = p.next) == null)
229 last = null;
230 return p;
231 }
232 }
233
234 /**
235 * Main put algorithm, used by put, timed offer
236 */
237 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
238 if (x == null) throw new NullPointerException();
239 for (;;) {
240 Node node;
241 boolean mustWait;
242
243 qlock.lockInterruptibly();
244 try {
245 node = waitingTakes.deq();
246 if ( (mustWait = (node == null)) )
247 node = waitingPuts.enq(x);
248 }
249 finally {
250 qlock.unlock();
251 }
252
253 if (mustWait)
254 return node.waitForTake(timed, nanos);
255
256 else if (node.set(x))
257 return true;
258
259 // else taker cancelled, so retry
260 }
261 }
262
263 /**
264 * Main take algorithm, used by take, timed poll
265 */
266 private E doTake(boolean timed, long nanos) throws InterruptedException {
267 for (;;) {
268 Node node;
269 boolean mustWait;
270
271 qlock.lockInterruptibly();
272 try {
273 node = waitingPuts.deq();
274 if ( (mustWait = (node == null)) )
275 node = waitingTakes.enq(null);
276 }
277 finally {
278 qlock.unlock();
279 }
280
281 if (mustWait)
282 return (E)node.waitForPut(timed, nanos);
283
284 else {
285 E x = (E)node.get();
286 if (x != null)
287 return x;
288 // else cancelled, so retry
289 }
290 }
291 }
292
293 public SynchronousQueue() {}
294
295
296 public void put(E x) throws InterruptedException {
297 doPut(x, false, 0);
298 }
299
300 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
301 return doPut(x, true, unit.toNanos(timeout));
302 }
303
304
305
306 public E take() throws InterruptedException {
307 return doTake(false, 0);
308 }
309
310 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
311 return doTake(true, unit.toNanos(timeout));
312 }
313
314 // Untimed nonblocking versions
315
316 public boolean offer(E x) {
317 if (x == null) throw new NullPointerException();
318
319 for (;;) {
320 qlock.lock();
321 Node node;
322 try {
323 node = waitingTakes.deq();
324 }
325 finally {
326 qlock.unlock();
327 }
328 if (node == null)
329 return false;
330
331 else if (node.set(x))
332 return true;
333 // else retry
334 }
335 }
336
337 public E poll() {
338 for (;;) {
339 Node node;
340 qlock.lock();
341 try {
342 node = waitingPuts.deq();
343 }
344 finally {
345 qlock.unlock();
346 }
347 if (node == null)
348 return null;
349
350 else {
351 Object x = node.get();
352 if (x != null)
353 return (E)x;
354 // else retry
355 }
356 }
357 }
358
359 /**
360 * Always returns true. SynchronousQueues have no internal capacity.
361 * @return true.
362 */
363 public boolean isEmpty() {
364 return true;
365 }
366
367 /**
368 * Always returns 0. SynchronousQueues have no internal capacity.
369 * @return zero.
370 */
371 public int size() {
372 return 0;
373 }
374
375 /**
376 * Always returns zero. SynchronousQueues have no internal capacity.
377 * @return zero.
378 */
379 public int remainingCapacity() {
380 return 0;
381 }
382
383 /**
384 * Always returns null. SynchronousQueues do not return elements
385 * unless actively waited on.
386 * @return null.
387 */
388 public E peek() {
389 return null;
390 }
391
392
393 static class EmptyIterator<E> implements Iterator<E> {
394 public boolean hasNext() {
395 return false;
396 }
397 public E next() {
398 throw new NoSuchElementException();
399 }
400 public void remove() {
401 throw new UnsupportedOperationException();
402 }
403 }
404
405 /**
406 * Returns an empty iterator.
407 */
408 public Iterator<E> iterator() {
409 return new EmptyIterator<E>();
410 }
411
412
413 /**
414 * Returns an empty array.
415 */
416 public Object[] toArray() {
417 return (E[]) new Object[0];
418 }
419
420 public <T> T[] toArray(T[] a) {
421 if (a.length > 0)
422 a[0] = null;
423 return a;
424 }
425 }