79 |
|
* 0 for waiting, 1 for ack, -1 for cancelled. |
80 |
|
*/ |
81 |
|
private static final class Node extends AbstractQueuedSynchronizer { |
82 |
+ |
/** Synchronization state value representing that node acked */ |
83 |
+ |
private static final int ACK = 1; |
84 |
+ |
/** Synchronization state value representing that node cancelled */ |
85 |
+ |
private static final int CANCEL = -1; |
86 |
+ |
|
87 |
|
/** The item being transferred */ |
88 |
|
Object item; |
89 |
|
/** Next node in wait queue */ |
90 |
|
Node next; |
86 |
– |
Node(Object x) { item = x; } |
91 |
|
|
92 |
< |
private static final int WAITING = 0; |
93 |
< |
private static final int ACKED = 1; |
90 |
< |
private static final int CANCELLED = -1; |
92 |
> |
/** Create node with initial item */ |
93 |
> |
Node(Object x) { item = x; } |
94 |
|
|
95 |
|
/** |
96 |
|
* Implements AQS base acquire to succeed if not in WAITING state |
97 |
|
*/ |
98 |
< |
protected boolean tryAcquireExclusiveState(boolean b, int ignore) { |
99 |
< |
return getState() != WAITING; |
98 |
> |
protected boolean tryAcquireExclusive(boolean b, int ignore) { |
99 |
> |
return getState() != 0; |
100 |
|
} |
101 |
|
|
102 |
|
/** |
103 |
|
* Implements AQS base release to signal if state changed |
104 |
|
*/ |
105 |
< |
protected boolean releaseExclusiveState(int newState) { |
106 |
< |
return compareAndSetState(WAITING, newState); |
104 |
< |
} |
105 |
< |
|
106 |
< |
/** |
107 |
< |
* Try to acknowledge; fail if not waiting |
108 |
< |
*/ |
109 |
< |
private boolean ack() { |
110 |
< |
return releaseExclusive(ACKED); |
105 |
> |
protected boolean tryReleaseExclusive(int newState) { |
106 |
> |
return compareAndSetState(0, newState); |
107 |
|
} |
108 |
|
|
109 |
|
/** |
110 |
< |
* Try to cancel; fail if not waiting |
115 |
< |
*/ |
116 |
< |
private boolean cancel() { |
117 |
< |
return releaseExclusive(CANCELLED); |
118 |
< |
} |
119 |
< |
|
120 |
< |
/** |
121 |
< |
* Take item and null out fields (for sake of GC) |
110 |
> |
* Take item and null out field (for sake of GC) |
111 |
|
*/ |
112 |
|
private Object extract() { |
113 |
|
Object x = item; |
114 |
|
item = null; |
126 |
– |
next = null; |
115 |
|
return x; |
116 |
|
} |
117 |
|
|
118 |
|
/** |
119 |
+ |
* Try to cancel on interrupt; if so rethrowing, |
120 |
+ |
* else setting interrupt state |
121 |
+ |
*/ |
122 |
+ |
private void checkCancellationOnInterrupt(InterruptedException ie) |
123 |
+ |
throws InterruptedException { |
124 |
+ |
if (releaseExclusive(CANCEL)) |
125 |
+ |
throw ie; |
126 |
+ |
Thread.currentThread().interrupt(); |
127 |
+ |
} |
128 |
+ |
|
129 |
+ |
/** |
130 |
|
* Fill in the slot created by the taker and signal taker to |
131 |
|
* continue. |
132 |
|
*/ |
133 |
|
boolean setItem(Object x) { |
134 |
< |
item = x; |
135 |
< |
return ack(); |
134 |
> |
item = x; // can place in slot even if cancelled |
135 |
> |
return releaseExclusive(ACK); |
136 |
|
} |
137 |
|
|
138 |
|
/** |
140 |
|
* to continue. |
141 |
|
*/ |
142 |
|
Object getItem() { |
143 |
< |
return (ack())? extract() : null; |
143 |
> |
return (releaseExclusive(ACK))? extract() : null; |
144 |
> |
} |
145 |
> |
|
146 |
> |
/** |
147 |
> |
* Wait for a taker to take item placed by putter. |
148 |
> |
*/ |
149 |
> |
void waitForTake() throws InterruptedException { |
150 |
> |
try { |
151 |
> |
acquireExclusiveInterruptibly(0); |
152 |
> |
} catch (InterruptedException ie) { |
153 |
> |
checkCancellationOnInterrupt(ie); |
154 |
> |
} |
155 |
> |
} |
156 |
> |
|
157 |
> |
/** |
158 |
> |
* Wait for a putter to put item placed by taker. |
159 |
> |
*/ |
160 |
> |
Object waitForPut() throws InterruptedException { |
161 |
> |
try { |
162 |
> |
acquireExclusiveInterruptibly(0); |
163 |
> |
} catch (InterruptedException ie) { |
164 |
> |
checkCancellationOnInterrupt(ie); |
165 |
> |
} |
166 |
> |
return extract(); |
167 |
|
} |
168 |
|
|
169 |
|
/** |
170 |
|
* Wait for a taker to take item placed by putter or time out. |
171 |
|
*/ |
172 |
< |
boolean waitForTake(boolean timed, long nanos) throws InterruptedException { |
172 |
> |
boolean waitForTake(long nanos) throws InterruptedException { |
173 |
|
try { |
174 |
< |
if (!timed) |
175 |
< |
acquireExclusiveInterruptibly(0); |
154 |
< |
else if (!acquireExclusiveTimed(0, nanos) && cancel()) |
174 |
> |
if (!acquireExclusiveTimed(0, nanos) && |
175 |
> |
releaseExclusive(CANCEL)) |
176 |
|
return false; |
156 |
– |
return true; |
177 |
|
} catch (InterruptedException ie) { |
178 |
< |
if (cancel()) |
159 |
< |
throw ie; |
160 |
< |
Thread.currentThread().interrupt(); |
161 |
< |
return true; |
178 |
> |
checkCancellationOnInterrupt(ie); |
179 |
|
} |
180 |
+ |
return true; |
181 |
|
} |
182 |
|
|
183 |
|
/** |
184 |
|
* Wait for a putter to put item placed by taker, or time out. |
185 |
|
*/ |
186 |
< |
Object waitForPut(boolean timed, long nanos) throws InterruptedException { |
186 |
> |
Object waitForPut(long nanos) throws InterruptedException { |
187 |
|
try { |
188 |
< |
if (!timed) |
189 |
< |
acquireExclusiveInterruptibly(0); |
172 |
< |
else if (!acquireExclusiveTimed(0, nanos) && cancel()) |
188 |
> |
if (!acquireExclusiveTimed(0, nanos) && |
189 |
> |
releaseExclusive(CANCEL)) |
190 |
|
return null; |
174 |
– |
return extract(); |
191 |
|
} catch (InterruptedException ie) { |
192 |
< |
if (cancel()) |
177 |
< |
throw ie; |
178 |
< |
Thread.currentThread().interrupt(); |
179 |
< |
return extract(); |
192 |
> |
checkCancellationOnInterrupt(ie); |
193 |
|
} |
194 |
+ |
return extract(); |
195 |
|
} |
196 |
|
|
197 |
|
} |
214 |
|
|
215 |
|
Node deq() { |
216 |
|
Node p = head; |
217 |
< |
if (p != null && (head = p.next) == null) |
218 |
< |
last = null; |
217 |
> |
if (p != null) { |
218 |
> |
if ((head = p.next) == null) |
219 |
> |
last = null; |
220 |
> |
p.next = null; |
221 |
> |
} |
222 |
|
return p; |
223 |
|
} |
224 |
|
} |
225 |
|
|
226 |
|
/** |
227 |
< |
* Main put algorithm, used by put, timed offer |
227 |
> |
* Creates a <tt>SynchronousQueue</tt>. |
228 |
|
*/ |
229 |
< |
private boolean doPut(E x, boolean timed, long nanos) throws InterruptedException { |
213 |
< |
if (x == null) throw new NullPointerException(); |
214 |
< |
for (;;) { |
215 |
< |
Node node; |
216 |
< |
boolean mustWait; |
217 |
< |
final ReentrantLock qlock = this.qlock; |
218 |
< |
qlock.lockInterruptibly(); |
219 |
< |
try { |
220 |
< |
node = waitingTakes.deq(); |
221 |
< |
if ( (mustWait = (node == null)) ) |
222 |
< |
node = waitingPuts.enq(x); |
223 |
< |
} finally { |
224 |
< |
qlock.unlock(); |
225 |
< |
} |
226 |
< |
|
227 |
< |
if (mustWait) |
228 |
< |
return node.waitForTake(timed, nanos); |
229 |
< |
|
230 |
< |
else if (node.setItem(x)) |
231 |
< |
return true; |
229 |
> |
public SynchronousQueue() {} |
230 |
|
|
233 |
– |
// else taker cancelled, so retry |
234 |
– |
} |
235 |
– |
} |
231 |
|
|
232 |
|
/** |
233 |
< |
* Main take algorithm, used by take, timed poll |
233 |
> |
* Adds the specified element to this queue, waiting if necessary for |
234 |
> |
* another thread to receive it. |
235 |
> |
* @param o the element to add |
236 |
> |
* @throws InterruptedException if interrupted while waiting. |
237 |
> |
* @throws NullPointerException if the specified element is <tt>null</tt>. |
238 |
|
*/ |
239 |
< |
private E doTake(boolean timed, long nanos) throws InterruptedException { |
239 |
> |
public void put(E o) throws InterruptedException { |
240 |
> |
if (o == null) throw new NullPointerException(); |
241 |
> |
final ReentrantLock qlock = this.qlock; |
242 |
> |
|
243 |
|
for (;;) { |
244 |
|
Node node; |
245 |
|
boolean mustWait; |
244 |
– |
|
245 |
– |
final ReentrantLock qlock = this.qlock; |
246 |
|
qlock.lockInterruptibly(); |
247 |
|
try { |
248 |
< |
node = waitingPuts.deq(); |
248 |
> |
node = waitingTakes.deq(); |
249 |
|
if ( (mustWait = (node == null)) ) |
250 |
< |
node = waitingTakes.enq(null); |
250 |
> |
node = waitingPuts.enq(o); |
251 |
|
} finally { |
252 |
|
qlock.unlock(); |
253 |
|
} |
254 |
|
|
255 |
|
if (mustWait) { |
256 |
< |
Object x = node.waitForPut(timed, nanos); |
257 |
< |
return (E)x; |
258 |
< |
} |
259 |
< |
else { |
260 |
< |
Object x = node.getItem(); |
261 |
< |
if (x != null) |
262 |
< |
return (E)x; |
263 |
< |
// else cancelled, so retry |
256 |
> |
node.waitForTake(); |
257 |
> |
return; |
258 |
|
} |
265 |
– |
} |
266 |
– |
} |
267 |
– |
|
268 |
– |
/** |
269 |
– |
* Creates a <tt>SynchronousQueue</tt>. |
270 |
– |
*/ |
271 |
– |
public SynchronousQueue() {} |
259 |
|
|
260 |
+ |
else if (node.setItem(o)) |
261 |
+ |
return; |
262 |
|
|
263 |
< |
/** |
264 |
< |
* Adds the specified element to this queue, waiting if necessary for |
276 |
< |
* another thread to receive it. |
277 |
< |
* @param o the element to add |
278 |
< |
* @throws InterruptedException if interrupted while waiting. |
279 |
< |
* @throws NullPointerException if the specified element is <tt>null</tt>. |
280 |
< |
*/ |
281 |
< |
public void put(E o) throws InterruptedException { |
282 |
< |
doPut(o, false, 0); |
263 |
> |
// else taker cancelled, so retry |
264 |
> |
} |
265 |
|
} |
266 |
|
|
267 |
|
/** |
278 |
|
* @throws NullPointerException if the specified element is <tt>null</tt>. |
279 |
|
*/ |
280 |
|
public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { |
281 |
< |
return doPut(o, true, unit.toNanos(timeout)); |
281 |
> |
if (o == null) throw new NullPointerException(); |
282 |
> |
long nanos = unit.toNanos(timeout); |
283 |
> |
final ReentrantLock qlock = this.qlock; |
284 |
> |
for (;;) { |
285 |
> |
Node node; |
286 |
> |
boolean mustWait; |
287 |
> |
qlock.lockInterruptibly(); |
288 |
> |
try { |
289 |
> |
node = waitingTakes.deq(); |
290 |
> |
if ( (mustWait = (node == null)) ) |
291 |
> |
node = waitingPuts.enq(o); |
292 |
> |
} finally { |
293 |
> |
qlock.unlock(); |
294 |
> |
} |
295 |
> |
|
296 |
> |
if (mustWait) |
297 |
> |
return node.waitForTake(nanos); |
298 |
> |
|
299 |
> |
else if (node.setItem(o)) |
300 |
> |
return true; |
301 |
> |
|
302 |
> |
// else taker cancelled, so retry |
303 |
> |
} |
304 |
> |
|
305 |
|
} |
306 |
|
|
307 |
|
|
311 |
|
* @return the head of this queue |
312 |
|
*/ |
313 |
|
public E take() throws InterruptedException { |
314 |
< |
return doTake(false, 0); |
314 |
> |
final ReentrantLock qlock = this.qlock; |
315 |
> |
for (;;) { |
316 |
> |
Node node; |
317 |
> |
boolean mustWait; |
318 |
> |
|
319 |
> |
qlock.lockInterruptibly(); |
320 |
> |
try { |
321 |
> |
node = waitingPuts.deq(); |
322 |
> |
if ( (mustWait = (node == null)) ) |
323 |
> |
node = waitingTakes.enq(null); |
324 |
> |
} finally { |
325 |
> |
qlock.unlock(); |
326 |
> |
} |
327 |
> |
|
328 |
> |
if (mustWait) |
329 |
> |
return (E)node.waitForPut(); |
330 |
> |
|
331 |
> |
else { |
332 |
> |
Object x = node.getItem(); |
333 |
> |
if (x != null) |
334 |
> |
return (E)x; |
335 |
> |
// else cancelled, so retry |
336 |
> |
} |
337 |
> |
} |
338 |
|
} |
339 |
|
|
340 |
|
/** |
350 |
|
* @throws InterruptedException if interrupted while waiting. |
351 |
|
*/ |
352 |
|
public E poll(long timeout, TimeUnit unit) throws InterruptedException { |
353 |
< |
return doTake(true, unit.toNanos(timeout)); |
353 |
> |
long nanos = unit.toNanos(timeout); |
354 |
> |
final ReentrantLock qlock = this.qlock; |
355 |
> |
|
356 |
> |
for (;;) { |
357 |
> |
Node node; |
358 |
> |
boolean mustWait; |
359 |
> |
|
360 |
> |
qlock.lockInterruptibly(); |
361 |
> |
try { |
362 |
> |
node = waitingPuts.deq(); |
363 |
> |
if ( (mustWait = (node == null)) ) |
364 |
> |
node = waitingTakes.enq(null); |
365 |
> |
} finally { |
366 |
> |
qlock.unlock(); |
367 |
> |
} |
368 |
> |
|
369 |
> |
if (mustWait) |
370 |
> |
return (E) node.waitForPut(nanos); |
371 |
> |
|
372 |
> |
else { |
373 |
> |
Object x = node.getItem(); |
374 |
> |
if (x != null) |
375 |
> |
return (E)x; |
376 |
> |
// else cancelled, so retry |
377 |
> |
} |
378 |
> |
} |
379 |
|
} |
380 |
|
|
381 |
|
// Untimed nonblocking versions |