325 |
|
* |
326 |
|
* Signalling. We create or wake up workers only when there |
327 |
|
* appears to be at least one task they might be able to find and |
328 |
< |
* execute. However, many other threads may notice the same task |
329 |
< |
* and each signal to wake up a thread that might take it. So in |
330 |
< |
* general, pools will be over-signalled. When a submission is |
331 |
< |
* added or another worker adds a task to a queue that has fewer |
332 |
< |
* than two tasks, they signal waiting workers (or trigger |
333 |
< |
* creation of new ones if fewer than the given parallelism level |
334 |
< |
* -- signalWork), and may leave a hint to the unparked worker to |
335 |
< |
* help signal others upon wakeup). These primary signals are |
336 |
< |
* buttressed by others (see method helpSignal) whenever other |
337 |
< |
* threads scan for work or do not have a task to process. On |
338 |
< |
* most platforms, signalling (unpark) overhead time is noticeably |
339 |
< |
* long, and the time between signalling a thread and it actually |
340 |
< |
* making progress can be very noticeably long, so it is worth |
341 |
< |
* offloading these delays from critical paths as much as |
342 |
< |
* possible. |
328 |
> |
* execute. |
329 |
|
* |
330 |
|
* Trimming workers. To release resources after periods of lack of |
331 |
|
* use, a worker starting to wait when the pool is quiescent will |
680 |
|
*/ |
681 |
|
final void push(ForkJoinTask<?> task) { |
682 |
|
ForkJoinTask<?>[] a; ForkJoinPool p; |
683 |
< |
int s = top, m, n; |
683 |
> |
int s = top, m; |
684 |
|
if ((a = array) != null) { // ignore if queue removed |
685 |
|
int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE; |
686 |
|
U.putOrderedObject(a, j, task); |
687 |
< |
if ((n = (top = s + 1) - base) <= 2) { |
702 |
< |
if ((p = pool) != null) |
703 |
< |
p.signalWork(this); |
704 |
< |
} |
705 |
< |
else if (n >= m) |
687 |
> |
if ((top = s + 1) - base >= m) |
688 |
|
growArray(); |
689 |
+ |
else if ((p = pool) != null) |
690 |
+ |
p.signalWork(this); |
691 |
|
} |
692 |
|
} |
693 |
|
|
1179 |
|
static final int FIFO_QUEUE = 1; |
1180 |
|
static final int SHARED_QUEUE = -1; |
1181 |
|
|
1182 |
< |
// bounds for #steps in scan loop -- must be power 2 minus 1 |
1183 |
< |
private static final int MIN_SCAN = 0x7ff; // cover estimation slop |
1200 |
< |
private static final int MAX_SCAN = 0x1ffff; // 4 * max workers |
1182 |
> |
// Mininum number of scans before blocking in scan() and related methods |
1183 |
> |
private static final int MIN_RESCANS = 0x0fff; |
1184 |
|
|
1185 |
|
// Instance fields |
1186 |
|
volatile long stealCount; // collects worker counts |
1406 |
|
(ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && |
1407 |
|
(q = ws[m & z & SQMASK]) != null && |
1408 |
|
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock |
1409 |
< |
int b = q.base, s = q.top, n, an; |
1410 |
< |
if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) { |
1411 |
< |
int j = (((an - 1) & s) << ASHIFT) + ABASE; |
1409 |
> |
int s = q.top, am; |
1410 |
> |
if ((a = q.array) != null && (am = a.length - 1) > s - q.base) { |
1411 |
> |
int j = ((am & s) << ASHIFT) + ABASE; |
1412 |
|
U.putOrderedObject(a, j, task); |
1413 |
|
q.top = s + 1; // push on to deque |
1414 |
|
q.qlock = 0; |
1415 |
< |
if (n <= 2) |
1433 |
< |
signalWork(q); |
1415 |
> |
signalWork(q); |
1416 |
|
return; |
1417 |
|
} |
1418 |
|
q.qlock = 0; |
1519 |
|
/** |
1520 |
|
* Tries to create or activate a worker if too few are active. |
1521 |
|
* |
1522 |
< |
* @param q the (non-null) queue holding tasks to be signalled |
1522 |
> |
* @param q if non-null, the queue holding tasks to be processed |
1523 |
|
*/ |
1524 |
|
final void signalWork(WorkQueue q) { |
1525 |
< |
int hint = q.poolIndex; |
1525 |
> |
long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p; |
1526 |
> |
if ((u = (int)((c = ctl) >>> 32)) < 0) { |
1527 |
> |
if ((e = (int)c) > 0) { |
1528 |
> |
if ((ws = workQueues) != null && ws.length > (i = e & SMASK) && |
1529 |
> |
(w = ws[i]) != null) { |
1530 |
> |
long nc = (((long)(w.nextWait & E_MASK)) | |
1531 |
> |
((long)(u + UAC_UNIT) << 32)); |
1532 |
> |
if (w.eventCount == (e | INT_SIGN) && |
1533 |
> |
U.compareAndSwapLong(this, CTL, c, nc)) { |
1534 |
> |
w.eventCount = (e + E_SEQ) & E_MASK; |
1535 |
> |
if ((p = w.parker) != null) |
1536 |
> |
U.unpark(p); |
1537 |
> |
} |
1538 |
> |
else |
1539 |
> |
retrySignalWork(q); |
1540 |
> |
} |
1541 |
> |
} |
1542 |
> |
else if ((short)u < 0) |
1543 |
> |
tryAddWorker(); |
1544 |
> |
} |
1545 |
> |
} |
1546 |
> |
|
1547 |
> |
/** |
1548 |
> |
* Fallback version of signalWork, triggered if release fails |
1549 |
> |
* and the calling queue is non-empty; |
1550 |
> |
*/ |
1551 |
> |
final void retrySignalWork(WorkQueue q) { |
1552 |
|
long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p; |
1553 |
< |
while ((u = (int)((c = ctl) >>> 32)) < 0) { |
1553 |
> |
while ((q != null && !q.isEmpty()) && |
1554 |
> |
(u = (int)((c = ctl) >>> 32)) < 0) { |
1555 |
|
if ((e = (int)c) > 0) { |
1556 |
|
if ((ws = workQueues) != null && ws.length > (i = e & SMASK) && |
1557 |
< |
(w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) { |
1557 |
> |
(w = ws[i]) != null) { |
1558 |
|
long nc = (((long)(w.nextWait & E_MASK)) | |
1559 |
|
((long)(u + UAC_UNIT) << 32)); |
1560 |
< |
if (U.compareAndSwapLong(this, CTL, c, nc)) { |
1561 |
< |
w.hint = hint; |
1560 |
> |
if (w.eventCount == (e | INT_SIGN) && |
1561 |
> |
U.compareAndSwapLong(this, CTL, c, nc)) { |
1562 |
|
w.eventCount = (e + E_SEQ) & E_MASK; |
1563 |
|
if ((p = w.parker) != null) |
1564 |
|
U.unpark(p); |
1565 |
|
break; |
1566 |
|
} |
1558 |
– |
if (q.top - q.base <= 0) |
1559 |
– |
break; |
1567 |
|
} |
1568 |
|
else |
1569 |
|
break; |
1629 |
|
if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) { |
1630 |
|
int ec = w.eventCount; // ec is negative if inactive |
1631 |
|
int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5; |
1632 |
< |
w.hint = -1; // update seed and clear hint |
1626 |
< |
int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN; |
1632 |
> |
int j = (m << 1) | (ec < 0 ? MIN_RESCANS : 1); |
1633 |
|
do { |
1634 |
< |
WorkQueue q; ForkJoinTask<?>[] a; int b; |
1635 |
< |
if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 && |
1636 |
< |
(a = q.array) != null) { // probably nonempty |
1637 |
< |
long i = (((a.length - 1) & b) << ASHIFT) + ABASE; |
1638 |
< |
if (ec >= 0) { |
1639 |
< |
ForkJoinTask<?> t = (ForkJoinTask<?>) |
1640 |
< |
U.getObjectVolatile(a, i); |
1634 |
> |
WorkQueue q; int b; |
1635 |
> |
if ((q = ws[(r - j) & m]) != null && |
1636 |
> |
(b = q.base) - q.top < 0) { // probably nonempty |
1637 |
> |
ForkJoinTask<?>[] a = q.array; |
1638 |
> |
if ((ec >= 0 || (ec = w.eventCount) >= 0) && a != null) { |
1639 |
> |
long i = (((a.length - 1) & b) << ASHIFT) + ABASE; |
1640 |
> |
ForkJoinTask<?> t = |
1641 |
> |
(ForkJoinTask<?>)U.getObjectVolatile(a, i); |
1642 |
|
if (q.base == b && t != null && |
1643 |
|
U.compareAndSwapObject(a, i, t, null)) { |
1644 |
< |
U.putOrderedInt(q, QBASE, ++b); |
1638 |
< |
if (b - q.top < 0) |
1639 |
< |
signalWork(q); |
1644 |
> |
U.putOrderedInt(q, QBASE, b + 1); |
1645 |
|
return t; // taken |
1646 |
|
} |
1647 |
|
} |
1648 |
< |
if ((ec < 0 || j < m) && (int)(ctl >> AC_SHIFT) <= 0) { |
1649 |
< |
w.hint = (r + j) & m; // help signal below |
1645 |
< |
break; // cannot take |
1646 |
< |
} |
1648 |
> |
if (j < m) // must restart to revisit |
1649 |
> |
break; |
1650 |
|
} |
1651 |
|
} while (--j >= 0); |
1652 |
|
|
1653 |
< |
int h, e, ns; long c, sc; WorkQueue q; |
1654 |
< |
if ((ns = w.nsteals) != 0) { |
1653 |
> |
int e, ns; long c, sc; |
1654 |
> |
if (j >= 0 || plock != ps) { // incomplete scan |
1655 |
> |
if (w.eventCount < 0) // help activate for next time |
1656 |
> |
signalWork(null); |
1657 |
> |
} |
1658 |
> |
else if ((e = (int)(c = ctl)) < 0) |
1659 |
> |
w.qlock = -1; // pool is terminating |
1660 |
> |
else if (ec >= 0) { // try to enqueue/inactivate |
1661 |
> |
long nc = (((long)ec | |
1662 |
> |
((c - AC_UNIT) & (AC_MASK|TC_MASK)))); |
1663 |
> |
w.nextWait = e; // link and mark inactive |
1664 |
> |
w.eventCount = ec | INT_SIGN; |
1665 |
> |
if (!U.compareAndSwapLong(this, CTL, c, nc)) |
1666 |
> |
w.eventCount = ec; // unmark on CAS failure |
1667 |
> |
else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK)) |
1668 |
> |
idleAwaitWork(w, nc, c); |
1669 |
> |
} |
1670 |
> |
else if ((ns = w.nsteals) != 0) { |
1671 |
|
if (U.compareAndSwapLong(this, STEALCOUNT, |
1672 |
|
sc = stealCount, sc + ns)) |
1673 |
|
w.nsteals = 0; // collect steals and rescan |
1674 |
|
} |
1675 |
< |
else if (plock != ps) // consistency check |
1676 |
< |
; // skip |
1677 |
< |
else if ((e = (int)(c = ctl)) < 0) |
1678 |
< |
w.qlock = -1; // pool is terminating |
1679 |
< |
else { |
1680 |
< |
if ((h = w.hint) < 0) { |
1681 |
< |
if (ec >= 0) { // try to enqueue/inactivate |
1682 |
< |
long nc = (((long)ec | |
1683 |
< |
((c - AC_UNIT) & (AC_MASK|TC_MASK)))); |
1665 |
< |
w.nextWait = e; // link and mark inactive |
1666 |
< |
w.eventCount = ec | INT_SIGN; |
1667 |
< |
if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc)) |
1668 |
< |
w.eventCount = ec; // unmark on CAS failure |
1669 |
< |
else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK)) |
1670 |
< |
idleAwaitWork(w, nc, c); |
1671 |
< |
} |
1672 |
< |
else if (w.eventCount < 0 && ctl == c) { |
1673 |
< |
Thread wt = Thread.currentThread(); |
1674 |
< |
Thread.interrupted(); // clear status |
1675 |
< |
U.putObject(wt, PARKBLOCKER, this); |
1676 |
< |
w.parker = wt; // emulate LockSupport.park |
1677 |
< |
if (w.eventCount < 0) // recheck |
1678 |
< |
U.park(false, 0L); // block |
1679 |
< |
w.parker = null; |
1680 |
< |
U.putObject(wt, PARKBLOCKER, null); |
1681 |
< |
} |
1682 |
< |
} |
1683 |
< |
if ((h >= 0 || (h = w.hint) >= 0) && |
1684 |
< |
(ws = workQueues) != null && h < ws.length && |
1685 |
< |
(q = ws[h]) != null) { // signal others before retry |
1686 |
< |
WorkQueue v; Thread p; int u, i, s; |
1687 |
< |
for (int n = (config & SMASK) - 1;;) { |
1688 |
< |
int idleCount = (w.eventCount < 0) ? 0 : -1; |
1689 |
< |
if (((s = idleCount - q.base + q.top) <= n && |
1690 |
< |
(n = s) <= 0) || |
1691 |
< |
(u = (int)((c = ctl) >>> 32)) >= 0 || |
1692 |
< |
(e = (int)c) <= 0 || m < (i = e & SMASK) || |
1693 |
< |
(v = ws[i]) == null) |
1694 |
< |
break; |
1695 |
< |
long nc = (((long)(v.nextWait & E_MASK)) | |
1696 |
< |
((long)(u + UAC_UNIT) << 32)); |
1697 |
< |
if (v.eventCount != (e | INT_SIGN) || |
1698 |
< |
!U.compareAndSwapLong(this, CTL, c, nc)) |
1699 |
< |
break; |
1700 |
< |
v.hint = h; |
1701 |
< |
v.eventCount = (e + E_SEQ) & E_MASK; |
1702 |
< |
if ((p = v.parker) != null) |
1703 |
< |
U.unpark(p); |
1704 |
< |
if (--n <= 0) |
1705 |
< |
break; |
1706 |
< |
} |
1707 |
< |
} |
1675 |
> |
else if (w.eventCount < 0 && ctl == c) { |
1676 |
> |
Thread wt = Thread.currentThread(); |
1677 |
> |
Thread.interrupted(); // clear status |
1678 |
> |
U.putObject(wt, PARKBLOCKER, this); |
1679 |
> |
w.parker = wt; // emulate LockSupport.park |
1680 |
> |
if (w.eventCount < 0 && ctl == c)// recheck |
1681 |
> |
U.park(false, 0L); // block |
1682 |
> |
w.parker = null; |
1683 |
> |
U.putObject(wt, PARKBLOCKER, null); |
1684 |
|
} |
1685 |
|
} |
1686 |
|
return null; |
1699 |
|
* @param prevCtl the ctl value to restore if thread is terminated |
1700 |
|
*/ |
1701 |
|
private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) { |
1702 |
< |
if (w != null && w.eventCount < 0 && |
1703 |
< |
!tryTerminate(false, false) && (int)prevCtl != 0 && |
1704 |
< |
ctl == currentCtl) { |
1702 |
> |
if (w != null && w.eventCount < 0 && !tryTerminate(false, false) && |
1703 |
> |
(int)prevCtl != 0 && ctl == currentCtl) { |
1704 |
> |
int ns = w.nsteals; |
1705 |
> |
if (ns != 0) { |
1706 |
> |
w.nsteals = 0; |
1707 |
> |
long sc; |
1708 |
> |
do {} while (!U.compareAndSwapLong(this, STEALCOUNT, |
1709 |
> |
sc = stealCount, sc + ns)); |
1710 |
> |
} |
1711 |
|
int dc = -(short)(currentCtl >>> TC_SHIFT); |
1712 |
|
long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT; |
1713 |
|
long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; |
1714 |
|
Thread wt = Thread.currentThread(); |
1715 |
+ |
int spins = MIN_RESCANS; // poll before blocking |
1716 |
|
while (ctl == currentCtl) { |
1717 |
< |
Thread.interrupted(); // timed variant of version in scan() |
1718 |
< |
U.putObject(wt, PARKBLOCKER, this); |
1719 |
< |
w.parker = wt; |
1737 |
< |
if (ctl == currentCtl) |
1738 |
< |
U.park(false, parkTime); |
1739 |
< |
w.parker = null; |
1740 |
< |
U.putObject(wt, PARKBLOCKER, null); |
1741 |
< |
if (ctl != currentCtl) |
1742 |
< |
break; |
1743 |
< |
if (deadline - System.nanoTime() <= 0L && |
1744 |
< |
U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { |
1745 |
< |
w.eventCount = (w.eventCount + E_SEQ) | E_MASK; |
1746 |
< |
w.hint = -1; |
1747 |
< |
w.qlock = -1; // shrink |
1748 |
< |
break; |
1717 |
> |
if (spins >= 0) { |
1718 |
> |
if (w.nextSeed() < 0) |
1719 |
> |
--spins; |
1720 |
|
} |
1721 |
< |
} |
1722 |
< |
} |
1723 |
< |
} |
1724 |
< |
|
1725 |
< |
/** |
1726 |
< |
* Scans through queues looking for work while joining a task; if |
1727 |
< |
* any present, signals. May return early if more signalling is |
1728 |
< |
* detectably unneeded. |
1729 |
< |
* |
1730 |
< |
* @param task return early if done |
1731 |
< |
* @param origin an index to start scan |
1732 |
< |
*/ |
1733 |
< |
private void helpSignal(ForkJoinTask<?> task, int origin) { |
1734 |
< |
WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s; |
1764 |
< |
if (task != null && task.status >= 0 && |
1765 |
< |
(u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 && |
1766 |
< |
(ws = workQueues) != null && (m = ws.length - 1) >= 0) { |
1767 |
< |
outer: for (int k = origin, j = m; j >= 0; --j) { |
1768 |
< |
WorkQueue q = ws[k++ & m]; |
1769 |
< |
for (int n = m;;) { // limit to at most m signals |
1770 |
< |
if (task.status < 0) |
1771 |
< |
break outer; |
1772 |
< |
if (q == null || |
1773 |
< |
((s = -q.base + q.top) <= n && (n = s) <= 0)) |
1721 |
> |
else { |
1722 |
> |
Thread.interrupted(); // timed variant of version in scan() |
1723 |
> |
U.putObject(wt, PARKBLOCKER, this); |
1724 |
> |
w.parker = wt; |
1725 |
> |
if (ctl == currentCtl) |
1726 |
> |
U.park(false, parkTime); |
1727 |
> |
w.parker = null; |
1728 |
> |
U.putObject(wt, PARKBLOCKER, null); |
1729 |
> |
if (ctl != currentCtl) |
1730 |
> |
break; |
1731 |
> |
if (deadline - System.nanoTime() <= 0L && |
1732 |
> |
U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) { |
1733 |
> |
w.eventCount = (w.eventCount + E_SEQ) | E_MASK; |
1734 |
> |
w.qlock = -1; // shrink |
1735 |
|
break; |
1775 |
– |
if ((u = (int)((c = ctl) >>> 32)) >= 0 || |
1776 |
– |
(e = (int)c) <= 0 || m < (i = e & SMASK) || |
1777 |
– |
(w = ws[i]) == null) |
1778 |
– |
break outer; |
1779 |
– |
long nc = (((long)(w.nextWait & E_MASK)) | |
1780 |
– |
((long)(u + UAC_UNIT) << 32)); |
1781 |
– |
if (w.eventCount != (e | INT_SIGN)) |
1782 |
– |
break outer; |
1783 |
– |
if (U.compareAndSwapLong(this, CTL, c, nc)) { |
1784 |
– |
w.eventCount = (e + E_SEQ) & E_MASK; |
1785 |
– |
if ((p = w.parker) != null) |
1786 |
– |
U.unpark(p); |
1787 |
– |
if (--n <= 0) |
1788 |
– |
break; |
1736 |
|
} |
1737 |
|
} |
1738 |
|
} |
1835 |
|
WorkQueue[] ws; int m; |
1836 |
|
if (task != null && (ws = workQueues) != null && |
1837 |
|
(m = ws.length - 1) >= 0) { |
1838 |
< |
for (int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;;) { |
1838 |
> |
int scans = m | MIN_RESCANS; |
1839 |
> |
for (int j = 1, k = scans;;) { |
1840 |
|
WorkQueue q; int s; |
1841 |
|
if ((s = task.status) < 0) |
1842 |
|
return s; |
1843 |
< |
if (((q = ws[j & m]) == null || !q.pollAndExecCC(task)) && |
1844 |
< |
(j -= 2) <= 0) |
1843 |
> |
if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) |
1844 |
> |
k = scans; |
1845 |
> |
else if (--k >= 0) |
1846 |
> |
j += 2; |
1847 |
> |
else |
1848 |
|
break; |
1849 |
|
} |
1850 |
|
} |
1915 |
|
joiner.currentJoin = task; |
1916 |
|
do {} while ((s = task.status) >= 0 && !joiner.isEmpty() && |
1917 |
|
joiner.tryRemoveAndExec(task)); // process local tasks |
1918 |
< |
if (s >= 0 && (s = task.status) >= 0) { |
1919 |
< |
helpSignal(task, joiner.poolIndex); |
1920 |
< |
if ((s = task.status) >= 0 && |
1970 |
< |
(task instanceof CountedCompleter)) |
1971 |
< |
s = helpComplete(task); |
1972 |
< |
} |
1918 |
> |
if (s >= 0 && (s = task.status) >= 0 && |
1919 |
> |
(task instanceof CountedCompleter)) |
1920 |
> |
s = helpComplete(task); |
1921 |
|
while (s >= 0 && (s = task.status) >= 0) { |
1922 |
|
if ((!joiner.isEmpty() || // try helping |
1923 |
|
(s = tryHelpStealer(joiner, task)) == 0) && |
1924 |
|
(s = task.status) >= 0) { |
1925 |
< |
helpSignal(task, joiner.poolIndex); |
1978 |
< |
if ((s = task.status) >= 0 && tryCompensate()) { |
1925 |
> |
if (tryCompensate()) { |
1926 |
|
if (task.trySetSignal() && (s = task.status) >= 0) { |
1927 |
|
synchronized (task) { |
1928 |
|
if (task.status >= 0) { |
1935 |
|
task.notifyAll(); |
1936 |
|
} |
1937 |
|
} |
1938 |
< |
long c; // re-activate |
1939 |
< |
do {} while (!U.compareAndSwapLong |
1940 |
< |
(this, CTL, c = ctl, c + AC_UNIT)); |
1938 |
> |
// reactivate |
1939 |
> |
if (false) { // possible hotspot bug? |
1940 |
> |
U.getAndAddLong(this, CTL, AC_UNIT); |
1941 |
> |
} |
1942 |
> |
else { |
1943 |
> |
long c; |
1944 |
> |
do {} while (!U.compareAndSwapLong |
1945 |
> |
(this, CTL, c = ctl, c + AC_UNIT)); |
1946 |
> |
} |
1947 |
|
} |
1948 |
|
} |
1949 |
|
} |
1967 |
|
joiner.currentJoin = task; |
1968 |
|
do {} while ((s = task.status) >= 0 && !joiner.isEmpty() && |
1969 |
|
joiner.tryRemoveAndExec(task)); |
1970 |
< |
if (s >= 0 && (s = task.status) >= 0) { |
1971 |
< |
helpSignal(task, joiner.poolIndex); |
1972 |
< |
if ((s = task.status) >= 0 && |
2020 |
< |
(task instanceof CountedCompleter)) |
2021 |
< |
s = helpComplete(task); |
2022 |
< |
} |
1970 |
> |
if (s >= 0 && (s = task.status) >= 0 && |
1971 |
> |
(task instanceof CountedCompleter)) |
1972 |
> |
s = helpComplete(task); |
1973 |
|
if (s >= 0 && joiner.isEmpty()) { |
1974 |
|
do {} while (task.status >= 0 && |
1975 |
|
tryHelpStealer(joiner, task) > 0); |
2008 |
|
final void helpQuiescePool(WorkQueue w) { |
2009 |
|
for (boolean active = true;;) { |
2010 |
|
long c; WorkQueue q; ForkJoinTask<?> t; int b; |
2011 |
< |
while ((t = w.nextLocalTask()) != null) { |
2062 |
< |
if (w.base - w.top < 0) |
2063 |
< |
signalWork(w); |
2011 |
> |
while ((t = w.nextLocalTask()) != null) |
2012 |
|
t.doExec(); |
2065 |
– |
} |
2013 |
|
if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) { |
2014 |
|
if (!active) { // re-establish active count |
2015 |
|
active = true; |
2016 |
|
do {} while (!U.compareAndSwapLong |
2017 |
|
(this, CTL, c = ctl, c + AC_UNIT)); |
2018 |
|
} |
2019 |
< |
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { |
2073 |
< |
if (q.base - q.top < 0) |
2074 |
< |
signalWork(q); |
2019 |
> |
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) |
2020 |
|
w.runSubtask(t); |
2076 |
– |
} |
2021 |
|
} |
2022 |
|
else if (active) { // decrement active count without queuing |
2023 |
|
long nc = (c = ctl) - AC_UNIT; |
2044 |
|
return t; |
2045 |
|
if ((q = findNonEmptyStealQueue(w.nextSeed())) == null) |
2046 |
|
return null; |
2047 |
< |
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) { |
2104 |
< |
if (q.base - q.top < 0) |
2105 |
< |
signalWork(q); |
2047 |
> |
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) |
2048 |
|
return t; |
2107 |
– |
} |
2049 |
|
} |
2050 |
|
} |
2051 |
|
|
2157 |
|
for (int i = 0; i < ws.length; ++i) { |
2158 |
|
if ((w = ws[i]) != null) { |
2159 |
|
if (!w.isEmpty()) { // signal unprocessed tasks |
2160 |
< |
signalWork(w); |
2160 |
> |
signalWork(null); |
2161 |
|
return false; |
2162 |
|
} |
2163 |
|
if ((i & 1) != 0 && w.eventCount >= 0) |
2313 |
|
else |
2314 |
|
break; |
2315 |
|
} |
2316 |
< |
helpComplete(root); |
2316 |
> |
if (root.status >= 0) |
2317 |
> |
helpComplete(root); |
2318 |
|
} |
2319 |
|
} |
2320 |
|
|
2348 |
|
q.qlock = 0; |
2349 |
|
} |
2350 |
|
} |
2351 |
< |
if (t.status >= 0) { |
2352 |
< |
if (t instanceof CountedCompleter) |
2411 |
< |
p.externalHelpComplete(q, t); |
2412 |
< |
else |
2413 |
< |
p.helpSignal(t, q.poolIndex); |
2414 |
< |
} |
2351 |
> |
if (t.status >= 0 && (t instanceof CountedCompleter)) |
2352 |
> |
p.externalHelpComplete(q, t); |
2353 |
|
} |
2354 |
|
} |
2355 |
|
|
3064 |
|
ForkJoinTask<?> t; WorkQueue q; int b; |
3065 |
|
if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) { |
3066 |
|
found = true; |
3067 |
< |
if ((t = q.pollAt(b)) != null) { |
3130 |
< |
if (q.base - q.top < 0) |
3131 |
< |
signalWork(q); |
3067 |
> |
if ((t = q.pollAt(b)) != null) |
3068 |
|
t.doExec(); |
3133 |
– |
} |
3069 |
|
break; |
3070 |
|
} |
3071 |
|
} |
3180 |
|
Thread t = Thread.currentThread(); |
3181 |
|
if (t instanceof ForkJoinWorkerThread) { |
3182 |
|
ForkJoinPool p = ((ForkJoinWorkerThread)t).pool; |
3183 |
< |
while (!blocker.isReleasable()) { // variant of helpSignal |
3249 |
< |
WorkQueue[] ws; WorkQueue q; int m, u; |
3250 |
< |
if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) { |
3251 |
< |
for (int i = 0; i <= m; ++i) { |
3252 |
< |
if (blocker.isReleasable()) |
3253 |
< |
return; |
3254 |
< |
if ((q = ws[i]) != null && q.base - q.top < 0) { |
3255 |
< |
p.signalWork(q); |
3256 |
< |
if ((u = (int)(p.ctl >>> 32)) >= 0 || |
3257 |
< |
(u >> UAC_SHIFT) >= 0) |
3258 |
< |
break; |
3259 |
< |
} |
3260 |
< |
} |
3261 |
< |
} |
3183 |
> |
while (!blocker.isReleasable()) { |
3184 |
|
if (p.tryCompensate()) { |
3185 |
|
try { |
3186 |
|
do {} while (!blocker.isReleasable() && |