ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.155 by jsr166, Sat Dec 8 20:49:24 2012 UTC vs.
Revision 1.164 by dl, Tue Dec 18 21:46:16 2012 UTC

# Line 987 | Line 987 | public class ForkJoinPool extends Abstra
987              if (t != null) {
988                  (currentSteal = t).doExec();
989                  currentSteal = null;
990 +                ++nsteals;
991                  if (base - top < 0) {       // process remaining local tasks
992                      if (mode == 0)
993                          popAndExecAll();
994                      else
995                          pollAndExecAll();
996                  }
996                ++nsteals;
997                hint = -1;
997              }
998          }
999  
# Line 1021 | Line 1020 | public class ForkJoinPool extends Abstra
1020                      s != Thread.State.TIMED_WAITING);
1021          }
1022  
1024        /**
1025         * If this owned and is not already interrupted, try to
1026         * interrupt and/or unpark, ignoring exceptions.
1027         */
1028        final void interruptOwner() {
1029            Thread wt, p;
1030            if ((wt = owner) != null && !wt.isInterrupted()) {
1031                try {
1032                    wt.interrupt();
1033                } catch (SecurityException ignore) {
1034                }
1035            }
1036            if ((p = parker) != null)
1037                U.unpark(p);
1038        }
1039
1023          // Unsafe mechanics
1024          private static final sun.misc.Unsafe U;
1025          private static final long QLOCK;
# Line 1131 | Line 1114 | public class ForkJoinPool extends Abstra
1114      /**
1115       * Tolerance for idle timeouts, to cope with timer undershoots
1116       */
1117 <    private static final long TIMEOUT_SLOP = 2000000L; // 20ms
1117 >    private static final long TIMEOUT_SLOP = 2000000L;
1118  
1119      /**
1120       * The maximum stolen->joining link depth allowed in method
# Line 1268 | Line 1251 | public class ForkJoinPool extends Abstra
1251       * fails. This acts as a spinLock for normal cases, but falls back
1252       * to builtin monitor to block when (rarely) needed. This would be
1253       * a terrible idea for a highly contended lock, but works fine as
1254 <     * a more conservative alternative to a pure spinlock.  See
1272 <     * internal ConcurrentHashMap documentation for further
1273 <     * explanation of nearly the same construction.
1254 >     * a more conservative alternative to a pure spinlock.
1255       */
1256      private int acquirePlock() {
1257          int spins = PL_SPINS, r = 0, ps, nps;
# Line 1466 | Line 1447 | public class ForkJoinPool extends Abstra
1447              }
1448          }
1449  
1450 <        long c;                             // adjust ctl counts
1450 >        long c;                          // adjust ctl counts
1451          do {} while (!U.compareAndSwapLong
1452                       (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
1453                                             ((c - TC_UNIT) & TC_MASK) |
1454                                             (c & ~(AC_MASK|TC_MASK)))));
1455  
1456          if (!tryTerminate(false, false) && w != null && w.array != null) {
1457 <            w.cancelAll();                  // cancel remaining tasks
1458 <            int e, u, i, n; WorkQueue[] ws; WorkQueue v; Thread p;
1459 <            while ((u = (int)((c = ctl) >>> 32)) < 0) {
1460 <                if ((e = (int)c) > 0) {     // activate or create replacement
1461 <                    if ((ws = workQueues) != null &&
1462 <                        ws.length > (i = e & SMASK) &&
1463 <                        (v = ws[i]) != null && v.eventCount == (e | INT_SIGN)) {
1483 <                        long nc = (((long)(v.nextWait & E_MASK)) |
1484 <                                   ((long)(u + UAC_UNIT) << 32));
1485 <                        if (U.compareAndSwapLong(this, CTL, c, nc)) {
1486 <                            v.eventCount = (e + E_SEQ) & E_MASK;
1487 <                            if ((p = v.parker) != null)
1488 <                                U.unpark(p);
1489 <                            break;
1490 <                        }
1491 <                    }
1492 <                    else
1457 >            w.cancelAll();               // cancel remaining tasks
1458 >            WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
1459 >            while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
1460 >                if (e > 0) {             // activate or create replacement
1461 >                    if ((ws = workQueues) == null ||
1462 >                        (i = e & SMASK) >= ws.length ||
1463 >                        (v = ws[i]) == null)
1464                          break;
1465 +                    long nc = (((long)(v.nextWait & E_MASK)) |
1466 +                               ((long)(u + UAC_UNIT) << 32));
1467 +                    if (v.eventCount != (e | INT_SIGN))
1468 +                        break;
1469 +                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1470 +                        v.eventCount = (e + E_SEQ) & E_MASK;
1471 +                        if ((p = v.parker) != null)
1472 +                            U.unpark(p);
1473 +                        break;
1474 +                    }
1475                  }
1476                  else {
1477                      if ((short)u < 0)
# Line 1686 | Line 1667 | public class ForkJoinPool extends Abstra
1667       *
1668       * * If not already enqueued, try to inactivate and enqueue the
1669       * worker on wait queue. Or, if inactivating has caused the pool
1670 <     * to be quiescent, relay to idleAwaitWork to check for
1671 <     * termination and possibly shrink pool.
1670 >     * to be quiescent, relay to idleAwaitWork to possibly shrink
1671 >     * pool.
1672       *
1673       * * If already enqueued and none of the above apply, possibly
1674 <     * (with 1/2 probability) park awaiting signal, else lingering to
1675 <     * help scan and signal.
1674 >     * park awaiting signal, else lingering to help scan and signal.
1675 >     *
1676 >     * * If a non-empty queue discovered or left as a hint,
1677 >     * help wake up other workers before return
1678       *
1679       * @param w the worker (via its WorkQueue)
1680       * @return a task or null if none found
# Line 1702 | Line 1685 | public class ForkJoinPool extends Abstra
1685          if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1686              int ec = w.eventCount;               // ec is negative if inactive
1687              int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1688 +            w.hint = -1;                         // update seed and clear hint
1689              int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
1690              do {
1691                  WorkQueue q; ForkJoinTask<?>[] a; int b;
# Line 1723 | Line 1707 | public class ForkJoinPool extends Abstra
1707                  }
1708              } while (--j >= 0);
1709  
1710 <            long c, sc; int e, ns, h;
1711 <            if ((h = w.hint) < 0) {
1712 <                if ((ns = w.nsteals) != 0) {
1713 <                    if (U.compareAndSwapLong(this, STEALCOUNT,
1714 <                                             sc = stealCount, sc + ns))
1715 <                        w.nsteals = 0;           // collect steals
1716 <                }
1717 <                else if (plock != ps)            // consistency check
1718 <                    ;                            // skip
1719 <                else if ((e = (int)(c = ctl)) < 0)
1720 <                    w.qlock = -1;                // pool is terminating
1721 <                else if (ec >= 0) {              // try to enqueue/inactivate
1722 <                    long nc = ((long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)));
1723 <                    w.nextWait = e;              // link and mark inactive
1724 <                    w.eventCount = ec | INT_SIGN;
1725 <                    if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1726 <                        w.eventCount = ec;       // unmark on CAS failure
1727 <                    else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1728 <                        idleAwaitWork(w, nc, c);
1729 <                }
1730 <                else if (w.eventCount < 0) {     // block
1731 <                    Thread wt = Thread.currentThread();
1732 <                    Thread.interrupted();        // clear status
1733 <                    U.putObject(wt, PARKBLOCKER, this);
1734 <                    w.parker = wt;               // emulate LockSupport.park
1735 <                    if (w.eventCount < 0)        // recheck
1736 <                        U.park(false, 0L);
1737 <                    w.parker = null;
1738 <                    U.putObject(wt, PARKBLOCKER, null);
1710 >            int h, e, ns; long c, sc; WorkQueue q;
1711 >            if ((ns = w.nsteals) != 0) {
1712 >                if (U.compareAndSwapLong(this, STEALCOUNT,
1713 >                                         sc = stealCount, sc + ns))
1714 >                    w.nsteals = 0;               // collect steals and rescan
1715 >            }
1716 >            else if (plock != ps)                // consistency check
1717 >                ;                                // skip
1718 >            else if ((e = (int)(c = ctl)) < 0)
1719 >                w.qlock = -1;                    // pool is terminating
1720 >            else {
1721 >                if ((h = w.hint) < 0) {
1722 >                    if (ec >= 0) {               // try to enqueue/inactivate
1723 >                        long nc = (((long)ec |
1724 >                                    ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1725 >                        w.nextWait = e;          // link and mark inactive
1726 >                        w.eventCount = ec | INT_SIGN;
1727 >                        if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1728 >                            w.eventCount = ec;   // unmark on CAS failure
1729 >                        else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1730 >                            idleAwaitWork(w, nc, c);
1731 >                    }
1732 >                    else if (w.eventCount < 0 && !tryTerminate(false, false) &&
1733 >                             ctl == c) {         // block
1734 >                        Thread wt = Thread.currentThread();
1735 >                        Thread.interrupted();    // clear status
1736 >                        U.putObject(wt, PARKBLOCKER, this);
1737 >                        w.parker = wt;           // emulate LockSupport.park
1738 >                        if (w.eventCount < 0)    // recheck
1739 >                            U.park(false, 0L);
1740 >                        w.parker = null;
1741 >                        U.putObject(wt, PARKBLOCKER, null);
1742 >                    }
1743 >                }
1744 >                if ((h >= 0 || (h = w.hint) >= 0) &&
1745 >                    (ws = workQueues) != null && h < ws.length &&
1746 >                    (q = ws[h]) != null) {      // signal others before retry
1747 >                    WorkQueue v; Thread p; int u, i, s;
1748 >                    for (int n = (config & SMASK) >>> 1;;) {
1749 >                        int idleCount = (w.eventCount < 0) ? 0 : -1;
1750 >                        if (((s = idleCount - q.base + q.top) <= n &&
1751 >                             (n = s) <= 0) ||
1752 >                            (u = (int)((c = ctl) >>> 32)) >= 0 ||
1753 >                            (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1754 >                            (v = ws[i]) == null)
1755 >                            break;
1756 >                        long nc = (((long)(v.nextWait & E_MASK)) |
1757 >                                   ((long)(u + UAC_UNIT) << 32));
1758 >                        if (v.eventCount != (e | INT_SIGN) ||
1759 >                            !U.compareAndSwapLong(this, CTL, c, nc))
1760 >                            break;
1761 >                        v.hint = h;
1762 >                        v.eventCount = (e + E_SEQ) & E_MASK;
1763 >                        if ((p = v.parker) != null)
1764 >                            U.unpark(p);
1765 >                        if (--n <= 0)
1766 >                            break;
1767 >                    }
1768                  }
1769              }
1757            if (h >= 0 || w.hint >= 0)           // signal others before retry
1758                helpSignalHint(w);
1770          }
1771          return null;
1772      }
# Line 1826 | Line 1837 | public class ForkJoinPool extends Abstra
1837                          break outer;
1838                      long nc = (((long)(w.nextWait & E_MASK)) |
1839                                 ((long)(u + UAC_UNIT) << 32));
1840 <                    if (w.eventCount == (e | INT_SIGN) &&
1841 <                        U.compareAndSwapLong(this, CTL, c, nc)) {
1840 >                    if (w.eventCount != (e | INT_SIGN))
1841 >                        break outer;
1842 >                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1843                          w.eventCount = (e + E_SEQ) & E_MASK;
1844                          if ((p = w.parker) != null)
1845                              U.unpark(p);
# Line 1840 | Line 1852 | public class ForkJoinPool extends Abstra
1852      }
1853  
1854      /**
1843     * Signals other workers if tasks are present in hinted queue.
1844     *
1845     * @param caller the worker with the hint
1846     */
1847    private void helpSignalHint(WorkQueue caller) {
1848        WorkQueue[] ws; WorkQueue q, w; Thread p; long c; int h, m, u, e, i, s;
1849        if (caller != null && (h = caller.hint) >= 0 &&
1850            (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1851            (ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
1852            (q = ws[h & m]) != null) {
1853            caller.hint = -1;
1854            for (int n = 2;;) { // limit to at most 2 signals
1855                int idleCount = (caller.eventCount < 0) ? 0 : -1;
1856                if (((s = idleCount - q.base + q.top) <= n && (n = s) <= 0) ||
1857                    (u = (int)((c = ctl) >>> 32)) >= 0 ||
1858                    (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1859                    (w = ws[i]) == null)
1860                    break;
1861                long nc = (((long)(w.nextWait & E_MASK)) |
1862                           ((long)(u + UAC_UNIT) << 32));
1863                if (w.eventCount == (e | INT_SIGN) &&
1864                    U.compareAndSwapLong(this, CTL, c, nc)) {
1865                    w.hint = h;
1866                    w.eventCount = (e + E_SEQ) & E_MASK;
1867                    if ((p = w.parker) != null)
1868                        U.unpark(p);
1869                    if (--n <= 0)
1870                        break;
1871                }
1872            }
1873        }
1874    }
1875
1876    /**
1855       * Tries to locate and execute tasks for a stealer of the given
1856       * task, or in turn one of its stealers, Traces currentSteal ->
1857       * currentJoin links looking for a thread working on a descendant
# Line 2125 | Line 2103 | public class ForkJoinPool extends Abstra
2103       */
2104      private WorkQueue findNonEmptyStealQueue(int r) {
2105          for (WorkQueue[] ws;;) {
2106 <            int ps = plock, m, n;
2106 >            int ps = plock, m;
2107              if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
2108                  return null;
2109              for (int j = (m + 1) << 2; ;) {
2110                  WorkQueue q = ws[(((r + j) << 1) | 1) & m];
2111 <                if (q != null && (n = q.base - q.top) < 0) {
2134 <                    if (n < -1)
2135 <                        signalWork(q);
2111 >                if (q != null && q.base - q.top < 0)
2112                      return q;
2137                }
2113                  else if (--j < 0) {
2114                      if (plock == ps)
2115                          return null;
# Line 2165 | Line 2140 | public class ForkJoinPool extends Abstra
2140                      do {} while (!U.compareAndSwapLong
2141                                   (this, CTL, c = ctl, c + AC_UNIT));
2142                  }
2143 <                if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2143 >                if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2144 >                    if (q.base - q.top < 0)
2145 >                        signalWork(q);
2146                      w.runSubtask(t);
2147 +                }
2148              }
2149              else {
2150                  long c;
# Line 2198 | Line 2176 | public class ForkJoinPool extends Abstra
2176                  return t;
2177              if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
2178                  return null;
2179 <            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
2179 >            if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
2180 >                if (q.base - q.top < 0)
2181 >                    signalWork(q);
2182                  return t;
2183 +            }
2184          }
2185      }
2186  
# Line 2299 | Line 2280 | public class ForkJoinPool extends Abstra
2280                  if (((ps = plock) & PL_LOCK) != 0 ||
2281                      !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
2282                      ps = acquirePlock();
2283 <                int nps = SHUTDOWN;
2284 <                if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
2304 <                    releasePlock(nps);
2283 >                if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN))
2284 >                    releasePlock(SHUTDOWN);
2285              }
2286              if (!now) {                             // check if idle & no tasks
2287                  if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
# Line 2320 | Line 2300 | public class ForkJoinPool extends Abstra
2300                  for (int pass = 0; pass < 3; ++pass) {
2301                      WorkQueue[] ws = workQueues;
2302                      if (ws != null) {
2303 <                        WorkQueue w;
2303 >                        WorkQueue w; Thread wt;
2304                          int n = ws.length;
2305                          for (int i = 0; i < n; ++i) {
2306                              if ((w = ws[i]) != null) {
2307                                  w.qlock = -1;
2308                                  if (pass > 0) {
2309                                      w.cancelAll();
2310 <                                    if (pass > 1)
2311 <                                        w.interruptOwner();
2310 >                                    if (pass > 1 && (wt = w.owner) != null) {
2311 >                                        if (!wt.isInterrupted()) {
2312 >                                            try {
2313 >                                                wt.interrupt();
2314 >                                            } catch (SecurityException ignore) {
2315 >                                            }
2316 >                                        }
2317 >                                        U.unpark(wt);
2318 >                                    }
2319                                  }
2320                              }
2321                          }
# Line 2491 | Line 2478 | public class ForkJoinPool extends Abstra
2478          if ((p = commonPool) != null &&
2479              (q = p.findNonEmptyStealQueue(1)) != null &&
2480              (b = q.base) - q.top < 0 &&
2481 <            (t = q.pollAt(b)) != null)
2481 >            (t = q.pollAt(b)) != null) {
2482 >            if (q.base - q.top < 0)
2483 >                p.signalWork(q);
2484              t.doExec();
2485 +        }
2486      }
2487  
2488      // Exported methods
# Line 2593 | Line 2583 | public class ForkJoinPool extends Abstra
2583      }
2584  
2585      /**
2586 <     * Returns the common pool instance.
2586 >     * Returns the common pool instance. This pool is statically
2587 >     * constructed; its run state is unaffected by attempts to
2588 >     * {@link #shutdown} or {@link #shutdownNow}.
2589       *
2590       * @return the common pool instance
2591       */

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines