416 |
|
* if to its current value). This would be extremely costly. So |
417 |
|
* we relax it in several ways: (1) Producers only signal when |
418 |
|
* their queue is possibly empty at some point during a push |
419 |
< |
* operation. (2) Other workers propagate this signal when they |
420 |
< |
* find tasks. (3) Workers only enqueue after scanning (see below) |
421 |
< |
* and not finding any tasks. (4) Rather than CASing ctl to its |
422 |
< |
* current value in the common case where no action is required, |
423 |
< |
* we reduce write contention by equivalently prefacing signalWork |
424 |
< |
* when called by an external task producer using a memory access |
425 |
< |
* with full-volatile semantics or a "fullFence". |
426 |
< |
* |
427 |
< |
* Almost always, too many signals are issued. A task producer |
428 |
< |
* cannot in general tell if some existing worker is in the midst |
429 |
< |
* of finishing one task (or already scanning) and ready to take |
430 |
< |
* another without being signalled. So the producer might instead |
431 |
< |
* activate a different worker that does not find any work, and |
432 |
< |
* then inactivates. This scarcely matters in steady-state |
433 |
< |
* computations involving all workers, but can create contention |
434 |
< |
* and bookkeeping bottlenecks during ramp-up, ramp-down, and small |
435 |
< |
* computations involving only a few workers. |
419 |
> |
* operation (which requires conservatively checking size zero or |
420 |
> |
* one to cover races). (2) Other workers propagate this signal |
421 |
> |
* when they find tasks in a queue with size greater than one. (3) |
422 |
> |
* Workers only enqueue after scanning (see below) and not finding |
423 |
> |
* any tasks. (4) Rather than CASing ctl to its current value in |
424 |
> |
* the common case where no action is required, we reduce write |
425 |
> |
* contention by equivalently prefacing signalWork when called by |
426 |
> |
* an external task producer using a memory access with |
427 |
> |
* full-volatile semantics or a "fullFence". |
428 |
> |
* |
429 |
> |
* Almost always, too many signals are issued, in part because a |
430 |
> |
* task producer cannot tell if some existing worker is in the |
431 |
> |
* midst of finishing one task (or already scanning) and ready to |
432 |
> |
* take another without being signalled. So the producer might |
433 |
> |
* instead activate a different worker that does not find any |
434 |
> |
* work, and then inactivates. This scarcely matters in |
435 |
> |
* steady-state computations involving all workers, but can create |
436 |
> |
* contention and bookkeeping bottlenecks during ramp-up, |
437 |
> |
* ramp-down, and small computations involving only a few workers. |
438 |
|
* |
439 |
|
* Scanning. Method scan (from runWorker) performs top-level |
440 |
|
* scanning for tasks. (Similar scans appear in helpQuiesce and |
812 |
|
*/ |
813 |
|
final void push(ForkJoinTask<?> task) { |
814 |
|
ForkJoinTask<?>[] a; |
815 |
< |
int s = top, d, cap; |
815 |
> |
int s = top, d, cap, m; |
816 |
|
ForkJoinPool p = pool; |
817 |
|
if ((a = array) != null && (cap = a.length) > 0) { |
818 |
< |
QA.setRelease(a, (cap - 1) & s, task); |
818 |
> |
QA.setRelease(a, (m = cap - 1) & s, task); |
819 |
|
top = s + 1; |
820 |
< |
if ((d = (int)BASE.getAcquire(this) - s) == 0 && p != null) { |
820 |
> |
if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 && |
821 |
> |
p != null) { // size 0 or 1 |
822 |
|
VarHandle.fullFence(); |
823 |
|
p.signalWork(); |
824 |
|
} |
825 |
< |
else if (d + cap - 1 == 0) |
825 |
> |
else if (d == m) |
826 |
|
growArray(false); |
827 |
|
} |
828 |
|
} |
834 |
|
final boolean lockedPush(ForkJoinTask<?> task) { |
835 |
|
ForkJoinTask<?>[] a; |
836 |
|
boolean signal = false; |
837 |
< |
int s = top, b = base, cap; |
837 |
> |
int s = top, b = base, cap, d; |
838 |
|
if ((a = array) != null && (cap = a.length) > 0) { |
839 |
|
a[(cap - 1) & s] = task; |
840 |
|
top = s + 1; |
842 |
|
growArray(true); |
843 |
|
else { |
844 |
|
phase = 0; // full volatile unlock |
845 |
< |
if (base == s) |
845 |
> |
if (((s - base) & ~1) == 0) // size 0 or 1 |
846 |
|
signal = true; |
847 |
|
} |
848 |
|
} |
893 |
|
final ForkJoinTask<?> poll() { |
894 |
|
int b, k, cap; ForkJoinTask<?>[] a; |
895 |
|
while ((a = array) != null && (cap = a.length) > 0 && |
896 |
< |
(b = base) != top) { |
896 |
> |
top - (b = base) > 0) { |
897 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>) |
898 |
|
QA.getAcquire(a, k = (cap - 1) & b); |
899 |
|
if (base == b++) { |
985 |
|
* queue, up to bound n (to avoid infinite unfairness). |
986 |
|
*/ |
987 |
|
final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) { |
988 |
< |
if (t != null) { |
988 |
> |
if (t != null && q != null) { // hoist checks |
989 |
|
int nstolen = 1; |
990 |
|
for (;;) { |
991 |
|
t.doExec(); |
992 |
|
if (n-- < 0) |
993 |
|
break; |
994 |
|
else if ((t = nextLocalTask()) == null) { |
995 |
< |
if (q != null && (t = q.poll()) != null) |
993 |
< |
++nstolen; |
994 |
< |
else |
995 |
> |
if ((t = q.poll()) == null) |
996 |
|
break; |
997 |
+ |
else |
998 |
+ |
++nstolen; |
999 |
|
} |
1000 |
|
} |
1001 |
|
ForkJoinWorkerThread thread = owner; |
1012 |
|
final void tryRemoveAndExec(ForkJoinTask<?> task) { |
1013 |
|
ForkJoinTask<?>[] a; int s, cap; |
1014 |
|
if ((a = array) != null && (cap = a.length) > 0 && |
1015 |
< |
base - (s = top) < 0) { // traverse from top |
1015 |
> |
(s = top) - base > 0) { // traverse from top |
1016 |
|
for (int m = cap - 1, ns = s - 1, i = ns; ; --i) { |
1017 |
|
int index = i & m; |
1018 |
|
ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index); |
1052 |
|
if (task != null && (status = task.status) >= 0) { |
1053 |
|
int s, k, cap; ForkJoinTask<?>[] a; |
1054 |
|
while ((a = array) != null && (cap = a.length) > 0 && |
1055 |
< |
(s = top) != base) { |
1055 |
> |
(s = top) - base > 0) { |
1056 |
|
CountedCompleter<?> v = null; |
1057 |
|
ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)]; |
1058 |
|
if (o instanceof CountedCompleter) { |
1102 |
|
if (blocker != null) { |
1103 |
|
int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t; |
1104 |
|
while ((a = array) != null && (cap = a.length) > 0 && |
1105 |
< |
(b = base) != top) { |
1105 |
> |
top - (b = base) > 0) { |
1106 |
|
t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b); |
1107 |
|
if (blocker.isReleasable()) |
1108 |
|
break; |
1612 |
|
WorkQueue[] ws; int n; |
1613 |
|
if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) { |
1614 |
|
for (int m = n - 1, j = r & m;;) { |
1615 |
< |
WorkQueue q; int b, d; |
1616 |
< |
if ((q = ws[j]) != null && (d = (b = q.base) - q.top) != 0) { |
1615 |
> |
WorkQueue q; int b; |
1616 |
> |
if ((q = ws[j]) != null && q.top != (b = q.base)) { |
1617 |
|
int qid = q.id; |
1618 |
|
ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t; |
1619 |
|
if ((a = q.array) != null && (cap = a.length) > 0) { |
1622 |
|
QA.compareAndSet(a, k, t, null)) { |
1623 |
|
q.base = b; |
1624 |
|
w.source = qid; |
1625 |
< |
if (d != -1 || b != q.top) |
1625 |
> |
if (q.top - b > 0) |
1626 |
|
signalWork(); |
1627 |
|
w.topLevelExec(t, q, // random fairness bound |
1628 |
|
r & ((n << TOP_BOUND_SHIFT) - 1)); |
1666 |
|
while (n > 0) { |
1667 |
|
WorkQueue q; int b; |
1668 |
|
if ((q = ws[r & m]) != null && q.source == id && |
1669 |
< |
(b = q.base) != q.top) { |
1669 |
> |
q.top != (b = q.base)) { |
1670 |
|
ForkJoinTask<?>[] a; int cap, k; |
1671 |
|
int qid = q.id; |
1672 |
|
if ((a = q.array) != null && (cap = a.length) > 0) { |
1729 |
|
WorkQueue q; int b; |
1730 |
|
if ((q = ws[r & m]) != null) { |
1731 |
|
int qs = q.source; |
1732 |
< |
if ((b = q.base) != q.top) { |
1732 |
> |
if (q.top != (b = q.base)) { |
1733 |
|
quiet = empty = false; |
1734 |
|
ForkJoinTask<?>[] a; int cap, k; |
1735 |
|
int qid = q.id; |
1798 |
|
WorkQueue q; |
1799 |
|
if ((q = ws[i]) != null) { |
1800 |
|
int b; ForkJoinTask<?> t; |
1801 |
< |
if ((b = q.base) != q.top) { |
1801 |
> |
if (q.top - (b = q.base) > 0) { |
1802 |
|
nonempty = true; |
1803 |
|
if ((t = q.poll()) != null) |
1804 |
|
return t; |