ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/SynchronousQueue.java
Revision: 1.4
Committed: Sat Jun 7 11:54:20 2003 UTC (21 years ago) by dl
Branch: MAIN
Changes since 1.3: +4 -0 lines
Log Message:
More javadoc fixes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11 * A Queue in which each put must wait for a take, and vice versa.
12 * SynchronousQueues are similar to rendezvous channels used in CSP
13 * and Ada. They are well suited for handoff designs, in which an
14 * object running in one thread must synch up with an object running
15 * in another thread in order to hand it some information, event, or
16 * task.
17 **/
18 public class SynchronousQueue<E> extends AbstractQueue<E>
19 implements BlockingQueue<E>, java.io.Serializable {
20
21 /*
22 This implementation divides actions into two cases for puts:
23
24 * An arriving putter that does not already have a waiting taker
25 creates a node holding item, and then waits for a taker to take it.
26 * An arriving putter that does already have a waiting taker fills
27 the slot node created by the taker, and notifies it to continue.
28
29 And symmetrically, two for takes:
30
31 * An arriving taker that does not already have a waiting putter
32 creates an empty slot node, and then waits for a putter to fill it.
33 * An arriving taker that does already have a waiting putter takes
34 item from the node created by the putter, and notifies it to continue.
35
36 This requires keeping two simple queues: waitingPuts and waitingTakes.
37
38 When a put or take waiting for the actions of its counterpart
39 aborts due to interruption or timeout, it marks the node
40 it created as "CANCELLED", which causes its counterpart to retry
41 the entire put or take sequence.
42 */
43
44 /**
45 * Special marker used in queue nodes to indicate that
46 * the thread waiting for a change in the node has timed out
47 * or been interrupted.
48 **/
49 private static final Object CANCELLED = new Object();
50
51 /*
52 * Note that all fields are transient final, so there is
53 * no explicit serialization code.
54 */
55
56 private transient final WaitQueue waitingPuts = new WaitQueue();
57 private transient final WaitQueue waitingTakes = new WaitQueue();
58 private transient final ReentrantLock qlock = new ReentrantLock();
59
60 /**
61 * Nodes each maintain an item and handle waits and signals for
62 * getting and setting it. The class opportunistically extends
63 * ReentrantLock to save an extra object allocation per
64 * rendezvous.
65 */
66 private static class Node extends ReentrantLock {
67 Condition done;
68 Object item;
69 Node next;
70 Node(Object x) { item = x; }
71
72 /**
73 * Fill in the slot created by the taker and signal taker to
74 * continue.
75 */
76 boolean set(Object x) {
77 this.lock();
78 try {
79 if (item != CANCELLED) {
80 item = x;
81 if (done != null)
82 done.signal();
83 return true;
84 }
85 else // taker has cancelled
86 return false;
87 }
88 finally {
89 this.unlock();
90 }
91 }
92
93 /**
94 * Remove item from slot created by putter and signal putter
95 * to continue.
96 */
97 Object get() {
98 this.lock();
99 try {
100 Object x = item;
101 if (x != CANCELLED) {
102 item = null;
103 next = null;
104 if (done != null)
105 done.signal();
106 return x;
107 }
108 else
109 return null;
110 }
111 finally {
112 this.unlock();
113 }
114 }
115
116
117 /**
118 * Wait for a taker to take item placed by putter, or time out.
119 */
120 boolean waitForTake(boolean timed, long nanos) throws InterruptedException {
121 this.lock();
122 try {
123 for (;;) {
124 if (item == null)
125 return true;
126 if (done == null)
127 done = this.newCondition();
128 if (timed) {
129 if (nanos <= 0) {
130 item = CANCELLED;
131 return false;
132 }
133 nanos = done.awaitNanos(nanos);
134 }
135 else
136 done.await();
137 }
138 }
139 catch (InterruptedException ie) {
140 // If taken, return normally but set interrupt status
141 if (item == null) {
142 Thread.currentThread().interrupt();
143 return true;
144 }
145 else {
146 item = CANCELLED;
147 done.signal(); // propagate signal
148 throw ie;
149 }
150 }
151 finally {
152 this.unlock();
153 }
154 }
155
156
157 /**
158 * Wait for a putter to put item placed by taker, or time out.
159 */
160 Object waitForPut(boolean timed, long nanos) throws InterruptedException {
161 this.lock();
162 try {
163 for (;;) {
164 Object x = item;
165 if (x != null) {
166 item = null;
167 next = null;
168 return x;
169 }
170 if (done == null)
171 done = this.newCondition();
172 if (timed) {
173 if (nanos <= 0) {
174 item = CANCELLED;
175 return null;
176 }
177 nanos = done.awaitNanos(nanos);
178 }
179 else
180 done.await();
181 }
182 }
183 catch(InterruptedException ie) {
184 Object x = item;
185 if (x != null) {
186 item = null;
187 next = null;
188 Thread.currentThread().interrupt();
189 return x;
190 }
191 else {
192 item = CANCELLED;
193 done.signal(); // propagate signal
194 throw ie;
195 }
196 }
197 finally {
198 this.unlock();
199 }
200 }
201 }
202
203 /**
204 * Simple FIFO queue class to hold waiting puts/takes.
205 **/
206 private static class WaitQueue<E> {
207 Node head;
208 Node last;
209
210 Node enq(Object x) {
211 Node p = new Node(x);
212 if (last == null)
213 last = head = p;
214 else
215 last = last.next = p;
216 return p;
217 }
218
219 Node deq() {
220 Node p = head;
221 if (p != null && (head = p.next) == null)
222 last = null;
223 return p;
224 }
225 }
226
227 /**
228 * Main put algorithm, used by put, timed offer
229 */
230 private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException {
231 if (x == null) throw new IllegalArgumentException();
232 for (;;) {
233 Node node;
234 boolean mustWait;
235
236 qlock.lockInterruptibly();
237 try {
238 node = waitingTakes.deq();
239 if ( (mustWait = (node == null)) )
240 node = waitingPuts.enq(x);
241 }
242 finally {
243 qlock.unlock();
244 }
245
246 if (mustWait)
247 return node.waitForTake(timed, nanos);
248
249 else if (node.set(x))
250 return true;
251
252 // else taker cancelled, so retry
253 }
254 }
255
256 /**
257 * Main take algorithm, used by take, timed poll
258 */
259 private E doTake(boolean timed, long nanos) throws InterruptedException {
260 for (;;) {
261 Node node;
262 boolean mustWait;
263
264 qlock.lockInterruptibly();
265 try {
266 node = waitingPuts.deq();
267 if ( (mustWait = (node == null)) )
268 node = waitingTakes.enq(null);
269 }
270 finally {
271 qlock.unlock();
272 }
273
274 if (mustWait)
275 return (E)node.waitForPut(timed, nanos);
276
277 else {
278 E x = (E)node.get();
279 if (x != null)
280 return x;
281 // else cancelled, so retry
282 }
283 }
284 }
285
286 public SynchronousQueue() {}
287
288 public boolean isEmpty() {
289 return true;
290 }
291
292 public int size() {
293 return 0;
294 }
295
296 /**
297 * Always returns zero. SynchronousQueues have no internal capacity.
298 * @return zero.
299 */
300 public int remainingCapacity() {
301 return 0;
302 }
303
304 public E peek() {
305 return null;
306 }
307
308
309 public void put(E x) throws InterruptedException {
310 doPut(x, false, 0);
311 }
312
313 public boolean offer(E x, long timeout, TimeUnit unit) throws InterruptedException {
314 return doPut(x, true, unit.toNanos(timeout));
315 }
316
317
318
319 public E take() throws InterruptedException {
320 return doTake(false, 0);
321 }
322
323 public E poll(long timeout, TimeUnit unit) throws InterruptedException {
324 return doTake(true, unit.toNanos(timeout));
325 }
326
327 // Untimed nonblocking versions
328
329 public boolean offer(E x) {
330 if (x == null) throw new IllegalArgumentException();
331
332 for (;;) {
333 qlock.lock();
334 Node node;
335 try {
336 node = waitingTakes.deq();
337 }
338 finally {
339 qlock.unlock();
340 }
341 if (node == null)
342 return false;
343
344 else if (node.set(x))
345 return true;
346 // else retry
347 }
348 }
349
350 public E poll() {
351 for (;;) {
352 Node node;
353 qlock.lock();
354 try {
355 node = waitingPuts.deq();
356 }
357 finally {
358 qlock.unlock();
359 }
360 if (node == null)
361 return null;
362
363 else {
364 Object x = node.get();
365 if (x != null)
366 return (E)x;
367 // else retry
368 }
369 }
370 }
371
372 public boolean remove(Object x) {
373 return false;
374 }
375
376 static class EmptyIterator<E> implements Iterator {
377 public boolean hasNext() {
378 return false;
379 }
380 public E next() {
381 throw new NoSuchElementException();
382 }
383 public void remove() {
384 throw new UnsupportedOperationException();
385 }
386 }
387
388 public Iterator<E> iterator() {
389 return new EmptyIterator();
390 }
391
392
393 public Object[] toArray() {
394 return new E[0];
395 }
396
397 public <T> T[] toArray(T[] a) {
398 if (a.length > 0)
399 a[0] = null;
400 return a;
401 }
402 }