561 |
|
* eligibility is determined by checking completion chains rather |
562 |
|
* than tracking stealers. |
563 |
|
* |
564 |
+ |
* Joining under timeouts (ForkJoinTask timed get) uses a |
565 |
+ |
* constrained mixture of helping and compensating in part because |
566 |
+ |
* pools (actually, only the common pool) may not have any |
567 |
+ |
* available threads: If the pool is saturated (all available |
568 |
+ |
* workers are busy), the caller tries to remove and otherwise |
569 |
+ |
* help; else it blocks under compensation so that it may time out |
570 |
+ |
* independently of any tasks. |
571 |
+ |
* |
572 |
|
* Compensation does not by default aim to keep exactly the target |
573 |
|
* parallelism number of unblocked threads running at any given |
574 |
|
* time. Some previous versions of this class employed immediate |
892 |
|
} |
893 |
|
|
894 |
|
/** |
895 |
< |
* Provides a more accurate estimate of whether this queue has |
896 |
< |
* any tasks than does queueSize, by checking whether an |
889 |
< |
* apparently near-empty queue has at least one unclaimed |
890 |
< |
* task. |
895 |
> |
* Provides a more conservative estimate of whether this queue |
896 |
> |
* has any tasks than does queueSize. |
897 |
|
*/ |
898 |
|
final boolean isEmpty() { |
899 |
< |
VarHandle.acquireFence(); |
894 |
< |
int s = top, b = base, cap; |
895 |
< |
ForkJoinTask<?>[] a = array; |
896 |
< |
return s - b <= 1 && (a == null || (cap = a.length) == 0 || |
897 |
< |
(a[(cap - 1) & b] == null && |
898 |
< |
a[(cap - 1) & (s - 1)] == null)); |
899 |
> |
return !((source != 0 && owner == null) || top - base > 0); |
900 |
|
} |
901 |
|
|
902 |
|
/** |
1377 |
|
Throwable ex = null; |
1378 |
|
ForkJoinWorkerThread wt = null; |
1379 |
|
try { |
1380 |
< |
if (fac != null && (wt = fac.newThread(this)) != null) { |
1380 |
> |
if (mode >= 0 && fac != null && (wt = fac.newThread(this)) != null) { |
1381 |
|
wt.start(); |
1382 |
|
return true; |
1383 |
|
} |
1587 |
|
private int awaitWork(WorkQueue w) { |
1588 |
|
if (w == null) |
1589 |
|
return -1; // already terminated |
1590 |
< |
int phase, ac; // advance phase |
1590 |
> |
int phase, ac, md, rc; // advance phase |
1591 |
|
w.phase = (phase = w.phase + SS_SEQ) | UNSIGNALLED; |
1592 |
|
long prevCtl = ctl, c; // enqueue |
1593 |
|
do { |
1597 |
|
|
1598 |
|
LockSupport.setCurrentBlocker(this); // prepare to block (exit also OK) |
1599 |
|
long deadline = 0L; // use timed wait if nonzero |
1600 |
< |
if ((ac = (int)(c >> RC_SHIFT)) + (mode & SMASK) <= 0) { // quiescent |
1600 |
> |
if ((rc = (ac = (int)(c >> RC_SHIFT)) + ((md = mode) & SMASK)) <= 0) { |
1601 |
|
if ((deadline = System.currentTimeMillis() + keepAlive) == 0L) |
1602 |
|
deadline = 1L; // avoid zero |
1603 |
|
WorkQueue[] qs = queues; // check for racing submission |
1608 |
|
(cap = a.length) > 0 && a[(cap - 1) & q.base] != null) { |
1609 |
|
if (ctl == c && compareAndSetCtl(c, prevCtl)) |
1610 |
|
w.phase = phase; // self-signal |
1611 |
< |
break; // else lost race |
1611 |
> |
break; |
1612 |
|
} |
1613 |
|
} |
1614 |
|
} |
1615 |
|
for (;;) { // await activation or termination |
1616 |
< |
if (w.phase >= 0) |
1616 |
> |
if ((md = mode) < 0) |
1617 |
> |
return -1; |
1618 |
> |
else if (w.phase >= 0) |
1619 |
|
break; |
1620 |
< |
else if (tryTerminate(false, false)) |
1620 |
> |
else if (rc <= 0 && (md & SHUTDOWN) != 0 && |
1621 |
> |
tryTerminate(false, false)) |
1622 |
|
return -1; |
1623 |
|
else if ((int)(ctl >> RC_SHIFT) > ac) |
1624 |
|
Thread.onSpinWait(); // signal in progress |
1625 |
< |
else if (deadline != 0L) |
1626 |
< |
LockSupport.parkUntil(deadline); |
1627 |
< |
else |
1628 |
< |
LockSupport.park(); |
1629 |
< |
if (w.phase >= 0) |
1630 |
< |
break; |
1631 |
< |
else if (deadline != 0L && |
1632 |
< |
deadline - System.currentTimeMillis() <= TIMEOUT_SLOP && |
1633 |
< |
compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) | |
1634 |
< |
(w.stackPred & SP_MASK)))) { |
1635 |
< |
w.phase = QUIET; |
1636 |
< |
return -1; // drop on timeout |
1625 |
> |
else { |
1626 |
> |
if (rc <= 0) |
1627 |
> |
LockSupport.parkUntil(deadline); |
1628 |
> |
else |
1629 |
> |
LockSupport.park(); |
1630 |
> |
if ((int)(ctl >> RC_SHIFT) <= ac && |
1631 |
> |
!Thread.interrupted() && rc <= 0 && |
1632 |
> |
deadline - System.currentTimeMillis() <= TIMEOUT_SLOP && |
1633 |
> |
compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) | |
1634 |
> |
(w.stackPred & SP_MASK)))) { |
1635 |
> |
w.phase = QUIET; |
1636 |
> |
return -1; // drop on timeout |
1637 |
> |
} |
1638 |
|
} |
1634 |
– |
else |
1635 |
– |
Thread.interrupted(); // clear status before repark |
1639 |
|
} |
1640 |
|
LockSupport.setCurrentBlocker(null); |
1641 |
|
return 0; |
1642 |
|
} |
1643 |
|
|
1644 |
+ |
// Utilities used by ForkJoinTask |
1645 |
+ |
|
1646 |
+ |
/** |
1647 |
+ |
* Returns true if all workers are busy |
1648 |
+ |
*/ |
1649 |
+ |
final boolean isSaturated() { |
1650 |
+ |
long c; |
1651 |
+ |
return (int)((c = ctl) >> RC_SHIFT) >= 0 && ((int)c & ~UNSIGNALLED) == 0; |
1652 |
+ |
} |
1653 |
+ |
|
1654 |
+ |
/** |
1655 |
+ |
* Returns true if terminated or terminating |
1656 |
+ |
*/ |
1657 |
+ |
final boolean isStopping() { |
1658 |
+ |
return mode < 0; |
1659 |
+ |
} |
1660 |
+ |
|
1661 |
+ |
/** |
1662 |
+ |
* Returns true if can start terminating if enabled, or already terminated |
1663 |
+ |
*/ |
1664 |
+ |
final boolean canStop() { |
1665 |
+ |
outer: for (long oldSum = 0L;;) { // repeat until stable |
1666 |
+ |
int md; WorkQueue[] qs; WorkQueue q; |
1667 |
+ |
long c = ctl, checkSum = c; |
1668 |
+ |
if (((md = mode) & STOP) != 0 || (qs = queues) == null) |
1669 |
+ |
return true; |
1670 |
+ |
if ((md & SMASK) + (int)(c >> RC_SHIFT) > 0) |
1671 |
+ |
break; |
1672 |
+ |
for (int i = 1, s; i < qs.length; i += 2) { // scan submitters |
1673 |
+ |
long u = ((long)i) << 32; |
1674 |
+ |
if ((q = qs[i]) == null) |
1675 |
+ |
checkSum += u; |
1676 |
+ |
else if (q.source == 0 && (s = q.top) == q.base) |
1677 |
+ |
checkSum += u + s; |
1678 |
+ |
else |
1679 |
+ |
break outer; |
1680 |
+ |
} |
1681 |
+ |
if (oldSum == (oldSum = checkSum)) |
1682 |
+ |
return true; |
1683 |
+ |
} |
1684 |
+ |
return (mode & STOP) != 0; // recheck mode on false return |
1685 |
+ |
} |
1686 |
+ |
|
1687 |
|
/** |
1688 |
|
* Tries to decrement counts (sometimes implicitly) and possibly |
1689 |
|
* arrange for a compensating worker in preparation for |
1701 |
|
int minActive = (short)(b & SMASK), |
1702 |
|
maxTotal = b >>> SWIDTH, |
1703 |
|
active = (int)(c >> RC_SHIFT), |
1704 |
< |
total = (short)(c >>> TC_SHIFT), sp; |
1705 |
< |
if ((sp = (int)c & ~UNSIGNALLED) != 0) { // activate idle worker |
1706 |
< |
WorkQueue[] qs; int n; WorkQueue v; |
1707 |
< |
if ((qs = queues) != null && (n = qs.length) > 0 && |
1708 |
< |
(v = qs[sp & (n - 1)]) != null) { |
1709 |
< |
Thread vt = v.owner; |
1710 |
< |
long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c); |
1711 |
< |
if (compareAndSetCtl(c, nc)) { |
1712 |
< |
v.phase = sp; |
1713 |
< |
LockSupport.unpark(vt); |
1714 |
< |
return ADJUST; |
1704 |
> |
total = (short)(c >>> TC_SHIFT), |
1705 |
> |
sp = (int)c & ~UNSIGNALLED; |
1706 |
> |
if (total >= 0) { |
1707 |
> |
if (sp != 0) { // activate idle worker |
1708 |
> |
WorkQueue[] qs; int n; WorkQueue v; |
1709 |
> |
if ((qs = queues) != null && (n = qs.length) > 0 && |
1710 |
> |
(v = qs[sp & (n - 1)]) != null) { |
1711 |
> |
Thread vt = v.owner; |
1712 |
> |
long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c); |
1713 |
> |
if (compareAndSetCtl(c, nc)) { |
1714 |
> |
v.phase = sp; |
1715 |
> |
LockSupport.unpark(vt); |
1716 |
> |
return ADJUST; |
1717 |
> |
} |
1718 |
|
} |
1719 |
+ |
return -1; // retry |
1720 |
+ |
} |
1721 |
+ |
else if (active > minActive) { // reduce parallelism |
1722 |
+ |
long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); |
1723 |
+ |
return compareAndSetCtl(c, nc) ? ADJUST : -1; |
1724 |
|
} |
1671 |
– |
return -1; // retry |
1672 |
– |
} |
1673 |
– |
else if (total >= 0 && active > minActive) { // reduce parallelism |
1674 |
– |
long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c)); |
1675 |
– |
return compareAndSetCtl(c, nc) ? ADJUST : -1; |
1725 |
|
} |
1726 |
< |
else if (total < maxTotal) { // expand pool |
1726 |
> |
if (total < maxTotal) { // expand pool |
1727 |
|
long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); |
1728 |
|
return !compareAndSetCtl(c, nc) ? -1 : !createWorker() ? 0 : ADJUST; |
1729 |
|
} |
1730 |
< |
else if (!compareAndSetCtl(c, c)) // validate |
1730 |
> |
else if (!compareAndSetCtl(c, c)) // validate |
1731 |
|
return -1; |
1732 |
|
else if ((sat = saturate) != null && sat.test(this)) |
1733 |
|
return 0; |
1761 |
|
outer: for (;;) { |
1762 |
|
if ((s = task.status) < 0) |
1763 |
|
break; |
1764 |
+ |
else if (mode < 0) |
1765 |
+ |
ForkJoinTask.cancelIgnoringExceptions(task); |
1766 |
|
else if (!scan && c == (c = ctl)) { |
1767 |
< |
if (mode < 0) |
1717 |
< |
ForkJoinTask.cancelIgnoringExceptions(task); |
1718 |
< |
else if ((s = tryCompensate(c)) >= 0) |
1767 |
> |
if ((s = tryCompensate(c)) >= 0) |
1768 |
|
break; // block |
1769 |
|
} |
1770 |
|
else { // scan for subtasks |
1812 |
|
} |
1813 |
|
|
1814 |
|
/** |
1815 |
< |
* Version of helpJoin for CountedCompleters, also usable with |
1816 |
< |
* external submitter threads. Scans for and runs subtasks of the |
1768 |
< |
* given root task, compensating and blocking if none are found. |
1815 |
> |
* Extra helpJoin steps for CountedCompleters. Scans for and runs |
1816 |
> |
* subtasks of the given root task, returning if none are found. |
1817 |
|
* |
1818 |
|
* @param task root of CountedCompleter computation |
1819 |
|
* @param w caller's WorkQueue |
1820 |
|
* @param owned true if owned by a ForkJoinWorkerThread |
1821 |
< |
* @return task status on exit, or ADJUST for compensated blocking |
1821 |
> |
* @return task status on exit |
1822 |
|
*/ |
1823 |
|
final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean owned) { |
1824 |
|
int s = 0; |
1834 |
|
} |
1835 |
|
else if ((s = task.status) < 0) |
1836 |
|
break; |
1837 |
< |
else if (!scan && c == (c = ctl)) { |
1838 |
< |
if (mode < 0) |
1839 |
< |
ForkJoinTask.cancelIgnoringExceptions(task); |
1840 |
< |
else if (!owned || (s = tryCompensate(c)) >= 0) |
1793 |
< |
break; // block |
1794 |
< |
} |
1837 |
> |
else if (!scan && c == (c = ctl)) |
1838 |
> |
break; |
1839 |
> |
else if (mode < 0) |
1840 |
> |
ForkJoinTask.cancelIgnoringExceptions(task); |
1841 |
|
else { // scan for subtasks |
1842 |
|
scan = false; |
1843 |
|
WorkQueue[] qs = queues; |
1879 |
|
} |
1880 |
|
|
1881 |
|
/** |
1836 |
– |
* Runs tasks until {@code isQuiescent()}. Rather than blocking |
1837 |
– |
* when tasks cannot be found, rescans until all others cannot |
1838 |
– |
* find tasks either. |
1839 |
– |
*/ |
1840 |
– |
final void helpQuiescePool(WorkQueue w) { |
1841 |
– |
if (w != null) { |
1842 |
– |
int prevSrc = w.source, wsrc = prevSrc, cfg = w.config, r = cfg + 1; |
1843 |
– |
for (boolean active = true, locals = true;;) { |
1844 |
– |
boolean busy = false, scan = false; |
1845 |
– |
if (locals) { // run local tasks before (re)polling |
1846 |
– |
locals = false; |
1847 |
– |
for (ForkJoinTask<?> u; (u = w.nextLocalTask(cfg)) != null;) |
1848 |
– |
u.doExec(); |
1849 |
– |
} |
1850 |
– |
WorkQueue[] qs = queues; |
1851 |
– |
int n = (qs == null) ? 0 : qs.length; |
1852 |
– |
for (int i = n; i > 0; --i, ++r) { |
1853 |
– |
int j, b, cap; WorkQueue q; ForkJoinTask<?>[] a; |
1854 |
– |
if ((q = qs[j = (n - 1) & r]) != null && q != w && |
1855 |
– |
(a = q.array) != null && (cap = a.length) > 0) { |
1856 |
– |
int k = (cap - 1) & (b = q.base); |
1857 |
– |
int nextBase = b + 1, src = j | SRC; |
1858 |
– |
ForkJoinTask<?> t = WorkQueue.getSlot(a, k); |
1859 |
– |
if (q.base != b) |
1860 |
– |
busy = scan = true; |
1861 |
– |
else if (t != null) { |
1862 |
– |
busy = scan = true; |
1863 |
– |
if (!active) { // increment before taking |
1864 |
– |
active = true; |
1865 |
– |
getAndAddCtl(RC_UNIT); |
1866 |
– |
} |
1867 |
– |
if (WorkQueue.casSlotToNull(a, k, t)) { |
1868 |
– |
q.base = nextBase; |
1869 |
– |
w.source = src; |
1870 |
– |
t.doExec(); |
1871 |
– |
w.source = wsrc = prevSrc; |
1872 |
– |
locals = true; |
1873 |
– |
} |
1874 |
– |
break; |
1875 |
– |
} |
1876 |
– |
else if (!busy) { |
1877 |
– |
if (q.top != b || a[nextBase & (cap - 1)] != null) |
1878 |
– |
busy = scan = true; |
1879 |
– |
else if (q.source != QUIET && q.phase >= 0) |
1880 |
– |
busy = true; |
1881 |
– |
} |
1882 |
– |
} |
1883 |
– |
} |
1884 |
– |
VarHandle.acquireFence(); |
1885 |
– |
if (!scan && queues == qs) { |
1886 |
– |
if (!busy) { |
1887 |
– |
w.source = prevSrc; |
1888 |
– |
if (!active) |
1889 |
– |
getAndAddCtl(RC_UNIT); |
1890 |
– |
break; |
1891 |
– |
} |
1892 |
– |
if (wsrc != QUIET) |
1893 |
– |
w.source = wsrc = QUIET; |
1894 |
– |
if (active) { // decrement |
1895 |
– |
active = false; |
1896 |
– |
getAndAddCtl(RC_MASK & -RC_UNIT); |
1897 |
– |
} |
1898 |
– |
else |
1899 |
– |
Thread.yield(); // no tasks but others busy |
1900 |
– |
} |
1901 |
– |
} |
1902 |
– |
} |
1903 |
– |
} |
1904 |
– |
|
1905 |
– |
/** |
1882 |
|
* Scans for and returns a polled task, if available. Used only |
1883 |
|
* for untracked polls. Begins scan at an index (scanRover) |
1884 |
|
* advanced on each call, to avoid systematic unfairness. |
1919 |
|
} |
1920 |
|
|
1921 |
|
/** |
1922 |
+ |
* Runs tasks until {@code isQuiescent()}. Rather than blocking |
1923 |
+ |
* when tasks cannot be found, rescans until all others cannot |
1924 |
+ |
* find tasks either. |
1925 |
+ |
* |
1926 |
+ |
* @param nanos max wait time (Long.MAX_VALUE if effectively untimed) |
1927 |
+ |
* @param interruptible true if return on interrupt |
1928 |
+ |
* @return positive if quiescent, negative if interrupted, else 0 |
1929 |
+ |
*/ |
1930 |
+ |
final int helpQuiescePool(WorkQueue w, long nanos, boolean interruptible) { |
1931 |
+ |
if (w == null) |
1932 |
+ |
return 0; |
1933 |
+ |
long startTime = System.nanoTime(), parkTime = 0L; |
1934 |
+ |
int prevSrc = w.source, wsrc = prevSrc, cfg = w.config, r = cfg + 1; |
1935 |
+ |
for (boolean active = true, locals = true;;) { |
1936 |
+ |
boolean busy = false, scan = false; |
1937 |
+ |
if (locals) { // run local tasks before (re)polling |
1938 |
+ |
locals = false; |
1939 |
+ |
for (ForkJoinTask<?> u; (u = w.nextLocalTask(cfg)) != null;) |
1940 |
+ |
u.doExec(); |
1941 |
+ |
} |
1942 |
+ |
WorkQueue[] qs = queues; |
1943 |
+ |
int n = (qs == null) ? 0 : qs.length; |
1944 |
+ |
for (int i = n; i > 0; --i, ++r) { |
1945 |
+ |
int j, b, cap; WorkQueue q; ForkJoinTask<?>[] a; |
1946 |
+ |
if ((q = qs[j = (n - 1) & r]) != null && q != w && |
1947 |
+ |
(a = q.array) != null && (cap = a.length) > 0) { |
1948 |
+ |
int k = (cap - 1) & (b = q.base); |
1949 |
+ |
int nextBase = b + 1, src = j | SRC; |
1950 |
+ |
ForkJoinTask<?> t = WorkQueue.getSlot(a, k); |
1951 |
+ |
if (q.base != b) |
1952 |
+ |
busy = scan = true; |
1953 |
+ |
else if (t != null) { |
1954 |
+ |
busy = scan = true; |
1955 |
+ |
if (!active) { // increment before taking |
1956 |
+ |
active = true; |
1957 |
+ |
getAndAddCtl(RC_UNIT); |
1958 |
+ |
} |
1959 |
+ |
if (WorkQueue.casSlotToNull(a, k, t)) { |
1960 |
+ |
q.base = nextBase; |
1961 |
+ |
w.source = src; |
1962 |
+ |
t.doExec(); |
1963 |
+ |
w.source = wsrc = prevSrc; |
1964 |
+ |
locals = true; |
1965 |
+ |
} |
1966 |
+ |
break; |
1967 |
+ |
} |
1968 |
+ |
else if (!busy) { |
1969 |
+ |
if (q.top != b || a[nextBase & (cap - 1)] != null) |
1970 |
+ |
busy = scan = true; |
1971 |
+ |
else if (q.source != QUIET && q.phase >= 0) |
1972 |
+ |
busy = true; |
1973 |
+ |
} |
1974 |
+ |
} |
1975 |
+ |
} |
1976 |
+ |
VarHandle.acquireFence(); |
1977 |
+ |
if (!scan && queues == qs) { |
1978 |
+ |
boolean interrupted; |
1979 |
+ |
if (!busy) { |
1980 |
+ |
w.source = prevSrc; |
1981 |
+ |
if (!active) |
1982 |
+ |
getAndAddCtl(RC_UNIT); |
1983 |
+ |
return 1; |
1984 |
+ |
} |
1985 |
+ |
if (wsrc != QUIET) |
1986 |
+ |
w.source = wsrc = QUIET; |
1987 |
+ |
if (active) { // decrement |
1988 |
+ |
active = false; |
1989 |
+ |
parkTime = 0L; |
1990 |
+ |
getAndAddCtl(RC_MASK & -RC_UNIT); |
1991 |
+ |
} |
1992 |
+ |
else if (parkTime == 0L) { |
1993 |
+ |
parkTime = 1L << 10; // initially about 1 usec |
1994 |
+ |
Thread.yield(); |
1995 |
+ |
} |
1996 |
+ |
else if ((interrupted = interruptible && Thread.interrupted()) || |
1997 |
+ |
System.nanoTime() - startTime > nanos) { |
1998 |
+ |
getAndAddCtl(RC_UNIT); |
1999 |
+ |
return interrupted ? -1 : 0; |
2000 |
+ |
} |
2001 |
+ |
else { |
2002 |
+ |
LockSupport.parkNanos(this, parkTime); |
2003 |
+ |
if (parkTime < nanos >>> 8 && parkTime < 1L << 20) |
2004 |
+ |
parkTime <<= 1; // max sleep approx 1 sec or 1% nanos |
2005 |
+ |
} |
2006 |
+ |
} |
2007 |
+ |
} |
2008 |
+ |
} |
2009 |
+ |
|
2010 |
+ |
/** |
2011 |
+ |
* Helps quiesce from external caller until done, interrupted, or timeout |
2012 |
+ |
* |
2013 |
+ |
* @param nanos max wait time (Long.MAX_VALUE if effectively untimed) |
2014 |
+ |
* @param interruptible true if return on interrupt |
2015 |
+ |
* @return positive if quiescent, negative if interrupted, else 0 |
2016 |
+ |
*/ |
2017 |
+ |
final int externalHelpQuiescePool(long nanos, boolean interruptible) { |
2018 |
+ |
for (long startTime = System.nanoTime(), parkTime = 0L;;) { |
2019 |
+ |
ForkJoinTask<?> t; |
2020 |
+ |
if ((t = pollScan(false)) != null) { |
2021 |
+ |
t.doExec(); |
2022 |
+ |
parkTime = 0L; |
2023 |
+ |
} |
2024 |
+ |
else if (canStop()) |
2025 |
+ |
return 1; |
2026 |
+ |
else if (parkTime == 0L) { |
2027 |
+ |
parkTime = 1L << 10; |
2028 |
+ |
Thread.yield(); |
2029 |
+ |
} |
2030 |
+ |
else if ((System.nanoTime() - startTime) > nanos) |
2031 |
+ |
return 0; |
2032 |
+ |
else if (interruptible && Thread.interrupted()) |
2033 |
+ |
return -1; |
2034 |
+ |
else { |
2035 |
+ |
LockSupport.parkNanos(this, parkTime); |
2036 |
+ |
if (parkTime < nanos >>> 8 && parkTime < 1L << 20) |
2037 |
+ |
parkTime <<= 1; |
2038 |
+ |
} |
2039 |
+ |
} |
2040 |
+ |
} |
2041 |
+ |
|
2042 |
+ |
/** |
2043 |
|
* Gets and removes a local or stolen task for the given worker. |
2044 |
|
* |
2045 |
|
* @return a task, if available |
2221 |
|
md = getAndBitwiseOrMode(SHUTDOWN); |
2222 |
|
} |
2223 |
|
if ((md & STOP) == 0) { |
2224 |
< |
if (!now && !isQuiescent()) |
2224 |
> |
if (!now && !canStop()) |
2225 |
|
return false; |
2226 |
|
md = getAndBitwiseOrMode(STOP); |
2227 |
|
} |
2228 |
|
if ((md & TERMINATED) == 0) { |
2229 |
|
for (ForkJoinTask<?> t; (t = pollScan(false)) != null; ) |
2230 |
< |
ForkJoinTask.cancelIgnoringExceptions(t); // cancel tasks |
2231 |
< |
WorkQueue[] qs; WorkQueue q; Thread t; |
2232 |
< |
int n = ((qs = queues) == null) ? 0 : qs.length; |
2233 |
< |
for (int i = 1; i < n; i += 2) { // unblock parked workers |
2234 |
< |
if ((q = qs[i]) != null && (t = q.owner) != null && |
2235 |
< |
!t.isInterrupted()) { |
2236 |
< |
try { |
2237 |
< |
t.interrupt(); |
2238 |
< |
} catch (Throwable ignore) { |
2230 |
> |
ForkJoinTask.cancelIgnoringExceptions(t); // help cancel tasks |
2231 |
> |
|
2232 |
> |
WorkQueue[] qs; int n; WorkQueue q; Thread thread; |
2233 |
> |
if ((qs = queues) != null && (n = qs.length) > 0) { |
2234 |
> |
for (int j = 1; j < n; j += 2) { // unblock other workers |
2235 |
> |
if ((q = qs[j]) != null && (thread = q.owner) != null && |
2236 |
> |
!thread.isInterrupted()) { |
2237 |
> |
try { |
2238 |
> |
thread.interrupt(); |
2239 |
> |
} catch (Throwable ignore) { |
2240 |
> |
} |
2241 |
|
} |
2242 |
|
} |
2243 |
|
} |
2244 |
|
|
2245 |
< |
ReentrantLock lock; Condition cond; // finish if no workers |
2245 |
> |
ReentrantLock lock; Condition cond; // signal when no workers |
2246 |
|
if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 && |
2247 |
|
(getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 && |
2248 |
|
(lock = registrationLock) != null) { |
2607 |
|
} |
2608 |
|
|
2609 |
|
/** |
2511 |
– |
* Task class, plus some helper methods, for invokeAll and invokeAny. |
2512 |
– |
*/ |
2513 |
– |
static final class BulkTask<E> extends CountedCompleter<E> { |
2514 |
– |
private static final long serialVersionUID = 2838392045355241008L; |
2515 |
– |
@SuppressWarnings("serial") // Conditionally serializable |
2516 |
– |
final Callable<E> callable; |
2517 |
– |
@SuppressWarnings("serial") // Conditionally serializable |
2518 |
– |
E result; |
2519 |
– |
final boolean invokeAny; // false if performing invokeAll |
2520 |
– |
BulkTask(BulkTask<E> parent, Callable<E> callable, boolean invokeAny) { |
2521 |
– |
super(parent); |
2522 |
– |
this.callable = callable; |
2523 |
– |
this.invokeAny = invokeAny; |
2524 |
– |
} |
2525 |
– |
public E getRawResult() { return result; } |
2526 |
– |
public void setRawResult(E r) { result = r; } |
2527 |
– |
|
2528 |
– |
public void compute() { |
2529 |
– |
try { |
2530 |
– |
E r = callable.call(); |
2531 |
– |
@SuppressWarnings("unchecked") CountedCompleter<E> p = |
2532 |
– |
invokeAny ? (CountedCompleter<E>)getCompleter() : null; |
2533 |
– |
if (p != null) |
2534 |
– |
p.complete(r); |
2535 |
– |
else |
2536 |
– |
complete(r); |
2537 |
– |
} catch (Throwable ex) { |
2538 |
– |
completeExceptionally(ex); |
2539 |
– |
} |
2540 |
– |
} |
2541 |
– |
|
2542 |
– |
static void cancelAll(BulkTask<?>[] fs) { // cancel all nonnull tasks |
2543 |
– |
if (fs != null) { |
2544 |
– |
for (BulkTask<?> f: fs) { |
2545 |
– |
if (f != null) |
2546 |
– |
f.cancel(false); |
2547 |
– |
} |
2548 |
– |
} |
2549 |
– |
} |
2550 |
– |
|
2551 |
– |
/** |
2552 |
– |
* Creates, records, and forks a BulkTask for each Callable; |
2553 |
– |
* returns the array, with first element root task (if nonempty). |
2554 |
– |
*/ |
2555 |
– |
static <T> BulkTask<T>[] forkAll(Collection<? extends Callable<T>> cs, |
2556 |
– |
boolean invokeAny) { |
2557 |
– |
int n = cs.size(); |
2558 |
– |
@SuppressWarnings("unchecked") |
2559 |
– |
BulkTask<T>[] fs = (BulkTask<T>[])new BulkTask<?>[n]; |
2560 |
– |
BulkTask<T> root = null; // parent completer for all others |
2561 |
– |
Iterator<? extends Callable<T>> it = cs.iterator(); |
2562 |
– |
int i = 0; // ignores extra elements if cs.size() inconsistent |
2563 |
– |
while (i < n && it.hasNext()) { |
2564 |
– |
Callable<T> c; BulkTask<T> f; |
2565 |
– |
if ((c = it.next()) == null) { |
2566 |
– |
cancelAll(fs); |
2567 |
– |
throw new NullPointerException(); |
2568 |
– |
} |
2569 |
– |
fs[i++] = f = new BulkTask<T>(root, c, invokeAny); |
2570 |
– |
if (root == null) |
2571 |
– |
(root = f).setPendingCount(n); |
2572 |
– |
f.fork(); |
2573 |
– |
} |
2574 |
– |
return fs; |
2575 |
– |
} |
2576 |
– |
|
2577 |
– |
/** |
2578 |
– |
* If completed abnormally, throws any exception encountered |
2579 |
– |
* by any task in array, or a CancellationException if none, |
2580 |
– |
* wrapped in ExecutionException. Else returns result. |
2581 |
– |
*/ |
2582 |
– |
E reportInvokeAnyResult(BulkTask<?>[] fs) throws ExecutionException { |
2583 |
– |
E r = getRawResult(); |
2584 |
– |
if (r == null && isCompletedAbnormally()) { |
2585 |
– |
Throwable ex = null; |
2586 |
– |
if (fs != null) { |
2587 |
– |
for (BulkTask<?> f: fs) { |
2588 |
– |
if (f != null && (ex = f.getException()) != null) |
2589 |
– |
break; |
2590 |
– |
} |
2591 |
– |
} |
2592 |
– |
if (ex == null) |
2593 |
– |
ex = new CancellationException(); |
2594 |
– |
throw new ExecutionException(ex); |
2595 |
– |
} |
2596 |
– |
return r; |
2597 |
– |
} |
2598 |
– |
} |
2599 |
– |
|
2600 |
– |
/** |
2610 |
|
* @throws NullPointerException {@inheritDoc} |
2611 |
|
* @throws RejectedExecutionException {@inheritDoc} |
2612 |
|
*/ |
2613 |
|
@Override |
2614 |
|
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) { |
2615 |
< |
BulkTask<T>[] fs; BulkTask<T> root; |
2616 |
< |
if ((fs = BulkTask.forkAll(tasks, false)) != null && fs.length > 0 && |
2617 |
< |
(root = fs[0]) != null) |
2618 |
< |
root.quietlyJoin(); |
2619 |
< |
return Arrays.asList(fs); |
2615 |
> |
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); |
2616 |
> |
try { |
2617 |
> |
for (Callable<T> t : tasks) { |
2618 |
> |
ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); |
2619 |
> |
futures.add(f); |
2620 |
> |
externalSubmit(f); |
2621 |
> |
} |
2622 |
> |
for (int i = futures.size() - 1; i >= 0; --i) |
2623 |
> |
((ForkJoinTask<?>)futures.get(i)).quietlyJoin(); |
2624 |
> |
return futures; |
2625 |
> |
} catch (Throwable t) { |
2626 |
> |
for (Future<T> e : futures) |
2627 |
> |
ForkJoinTask.cancelIgnoringExceptions(e); |
2628 |
> |
throw t; |
2629 |
> |
} |
2630 |
|
} |
2631 |
|
|
2632 |
|
@Override |
2633 |
|
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, |
2634 |
|
long timeout, TimeUnit unit) |
2635 |
|
throws InterruptedException { |
2636 |
< |
BulkTask<T>[] fs; BulkTask<T> root; |
2637 |
< |
long deadline = unit.toNanos(timeout) + System.nanoTime(); |
2638 |
< |
if ((fs = BulkTask.forkAll(tasks, false)) != null && fs.length > 0 && |
2639 |
< |
(root = fs[0]) != null) { |
2640 |
< |
try { |
2641 |
< |
root.get(deadline, TimeUnit.NANOSECONDS); |
2642 |
< |
} catch (Exception ex) { |
2643 |
< |
BulkTask.cancelAll(fs); |
2636 |
> |
long nanos = unit.toNanos(timeout); |
2637 |
> |
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size()); |
2638 |
> |
try { |
2639 |
> |
for (Callable<T> t : tasks) { |
2640 |
> |
ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t); |
2641 |
> |
futures.add(f); |
2642 |
> |
externalSubmit(f); |
2643 |
> |
} |
2644 |
> |
long startTime = System.nanoTime(), ns = nanos; |
2645 |
> |
boolean timedOut = (ns < 0L); |
2646 |
> |
for (int i = futures.size() - 1; i >= 0; --i) { |
2647 |
> |
Future<T> f = futures.get(i); |
2648 |
> |
if (!f.isDone()) { |
2649 |
> |
if (timedOut) |
2650 |
> |
ForkJoinTask.cancelIgnoringExceptions(f); |
2651 |
> |
else { |
2652 |
> |
try { |
2653 |
> |
f.get(ns, TimeUnit.NANOSECONDS); |
2654 |
> |
} catch (CancellationException | TimeoutException | |
2655 |
> |
ExecutionException ok) { |
2656 |
> |
} |
2657 |
> |
if ((ns = nanos - (System.nanoTime() - startTime)) < 0L) |
2658 |
> |
timedOut = true; |
2659 |
> |
} |
2660 |
> |
} |
2661 |
|
} |
2662 |
+ |
return futures; |
2663 |
+ |
} catch (Throwable t) { |
2664 |
+ |
for (Future<T> e : futures) |
2665 |
+ |
ForkJoinTask.cancelIgnoringExceptions(e); |
2666 |
+ |
throw t; |
2667 |
|
} |
2627 |
– |
return Arrays.asList(fs); |
2628 |
– |
} |
2629 |
– |
|
2630 |
– |
@Override |
2631 |
– |
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) |
2632 |
– |
throws InterruptedException, ExecutionException { |
2633 |
– |
BulkTask<T>[] fs; BulkTask<T> root; |
2634 |
– |
if ((fs = BulkTask.forkAll(tasks, true)) != null && fs.length > 0 && |
2635 |
– |
(root = fs[0]) != null) { |
2636 |
– |
root.quietlyJoin(); |
2637 |
– |
BulkTask.cancelAll(fs); |
2638 |
– |
return root.reportInvokeAnyResult(fs); |
2639 |
– |
} |
2640 |
– |
else |
2641 |
– |
throw new IllegalArgumentException(); // no tasks |
2642 |
– |
} |
2643 |
– |
|
2644 |
– |
@Override |
2645 |
– |
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, |
2646 |
– |
long timeout, TimeUnit unit) |
2647 |
– |
throws InterruptedException, ExecutionException, TimeoutException { |
2648 |
– |
BulkTask<T>[] fs; BulkTask<T> root; |
2649 |
– |
long deadline = unit.toNanos(timeout) + System.nanoTime(); |
2650 |
– |
if ((fs = BulkTask.forkAll(tasks, true)) != null && fs.length > 0 && |
2651 |
– |
(root = fs[0]) != null) { |
2652 |
– |
TimeoutException tex = null; |
2653 |
– |
try { |
2654 |
– |
root.get(deadline, TimeUnit.NANOSECONDS); |
2655 |
– |
} catch (TimeoutException tx) { |
2656 |
– |
tex = tx; |
2657 |
– |
} catch (Throwable ignore) { |
2658 |
– |
} |
2659 |
– |
BulkTask.cancelAll(fs); |
2660 |
– |
if (tex != null) |
2661 |
– |
throw tex; |
2662 |
– |
return root.reportInvokeAnyResult(fs); |
2663 |
– |
} |
2664 |
– |
else |
2665 |
– |
throw new IllegalArgumentException(); |
2668 |
|
} |
2669 |
|
|
2670 |
|
/** |
2773 |
|
* @return {@code true} if all threads are currently idle |
2774 |
|
*/ |
2775 |
|
public boolean isQuiescent() { |
2776 |
< |
int m, p; |
2775 |
< |
return ((((m = mode) & STOP) != 0) || |
2776 |
< |
((p = m & SMASK) + (int)(ctl >> RC_SHIFT) <= 0 && |
2777 |
< |
!hasQueuedSubmissions() && p + (int)(ctl >> RC_SHIFT) <= 0) || |
2778 |
< |
(mode & STOP) != 0); // recheck |
2776 |
> |
return canStop(); |
2777 |
|
} |
2778 |
|
|
2779 |
|
/** |
3044 |
|
*/ |
3045 |
|
public boolean awaitTermination(long timeout, TimeUnit unit) |
3046 |
|
throws InterruptedException { |
3047 |
+ |
ReentrantLock lock; Condition cond; |
3048 |
|
long nanos = unit.toNanos(timeout); |
3049 |
< |
ReentrantLock lock; Condition cond; // construct only if waiting |
3051 |
< |
if (Thread.interrupted()) |
3052 |
< |
throw new InterruptedException(); |
3049 |
> |
boolean terminated = false; |
3050 |
|
if (this == common) { |
3051 |
< |
awaitQuiescence(timeout, unit); |
3052 |
< |
return false; |
3051 |
> |
Thread t; ForkJoinWorkerThread wt; int q; |
3052 |
> |
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && |
3053 |
> |
(wt = (ForkJoinWorkerThread)t).pool == this) |
3054 |
> |
q = helpQuiescePool(wt.workQueue, nanos, true); |
3055 |
> |
else |
3056 |
> |
q = externalHelpQuiescePool(nanos, true); |
3057 |
> |
if (q < 0) |
3058 |
> |
throw new InterruptedException(); |
3059 |
|
} |
3060 |
< |
if (isTerminated() || (lock = registrationLock) == null) |
3061 |
< |
return true; |
3062 |
< |
lock.lock(); |
3063 |
< |
try { |
3064 |
< |
if ((cond = termination) == null) |
3065 |
< |
termination = cond = lock.newCondition(); |
3066 |
< |
while (!isTerminated() && nanos > 0L) |
3067 |
< |
nanos = cond.awaitNanos(nanos); |
3068 |
< |
} finally { |
3069 |
< |
lock.unlock(); |
3060 |
> |
else if (!(terminated = isTerminated()) && |
3061 |
> |
(lock = registrationLock) != null) { |
3062 |
> |
lock.lock(); |
3063 |
> |
try { |
3064 |
> |
if ((cond = termination) == null) |
3065 |
> |
termination = cond = lock.newCondition(); |
3066 |
> |
while (!(terminated = isTerminated()) && nanos > 0L) |
3067 |
> |
nanos = cond.awaitNanos(nanos); |
3068 |
> |
} finally { |
3069 |
> |
lock.unlock(); |
3070 |
> |
} |
3071 |
|
} |
3072 |
< |
return isTerminated(); |
3072 |
> |
return terminated; |
3073 |
|
} |
3074 |
|
|
3075 |
|
/** |
3084 |
|
* timeout elapsed. |
3085 |
|
*/ |
3086 |
|
public boolean awaitQuiescence(long timeout, TimeUnit unit) { |
3087 |
< |
Thread thread; ForkJoinWorkerThread wt; |
3087 |
> |
Thread t; ForkJoinWorkerThread wt; int q; |
3088 |
|
long nanos = unit.toNanos(timeout); |
3089 |
< |
if ((thread = Thread.currentThread()) instanceof ForkJoinWorkerThread && |
3090 |
< |
(wt = (ForkJoinWorkerThread)thread).pool == this) { |
3091 |
< |
helpQuiescePool(wt.workQueue); |
3092 |
< |
return true; |
3093 |
< |
} |
3094 |
< |
// else cannot block, so use exponential sleeps |
3091 |
< |
boolean quiesced = false, interrupted = false; |
3092 |
< |
for (long startTime = System.nanoTime(), parkTime = 0L;;) { |
3093 |
< |
ForkJoinTask<?> t; |
3094 |
< |
if ((t = pollScan(false)) != null) { |
3095 |
< |
t.doExec(); |
3096 |
< |
parkTime = 0L; |
3097 |
< |
} |
3098 |
< |
else if (quiesced = isQuiescent()) |
3099 |
< |
break; |
3100 |
< |
else if ((System.nanoTime() - startTime) > nanos) |
3101 |
< |
break; |
3102 |
< |
else if (parkTime == 0L) { |
3103 |
< |
parkTime = 1L << 10; // initially about 1 usec |
3104 |
< |
Thread.yield(); |
3105 |
< |
} |
3106 |
< |
else if (Thread.interrupted()) |
3107 |
< |
interrupted = true; |
3108 |
< |
else { |
3109 |
< |
LockSupport.parkNanos(this, parkTime); |
3110 |
< |
if (parkTime < nanos >>> 8) // max sleep approx 1% nanos |
3111 |
< |
parkTime <<= 1; |
3112 |
< |
} |
3113 |
< |
} |
3114 |
< |
if (interrupted) |
3115 |
< |
Thread.currentThread().interrupt(); |
3116 |
< |
return quiesced; |
3117 |
< |
} |
3118 |
< |
|
3119 |
< |
/** |
3120 |
< |
* Waits and/or attempts to assist performing tasks indefinitely |
3121 |
< |
* until the {@link #commonPool()} {@link #isQuiescent}. |
3122 |
< |
*/ |
3123 |
< |
static void quiesceCommonPool() { |
3124 |
< |
common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS); |
3089 |
> |
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread && |
3090 |
> |
(wt = (ForkJoinWorkerThread)t).pool == this) |
3091 |
> |
q = helpQuiescePool(wt.workQueue, nanos, false); |
3092 |
> |
else |
3093 |
> |
q = externalHelpQuiescePool(nanos, false); |
3094 |
> |
return (q > 0); |
3095 |
|
} |
3096 |
|
|
3097 |
|
/** |