280 |
|
* 2. Await match or cancellation (method awaitMatch) |
281 |
|
* |
282 |
|
* Wait for another thread to match node; instead cancelling if |
283 |
< |
* the current thread was interrupted or the wait timed out. On |
284 |
< |
* multiprocessors, we use front-of-queue spinning: If a node |
285 |
< |
* appears to be the first unmatched node in the queue, it |
286 |
< |
* spins a bit before blocking. In either case, before blocking |
287 |
< |
* it tries to unsplice any nodes between the current "head" |
288 |
< |
* and the first unmatched node. |
289 |
< |
* |
290 |
< |
* Front-of-queue spinning vastly improves performance of |
291 |
< |
* heavily contended queues. And so long as it is relatively |
292 |
< |
* brief and "quiet", spinning does not much impact performance |
293 |
< |
* of less-contended queues. During spins threads check their |
294 |
< |
* interrupt status and generate a thread-local random number |
295 |
< |
* to decide to occasionally perform a Thread.yield. While |
296 |
< |
* yield has underdefined specs, we assume that it might help, |
297 |
< |
* and will not hurt, in limiting impact of spinning on busy |
298 |
< |
* systems. We also use smaller (1/2) spins for nodes that are |
299 |
< |
* not known to be front but whose predecessors have not |
300 |
< |
* blocked -- these "chained" spins avoid artifacts of |
301 |
< |
* front-of-queue rules which otherwise lead to alternating |
302 |
< |
* nodes spinning vs blocking. Further, front threads that |
303 |
< |
* represent phase changes (from data to request node or vice |
304 |
< |
* versa) compared to their predecessors receive additional |
305 |
< |
* chained spins, reflecting longer paths typically required to |
306 |
< |
* unblock threads during phase changes. |
307 |
< |
* |
283 |
> |
* the current thread was interrupted or the wait timed out. To |
284 |
> |
* improve performance in common single-source / single-sink |
285 |
> |
* usages when there are more tasks that cores, an initial |
286 |
> |
* Thread.yield is tried when there is apparently only one |
287 |
> |
* waiter. In other cases, waiters may help with some |
288 |
> |
* bookkeeping, then park/unpark. |
289 |
|
* |
290 |
|
* ** Unlinking removed interior nodes ** |
291 |
|
* |
321 |
|
* |
322 |
|
* When these cases arise, rather than always retraversing the |
323 |
|
* entire list to find an actual predecessor to unlink (which |
324 |
< |
* won't help for case (1) anyway), we record a conservative |
325 |
< |
* estimate of possible unsplice failures (in "sweepVotes"). |
326 |
< |
* We trigger a full sweep when the estimate exceeds a threshold |
346 |
< |
* ("SWEEP_THRESHOLD") indicating the maximum number of estimated |
347 |
< |
* removal failures to tolerate before sweeping through, unlinking |
348 |
< |
* cancelled nodes that were not unlinked upon initial removal. |
349 |
< |
* We perform sweeps by the thread hitting threshold (rather than |
350 |
< |
* background threads or by spreading work to other threads) |
351 |
< |
* because in the main contexts in which removal occurs, the |
352 |
< |
* caller is timed-out or cancelled, which are not time-critical |
353 |
< |
* enough to warrant the overhead that alternatives would impose |
354 |
< |
* on other threads. |
355 |
< |
* |
356 |
< |
* Because the sweepVotes estimate is conservative, and because |
357 |
< |
* nodes become unlinked "naturally" as they fall off the head of |
358 |
< |
* the queue, and because we allow votes to accumulate even while |
359 |
< |
* sweeps are in progress, there are typically significantly fewer |
360 |
< |
* such nodes than estimated. Choice of a threshold value |
361 |
< |
* balances the likelihood of wasted effort and contention, versus |
362 |
< |
* providing a worst-case bound on retention of interior nodes in |
363 |
< |
* quiescent queues. The value defined below was chosen |
364 |
< |
* empirically to balance these under various timeout scenarios. |
365 |
< |
* |
366 |
< |
* Because traversal operations on the linked list of nodes are a |
324 |
> |
* won't help for case (1) anyway), we record the need to sweep the |
325 |
> |
* next time any thread would otherwise block in awaitMatch. Also, |
326 |
> |
* because traversal operations on the linked list of nodes are a |
327 |
|
* natural opportunity to sweep dead nodes, we generally do so, |
328 |
|
* including all the operations that might remove elements as they |
329 |
|
* traverse, such as removeIf and Iterator.remove. This largely |
336 |
|
* self-linked. |
337 |
|
*/ |
338 |
|
|
379 |
– |
/** True if on multiprocessor */ |
380 |
– |
private static final boolean MP = |
381 |
– |
Runtime.getRuntime().availableProcessors() > 1; |
382 |
– |
|
339 |
|
/** |
340 |
< |
* The number of times to spin (with randomly interspersed calls |
341 |
< |
* to Thread.yield) on multiprocessor before blocking when a node |
342 |
< |
* is apparently the first waiter in the queue. See above for |
387 |
< |
* explanation. Must be a power of two. The value is empirically |
388 |
< |
* derived -- it works pretty well across a variety of processors, |
389 |
< |
* numbers of CPUs, and OSes. |
340 |
> |
* The number of nanoseconds for which it is faster to spin |
341 |
> |
* rather than to use timed park. A rough estimate suffices. |
342 |
> |
* Using a power of two minus one simplifies some comparisons. |
343 |
|
*/ |
344 |
< |
private static final int FRONT_SPINS = 1 << 7; |
392 |
< |
|
393 |
< |
/** |
394 |
< |
* The number of times to spin before blocking when a node is |
395 |
< |
* preceded by another node that is apparently spinning. Also |
396 |
< |
* serves as an increment to FRONT_SPINS on phase changes, and as |
397 |
< |
* base average frequency for yielding during spins. Must be a |
398 |
< |
* power of two. |
399 |
< |
*/ |
400 |
< |
private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; |
344 |
> |
static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1023L; |
345 |
|
|
346 |
|
/** |
347 |
|
* The maximum number of estimated removal failures (sweepVotes) |
357 |
|
* them after use. Writes that are intrinsically ordered wrt |
358 |
|
* other accesses or CASes use simple relaxed forms. |
359 |
|
*/ |
360 |
< |
static final class Node { |
360 |
> |
static final class Node implements ForkJoinPool.ManagedBlocker { |
361 |
|
final boolean isData; // false if this is a request node |
362 |
|
volatile Object item; // initially non-null if isData; CASed to match |
363 |
|
volatile Node next; |
402 |
|
final void appendRelaxed(Node next) { |
403 |
|
// assert next != null; |
404 |
|
// assert this.next == null; |
405 |
< |
NEXT.set(this, next); |
462 |
< |
} |
463 |
< |
|
464 |
< |
/** |
465 |
< |
* Sets item (of a request node) to self and waiter to null, |
466 |
< |
* to avoid garbage retention after matching or cancelling. |
467 |
< |
* Uses relaxed writes because order is already constrained in |
468 |
< |
* the only calling contexts: item is forgotten only after |
469 |
< |
* volatile/atomic mechanics that extract items, and visitors |
470 |
< |
* of request nodes only ever check whether item is null. |
471 |
< |
* Similarly, clearing waiter follows either CAS or return |
472 |
< |
* from park (if ever parked; else we don't care). |
473 |
< |
*/ |
474 |
< |
final void forgetContents() { |
475 |
< |
// assert isMatched(); |
476 |
< |
if (!isData) |
477 |
< |
ITEM.set(this, this); |
478 |
< |
WAITER.set(this, null); |
405 |
> |
NEXT.setOpaque(this, next); |
406 |
|
} |
407 |
|
|
408 |
|
/** |
432 |
|
return d != haveData && d != (item == null); |
433 |
|
} |
434 |
|
|
435 |
+ |
public final boolean isReleasable() { |
436 |
+ |
return (isData == (item == null)) || |
437 |
+ |
Thread.currentThread().isInterrupted(); |
438 |
+ |
} |
439 |
+ |
|
440 |
+ |
public final boolean block() { |
441 |
+ |
while (!isReleasable()) LockSupport.park(); |
442 |
+ |
return true; |
443 |
+ |
} |
444 |
+ |
|
445 |
|
private static final long serialVersionUID = -3375979862319811754L; |
446 |
|
} |
447 |
|
|
474 |
|
private transient volatile Node tail; |
475 |
|
|
476 |
|
/** The number of apparent failures to unsplice cancelled nodes */ |
477 |
< |
private transient volatile int sweepVotes; |
477 |
> |
private transient volatile boolean needSweep; |
478 |
|
|
479 |
|
private boolean casTail(Node cmp, Node val) { |
480 |
|
// assert cmp != null; |
486 |
|
return HEAD.compareAndSet(this, cmp, val); |
487 |
|
} |
488 |
|
|
552 |
– |
/** Atomic version of ++sweepVotes. */ |
553 |
– |
private int incSweepVotes() { |
554 |
– |
return (int) SWEEPVOTES.getAndAdd(this, 1) + 1; |
555 |
– |
} |
556 |
– |
|
489 |
|
/** |
490 |
|
* Tries to CAS pred.next (or head, if pred is null) from c to p. |
491 |
|
* Caller must ensure that we're not unlinking the trailing node. |
592 |
|
} |
593 |
|
|
594 |
|
/** |
595 |
< |
* Spins/yields/blocks until node s is matched or caller gives up. |
595 |
> |
* Possibly blocks until node s is matched or caller gives up. |
596 |
|
* |
597 |
|
* @param s the waiting node |
598 |
|
* @param pred the predecessor of s, or null if unknown (the null |
603 |
|
* @param nanos timeout in nanosecs, used only if timed is true |
604 |
|
* @return matched item, or e if unmatched on interrupt or timeout |
605 |
|
*/ |
606 |
+ |
@SuppressWarnings("unchecked") |
607 |
|
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { |
608 |
+ |
final boolean isData = s.isData; |
609 |
|
final long deadline = timed ? System.nanoTime() + nanos : 0L; |
610 |
< |
Thread w = Thread.currentThread(); |
611 |
< |
int spins = -1; // initialized after first item and cancel checks |
612 |
< |
ThreadLocalRandom randomYields = null; // bound if needed |
613 |
< |
|
614 |
< |
for (;;) { |
615 |
< |
final Object item; |
616 |
< |
if ((item = s.item) != e) { // matched |
617 |
< |
// assert item != s; |
618 |
< |
s.forgetContents(); // avoid garbage |
685 |
< |
@SuppressWarnings("unchecked") E itemE = (E) item; |
686 |
< |
return itemE; |
687 |
< |
} |
688 |
< |
else if (w.isInterrupted() || (timed && nanos <= 0L)) { |
689 |
< |
// try to cancel and unlink |
690 |
< |
if (s.casItem(e, s.isData ? null : s)) { |
691 |
< |
unsplice(pred, s); |
610 |
> |
final Thread w = Thread.currentThread(); |
611 |
> |
int stat = -1; // -1: may yield, +1: park, else 0 |
612 |
> |
Object item; |
613 |
> |
while ((item = s.item) == e) { |
614 |
> |
if (needSweep) // help clean |
615 |
> |
sweep(); |
616 |
> |
else if ((timed && nanos <= 0L) || w.isInterrupted()) { |
617 |
> |
if (s.casItem(e, (e == null) ? s : null)) { |
618 |
> |
unsplice(pred, s); // cancelled |
619 |
|
return e; |
620 |
|
} |
694 |
– |
// return normally if lost CAS |
695 |
– |
} |
696 |
– |
else if (spins < 0) { // establish spins at/near front |
697 |
– |
if ((spins = spinsFor(pred, s.isData)) > 0) |
698 |
– |
randomYields = ThreadLocalRandom.current(); |
621 |
|
} |
622 |
< |
else if (spins > 0) { // spin |
623 |
< |
--spins; |
624 |
< |
if (randomYields.nextInt(CHAINED_SPINS) == 0) |
625 |
< |
Thread.yield(); // occasionally yield |
622 |
> |
else if (stat <= 0) { |
623 |
> |
if (pred != null && pred.next == s) { |
624 |
> |
if (stat < 0 && |
625 |
> |
(pred.isData != isData || pred.isMatched())) { |
626 |
> |
stat = 0; // yield once if first |
627 |
> |
Thread.yield(); |
628 |
> |
} |
629 |
> |
else { |
630 |
> |
stat = 1; |
631 |
> |
s.waiter = w; // enable unpark |
632 |
> |
} |
633 |
> |
} // else signal in progress |
634 |
|
} |
635 |
< |
else if (s.waiter == null) { |
636 |
< |
s.waiter = w; // request unpark then recheck |
635 |
> |
else if ((item = s.item) != e) |
636 |
> |
break; // recheck |
637 |
> |
else if (!timed) { |
638 |
> |
LockSupport.setCurrentBlocker(this); |
639 |
> |
try { |
640 |
> |
ForkJoinPool.managedBlock(s); |
641 |
> |
} catch (InterruptedException cannotHappen) { } |
642 |
> |
LockSupport.setCurrentBlocker(null); |
643 |
|
} |
644 |
< |
else if (timed) { |
644 |
> |
else { |
645 |
|
nanos = deadline - System.nanoTime(); |
646 |
< |
if (nanos > 0L) |
646 |
> |
if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD) |
647 |
|
LockSupport.parkNanos(this, nanos); |
648 |
|
} |
713 |
– |
else { |
714 |
– |
LockSupport.park(this); |
715 |
– |
} |
716 |
– |
} |
717 |
– |
} |
718 |
– |
|
719 |
– |
/** |
720 |
– |
* Returns spin/yield value for a node with given predecessor and |
721 |
– |
* data mode. See above for explanation. |
722 |
– |
*/ |
723 |
– |
private static int spinsFor(Node pred, boolean haveData) { |
724 |
– |
if (MP && pred != null) { |
725 |
– |
if (pred.isData != haveData) // phase change |
726 |
– |
return FRONT_SPINS + CHAINED_SPINS; |
727 |
– |
if (pred.isMatched()) // probably at front |
728 |
– |
return FRONT_SPINS; |
729 |
– |
if (pred.waiter == null) // pred apparently spinning |
730 |
– |
return CHAINED_SPINS; |
649 |
|
} |
650 |
< |
return 0; |
650 |
> |
if (stat == 1) |
651 |
> |
WAITER.set(s, null); |
652 |
> |
if (!isData) |
653 |
> |
ITEM.set(s, s); // self-link to avoid garbage |
654 |
> |
return (E) item; |
655 |
|
} |
656 |
|
|
657 |
|
/* -------------- Traversal methods -------------- */ |
1074 |
|
* See above for rationale. Briefly: if pred still points to |
1075 |
|
* s, try to unlink s. If s cannot be unlinked, because it is |
1076 |
|
* trailing node or pred might be unlinked, and neither pred |
1077 |
< |
* nor s are head or offlist, add to sweepVotes, and if enough |
1156 |
< |
* votes have accumulated, sweep. |
1077 |
> |
* nor s are head or offlist, set needSweep; |
1078 |
|
*/ |
1079 |
|
if (pred != null && pred.next == s) { |
1080 |
|
Node n = s.next; |
1092 |
|
if (hn != h && casHead(h, hn)) |
1093 |
|
h.selfLink(); // advance head |
1094 |
|
} |
1095 |
< |
// sweep every SWEEP_THRESHOLD votes |
1096 |
< |
if (pred.next != pred && s.next != s // recheck if offlist |
1176 |
< |
&& (incSweepVotes() & (SWEEP_THRESHOLD - 1)) == 0) |
1177 |
< |
sweep(); |
1095 |
> |
if (pred.next != pred && s.next != s) |
1096 |
> |
needSweep = true; |
1097 |
|
} |
1098 |
|
} |
1099 |
|
} |
1103 |
|
* traversal from head. |
1104 |
|
*/ |
1105 |
|
private void sweep() { |
1106 |
+ |
needSweep = false; |
1107 |
|
for (Node p = head, s, n; p != null && (s = p.next) != null; ) { |
1108 |
|
if (!s.isMatched()) |
1109 |
|
// Unmatched nodes are never self-linked |
1156 |
|
* @throws NullPointerException if the specified element is null |
1157 |
|
*/ |
1158 |
|
public void put(E e) { |
1159 |
< |
xfer(e, true, ASYNC, 0); |
1159 |
> |
xfer(e, true, ASYNC, 0L); |
1160 |
|
} |
1161 |
|
|
1162 |
|
/** |
1169 |
|
* @throws NullPointerException if the specified element is null |
1170 |
|
*/ |
1171 |
|
public boolean offer(E e, long timeout, TimeUnit unit) { |
1172 |
< |
xfer(e, true, ASYNC, 0); |
1172 |
> |
xfer(e, true, ASYNC, 0L); |
1173 |
|
return true; |
1174 |
|
} |
1175 |
|
|
1181 |
|
* @throws NullPointerException if the specified element is null |
1182 |
|
*/ |
1183 |
|
public boolean offer(E e) { |
1184 |
< |
xfer(e, true, ASYNC, 0); |
1184 |
> |
xfer(e, true, ASYNC, 0L); |
1185 |
|
return true; |
1186 |
|
} |
1187 |
|
|
1194 |
|
* @throws NullPointerException if the specified element is null |
1195 |
|
*/ |
1196 |
|
public boolean add(E e) { |
1197 |
< |
xfer(e, true, ASYNC, 0); |
1197 |
> |
xfer(e, true, ASYNC, 0L); |
1198 |
|
return true; |
1199 |
|
} |
1200 |
|
|
1209 |
|
* @throws NullPointerException if the specified element is null |
1210 |
|
*/ |
1211 |
|
public boolean tryTransfer(E e) { |
1212 |
< |
return xfer(e, true, NOW, 0) == null; |
1212 |
> |
return xfer(e, true, NOW, 0L) == null; |
1213 |
|
} |
1214 |
|
|
1215 |
|
/** |
1224 |
|
* @throws NullPointerException if the specified element is null |
1225 |
|
*/ |
1226 |
|
public void transfer(E e) throws InterruptedException { |
1227 |
< |
if (xfer(e, true, SYNC, 0) != null) { |
1227 |
> |
if (xfer(e, true, SYNC, 0L) != null) { |
1228 |
|
Thread.interrupted(); // failure possible only due to interrupt |
1229 |
|
throw new InterruptedException(); |
1230 |
|
} |
1254 |
|
} |
1255 |
|
|
1256 |
|
public E take() throws InterruptedException { |
1257 |
< |
E e = xfer(null, false, SYNC, 0); |
1257 |
> |
E e = xfer(null, false, SYNC, 0L); |
1258 |
|
if (e != null) |
1259 |
|
return e; |
1260 |
|
Thread.interrupted(); |
1269 |
|
} |
1270 |
|
|
1271 |
|
public E poll() { |
1272 |
< |
return xfer(null, false, NOW, 0); |
1272 |
> |
return xfer(null, false, NOW, 0L); |
1273 |
|
} |
1274 |
|
|
1275 |
|
/** |
1613 |
|
// VarHandle mechanics |
1614 |
|
private static final VarHandle HEAD; |
1615 |
|
private static final VarHandle TAIL; |
1696 |
– |
private static final VarHandle SWEEPVOTES; |
1616 |
|
static final VarHandle ITEM; |
1617 |
|
static final VarHandle NEXT; |
1618 |
|
static final VarHandle WAITER; |
1623 |
|
Node.class); |
1624 |
|
TAIL = l.findVarHandle(LinkedTransferQueue.class, "tail", |
1625 |
|
Node.class); |
1707 |
– |
SWEEPVOTES = l.findVarHandle(LinkedTransferQueue.class, "sweepVotes", |
1708 |
– |
int.class); |
1626 |
|
ITEM = l.findVarHandle(Node.class, "item", Object.class); |
1627 |
|
NEXT = l.findVarHandle(Node.class, "next", Node.class); |
1628 |
|
WAITER = l.findVarHandle(Node.class, "waiter", Thread.class); |