120 |
|
abstract Node enq(Object x); |
121 |
|
/** Remove and return node, or null if empty */ |
122 |
|
abstract Node deq(); |
123 |
+ |
/** Remove a cancelled node to avoid garbage retention. */ |
124 |
+ |
abstract void unlink(Node node); |
125 |
+ |
/** Return true if a cancelled node might be on queue */ |
126 |
+ |
abstract boolean shouldUnlink(Node node); |
127 |
|
} |
128 |
|
|
129 |
|
/** |
152 |
|
} |
153 |
|
return p; |
154 |
|
} |
155 |
+ |
|
156 |
+ |
boolean shouldUnlink(Node node) { |
157 |
+ |
return (node == last || node.next != null); |
158 |
+ |
} |
159 |
+ |
|
160 |
+ |
|
161 |
+ |
void unlink(Node node) { |
162 |
+ |
Node p = head; |
163 |
+ |
Node trail = null; |
164 |
+ |
while (p != null) { |
165 |
+ |
if (p == node) { |
166 |
+ |
Node next = p.next; |
167 |
+ |
if (trail == null) |
168 |
+ |
head = next; |
169 |
+ |
else |
170 |
+ |
trail.next = next; |
171 |
+ |
if (last == node) |
172 |
+ |
last = trail; |
173 |
+ |
break; |
174 |
+ |
} |
175 |
+ |
trail = p; |
176 |
+ |
p = p.next; |
177 |
+ |
} |
178 |
+ |
} |
179 |
|
} |
180 |
|
|
181 |
|
/** |
197 |
|
} |
198 |
|
return p; |
199 |
|
} |
200 |
+ |
|
201 |
+ |
boolean shouldUnlink(Node node) { |
202 |
+ |
// Return false if already dequeued or is bottom node (in which |
203 |
+ |
// case we might retain at most one garbage node) |
204 |
+ |
return (node == head || node.next != null); |
205 |
+ |
} |
206 |
+ |
|
207 |
+ |
void unlink(Node node) { |
208 |
+ |
Node p = head; |
209 |
+ |
Node trail = null; |
210 |
+ |
while (p != null) { |
211 |
+ |
if (p == node) { |
212 |
+ |
Node next = p.next; |
213 |
+ |
if (trail == null) |
214 |
+ |
head = next; |
215 |
+ |
else |
216 |
+ |
trail.next = next; |
217 |
+ |
break; |
218 |
+ |
} |
219 |
+ |
trail = p; |
220 |
+ |
p = p.next; |
221 |
+ |
} |
222 |
+ |
} |
223 |
|
} |
224 |
|
|
225 |
+ |
/* |
226 |
+ |
* Unlink the given node from consumer queue. Called by cancelled |
227 |
+ |
* (timeout, interrupt) waiters to avoid garbage retention in the |
228 |
+ |
* absence of producers. |
229 |
+ |
*/ |
230 |
+ |
private void unlinkCancelledConsumer(Node node) { |
231 |
+ |
// Use a form of double-check to avoid unnecessary locking and |
232 |
+ |
// traversal. The first check outside lock might |
233 |
+ |
// conservatively report true. |
234 |
+ |
if (waitingConsumers.shouldUnlink(node)) { |
235 |
+ |
qlock.lock(); |
236 |
+ |
try { |
237 |
+ |
if (waitingConsumers.shouldUnlink(node)) |
238 |
+ |
waitingConsumers.unlink(node); |
239 |
+ |
} finally { |
240 |
+ |
qlock.unlock(); |
241 |
+ |
} |
242 |
+ |
} |
243 |
+ |
} |
244 |
+ |
|
245 |
+ |
/* |
246 |
+ |
* Unlink the given node from producer queue. Symmetric |
247 |
+ |
* to unlinkCancelledConsumer. |
248 |
+ |
*/ |
249 |
+ |
private void unlinkCancelledProducer(Node node) { |
250 |
+ |
if (waitingProducers.shouldUnlink(node)) { |
251 |
+ |
qlock.lock(); |
252 |
+ |
try { |
253 |
+ |
if (waitingProducers.shouldUnlink(node)) |
254 |
+ |
waitingProducers.unlink(node); |
255 |
+ |
} finally { |
256 |
+ |
qlock.unlock(); |
257 |
+ |
} |
258 |
+ |
} |
259 |
+ |
} |
260 |
+ |
|
261 |
|
/** |
262 |
|
* Nodes each maintain an item and handle waits and signals for |
263 |
|
* getting and setting it. The class extends |
409 |
|
} |
410 |
|
|
411 |
|
if (mustWait) { |
412 |
< |
node.waitForTake(); |
413 |
< |
return; |
412 |
> |
try { |
413 |
> |
node.waitForTake(); |
414 |
> |
return; |
415 |
> |
} catch (InterruptedException ex) { |
416 |
> |
unlinkCancelledProducer(node); |
417 |
> |
throw ex; |
418 |
> |
} |
419 |
|
} |
420 |
|
|
421 |
|
else if (node.setItem(o)) |
455 |
|
qlock.unlock(); |
456 |
|
} |
457 |
|
|
458 |
< |
if (mustWait) |
459 |
< |
return node.waitForTake(nanos); |
458 |
> |
if (mustWait) { |
459 |
> |
try { |
460 |
> |
boolean x = node.waitForTake(nanos); |
461 |
> |
if (!x) |
462 |
> |
unlinkCancelledProducer(node); |
463 |
> |
return x; |
464 |
> |
} catch (InterruptedException ex) { |
465 |
> |
unlinkCancelledProducer(node); |
466 |
> |
throw ex; |
467 |
> |
} |
468 |
> |
} |
469 |
|
|
470 |
|
else if (node.setItem(o)) |
471 |
|
return true; |
497 |
|
} |
498 |
|
|
499 |
|
if (mustWait) { |
500 |
< |
Object x = node.waitForPut(); |
501 |
< |
return (E)x; |
500 |
> |
try { |
501 |
> |
Object x = node.waitForPut(); |
502 |
> |
return (E)x; |
503 |
> |
} catch (InterruptedException ex) { |
504 |
> |
unlinkCancelledConsumer(node); |
505 |
> |
throw ex; |
506 |
> |
} |
507 |
|
} |
508 |
|
else { |
509 |
|
Object x = node.getItem(); |
545 |
|
} |
546 |
|
|
547 |
|
if (mustWait) { |
548 |
< |
Object x = node.waitForPut(nanos); |
549 |
< |
return (E)x; |
548 |
> |
try { |
549 |
> |
Object x = node.waitForPut(nanos); |
550 |
> |
if (x == null) |
551 |
> |
unlinkCancelledConsumer(node); |
552 |
> |
return (E)x; |
553 |
> |
} catch (InterruptedException ex) { |
554 |
> |
unlinkCancelledConsumer(node); |
555 |
> |
throw ex; |
556 |
> |
} |
557 |
|
} |
558 |
|
else { |
559 |
|
Object x = node.getItem(); |