416 |
|
* if to its current value). This would be extremely costly. So |
417 |
|
* we relax it in several ways: (1) Producers only signal when |
418 |
|
* their queue is possibly empty at some point during a push |
419 |
< |
* operation. (2) Other workers propagate this signal |
419 |
> |
* operation (which requires conservatively checking size zero or |
420 |
> |
* one to cover races). (2) Other workers propagate this signal |
421 |
|
* when they find tasks in a queue with size greater than one. (3) |
422 |
|
* Workers only enqueue after scanning (see below) and not finding |
423 |
|
* any tasks. (4) Rather than CASing ctl to its current value in |
733 |
|
|
734 |
|
/** |
735 |
|
* The maximum number of top-level polls per worker before |
736 |
< |
* checking other queues, expressed as a bit shift. See above for |
737 |
< |
* rationale. |
736 |
> |
* checking other queues, expressed as a bit shift to, in effect, |
737 |
> |
* multiply by pool size, and then use as random value mask, so |
738 |
> |
* average bound is about poolSize*(1<<TOP_BOUND_SHIFT). See |
739 |
> |
* above for rationale. |
740 |
|
*/ |
741 |
|
static final int TOP_BOUND_SHIFT = 10; |
742 |
|
|
812 |
|
*/ |
813 |
|
final void push(ForkJoinTask<?> task) { |
814 |
|
ForkJoinTask<?>[] a; |
815 |
< |
int s = top, d = s - base, cap, m; |
815 |
> |
int s = top, d, cap, m; |
816 |
|
ForkJoinPool p = pool; |
817 |
|
if ((a = array) != null && (cap = a.length) > 0) { |
818 |
|
QA.setRelease(a, (m = cap - 1) & s, task); |
819 |
|
top = s + 1; |
820 |
< |
if (d == m) |
821 |
< |
growArray(false); |
822 |
< |
else if (QA.getAcquire(a, m & (s - 1)) == null && p != null) { |
823 |
< |
VarHandle.fullFence(); // was empty |
821 |
< |
p.signalWork(null); |
820 |
> |
if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 && |
821 |
> |
p != null) { // size 0 or 1 |
822 |
> |
VarHandle.fullFence(); |
823 |
> |
p.signalWork(); |
824 |
|
} |
825 |
+ |
else if (d == m) |
826 |
+ |
growArray(false); |
827 |
|
} |
828 |
|
} |
829 |
|
|
834 |
|
final boolean lockedPush(ForkJoinTask<?> task) { |
835 |
|
ForkJoinTask<?>[] a; |
836 |
|
boolean signal = false; |
837 |
< |
int s = top, d = s - base, cap, m; |
837 |
> |
int s = top, b = base, cap, d; |
838 |
|
if ((a = array) != null && (cap = a.length) > 0) { |
839 |
< |
a[(m = (cap - 1)) & s] = task; |
839 |
> |
a[(cap - 1) & s] = task; |
840 |
|
top = s + 1; |
841 |
< |
if (d == m) |
841 |
> |
if (b - s + cap - 1 == 0) |
842 |
|
growArray(true); |
843 |
|
else { |
844 |
|
phase = 0; // full volatile unlock |
845 |
< |
if (a[m & (s - 1)] == null) |
846 |
< |
signal = true; // was empty |
845 |
> |
if (((s - base) & ~1) == 0) // size 0 or 1 |
846 |
> |
signal = true; |
847 |
|
} |
848 |
|
} |
849 |
|
return signal; |
985 |
|
* queue, up to bound n (to avoid infinite unfairness). |
986 |
|
*/ |
987 |
|
final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) { |
988 |
< |
int nstolen = 1; |
989 |
< |
for (int j = 0;;) { |
990 |
< |
if (t != null) |
988 |
> |
if (t != null && q != null) { // hoist checks |
989 |
> |
int nstolen = 1; |
990 |
> |
for (;;) { |
991 |
|
t.doExec(); |
992 |
< |
if (j++ <= n) |
989 |
< |
t = nextLocalTask(); |
990 |
< |
else { |
991 |
< |
j = 0; |
992 |
< |
t = null; |
993 |
< |
} |
994 |
< |
if (t == null) { |
995 |
< |
if (q != null && (t = q.poll()) != null) { |
996 |
< |
++nstolen; |
997 |
< |
j = 0; |
998 |
< |
} |
999 |
< |
else if (j != 0) |
992 |
> |
if (n-- < 0) |
993 |
|
break; |
994 |
+ |
else if ((t = nextLocalTask()) == null) { |
995 |
+ |
if ((t = q.poll()) == null) |
996 |
+ |
break; |
997 |
+ |
else |
998 |
+ |
++nstolen; |
999 |
+ |
} |
1000 |
|
} |
1001 |
+ |
ForkJoinWorkerThread thread = owner; |
1002 |
+ |
nsteals += nstolen; |
1003 |
+ |
source = 0; |
1004 |
+ |
if (thread != null) |
1005 |
+ |
thread.afterTopLevelExec(); |
1006 |
|
} |
1003 |
– |
ForkJoinWorkerThread thread = owner; |
1004 |
– |
nsteals += nstolen; |
1005 |
– |
source = 0; |
1006 |
– |
if (thread != null) |
1007 |
– |
thread.afterTopLevelExec(); |
1007 |
|
} |
1008 |
|
|
1009 |
|
/** |
1426 |
|
|
1427 |
|
if (!tryTerminate(false, false) && // possibly replace worker |
1428 |
|
w != null && w.array != null) // avoid repeated failures |
1429 |
< |
signalWork(null); |
1429 |
> |
signalWork(); |
1430 |
|
|
1431 |
|
if (ex == null) // help clean on way out |
1432 |
|
ForkJoinTask.helpExpungeStaleExceptions(); |
1436 |
|
|
1437 |
|
/** |
1438 |
|
* Tries to create or release a worker if too few are running. |
1440 |
– |
* @param q if non-null recheck if empty on CAS failure |
1439 |
|
*/ |
1440 |
< |
final void signalWork(WorkQueue q) { |
1440 |
> |
final void signalWork() { |
1441 |
|
for (;;) { |
1442 |
|
long c; int sp; WorkQueue[] ws; int i; WorkQueue v; |
1443 |
|
if ((c = ctl) >= 0L) // enough workers |
1464 |
|
LockSupport.unpark(vt); |
1465 |
|
break; |
1466 |
|
} |
1469 |
– |
else if (q != null && q.isEmpty()) // no need to retry |
1470 |
– |
break; |
1467 |
|
} |
1468 |
|
} |
1469 |
|
} |
1584 |
|
else if (rc <= 0 && (md & SHUTDOWN) != 0 && |
1585 |
|
tryTerminate(false, false)) |
1586 |
|
break; // quiescent shutdown |
1587 |
< |
else if (w.phase < 0) { |
1588 |
< |
if (rc <= 0 && pred != 0 && phase == (int)c) { |
1589 |
< |
long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); |
1590 |
< |
long d = keepAlive + System.currentTimeMillis(); |
1591 |
< |
LockSupport.parkUntil(this, d); |
1592 |
< |
if (ctl == c && // drop on timeout if all idle |
1593 |
< |
d - System.currentTimeMillis() <= TIMEOUT_SLOP && |
1594 |
< |
CTL.compareAndSet(this, c, nc)) { |
1595 |
< |
w.phase = QUIET; |
1600 |
< |
break; |
1601 |
< |
} |
1602 |
< |
} |
1603 |
< |
else { |
1604 |
< |
LockSupport.park(this); |
1605 |
< |
if (w.phase < 0) // one spurious wakeup check |
1606 |
< |
LockSupport.park(this); |
1587 |
> |
else if (rc <= 0 && pred != 0 && phase == (int)c) { |
1588 |
> |
long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); |
1589 |
> |
long d = keepAlive + System.currentTimeMillis(); |
1590 |
> |
LockSupport.parkUntil(this, d); |
1591 |
> |
if (ctl == c && // drop on timeout if all idle |
1592 |
> |
d - System.currentTimeMillis() <= TIMEOUT_SLOP && |
1593 |
> |
CTL.compareAndSet(this, c, nc)) { |
1594 |
> |
w.phase = QUIET; |
1595 |
> |
break; |
1596 |
|
} |
1597 |
|
} |
1598 |
+ |
else if (w.phase < 0) |
1599 |
+ |
LockSupport.park(this); // OK if spuriously woken |
1600 |
|
w.source = 0; // disable signal |
1601 |
|
} |
1602 |
|
} |
1612 |
|
WorkQueue[] ws; int n; |
1613 |
|
if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) { |
1614 |
|
for (int m = n - 1, j = r & m;;) { |
1615 |
< |
WorkQueue q; int b, s; |
1616 |
< |
if ((q = ws[j]) != null && (s = q.top) != (b = q.base)) { |
1615 |
> |
WorkQueue q; int b; |
1616 |
> |
if ((q = ws[j]) != null && q.top != (b = q.base)) { |
1617 |
|
int qid = q.id; |
1618 |
|
ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t; |
1619 |
|
if ((a = q.array) != null && (cap = a.length) > 0) { |
1622 |
|
QA.compareAndSet(a, k, t, null)) { |
1623 |
|
q.base = b; |
1624 |
|
w.source = qid; |
1625 |
< |
if (s != b && a[(cap - 1) & b] != null) |
1626 |
< |
signalWork(q); // help signal if more tasks |
1625 |
> |
if (q.top - b > 0) |
1626 |
> |
signalWork(); |
1627 |
|
w.topLevelExec(t, q, // random fairness bound |
1628 |
< |
(r | (1 << TOP_BOUND_SHIFT)) & SMASK); |
1628 |
> |
r & ((n << TOP_BOUND_SHIFT) - 1)); |
1629 |
|
} |
1630 |
|
} |
1631 |
|
return true; |
1871 |
|
r = ThreadLocalRandom.advanceProbe(r); |
1872 |
|
else { |
1873 |
|
if (q.lockedPush(task)) |
1874 |
< |
signalWork(null); |
1874 |
> |
signalWork(); |
1875 |
|
return; |
1876 |
|
} |
1877 |
|
} |