416 |
|
* if to its current value). This would be extremely costly. So |
417 |
|
* we relax it in several ways: (1) Producers only signal when |
418 |
|
* their queue is possibly empty at some point during a push |
419 |
< |
* operation (which requires conservatively checking size zero or |
420 |
< |
* one to cover races). (2) Other workers propagate this signal |
419 |
> |
* operation. (2) Other workers propagate this signal |
420 |
|
* when they find tasks in a queue with size greater than one. (3) |
421 |
|
* Workers only enqueue after scanning (see below) and not finding |
422 |
|
* any tasks. (4) Rather than CASing ctl to its current value in |
732 |
|
|
733 |
|
/** |
734 |
|
* The maximum number of top-level polls per worker before |
735 |
< |
* checking other queues, expressed as a bit shift to, in effect, |
736 |
< |
* multiply by pool size, and then use as random value mask, so |
738 |
< |
* average bound is about poolSize*(1<<TOP_BOUND_SHIFT). See |
739 |
< |
* above for rationale. |
735 |
> |
* checking other queues, expressed as a bit shift. See above for |
736 |
> |
* rationale. |
737 |
|
*/ |
738 |
|
static final int TOP_BOUND_SHIFT = 10; |
739 |
|
|
809 |
|
*/ |
810 |
|
final void push(ForkJoinTask<?> task) { |
811 |
|
ForkJoinTask<?>[] a; |
812 |
< |
int s = top, d, cap, m; |
812 |
> |
int s = top, d = s - base, cap, m; |
813 |
|
ForkJoinPool p = pool; |
814 |
|
if ((a = array) != null && (cap = a.length) > 0) { |
815 |
|
QA.setRelease(a, (m = cap - 1) & s, task); |
816 |
|
top = s + 1; |
817 |
< |
if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 && |
821 |
< |
p != null) { // size 0 or 1 |
822 |
< |
VarHandle.fullFence(); |
823 |
< |
p.signalWork(); |
824 |
< |
} |
825 |
< |
else if (d == m) |
817 |
> |
if (d == m) |
818 |
|
growArray(false); |
819 |
+ |
else if (QA.getAcquire(a, m & (s - 1)) == null && p != null) { |
820 |
+ |
VarHandle.fullFence(); // was empty |
821 |
+ |
p.signalWork(null); |
822 |
+ |
} |
823 |
|
} |
824 |
|
} |
825 |
|
|
830 |
|
final boolean lockedPush(ForkJoinTask<?> task) { |
831 |
|
ForkJoinTask<?>[] a; |
832 |
|
boolean signal = false; |
833 |
< |
int s = top, b = base, cap, d; |
833 |
> |
int s = top, d = s - base, cap, m; |
834 |
|
if ((a = array) != null && (cap = a.length) > 0) { |
835 |
< |
a[(cap - 1) & s] = task; |
835 |
> |
a[(m = (cap - 1)) & s] = task; |
836 |
|
top = s + 1; |
837 |
< |
if (b - s + cap - 1 == 0) |
837 |
> |
if (d == m) |
838 |
|
growArray(true); |
839 |
|
else { |
840 |
|
phase = 0; // full volatile unlock |
841 |
< |
if (((s - base) & ~1) == 0) // size 0 or 1 |
842 |
< |
signal = true; |
841 |
> |
if (a[m & (s - 1)] == null) |
842 |
> |
signal = true; // was empty |
843 |
|
} |
844 |
|
} |
845 |
|
return signal; |
981 |
|
* queue, up to bound n (to avoid infinite unfairness). |
982 |
|
*/ |
983 |
|
final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) { |
984 |
< |
if (t != null && q != null) { // hoist checks |
985 |
< |
int nstolen = 1; |
986 |
< |
for (;;) { |
984 |
> |
int nstolen = 1; |
985 |
> |
for (int j = 0;;) { |
986 |
> |
if (t != null) |
987 |
|
t.doExec(); |
988 |
< |
if (n-- < 0) |
989 |
< |
break; |
990 |
< |
else if ((t = nextLocalTask()) == null) { |
991 |
< |
if ((t = q.poll()) == null) |
992 |
< |
break; |
993 |
< |
else |
994 |
< |
++nstolen; |
988 |
> |
if (j++ <= n) |
989 |
> |
t = nextLocalTask(); |
990 |
> |
else { |
991 |
> |
j = 0; |
992 |
> |
t = null; |
993 |
> |
} |
994 |
> |
if (t == null) { |
995 |
> |
if (q != null && (t = q.poll()) != null) { |
996 |
> |
++nstolen; |
997 |
> |
j = 0; |
998 |
|
} |
999 |
+ |
else if (j != 0) |
1000 |
+ |
break; |
1001 |
|
} |
1001 |
– |
ForkJoinWorkerThread thread = owner; |
1002 |
– |
nsteals += nstolen; |
1003 |
– |
source = 0; |
1004 |
– |
if (thread != null) |
1005 |
– |
thread.afterTopLevelExec(); |
1002 |
|
} |
1003 |
+ |
ForkJoinWorkerThread thread = owner; |
1004 |
+ |
nsteals += nstolen; |
1005 |
+ |
source = 0; |
1006 |
+ |
if (thread != null) |
1007 |
+ |
thread.afterTopLevelExec(); |
1008 |
|
} |
1009 |
|
|
1010 |
|
/** |
1427 |
|
|
1428 |
|
if (!tryTerminate(false, false) && // possibly replace worker |
1429 |
|
w != null && w.array != null) // avoid repeated failures |
1430 |
< |
signalWork(); |
1430 |
> |
signalWork(null); |
1431 |
|
|
1432 |
|
if (ex == null) // help clean on way out |
1433 |
|
ForkJoinTask.helpExpungeStaleExceptions(); |
1437 |
|
|
1438 |
|
/** |
1439 |
|
* Tries to create or release a worker if too few are running. |
1440 |
+ |
* @param q if non-null recheck if empty on CAS failure |
1441 |
|
*/ |
1442 |
< |
final void signalWork() { |
1442 |
> |
final void signalWork(WorkQueue q) { |
1443 |
|
for (;;) { |
1444 |
|
long c; int sp; WorkQueue[] ws; int i; WorkQueue v; |
1445 |
|
if ((c = ctl) >= 0L) // enough workers |
1466 |
|
LockSupport.unpark(vt); |
1467 |
|
break; |
1468 |
|
} |
1469 |
+ |
else if (q != null && q.isEmpty()) // no need to retry |
1470 |
+ |
break; |
1471 |
|
} |
1472 |
|
} |
1473 |
|
} |
1588 |
|
else if (rc <= 0 && (md & SHUTDOWN) != 0 && |
1589 |
|
tryTerminate(false, false)) |
1590 |
|
break; // quiescent shutdown |
1591 |
< |
else if (rc <= 0 && pred != 0 && phase == (int)c) { |
1592 |
< |
long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); |
1593 |
< |
long d = keepAlive + System.currentTimeMillis(); |
1594 |
< |
LockSupport.parkUntil(this, d); |
1595 |
< |
if (ctl == c && // drop on timeout if all idle |
1596 |
< |
d - System.currentTimeMillis() <= TIMEOUT_SLOP && |
1597 |
< |
CTL.compareAndSet(this, c, nc)) { |
1598 |
< |
w.phase = QUIET; |
1599 |
< |
break; |
1591 |
> |
else if (w.phase < 0) { |
1592 |
> |
if (rc <= 0 && pred != 0 && phase == (int)c) { |
1593 |
> |
long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred); |
1594 |
> |
long d = keepAlive + System.currentTimeMillis(); |
1595 |
> |
LockSupport.parkUntil(this, d); |
1596 |
> |
if (ctl == c && // drop on timeout if all idle |
1597 |
> |
d - System.currentTimeMillis() <= TIMEOUT_SLOP && |
1598 |
> |
CTL.compareAndSet(this, c, nc)) { |
1599 |
> |
w.phase = QUIET; |
1600 |
> |
break; |
1601 |
> |
} |
1602 |
> |
} |
1603 |
> |
else { |
1604 |
> |
LockSupport.park(this); |
1605 |
> |
if (w.phase < 0) // one spurious wakeup check |
1606 |
> |
LockSupport.park(this); |
1607 |
|
} |
1608 |
|
} |
1598 |
– |
else if (w.phase < 0) |
1599 |
– |
LockSupport.park(this); // OK if spuriously woken |
1609 |
|
w.source = 0; // disable signal |
1610 |
|
} |
1611 |
|
} |
1631 |
|
QA.compareAndSet(a, k, t, null)) { |
1632 |
|
q.base = b; |
1633 |
|
w.source = qid; |
1634 |
< |
if (q.top - b > 0) |
1635 |
< |
signalWork(); |
1634 |
> |
if (a[(cap - 1) & b] != null) |
1635 |
> |
signalWork(q); // help signal if more tasks |
1636 |
|
w.topLevelExec(t, q, // random fairness bound |
1637 |
< |
r & ((n << TOP_BOUND_SHIFT) - 1)); |
1637 |
> |
(r | (1 << TOP_BOUND_SHIFT)) & SMASK); |
1638 |
|
} |
1639 |
|
} |
1640 |
|
return true; |
1880 |
|
r = ThreadLocalRandom.advanceProbe(r); |
1881 |
|
else { |
1882 |
|
if (q.lockedPush(task)) |
1883 |
< |
signalWork(); |
1883 |
> |
signalWork(null); |
1884 |
|
return; |
1885 |
|
} |
1886 |
|
} |