ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/ForkJoinPool.java (file contents):
Revision 1.365 by dl, Mon Jan 20 15:51:54 2020 UTC vs.
Revision 1.366 by dl, Sat Feb 1 20:20:17 2020 UTC

# Line 561 | Line 561 | public class ForkJoinPool extends Abstra
561       * eligibility is determined by checking completion chains rather
562       * than tracking stealers.
563       *
564 +     * Joining under timeouts (ForkJoinTask timed get) uses a
565 +     * constrained mixture of helping and compensating in part because
566 +     * pools (actually, only the common pool) may not have any
567 +     * available threads: If the pool is saturated (all available
568 +     * workers are busy), the caller tries to remove and otherwise
569 +     * help; else it blocks under compensation so that it may time out
570 +     * independently of any tasks.
571 +     *
572       * Compensation does not by default aim to keep exactly the target
573       * parallelism number of unblocked threads running at any given
574       * time. Some previous versions of this class employed immediate
# Line 884 | Line 892 | public class ForkJoinPool extends Abstra
892          }
893  
894          /**
895 <         * Provides a more accurate estimate of whether this queue has
896 <         * any tasks than does queueSize, by checking whether an
889 <         * apparently near-empty queue has at least one unclaimed
890 <         * task.
895 >         * Provides a more conservative estimate of whether this queue
896 >         * has any tasks than does queueSize.
897           */
898          final boolean isEmpty() {
899 <            VarHandle.acquireFence();
894 <            int s = top, b = base, cap;
895 <            ForkJoinTask<?>[] a = array;
896 <            return s - b <= 1 && (a == null || (cap = a.length) == 0 ||
897 <                                  (a[(cap - 1) & b] == null &&
898 <                                   a[(cap - 1) & (s - 1)] == null));
899 >            return !((source != 0 && owner == null) || top - base > 0);
900          }
901  
902          /**
# Line 1376 | Line 1377 | public class ForkJoinPool extends Abstra
1377          Throwable ex = null;
1378          ForkJoinWorkerThread wt = null;
1379          try {
1380 <            if (fac != null && (wt = fac.newThread(this)) != null) {
1380 >            if (mode >= 0 && fac != null && (wt = fac.newThread(this)) != null) {
1381                  wt.start();
1382                  return true;
1383              }
# Line 1586 | Line 1587 | public class ForkJoinPool extends Abstra
1587      private int awaitWork(WorkQueue w) {
1588          if (w == null)
1589              return -1;                       // already terminated
1590 <        int phase, ac;                       // advance phase
1590 >        int phase, ac, md, rc;                   // advance phase
1591          w.phase = (phase = w.phase + SS_SEQ) | UNSIGNALLED;
1592          long prevCtl = ctl, c;               // enqueue
1593          do {
# Line 1596 | Line 1597 | public class ForkJoinPool extends Abstra
1597  
1598          LockSupport.setCurrentBlocker(this); // prepare to block (exit also OK)
1599          long deadline = 0L;                  // use timed wait if nonzero
1600 <        if ((ac = (int)(c >> RC_SHIFT)) + (mode & SMASK) <= 0) { // quiescent
1600 >        if ((rc = (ac = (int)(c >> RC_SHIFT)) + ((md = mode) & SMASK)) <= 0) {
1601              if ((deadline = System.currentTimeMillis() + keepAlive) == 0L)
1602                  deadline = 1L;               // avoid zero
1603              WorkQueue[] qs = queues;         // check for racing submission
# Line 1607 | Line 1608 | public class ForkJoinPool extends Abstra
1608                      (cap = a.length) > 0 && a[(cap - 1) & q.base] != null) {
1609                      if (ctl == c && compareAndSetCtl(c, prevCtl))
1610                          w.phase = phase;     // self-signal
1611 <                    break;                   // else lost race
1611 >                    break;
1612                  }
1613              }
1614          }
1615          for (;;) {                           // await activation or termination
1616 <            if (w.phase >= 0)
1616 >            if ((md = mode) < 0)
1617 >                return -1;
1618 >            else if (w.phase >= 0)
1619                  break;
1620 <            else if (tryTerminate(false, false))
1620 >            else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
1621 >                     tryTerminate(false, false))
1622                  return -1;
1623              else if ((int)(ctl >> RC_SHIFT) > ac)
1624                  Thread.onSpinWait();         // signal in progress
1625 <            else if (deadline != 0L)
1626 <                LockSupport.parkUntil(deadline);
1627 <            else
1628 <                LockSupport.park();
1629 <            if (w.phase >= 0)
1630 <                break;
1631 <            else if (deadline != 0L &&
1632 <                     deadline - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1633 <                     compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) |
1634 <                                          (w.stackPred & SP_MASK)))) {
1635 <                w.phase = QUIET;
1636 <                return -1;                   // drop on timeout
1625 >            else {
1626 >                if (rc <= 0)
1627 >                    LockSupport.parkUntil(deadline);
1628 >                else
1629 >                    LockSupport.park();
1630 >                if ((int)(ctl >> RC_SHIFT) <= ac &&
1631 >                    !Thread.interrupted() && rc <= 0 &&
1632 >                    deadline - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1633 >                    compareAndSetCtl(c, ((UC_MASK & (c - TC_UNIT)) |
1634 >                                         (w.stackPred & SP_MASK)))) {
1635 >                    w.phase = QUIET;
1636 >                    return -1;               // drop on timeout
1637 >                }
1638              }
1634            else
1635                Thread.interrupted();        // clear status before repark
1639          }
1640          LockSupport.setCurrentBlocker(null);
1641          return 0;
1642      }
1643  
1644 +    // Utilities used by ForkJoinTask
1645 +
1646 +    /**
1647 +     * Returns true if all workers are busy
1648 +     */
1649 +    final boolean isSaturated() {
1650 +        long c;
1651 +        return (int)((c = ctl) >> RC_SHIFT) >= 0 && ((int)c & ~UNSIGNALLED) == 0;
1652 +    }
1653 +
1654 +    /**
1655 +     * Returns true if terminated or terminating
1656 +     */
1657 +    final boolean isStopping() {
1658 +        return mode < 0;
1659 +    }
1660 +
1661 +    /**
1662 +     * Returns true if can start terminating if enabled, or already terminated
1663 +     */
1664 +    final boolean canStop() {
1665 +        outer: for (long oldSum = 0L;;) { // repeat until stable
1666 +            int md; WorkQueue[] qs; WorkQueue q;
1667 +            long c = ctl, checkSum = c;
1668 +            if (((md = mode) & STOP) != 0 || (qs = queues) == null)
1669 +                return true;
1670 +            if ((md & SMASK) + (int)(c >> RC_SHIFT) > 0)
1671 +                break;
1672 +            for (int i = 1, s; i < qs.length; i += 2) { // scan submitters
1673 +                long u = ((long)i) << 32;
1674 +                if ((q = qs[i]) == null)
1675 +                    checkSum += u;
1676 +                else if (q.source == 0 && (s = q.top) == q.base)
1677 +                    checkSum += u + s;
1678 +                else
1679 +                    break outer;
1680 +            }
1681 +            if (oldSum == (oldSum = checkSum))
1682 +                return true;
1683 +        }
1684 +        return (mode & STOP) != 0; // recheck mode on false return
1685 +    }
1686 +
1687      /**
1688       * Tries to decrement counts (sometimes implicitly) and possibly
1689       * arrange for a compensating worker in preparation for
# Line 1655 | Line 1701 | public class ForkJoinPool extends Abstra
1701          int minActive = (short)(b & SMASK),
1702              maxTotal  = b >>> SWIDTH,
1703              active    = (int)(c >> RC_SHIFT),
1704 <            total     = (short)(c >>> TC_SHIFT), sp;
1705 <        if ((sp = (int)c & ~UNSIGNALLED) != 0) {   // activate idle worker
1706 <            WorkQueue[] qs; int n; WorkQueue v;
1707 <            if ((qs = queues) != null && (n = qs.length) > 0 &&
1708 <                (v = qs[sp & (n - 1)]) != null) {
1709 <                Thread vt = v.owner;
1710 <                long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c);
1711 <                if (compareAndSetCtl(c, nc)) {
1712 <                    v.phase = sp;
1713 <                    LockSupport.unpark(vt);
1714 <                    return ADJUST;
1704 >            total     = (short)(c >>> TC_SHIFT),
1705 >            sp        = (int)c & ~UNSIGNALLED;
1706 >        if (total >= 0) {
1707 >            if (sp != 0) {                        // activate idle worker
1708 >                WorkQueue[] qs; int n; WorkQueue v;
1709 >                if ((qs = queues) != null && (n = qs.length) > 0 &&
1710 >                    (v = qs[sp & (n - 1)]) != null) {
1711 >                    Thread vt = v.owner;
1712 >                    long nc = ((long)v.stackPred & SP_MASK) | (UC_MASK & c);
1713 >                    if (compareAndSetCtl(c, nc)) {
1714 >                        v.phase = sp;
1715 >                        LockSupport.unpark(vt);
1716 >                        return ADJUST;
1717 >                    }
1718                  }
1719 +                return -1;                        // retry
1720 +            }
1721 +            else if (active > minActive) {        // reduce parallelism
1722 +                long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1723 +                return compareAndSetCtl(c, nc) ? ADJUST : -1;
1724              }
1671            return -1;                               // retry
1672        }
1673        else if (total >= 0 && active > minActive) { // reduce parallelism
1674            long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1675            return compareAndSetCtl(c, nc) ? ADJUST : -1;
1725          }
1726 <        else if (total < maxTotal) {                 // expand pool
1726 >        if (total < maxTotal) {                   // expand pool
1727              long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
1728              return !compareAndSetCtl(c, nc) ? -1 : !createWorker() ? 0 : ADJUST;
1729          }
1730 <        else if (!compareAndSetCtl(c, c))            // validate
1730 >        else if (!compareAndSetCtl(c, c))         // validate
1731              return -1;
1732          else if ((sat = saturate) != null && sat.test(this))
1733              return 0;
# Line 1712 | Line 1761 | public class ForkJoinPool extends Abstra
1761              outer: for (;;) {
1762                  if ((s = task.status) < 0)
1763                      break;
1764 +                else if (mode < 0)
1765 +                    ForkJoinTask.cancelIgnoringExceptions(task);
1766                  else if (!scan && c == (c = ctl)) {
1767 <                    if (mode < 0)
1717 <                        ForkJoinTask.cancelIgnoringExceptions(task);
1718 <                    else if ((s = tryCompensate(c)) >= 0)
1767 >                    if ((s = tryCompensate(c)) >= 0)
1768                          break;                    // block
1769                  }
1770                  else {                            // scan for subtasks
# Line 1763 | Line 1812 | public class ForkJoinPool extends Abstra
1812      }
1813  
1814      /**
1815 <     * Version of helpJoin for CountedCompleters, also usable with
1816 <     * external submitter threads. Scans for and runs subtasks of the
1768 <     * given root task, compensating and blocking if none are found.
1815 >     * Extra helpJoin steps for CountedCompleters.  Scans for and runs
1816 >     * subtasks of the given root task, returning if none are found.
1817       *
1818       * @param task root of CountedCompleter computation
1819       * @param w caller's WorkQueue
1820       * @param owned true if owned by a ForkJoinWorkerThread
1821 <     * @return task status on exit, or ADJUST for compensated blocking
1821 >     * @return task status on exit
1822       */
1823      final int helpComplete(ForkJoinTask<?> task, WorkQueue w, boolean owned) {
1824          int s = 0;
# Line 1786 | Line 1834 | public class ForkJoinPool extends Abstra
1834                  }
1835                  else if ((s = task.status) < 0)
1836                      break;
1837 <                else if (!scan && c == (c = ctl)) {
1838 <                    if (mode < 0)
1839 <                        ForkJoinTask.cancelIgnoringExceptions(task);
1840 <                    else if (!owned || (s = tryCompensate(c)) >= 0)
1793 <                        break;                    // block
1794 <                }
1837 >                else if (!scan && c == (c = ctl))
1838 >                    break;
1839 >                else if (mode < 0)
1840 >                    ForkJoinTask.cancelIgnoringExceptions(task);
1841                  else {                            // scan for subtasks
1842                      scan = false;
1843                      WorkQueue[] qs = queues;
# Line 1833 | Line 1879 | public class ForkJoinPool extends Abstra
1879      }
1880  
1881      /**
1836     * Runs tasks until {@code isQuiescent()}. Rather than blocking
1837     * when tasks cannot be found, rescans until all others cannot
1838     * find tasks either.
1839     */
1840    final void helpQuiescePool(WorkQueue w) {
1841        if (w != null) {
1842            int prevSrc = w.source, wsrc = prevSrc, cfg = w.config, r = cfg + 1;
1843            for (boolean active = true, locals = true;;) {
1844                boolean busy = false, scan = false;
1845                if (locals) {  // run local tasks before (re)polling
1846                    locals = false;
1847                    for (ForkJoinTask<?> u; (u = w.nextLocalTask(cfg)) != null;)
1848                        u.doExec();
1849                }
1850                WorkQueue[] qs = queues;
1851                int n = (qs == null) ? 0 : qs.length;
1852                for (int i = n; i > 0; --i, ++r) {
1853                    int j, b, cap; WorkQueue q; ForkJoinTask<?>[] a;
1854                    if ((q = qs[j = (n - 1) & r]) != null && q != w &&
1855                        (a = q.array) != null && (cap = a.length) > 0) {
1856                        int k = (cap - 1) & (b = q.base);
1857                        int nextBase = b + 1, src = j | SRC;
1858                        ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1859                        if (q.base != b)
1860                            busy = scan = true;
1861                        else if (t != null) {
1862                            busy = scan = true;
1863                            if (!active) {    // increment before taking
1864                                active = true;
1865                                getAndAddCtl(RC_UNIT);
1866                            }
1867                            if (WorkQueue.casSlotToNull(a, k, t)) {
1868                                q.base = nextBase;
1869                                w.source = src;
1870                                t.doExec();
1871                                w.source = wsrc = prevSrc;
1872                                locals = true;
1873                            }
1874                            break;
1875                        }
1876                        else if (!busy) {
1877                            if (q.top != b || a[nextBase & (cap - 1)] != null)
1878                                busy = scan = true;
1879                            else if (q.source != QUIET && q.phase >= 0)
1880                                busy = true;
1881                        }
1882                    }
1883                }
1884                VarHandle.acquireFence();
1885                if (!scan && queues == qs) {
1886                    if (!busy) {
1887                        w.source = prevSrc;
1888                        if (!active)
1889                            getAndAddCtl(RC_UNIT);
1890                        break;
1891                    }
1892                    if (wsrc != QUIET)
1893                        w.source = wsrc = QUIET;
1894                    if (active) {                 // decrement
1895                        active = false;
1896                        getAndAddCtl(RC_MASK & -RC_UNIT);
1897                    }
1898                    else
1899                        Thread.yield();           // no tasks but others busy
1900                }
1901            }
1902        }
1903    }
1904
1905    /**
1882       * Scans for and returns a polled task, if available.  Used only
1883       * for untracked polls. Begins scan at an index (scanRover)
1884       * advanced on each call, to avoid systematic unfairness.
# Line 1943 | Line 1919 | public class ForkJoinPool extends Abstra
1919      }
1920  
1921      /**
1922 +     * Runs tasks until {@code isQuiescent()}. Rather than blocking
1923 +     * when tasks cannot be found, rescans until all others cannot
1924 +     * find tasks either.
1925 +     *
1926 +     * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
1927 +     * @param interruptible true if return on interrupt
1928 +     * @return positive if quiescent, negative if interrupted, else 0
1929 +     */
1930 +    final int helpQuiescePool(WorkQueue w, long nanos, boolean interruptible) {
1931 +        if (w == null)
1932 +            return 0;
1933 +        long startTime = System.nanoTime(), parkTime = 0L;
1934 +        int prevSrc = w.source, wsrc = prevSrc, cfg = w.config, r = cfg + 1;
1935 +        for (boolean active = true, locals = true;;) {
1936 +            boolean busy = false, scan = false;
1937 +            if (locals) {  // run local tasks before (re)polling
1938 +                locals = false;
1939 +                for (ForkJoinTask<?> u; (u = w.nextLocalTask(cfg)) != null;)
1940 +                    u.doExec();
1941 +            }
1942 +            WorkQueue[] qs = queues;
1943 +            int n = (qs == null) ? 0 : qs.length;
1944 +            for (int i = n; i > 0; --i, ++r) {
1945 +                int j, b, cap; WorkQueue q; ForkJoinTask<?>[] a;
1946 +                if ((q = qs[j = (n - 1) & r]) != null && q != w &&
1947 +                    (a = q.array) != null && (cap = a.length) > 0) {
1948 +                    int k = (cap - 1) & (b = q.base);
1949 +                    int nextBase = b + 1, src = j | SRC;
1950 +                    ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
1951 +                    if (q.base != b)
1952 +                        busy = scan = true;
1953 +                    else if (t != null) {
1954 +                        busy = scan = true;
1955 +                        if (!active) {    // increment before taking
1956 +                            active = true;
1957 +                            getAndAddCtl(RC_UNIT);
1958 +                        }
1959 +                        if (WorkQueue.casSlotToNull(a, k, t)) {
1960 +                            q.base = nextBase;
1961 +                            w.source = src;
1962 +                            t.doExec();
1963 +                            w.source = wsrc = prevSrc;
1964 +                            locals = true;
1965 +                        }
1966 +                        break;
1967 +                    }
1968 +                    else if (!busy) {
1969 +                        if (q.top != b || a[nextBase & (cap - 1)] != null)
1970 +                            busy = scan = true;
1971 +                        else if (q.source != QUIET && q.phase >= 0)
1972 +                            busy = true;
1973 +                    }
1974 +                }
1975 +            }
1976 +            VarHandle.acquireFence();
1977 +            if (!scan && queues == qs) {
1978 +                boolean interrupted;
1979 +                if (!busy) {
1980 +                    w.source = prevSrc;
1981 +                    if (!active)
1982 +                        getAndAddCtl(RC_UNIT);
1983 +                    return 1;
1984 +                }
1985 +                if (wsrc != QUIET)
1986 +                    w.source = wsrc = QUIET;
1987 +                if (active) {                 // decrement
1988 +                    active = false;
1989 +                    parkTime = 0L;
1990 +                    getAndAddCtl(RC_MASK & -RC_UNIT);
1991 +                }
1992 +                else if (parkTime == 0L) {
1993 +                    parkTime = 1L << 10; // initially about 1 usec
1994 +                    Thread.yield();
1995 +                }
1996 +                else if ((interrupted = interruptible && Thread.interrupted()) ||
1997 +                         System.nanoTime() - startTime > nanos) {
1998 +                    getAndAddCtl(RC_UNIT);
1999 +                    return interrupted ? -1 : 0;
2000 +                }
2001 +                else {
2002 +                    LockSupport.parkNanos(this, parkTime);
2003 +                    if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
2004 +                        parkTime <<= 1;  // max sleep approx 1 sec or 1% nanos
2005 +                }
2006 +            }
2007 +        }
2008 +    }
2009 +
2010 +    /**
2011 +     * Helps quiesce from external caller until done, interrupted, or timeout
2012 +     *
2013 +     * @param nanos max wait time (Long.MAX_VALUE if effectively untimed)
2014 +     * @param interruptible true if return on interrupt
2015 +     * @return positive if quiescent, negative if interrupted, else 0
2016 +     */
2017 +    final int externalHelpQuiescePool(long nanos, boolean interruptible) {
2018 +        for (long startTime = System.nanoTime(), parkTime = 0L;;) {
2019 +            ForkJoinTask<?> t;
2020 +            if ((t = pollScan(false)) != null) {
2021 +                t.doExec();
2022 +                parkTime = 0L;
2023 +            }
2024 +            else if (canStop())
2025 +                return 1;
2026 +            else if (parkTime == 0L) {
2027 +                parkTime = 1L << 10;
2028 +                Thread.yield();
2029 +            }
2030 +            else if ((System.nanoTime() - startTime) > nanos)
2031 +                return 0;
2032 +            else if (interruptible && Thread.interrupted())
2033 +                return -1;
2034 +            else {
2035 +                LockSupport.parkNanos(this, parkTime);
2036 +                if (parkTime < nanos >>> 8 && parkTime < 1L << 20)
2037 +                    parkTime <<= 1;
2038 +            }
2039 +        }
2040 +    }
2041 +
2042 +    /**
2043       * Gets and removes a local or stolen task for the given worker.
2044       *
2045       * @return a task, if available
# Line 2124 | Line 2221 | public class ForkJoinPool extends Abstra
2221              md = getAndBitwiseOrMode(SHUTDOWN);
2222          }
2223          if ((md & STOP) == 0) {
2224 <            if (!now && !isQuiescent())
2224 >            if (!now && !canStop())
2225                  return false;
2226              md = getAndBitwiseOrMode(STOP);
2227          }
2228          if ((md & TERMINATED) == 0) {
2229              for (ForkJoinTask<?> t; (t = pollScan(false)) != null; )
2230 <                ForkJoinTask.cancelIgnoringExceptions(t); // cancel tasks
2231 <            WorkQueue[] qs; WorkQueue q; Thread t;
2232 <            int n = ((qs = queues) == null) ? 0 : qs.length;
2233 <            for (int i = 1; i < n; i += 2) { // unblock parked workers
2234 <                if ((q = qs[i]) != null && (t = q.owner) != null &&
2235 <                    !t.isInterrupted()) {
2236 <                    try {
2237 <                        t.interrupt();
2238 <                    } catch (Throwable ignore) {
2230 >                ForkJoinTask.cancelIgnoringExceptions(t); // help cancel tasks
2231 >
2232 >            WorkQueue[] qs; int n; WorkQueue q; Thread thread;
2233 >            if ((qs = queues) != null && (n = qs.length) > 0) {
2234 >                for (int j = 1; j < n; j += 2) { // unblock other workers
2235 >                    if ((q = qs[j]) != null && (thread = q.owner) != null &&
2236 >                        !thread.isInterrupted()) {
2237 >                        try {
2238 >                            thread.interrupt();
2239 >                        } catch (Throwable ignore) {
2240 >                        }
2241                      }
2242                  }
2243              }
2244  
2245 <            ReentrantLock lock; Condition cond; // finish if no workers
2245 >            ReentrantLock lock; Condition cond; // signal when no workers
2246              if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) <= 0 &&
2247                  (getAndBitwiseOrMode(TERMINATED) & TERMINATED) == 0 &&
2248                  (lock = registrationLock) != null) {
# Line 2508 | Line 2607 | public class ForkJoinPool extends Abstra
2607      }
2608  
2609      /**
2511     * Task class, plus some helper methods, for invokeAll and invokeAny.
2512     */
2513    static final class BulkTask<E> extends CountedCompleter<E> {
2514        private static final long serialVersionUID = 2838392045355241008L;
2515        @SuppressWarnings("serial") // Conditionally serializable
2516        final Callable<E> callable;
2517        @SuppressWarnings("serial") // Conditionally serializable
2518        E result;
2519        final boolean invokeAny;    // false if performing invokeAll
2520        BulkTask(BulkTask<E> parent, Callable<E> callable, boolean invokeAny) {
2521            super(parent);
2522            this.callable = callable;
2523            this.invokeAny = invokeAny;
2524        }
2525        public E getRawResult() { return result; }
2526        public void setRawResult(E r) { result = r; }
2527
2528        public void compute() {
2529            try {
2530                E r = callable.call();
2531                @SuppressWarnings("unchecked") CountedCompleter<E> p =
2532                    invokeAny ? (CountedCompleter<E>)getCompleter() : null;
2533                if (p != null)
2534                    p.complete(r);
2535                else
2536                    complete(r);
2537            } catch (Throwable ex) {
2538                completeExceptionally(ex);
2539            }
2540        }
2541
2542        static void cancelAll(BulkTask<?>[] fs) { // cancel all nonnull tasks
2543            if (fs != null) {
2544                for (BulkTask<?> f: fs) {
2545                    if (f != null)
2546                        f.cancel(false);
2547                }
2548            }
2549        }
2550
2551        /**
2552         * Creates, records, and forks a BulkTask for each Callable;
2553         * returns the array, with first element root task (if nonempty).
2554         */
2555        static <T> BulkTask<T>[] forkAll(Collection<? extends Callable<T>> cs,
2556                                         boolean invokeAny) {
2557            int n = cs.size();
2558            @SuppressWarnings("unchecked")
2559            BulkTask<T>[] fs = (BulkTask<T>[])new BulkTask<?>[n];
2560            BulkTask<T> root = null; // parent completer for all others
2561            Iterator<? extends Callable<T>> it = cs.iterator();
2562            int i = 0; // ignores extra elements if cs.size() inconsistent
2563            while (i < n && it.hasNext()) {
2564                Callable<T> c; BulkTask<T> f;
2565                if ((c = it.next()) == null) {
2566                    cancelAll(fs);
2567                    throw new NullPointerException();
2568                }
2569                fs[i++] = f = new BulkTask<T>(root, c, invokeAny);
2570                if (root == null)
2571                    (root = f).setPendingCount(n);
2572                f.fork();
2573            }
2574            return fs;
2575        }
2576
2577        /**
2578         * If completed abnormally, throws any exception encountered
2579         * by any task in array, or a CancellationException if none,
2580         * wrapped in ExecutionException. Else returns result.
2581         */
2582        E reportInvokeAnyResult(BulkTask<?>[] fs) throws ExecutionException {
2583            E r = getRawResult();
2584            if (r == null && isCompletedAbnormally()) {
2585                Throwable ex = null;
2586                if (fs != null) {
2587                    for (BulkTask<?> f: fs) {
2588                        if (f != null && (ex = f.getException()) != null)
2589                            break;
2590                    }
2591                }
2592                if (ex == null)
2593                    ex = new CancellationException();
2594                throw new ExecutionException(ex);
2595            }
2596            return r;
2597        }
2598    }
2599
2600    /**
2610       * @throws NullPointerException       {@inheritDoc}
2611       * @throws RejectedExecutionException {@inheritDoc}
2612       */
2613      @Override
2614      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2615 <        BulkTask<T>[] fs; BulkTask<T> root;
2616 <        if ((fs = BulkTask.forkAll(tasks, false)) != null && fs.length > 0 &&
2617 <            (root = fs[0]) != null)
2618 <            root.quietlyJoin();
2619 <        return Arrays.asList(fs);
2615 >        ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2616 >        try {
2617 >            for (Callable<T> t : tasks) {
2618 >                ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2619 >                futures.add(f);
2620 >                externalSubmit(f);
2621 >            }
2622 >            for (int i = futures.size() - 1; i >= 0; --i)
2623 >                ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2624 >            return futures;
2625 >        } catch (Throwable t) {
2626 >            for (Future<T> e : futures)
2627 >                ForkJoinTask.cancelIgnoringExceptions(e);
2628 >            throw t;
2629 >        }
2630      }
2631  
2632      @Override
2633      public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
2634                                           long timeout, TimeUnit unit)
2635          throws InterruptedException {
2636 <        BulkTask<T>[] fs; BulkTask<T> root;
2637 <        long deadline = unit.toNanos(timeout) + System.nanoTime();
2638 <        if ((fs = BulkTask.forkAll(tasks, false)) != null && fs.length > 0 &&
2639 <            (root = fs[0]) != null) {
2640 <            try {
2641 <                root.get(deadline, TimeUnit.NANOSECONDS);
2642 <            } catch (Exception ex) {
2643 <                BulkTask.cancelAll(fs);
2636 >        long nanos = unit.toNanos(timeout);
2637 >        ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2638 >        try {
2639 >            for (Callable<T> t : tasks) {
2640 >                ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2641 >                futures.add(f);
2642 >                externalSubmit(f);
2643 >            }
2644 >            long startTime = System.nanoTime(), ns = nanos;
2645 >            boolean timedOut = (ns < 0L);
2646 >            for (int i = futures.size() - 1; i >= 0; --i) {
2647 >                Future<T> f = futures.get(i);
2648 >                if (!f.isDone()) {
2649 >                    if (timedOut)
2650 >                        ForkJoinTask.cancelIgnoringExceptions(f);
2651 >                    else {
2652 >                        try {
2653 >                            f.get(ns, TimeUnit.NANOSECONDS);
2654 >                        } catch (CancellationException | TimeoutException |
2655 >                                 ExecutionException ok) {
2656 >                        }
2657 >                        if ((ns = nanos - (System.nanoTime() - startTime)) < 0L)
2658 >                            timedOut = true;
2659 >                    }
2660 >                }
2661              }
2662 +            return futures;
2663 +        } catch (Throwable t) {
2664 +            for (Future<T> e : futures)
2665 +                ForkJoinTask.cancelIgnoringExceptions(e);
2666 +            throw t;
2667          }
2627        return Arrays.asList(fs);
2628    }
2629
2630    @Override
2631    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
2632        throws InterruptedException, ExecutionException {
2633        BulkTask<T>[] fs; BulkTask<T> root;
2634        if ((fs = BulkTask.forkAll(tasks, true)) != null && fs.length > 0 &&
2635            (root = fs[0]) != null) {
2636            root.quietlyJoin();
2637            BulkTask.cancelAll(fs);
2638            return root.reportInvokeAnyResult(fs);
2639        }
2640        else
2641            throw new IllegalArgumentException(); // no tasks
2642    }
2643
2644    @Override
2645    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
2646                           long timeout, TimeUnit unit)
2647        throws InterruptedException, ExecutionException, TimeoutException {
2648        BulkTask<T>[] fs; BulkTask<T> root;
2649        long deadline = unit.toNanos(timeout) + System.nanoTime();
2650        if ((fs = BulkTask.forkAll(tasks, true)) != null && fs.length > 0 &&
2651            (root = fs[0]) != null) {
2652            TimeoutException tex = null;
2653            try {
2654                root.get(deadline, TimeUnit.NANOSECONDS);
2655            } catch (TimeoutException tx) {
2656                tex = tx;
2657            } catch (Throwable ignore) {
2658            }
2659            BulkTask.cancelAll(fs);
2660            if (tex != null)
2661                throw tex;
2662            return root.reportInvokeAnyResult(fs);
2663        }
2664        else
2665            throw new IllegalArgumentException();
2668      }
2669  
2670      /**
# Line 2771 | Line 2773 | public class ForkJoinPool extends Abstra
2773       * @return {@code true} if all threads are currently idle
2774       */
2775      public boolean isQuiescent() {
2776 <        int m, p;
2775 <        return ((((m = mode) & STOP) != 0) ||
2776 <                ((p = m & SMASK) + (int)(ctl >> RC_SHIFT) <= 0 &&
2777 <                 !hasQueuedSubmissions() && p + (int)(ctl >> RC_SHIFT) <= 0) ||
2778 <                (mode & STOP) != 0); // recheck
2776 >        return canStop();
2777      }
2778  
2779      /**
# Line 3046 | Line 3044 | public class ForkJoinPool extends Abstra
3044       */
3045      public boolean awaitTermination(long timeout, TimeUnit unit)
3046          throws InterruptedException {
3047 +        ReentrantLock lock; Condition cond;
3048          long nanos = unit.toNanos(timeout);
3049 <        ReentrantLock lock; Condition cond; // construct only if waiting
3051 <        if (Thread.interrupted())
3052 <            throw new InterruptedException();
3049 >        boolean terminated = false;
3050          if (this == common) {
3051 <            awaitQuiescence(timeout, unit);
3052 <            return false;
3051 >            Thread t; ForkJoinWorkerThread wt; int q;
3052 >            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3053 >                (wt = (ForkJoinWorkerThread)t).pool == this)
3054 >                q = helpQuiescePool(wt.workQueue, nanos, true);
3055 >            else
3056 >                q = externalHelpQuiescePool(nanos, true);
3057 >            if (q < 0)
3058 >                throw new InterruptedException();
3059          }
3060 <        if (isTerminated() || (lock = registrationLock) == null)
3061 <            return true;
3062 <        lock.lock();
3063 <        try {
3064 <            if ((cond = termination) == null)
3065 <                termination = cond = lock.newCondition();
3066 <            while (!isTerminated() && nanos > 0L)
3067 <                nanos = cond.awaitNanos(nanos);
3068 <        } finally {
3069 <            lock.unlock();
3060 >        else if (!(terminated = isTerminated()) &&
3061 >                 (lock = registrationLock) != null) {
3062 >            lock.lock();
3063 >            try {
3064 >                if ((cond = termination) == null)
3065 >                    termination = cond = lock.newCondition();
3066 >                while (!(terminated = isTerminated()) && nanos > 0L)
3067 >                    nanos = cond.awaitNanos(nanos);
3068 >            } finally {
3069 >                lock.unlock();
3070 >            }
3071          }
3072 <        return isTerminated();
3072 >        return terminated;
3073      }
3074  
3075      /**
# Line 3080 | Line 3084 | public class ForkJoinPool extends Abstra
3084       * timeout elapsed.
3085       */
3086      public boolean awaitQuiescence(long timeout, TimeUnit unit) {
3087 <        Thread thread; ForkJoinWorkerThread wt;
3087 >        Thread t; ForkJoinWorkerThread wt; int q;
3088          long nanos = unit.toNanos(timeout);
3089 <        if ((thread = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3090 <            (wt = (ForkJoinWorkerThread)thread).pool == this) {
3091 <            helpQuiescePool(wt.workQueue);
3092 <            return true;
3093 <        }
3094 <        // else cannot block, so use exponential sleeps
3091 <        boolean quiesced = false, interrupted = false;
3092 <        for (long startTime = System.nanoTime(), parkTime = 0L;;) {
3093 <            ForkJoinTask<?> t;
3094 <            if ((t = pollScan(false)) != null) {
3095 <                t.doExec();
3096 <                parkTime = 0L;
3097 <            }
3098 <            else if (quiesced = isQuiescent())
3099 <                break;
3100 <            else if ((System.nanoTime() - startTime) > nanos)
3101 <                break;
3102 <            else if (parkTime == 0L) {
3103 <                parkTime = 1L << 10; // initially about 1 usec
3104 <                Thread.yield();
3105 <            }
3106 <            else if (Thread.interrupted())
3107 <                interrupted = true;
3108 <            else {
3109 <                LockSupport.parkNanos(this, parkTime);
3110 <                if (parkTime < nanos >>> 8) // max sleep approx 1% nanos
3111 <                    parkTime <<= 1;
3112 <            }
3113 <        }
3114 <        if (interrupted)
3115 <            Thread.currentThread().interrupt();
3116 <        return quiesced;
3117 <    }
3118 <
3119 <    /**
3120 <     * Waits and/or attempts to assist performing tasks indefinitely
3121 <     * until the {@link #commonPool()} {@link #isQuiescent}.
3122 <     */
3123 <    static void quiesceCommonPool() {
3124 <        common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3089 >        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
3090 >            (wt = (ForkJoinWorkerThread)t).pool == this)
3091 >            q = helpQuiescePool(wt.workQueue, nanos, false);
3092 >        else
3093 >            q = externalHelpQuiescePool(nanos, false);
3094 >        return (q > 0);
3095      }
3096  
3097      /**

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines