ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.159 by dl, Sat Dec 15 22:15:51 2012 UTC vs.
Revision 1.160 by dl, Sun Dec 16 19:57:04 2012 UTC

# Line 987 | Line 987 | public class ForkJoinPool extends Abstra
987              if (t != null) {
988                  (currentSteal = t).doExec();
989                  currentSteal = null;
990 +                ++nsteals;
991                  if (base - top < 0) {       // process remaining local tasks
992                      if (mode == 0)
993                          popAndExecAll();
994                      else
995                          pollAndExecAll();
996                  }
996                ++nsteals;
997                hint = -1;
997              }
998          }
999  
# Line 1021 | Line 1020 | public class ForkJoinPool extends Abstra
1020                      s != Thread.State.TIMED_WAITING);
1021          }
1022  
1024        /**
1025         * If this owned and is not already interrupted, try to
1026         * interrupt and/or unpark, ignoring exceptions.
1027         */
1028        final void interruptOwner() {
1029            Thread wt, p;
1030            if ((wt = owner) != null && !wt.isInterrupted()) {
1031                try {
1032                    wt.interrupt();
1033                } catch (SecurityException ignore) {
1034                }
1035            }
1036            if ((p = parker) != null)
1037                U.unpark(p);
1038        }
1039
1023          // Unsafe mechanics
1024          private static final sun.misc.Unsafe U;
1025          private static final long QLOCK;
# Line 1268 | Line 1251 | public class ForkJoinPool extends Abstra
1251       * fails. This acts as a spinLock for normal cases, but falls back
1252       * to builtin monitor to block when (rarely) needed. This would be
1253       * a terrible idea for a highly contended lock, but works fine as
1254 <     * a more conservative alternative to a pure spinlock.  See
1272 <     * internal ConcurrentHashMap documentation for further
1273 <     * explanation of nearly the same construction.
1254 >     * a more conservative alternative to a pure spinlock.
1255       */
1256      private int acquirePlock() {
1257          int spins = PL_SPINS, r = 0, ps, nps;
# Line 1466 | Line 1447 | public class ForkJoinPool extends Abstra
1447              }
1448          }
1449  
1450 <        long c;                             // adjust ctl counts
1450 >        long c;                          // adjust ctl counts
1451          do {} while (!U.compareAndSwapLong
1452                       (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
1453                                             ((c - TC_UNIT) & TC_MASK) |
1454                                             (c & ~(AC_MASK|TC_MASK)))));
1455  
1456          if (!tryTerminate(false, false) && w != null && w.array != null) {
1457 <            w.cancelAll();                  // cancel remaining tasks
1458 <            int e, u, i, n; WorkQueue[] ws; WorkQueue v; Thread p;
1459 <            while ((u = (int)((c = ctl) >>> 32)) < 0) {
1460 <                if ((e = (int)c) > 0) {     // activate or create replacement
1461 <                    if ((ws = workQueues) != null &&
1462 <                        ws.length > (i = e & SMASK) &&
1463 <                        (v = ws[i]) != null && v.eventCount == (e | INT_SIGN)) {
1483 <                        long nc = (((long)(v.nextWait & E_MASK)) |
1484 <                                   ((long)(u + UAC_UNIT) << 32));
1485 <                        if (U.compareAndSwapLong(this, CTL, c, nc)) {
1486 <                            v.eventCount = (e + E_SEQ) & E_MASK;
1487 <                            if ((p = v.parker) != null)
1488 <                                U.unpark(p);
1489 <                            break;
1490 <                        }
1491 <                    }
1492 <                    else
1457 >            w.cancelAll();               // cancel remaining tasks
1458 >            WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
1459 >            while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
1460 >                if (e > 0) {             // activate or create replacement
1461 >                    if ((ws = workQueues) == null ||
1462 >                        (i = e & SMASK) >= ws.length ||
1463 >                        (v = ws[i]) != null)
1464                          break;
1465 +                    long nc = (((long)(v.nextWait & E_MASK)) |
1466 +                               ((long)(u + UAC_UNIT) << 32));
1467 +                    if (v.eventCount != (e | INT_SIGN))
1468 +                        break;
1469 +                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1470 +                        v.eventCount = (e + E_SEQ) & E_MASK;
1471 +                        if ((p = v.parker) != null)
1472 +                            U.unpark(p);
1473 +                        break;
1474 +                    }
1475                  }
1476                  else {
1477                      if ((short)u < 0)
# Line 1686 | Line 1667 | public class ForkJoinPool extends Abstra
1667       *
1668       * * If not already enqueued, try to inactivate and enqueue the
1669       * worker on wait queue. Or, if inactivating has caused the pool
1670 <     * to be quiescent, relay to idleAwaitWork to check for
1671 <     * termination and possibly shrink pool.
1670 >     * to be quiescent, relay to idleAwaitWork to possibly shrink
1671 >     * pool.
1672       *
1673       * * If already enqueued and none of the above apply, possibly
1674 <     * (with 1/2 probability) park awaiting signal, else lingering to
1675 <     * help scan and signal.
1674 >     * park awaiting signal, else lingering to help scan and signal.
1675 >     *
1676 >     * * If a non-empty queue discovered or left as a hint,
1677 >     * help wake up other workers before return
1678       *
1679       * @param w the worker (via its WorkQueue)
1680       * @return a task or null if none found
# Line 1702 | Line 1685 | public class ForkJoinPool extends Abstra
1685          if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1686              int ec = w.eventCount;               // ec is negative if inactive
1687              int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
1688 +            w.hint = -1;                         // update seed and clear hint
1689              int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
1690              do {
1691                  WorkQueue q; ForkJoinTask<?>[] a; int b;
# Line 1723 | Line 1707 | public class ForkJoinPool extends Abstra
1707                  }
1708              } while (--j >= 0);
1709  
1710 <            long c, sc; int e, ns, h;
1711 <            if ((h = w.hint) < 0) {
1712 <                if ((ns = w.nsteals) != 0) {
1713 <                    if (U.compareAndSwapLong(this, STEALCOUNT,
1714 <                                             sc = stealCount, sc + ns))
1715 <                        w.nsteals = 0;           // collect steals
1716 <                }
1717 <                else if (plock != ps)            // consistency check
1718 <                    ;                            // skip
1719 <                else if ((e = (int)(c = ctl)) < 0)
1720 <                    w.qlock = -1;                // pool is terminating
1721 <                else if (ec >= 0) {              // try to enqueue/inactivate
1722 <                    long nc = ((long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)));
1723 <                    w.nextWait = e;              // link and mark inactive
1724 <                    w.eventCount = ec | INT_SIGN;
1725 <                    if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1726 <                        w.eventCount = ec;       // unmark on CAS failure
1727 <                    else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1728 <                        idleAwaitWork(w, nc, c);
1729 <                }
1730 <                else if (w.eventCount < 0) {     // block
1731 <                    Thread wt = Thread.currentThread();
1732 <                    Thread.interrupted();        // clear status
1733 <                    U.putObject(wt, PARKBLOCKER, this);
1734 <                    w.parker = wt;               // emulate LockSupport.park
1735 <                    if (w.eventCount < 0)        // recheck
1736 <                        U.park(false, 0L);
1737 <                    w.parker = null;
1738 <                    U.putObject(wt, PARKBLOCKER, null);
1710 >            int h, e, ns; long c, sc; WorkQueue q;
1711 >            if ((ns = w.nsteals) != 0) {
1712 >                if (U.compareAndSwapLong(this, STEALCOUNT,
1713 >                                         sc = stealCount, sc + ns))
1714 >                    w.nsteals = 0;               // collect steals and rescan
1715 >            }
1716 >            else if (plock != ps)                // consistency check
1717 >                ;                                // skip
1718 >            else if ((e = (int)(c = ctl)) < 0)
1719 >                w.qlock = -1;                    // pool is terminating
1720 >            else {
1721 >                if ((h = w.hint) < 0) {
1722 >                    if (ec >= 0) {               // try to enqueue/inactivate
1723 >                        long nc = (((long)ec |
1724 >                                    ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
1725 >                        w.nextWait = e;          // link and mark inactive
1726 >                        w.eventCount = ec | INT_SIGN;
1727 >                        if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
1728 >                            w.eventCount = ec;   // unmark on CAS failure
1729 >                        else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
1730 >                            idleAwaitWork(w, nc, c);
1731 >                    }
1732 >                    else if (w.eventCount < 0 && !tryTerminate(false, false) &&
1733 >                             ctl == c) {         // block
1734 >                        Thread wt = Thread.currentThread();
1735 >                        Thread.interrupted();    // clear status
1736 >                        U.putObject(wt, PARKBLOCKER, this);
1737 >                        w.parker = wt;           // emulate LockSupport.park
1738 >                        if (w.eventCount < 0)    // recheck
1739 >                            U.park(false, 0L);
1740 >                        w.parker = null;
1741 >                        U.putObject(wt, PARKBLOCKER, null);
1742 >                    }
1743 >                }
1744 >                if ((h >= 0 || (h = w.hint) >= 0) &&
1745 >                    (ws = workQueues) != null && h < ws.length &&
1746 >                    (q = ws[h]) != null) {      // signal others before retry
1747 >                    WorkQueue v; Thread p; int u, i, s;
1748 >                    for (int n = (config & SMASK) >>> 1;;) {
1749 >                        int idleCount = (w.eventCount < 0) ? 0 : -1;
1750 >                        if (((s = idleCount - q.base + q.top) <= n &&
1751 >                             (n = s) <= 0) ||
1752 >                            (u = (int)((c = ctl) >>> 32)) >= 0 ||
1753 >                            (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1754 >                            (v = ws[i]) == null)
1755 >                            break;
1756 >                        long nc = (((long)(v.nextWait & E_MASK)) |
1757 >                                   ((long)(u + UAC_UNIT) << 32));
1758 >                        if (v.eventCount != (e | INT_SIGN) ||
1759 >                            !U.compareAndSwapLong(this, CTL, c, nc))
1760 >                            break;
1761 >                        v.hint = h;
1762 >                        v.eventCount = (e + E_SEQ) & E_MASK;
1763 >                        if ((p = v.parker) != null)
1764 >                            U.unpark(p);
1765 >                        if (--n <= 0)
1766 >                            break;
1767 >                    }
1768                  }
1769              }
1757            if (h >= 0 || w.hint >= 0)           // signal others before retry
1758                helpSignalHint(w);
1770          }
1771          return null;
1772      }
# Line 1826 | Line 1837 | public class ForkJoinPool extends Abstra
1837                          break outer;
1838                      long nc = (((long)(w.nextWait & E_MASK)) |
1839                                 ((long)(u + UAC_UNIT) << 32));
1840 <                    if (w.eventCount == (e | INT_SIGN) &&
1841 <                        U.compareAndSwapLong(this, CTL, c, nc)) {
1842 <                        w.eventCount = (e + E_SEQ) & E_MASK;
1832 <                        if ((p = w.parker) != null)
1833 <                            U.unpark(p);
1834 <                        if (--n <= 0)
1835 <                            break;
1836 <                    }
1837 <                }
1838 <            }
1839 <        }
1840 <    }
1841 <
1842 <    /**
1843 <     * Signals other workers if tasks are present in hinted queue.
1844 <     *
1845 <     * @param caller the worker with the hint
1846 <     */
1847 <    private void helpSignalHint(WorkQueue caller) {
1848 <        WorkQueue[] ws; WorkQueue q, w; Thread p; long c; int h, m, u, e, i, s;
1849 <        if (caller != null && (h = caller.hint) >= 0) {
1850 <            caller.hint = -1;
1851 <            if ((u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1852 <                (ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
1853 <                (q = ws[h & m]) != null) {
1854 <                for (int n = 2;;) { // limit to at most 2 signals
1855 <                    int idleCount = (caller.eventCount < 0) ? 0 : -1;
1856 <                    if (((s = idleCount - q.base + q.top) <= n &&
1857 <                         (n = s) <= 0) ||
1858 <                        (u = (int)((c = ctl) >>> 32)) >= 0 ||
1859 <                        (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1860 <                        (w = ws[i]) == null)
1861 <                        break;
1862 <                    long nc = (((long)(w.nextWait & E_MASK)) |
1863 <                               ((long)(u + UAC_UNIT) << 32));
1864 <                    if (w.eventCount == (e | INT_SIGN) &&
1865 <                        U.compareAndSwapLong(this, CTL, c, nc)) {
1866 <                        w.hint = h;
1840 >                    if (w.eventCount != (e | INT_SIGN))
1841 >                        break outer;
1842 >                    if (U.compareAndSwapLong(this, CTL, c, nc)) {
1843                          w.eventCount = (e + E_SEQ) & E_MASK;
1844                          if ((p = w.parker) != null)
1845                              U.unpark(p);
# Line 2301 | Line 2277 | public class ForkJoinPool extends Abstra
2277                  if (((ps = plock) & PL_LOCK) != 0 ||
2278                      !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
2279                      ps = acquirePlock();
2280 <                int nps = SHUTDOWN;
2281 <                if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
2306 <                    releasePlock(nps);
2280 >                if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN))
2281 >                    releasePlock(SHUTDOWN);
2282              }
2283              if (!now) {                             // check if idle & no tasks
2284                  if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
# Line 2322 | Line 2297 | public class ForkJoinPool extends Abstra
2297                  for (int pass = 0; pass < 3; ++pass) {
2298                      WorkQueue[] ws = workQueues;
2299                      if (ws != null) {
2300 <                        WorkQueue w;
2300 >                        WorkQueue w; Thread wt;
2301                          int n = ws.length;
2302                          for (int i = 0; i < n; ++i) {
2303                              if ((w = ws[i]) != null) {
2304                                  w.qlock = -1;
2305                                  if (pass > 0) {
2306                                      w.cancelAll();
2307 <                                    if (pass > 1)
2308 <                                        w.interruptOwner();
2307 >                                    if (pass > 1 && (wt = w.owner) != null) {
2308 >                                        if (!wt.isInterrupted()) {
2309 >                                            try {
2310 >                                                wt.interrupt();
2311 >                                            } catch (SecurityException ignore) {
2312 >                                            }
2313 >                                        }
2314 >                                        U.unpark(wt);
2315 >                                    }
2316                                  }
2317                              }
2318                          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines