ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166e/ForkJoinPool.java (file contents):
Revision 1.23 by jsr166, Thu Nov 22 18:14:57 2012 UTC vs.
Revision 1.36 by dl, Tue Dec 18 21:46:09 2012 UTC

# Line 51 | Line 51 | import java.util.concurrent.TimeUnit;
51   * dynamically adding, suspending, or resuming internal worker
52   * threads, even if some tasks are stalled waiting to join
53   * others. However, no such adjustments are guaranteed in the face of
54 < * blocked IO or other unmanaged synchronization. The nested {@link
54 > * blocked I/O or other unmanaged synchronization. The nested {@link
55   * ManagedBlocker} interface enables extension of the kinds of
56   * synchronization accommodated.
57   *
# Line 987 | Line 987 | public class ForkJoinPool extends Abstra
987              if (t != null) {
988                  (currentSteal = t).doExec();
989                  currentSteal = null;
990 +                ++nsteals;
991                  if (base - top < 0) {       // process remaining local tasks
992                      if (mode == 0)
993                          popAndExecAll();
994                      else
995                          pollAndExecAll();
996                  }
996                ++nsteals;
997                hint = -1;
997              }
998          }
999  
# Line 1021 | Line 1020 | public class ForkJoinPool extends Abstra
1020                      s != Thread.State.TIMED_WAITING);
1021          }
1022  
1024        /**
1025         * If this owned and is not already interrupted, try to
1026         * interrupt and/or unpark, ignoring exceptions.
1027         */
1028        final void interruptOwner() {
1029            Thread wt, p;
1030            if ((wt = owner) != null && !wt.isInterrupted()) {
1031                try {
1032                    wt.interrupt();
1033                } catch (SecurityException ignore) {
1034                }
1035            }
1036            if ((p = parker) != null)
1037                U.unpark(p);
1038        }
1039
1023          // Unsafe mechanics
1024          private static final sun.misc.Unsafe U;
1025          private static final long QLOCK;
# Line 1129 | Line 1112 | public class ForkJoinPool extends Abstra
1112      private static final long FAST_IDLE_TIMEOUT =  200L * 1000L * 1000L;
1113  
1114      /**
1115 +     * Tolerance for idle timeouts, to cope with timer undershoots
1116 +     */
1117 +    private static final long TIMEOUT_SLOP = 2000000L;
1118 +
1119 +    /**
1120       * The maximum stolen->joining link depth allowed in method
1121       * tryHelpStealer.  Must be a power of two.  Depths for legitimate
1122       * chains are unbounded, but we use a fixed constant to avoid
# Line 1263 | Line 1251 | public class ForkJoinPool extends Abstra
1251       * fails. This acts as a spinLock for normal cases, but falls back
1252       * to builtin monitor to block when (rarely) needed. This would be
1253       * a terrible idea for a highly contended lock, but works fine as
1254 <     * a more conservative alternative to a pure spinlock.  See
1267 <     * internal ConcurrentHashMap documentation for further
1268 <     * explanation of nearly the same construction.
1254 >     * a more conservative alternative to a pure spinlock.
1255       */
1256      private int acquirePlock() {
1257          int spins = PL_SPINS, r = 0, ps, nps;
# Line 1326 | Line 1312 | public class ForkJoinPool extends Abstra
1312       * wrap around zero, this method harmlessly fails to reinitialize
1313       * if workQueues exists, while still advancing plock.
1314       *
1315 <     * Additonally tries to create the first worker.
1315 >     * Additionally tries to create the first worker.
1316       */
1317      private void initWorkers() {
1318          WorkQueue[] ws, nws; int ps;
# Line 1350 | Line 1336 | public class ForkJoinPool extends Abstra
1336      }
1337  
1338      /**
1339 <     * Tries to create and start one worker. Adjusts counts etc on
1340 <     * failure.
1339 >     * Tries to create and start one worker if fewer than target
1340 >     * parallelism level exist. Adjusts counts etc on failure.
1341       */
1342      private void tryAddWorker() {
1343          long c; int u;
# Line 1461 | Line 1447 | public class ForkJoinPool extends Abstra
1447              }
1448          }
1449  
1450 <        long c;                             // adjust ctl counts
1450 >        long c;                          // adjust ctl counts
1451          do {} while (!U.compareAndSwapLong
1452                       (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
1453                                             ((c - TC_UNIT) & TC_MASK) |
1454                                             (c & ~(AC_MASK|TC_MASK)))));
1455  
1456 <        if (!tryTerminate(false, false) && w != null) {
1457 <            w.cancelAll();                  // cancel remaining tasks
1458 <            if (w.array != null)            // suppress signal if never ran
1459 <                tryAddWorker();             // create replacement
1460 <            if (ex == null)                 // help clean refs on way out
1461 <                ForkJoinTask.helpExpungeStaleExceptions();
1456 >        if (!tryTerminate(false, false) && w != null && w.array != null) {
1457 >            w.cancelAll();               // cancel remaining tasks
1458 >            WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
1459 >            while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
1460 >                if (e > 0) {             // activate or create replacement
1461 >                    if ((ws = workQueues) == null ||
1462 >                        (i = e & SMASK) >= ws.length ||
1463 >                        (v = ws[i]) == null)
1464 >                        break;
1465 >                    long nc = (((long)(v.nextWait & E_MASK)) |
1466 >                               ((long)(u + UAC_UNIT) << 32));
1467 >                    if (v.eventCount != (e | INT_SIGN))
1468 >                        break;
1469 >                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1470 >                        v.eventCount = (e + E_SEQ) & E_MASK;
1471 >                        if ((p = v.parker) != null)
1472 >                            U.unpark(p);
1473 >                        break;
1474 >                    }
1475 >                }
1476 >                else {
1477 >                    if ((short)u < 0)
1478 >                        tryAddWorker();
1479 >                    break;
1480 >                }
1481 >            }
1482          }
1483 <
1484 <        if (ex != null)                     // rethrow
1483 >        if (ex == null)                     // help clean refs on way out
1484 >            ForkJoinTask.helpExpungeStaleExceptions();
1485 >        else                                // rethrow
1486              ForkJoinTask.rethrow(ex);
1487      }
1488  
# Line 1660 | Line 1667 | public class ForkJoinPool extends Abstra
1667       *
1668       * * If not already enqueued, try to inactivate and enqueue the
1669       * worker on wait queue. Or, if inactivating has caused the pool
1670 <     * to be quiescent, relay to idleAwaitWork to check for
1671 <     * termination and possibly shrink pool.
1670 >     * to be quiescent, relay to idleAwaitWork to possibly shrink
1671 >     * pool.
1672       *
1673       * * If already enqueued and none of the above apply, possibly
1674 <     * (with 1/2 probability) park awaiting signal, else lingering to
1675 <     * help scan and signal.
1674 >     * park awaiting signal, else lingering to help scan and signal.
1675 >     *
1676 >     * * If a non-empty queue discovered or left as a hint,
1677 >     * help wake up other workers before return
1678       *
1679       * @param w the worker (via its WorkQueue)
1680       * @return a task or null if none found
# Line 1676 | Line 1685 | public class ForkJoinPool extends Abstra
1685          if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1686              int ec = w.eventCount;               // ec is negative if inactive
1687              int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1688 +            w.hint = -1;                         // update seed and clear hint
1689              int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
1690              do {
1691                  WorkQueue q; ForkJoinTask<?>[] a; int b;
# Line 1697 | Line 1707 | public class ForkJoinPool extends Abstra
1707                  }
1708              } while (--j >= 0);
1709  
1710 <            long c, sc; int e, ns, h;
1711 <            if ((h = w.hint) < 0) {
1712 <                if ((ns = w.nsteals) != 0) {
1713 <                    if (U.compareAndSwapLong(this, STEALCOUNT,
1714 <                                             sc = stealCount, sc + ns))
1715 <                        w.nsteals = 0;           // collect steals
1716 <                }
1717 <                else if (plock != ps)            // consistency check
1718 <                    ;                            // skip
1719 <                else if ((e = (int)(c = ctl)) < 0)
1720 <                    w.qlock = -1;                // pool is terminating
1721 <                else if (ec >= 0) {              // try to enqueue/inactivate
1722 <                    long nc = ((long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)));
1723 <                    w.nextWait = e;              // link and mark inactive
1724 <                    w.eventCount = ec | INT_SIGN;
1725 <                    if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1726 <                        w.eventCount = ec;       // unmark on CAS failure
1727 <                    else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1728 <                        idleAwaitWork(w, nc, c);
1729 <                }
1730 <                else if (w.eventCount < 0) {     // block
1731 <                    Thread wt = Thread.currentThread();
1732 <                    Thread.interrupted();        // clear status
1733 <                    U.putObject(wt, PARKBLOCKER, this);
1734 <                    w.parker = wt;               // emulate LockSupport.park
1735 <                    if (w.eventCount < 0)        // recheck
1736 <                        U.park(false, 0L);
1737 <                    w.parker = null;
1738 <                    U.putObject(wt, PARKBLOCKER, null);
1739 <                }
1740 <            }
1741 <            if (h >= 0 || (h = w.hint) >= 0) {   // signal others before retry
1742 <                w.hint = -1;                     // reset
1743 <                helpSignal(null, h, true);
1710 >            int h, e, ns; long c, sc; WorkQueue q;
1711 >            if ((ns = w.nsteals) != 0) {
1712 >                if (U.compareAndSwapLong(this, STEALCOUNT,
1713 >                                         sc = stealCount, sc + ns))
1714 >                    w.nsteals = 0;               // collect steals and rescan
1715 >            }
1716 >            else if (plock != ps)                // consistency check
1717 >                ;                                // skip
1718 >            else if ((e = (int)(c = ctl)) < 0)
1719 >                w.qlock = -1;                    // pool is terminating
1720 >            else {
1721 >                if ((h = w.hint) < 0) {
1722 >                    if (ec >= 0) {               // try to enqueue/inactivate
1723 >                        long nc = (((long)ec |
1724 >                                    ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1725 >                        w.nextWait = e;          // link and mark inactive
1726 >                        w.eventCount = ec | INT_SIGN;
1727 >                        if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1728 >                            w.eventCount = ec;   // unmark on CAS failure
1729 >                        else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1730 >                            idleAwaitWork(w, nc, c);
1731 >                    }
1732 >                    else if (w.eventCount < 0 && !tryTerminate(false, false) &&
1733 >                             ctl == c) {         // block
1734 >                        Thread wt = Thread.currentThread();
1735 >                        Thread.interrupted();    // clear status
1736 >                        U.putObject(wt, PARKBLOCKER, this);
1737 >                        w.parker = wt;           // emulate LockSupport.park
1738 >                        if (w.eventCount < 0)    // recheck
1739 >                            U.park(false, 0L);
1740 >                        w.parker = null;
1741 >                        U.putObject(wt, PARKBLOCKER, null);
1742 >                    }
1743 >                }
1744 >                if ((h >= 0 || (h = w.hint) >= 0) &&
1745 >                    (ws = workQueues) != null && h < ws.length &&
1746 >                    (q = ws[h]) != null) {      // signal others before retry
1747 >                    WorkQueue v; Thread p; int u, i, s;
1748 >                    for (int n = (config & SMASK) >>> 1;;) {
1749 >                        int idleCount = (w.eventCount < 0) ? 0 : -1;
1750 >                        if (((s = idleCount - q.base + q.top) <= n &&
1751 >                             (n = s) <= 0) ||
1752 >                            (u = (int)((c = ctl) >>> 32)) >= 0 ||
1753 >                            (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1754 >                            (v = ws[i]) == null)
1755 >                            break;
1756 >                        long nc = (((long)(v.nextWait & E_MASK)) |
1757 >                                   ((long)(u + UAC_UNIT) << 32));
1758 >                        if (v.eventCount != (e | INT_SIGN) ||
1759 >                            !U.compareAndSwapLong(this, CTL, c, nc))
1760 >                            break;
1761 >                        v.hint = h;
1762 >                        v.eventCount = (e + E_SEQ) & E_MASK;
1763 >                        if ((p = v.parker) != null)
1764 >                            U.unpark(p);
1765 >                        if (--n <= 0)
1766 >                            break;
1767 >                    }
1768 >                }
1769              }
1770          }
1771          return null;
# Line 1753 | Line 1788 | public class ForkJoinPool extends Abstra
1788              !tryTerminate(false, false) && (int)prevCtl != 0) {
1789              int dc = -(short)(currentCtl >>> TC_SHIFT);
1790              long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1791 <            long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop
1791 >            long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1792              Thread wt = Thread.currentThread();
1793              while (ctl == currentCtl) {
1794                  Thread.interrupted();  // timed variant of version in scan()
# Line 1776 | Line 1811 | public class ForkJoinPool extends Abstra
1811      }
1812  
1813      /**
1814 <     * Scans through queues looking for work (optionally, while
1815 <     * joining a task); if any present, signals. May return early if
1816 <     * more signalling is detectably unneeded.
1814 >     * Scans through queues looking for work while joining a task; if
1815 >     * any present, signals. May return early if more signalling is
1816 >     * detectably unneeded.
1817       *
1818 <     * @param task if non-null, return early if done
1818 >     * @param task return early if done
1819       * @param origin an index to start scan
1785     * @param once if only the origin should be checked
1820       */
1821 <    private void helpSignal(ForkJoinTask<?> task, int origin, boolean once) {
1821 >    private void helpSignal(ForkJoinTask<?> task, int origin) {
1822          WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
1823 <        if ((u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1823 >        if (task != null && task.status >= 0 &&
1824 >            (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1825              (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1826 <            outer: for (int k = origin, j = once ? 0 : m; j >= 0; --j) {
1826 >            outer: for (int k = origin, j = m; j >= 0; --j) {
1827                  WorkQueue q = ws[k++ & m];
1828                  for (int n = m;;) { // limit to at most m signals
1829 <                    if (task != null && task.status < 0)
1829 >                    if (task.status < 0)
1830                          break outer;
1831                      if (q == null ||
1832 <                        ((s = (task == null ? -1 : 0) - q.base + q.top) <= n &&
1798 <                         (n = s) <= 0))
1832 >                        ((s = -q.base + q.top) <= n && (n = s) <= 0))
1833                          break;
1834                      if ((u = (int)((c = ctl) >>> 32)) >= 0 ||
1835                          (e = (int)c) <= 0 || m < (i = e & SMASK) ||
# Line 1803 | Line 1837 | public class ForkJoinPool extends Abstra
1837                          break outer;
1838                      long nc = (((long)(w.nextWait & E_MASK)) |
1839                                 ((long)(u + UAC_UNIT) << 32));
1840 <                    if (w.eventCount == (e | INT_SIGN) &&
1841 <                        U.compareAndSwapLong(this, CTL, c, nc)) {
1840 >                    if (w.eventCount != (e | INT_SIGN))
1841 >                        break outer;
1842 >                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1843                          w.eventCount = (e + E_SEQ) & E_MASK;
1844                          if ((p = w.parker) != null)
1845                              U.unpark(p);
# Line 1996 | Line 2031 | public class ForkJoinPool extends Abstra
2031              do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2032                           joiner.tryRemoveAndExec(task)); // process local tasks
2033              if (s >= 0 && (s = task.status) >= 0) {
2034 <                helpSignal(task, joiner.poolIndex, false);
2034 >                helpSignal(task, joiner.poolIndex);
2035                  if ((s = task.status) >= 0 &&
2036                      (task instanceof CountedCompleter))
2037                      s = helpComplete(task, LIFO_QUEUE);
# Line 2005 | Line 2040 | public class ForkJoinPool extends Abstra
2040                  if ((!joiner.isEmpty() ||           // try helping
2041                       (s = tryHelpStealer(joiner, task)) == 0) &&
2042                      (s = task.status) >= 0) {
2043 <                    helpSignal(task, joiner.poolIndex, false);
2043 >                    helpSignal(task, joiner.poolIndex);
2044                      if ((s = task.status) >= 0 && tryCompensate()) {
2045                          if (task.trySetSignal() && (s = task.status) >= 0) {
2046                              synchronized (task) {
# Line 2046 | Line 2081 | public class ForkJoinPool extends Abstra
2081              do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2082                           joiner.tryRemoveAndExec(task));
2083              if (s >= 0 && (s = task.status) >= 0) {
2084 <                helpSignal(task, joiner.poolIndex, false);
2084 >                helpSignal(task, joiner.poolIndex);
2085                  if ((s = task.status) >= 0 &&
2086                      (task instanceof CountedCompleter))
2087                      s = helpComplete(task, LIFO_QUEUE);
# Line 2068 | Line 2103 | public class ForkJoinPool extends Abstra
2103       */
2104      private WorkQueue findNonEmptyStealQueue(int r) {
2105          for (WorkQueue[] ws;;) {
2106 <            int ps = plock, m, n;
2106 >            int ps = plock, m;
2107              if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
2108                  return null;
2109              for (int j = (m + 1) << 2; ;) {
2110                  WorkQueue q = ws[(((r + j) << 1) | 1) & m];
2111 <                if (q != null && (n = q.base - q.top) < 0) {
2077 <                    if (n < -1)
2078 <                        signalWork(q);
2111 >                if (q != null && q.base - q.top < 0)
2112                      return q;
2080                }
2113                  else if (--j < 0) {
2114                      if (plock == ps)
2115                          return null;
# Line 2108 | Line 2140 | public class ForkJoinPool extends Abstra
2140                      do {} while (!U.compareAndSwapLong
2141                                   (this, CTL, c = ctl, c + AC_UNIT));
2142                  }
2143 <                if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2143 >                if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2144 >                    if (q.base - q.top < 0)
2145 >                        signalWork(q);
2146                      w.runSubtask(t);
2147 +                }
2148              }
2149              else {
2150                  long c;
# Line 2141 | Line 2176 | public class ForkJoinPool extends Abstra
2176                  return t;
2177              if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
2178                  return null;
2179 <            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2179 >            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2180 >                if (q.base - q.top < 0)
2181 >                    signalWork(q);
2182                  return t;
2183 +            }
2184          }
2185      }
2186  
# Line 2242 | Line 2280 | public class ForkJoinPool extends Abstra
2280                  if (((ps = plock) & PL_LOCK) != 0 ||
2281                      !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
2282                      ps = acquirePlock();
2283 <                int nps = SHUTDOWN;
2284 <                if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
2247 <                    releasePlock(nps);
2283 >                if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN))
2284 >                    releasePlock(SHUTDOWN);
2285              }
2286              if (!now) {                             // check if idle & no tasks
2287                  if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
# Line 2263 | Line 2300 | public class ForkJoinPool extends Abstra
2300                  for (int pass = 0; pass < 3; ++pass) {
2301                      WorkQueue[] ws = workQueues;
2302                      if (ws != null) {
2303 <                        WorkQueue w;
2303 >                        WorkQueue w; Thread wt;
2304                          int n = ws.length;
2305                          for (int i = 0; i < n; ++i) {
2306                              if ((w = ws[i]) != null) {
2307                                  w.qlock = -1;
2308                                  if (pass > 0) {
2309                                      w.cancelAll();
2310 <                                    if (pass > 1)
2311 <                                        w.interruptOwner();
2310 >                                    if (pass > 1 && (wt = w.owner) != null) {
2311 >                                        if (!wt.isInterrupted()) {
2312 >                                            try {
2313 >                                                wt.interrupt();
2314 >                                            } catch (SecurityException ignore) {
2315 >                                            }
2316 >                                        }
2317 >                                        U.unpark(wt);
2318 >                                    }
2319                                  }
2320                              }
2321                          }
# Line 2378 | Line 2422 | public class ForkJoinPool extends Abstra
2422                      (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
2423                      break;
2424                  if (task == null) {
2425 <                    helpSignal(root, q.poolIndex, false);
2425 >                    helpSignal(root, q.poolIndex);
2426                      if (root.status >= 0)
2427                          helpComplete(root, SHARED_QUEUE);
2428                      break;
# Line 2421 | Line 2465 | public class ForkJoinPool extends Abstra
2465                  if (t instanceof CountedCompleter)
2466                      p.externalHelpComplete(q, t);
2467                  else
2468 <                    p.helpSignal(t, q.poolIndex, false);
2468 >                    p.helpSignal(t, q.poolIndex);
2469              }
2470          }
2471      }
# Line 2434 | Line 2478 | public class ForkJoinPool extends Abstra
2478          if ((p = commonPool) != null &&
2479              (q = p.findNonEmptyStealQueue(1)) != null &&
2480              (b = q.base) - q.top < 0 &&
2481 <            (t = q.pollAt(b)) != null)
2481 >            (t = q.pollAt(b)) != null) {
2482 >            if (q.base - q.top < 0)
2483 >                p.signalWork(q);
2484              t.doExec();
2485 +        }
2486      }
2487  
2488      // Exported methods
# Line 2536 | Line 2583 | public class ForkJoinPool extends Abstra
2583      }
2584  
2585      /**
2586 <     * Returns the common pool instance.
2586 >     * Returns the common pool instance. This pool is statically
2587 >     * constructed; its run state is unaffected by attempts to
2588 >     * {@link #shutdown} or {@link #shutdownNow}.
2589       *
2590       * @return the common pool instance
2591       */
# Line 3036 | Line 3085 | public class ForkJoinPool extends Abstra
3085       * commenced but not yet completed.  This method may be useful for
3086       * debugging. A return of {@code true} reported a sufficient
3087       * period after shutdown may indicate that submitted tasks have
3088 <     * ignored or suppressed interruption, or are waiting for IO,
3088 >     * ignored or suppressed interruption, or are waiting for I/O,
3089       * causing this executor not to properly terminate. (See the
3090       * advisory notes for class {@link ForkJoinTask} stating that
3091       * tasks should not normally entail blocking operations.  But if

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines