ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.153 by jsr166, Mon Nov 26 14:11:54 2012 UTC vs.
Revision 1.154 by dl, Sat Dec 8 14:08:51 2012 UTC

# Line 1129 | Line 1129 | public class ForkJoinPool extends Abstra
1129      private static final long FAST_IDLE_TIMEOUT =  200L * 1000L * 1000L;
1130  
1131      /**
1132 +     * Tolerance for idle timeouts, to cope with timer undershoots
1133 +     */
1134 +    private static final long TIMEOUT_SLOP = 2000000L; // 20ms
1135 +
1136 +    /**
1137       * The maximum stolen->joining link depth allowed in method
1138       * tryHelpStealer.  Must be a power of two.  Depths for legitimate
1139       * chains are unbounded, but we use a fixed constant to avoid
# Line 1350 | Line 1355 | public class ForkJoinPool extends Abstra
1355      }
1356  
1357      /**
1358 <     * Tries to create and start one worker. Adjusts counts etc on
1359 <     * failure.
1358 >     * Tries to create and start one worker if fewer than target
1359 >     * parallelism level exist. Adjusts counts etc on failure.
1360       */
1361      private void tryAddWorker() {
1362          long c; int u;
# Line 1467 | Line 1472 | public class ForkJoinPool extends Abstra
1472                                             ((c - TC_UNIT) & TC_MASK) |
1473                                             (c & ~(AC_MASK|TC_MASK)))));
1474  
1475 <        if (!tryTerminate(false, false) && w != null) {
1475 >        if (!tryTerminate(false, false) && w != null && w.array != null) {
1476              w.cancelAll();                  // cancel remaining tasks
1477 <            if (w.array != null)            // suppress signal if never ran
1478 <                tryAddWorker();             // create replacement
1479 <            if (ex == null)                 // help clean refs on way out
1480 <                ForkJoinTask.helpExpungeStaleExceptions();
1477 >            int e, u, i, n; WorkQueue[] ws; WorkQueue v; Thread p;
1478 >            while ((u = (int)((c = ctl) >>> 32)) < 0) {
1479 >                if ((e = (int)c) > 0) {     // activate or create replacement
1480 >                    if ((ws = workQueues) != null &&
1481 >                        ws.length > (i = e & SMASK) &&
1482 >                        (v = ws[i]) != null && v.eventCount == (e | INT_SIGN)) {
1483 >                        long nc = (((long)(v.nextWait & E_MASK)) |
1484 >                                   ((long)(u + UAC_UNIT) << 32));
1485 >                        if (U.compareAndSwapLong(this, CTL, c, nc)) {
1486 >                            v.eventCount = (e + E_SEQ) & E_MASK;
1487 >                            if ((p = v.parker) != null)
1488 >                                U.unpark(p);
1489 >                            break;
1490 >                        }
1491 >                    }
1492 >                    else
1493 >                        break;
1494 >                }
1495 >                else {
1496 >                    if ((short)u < 0)
1497 >                        tryAddWorker();
1498 >                    break;
1499 >                }
1500 >            }
1501          }
1502 <
1503 <        if (ex != null)                     // rethrow
1502 >        if (ex == null)                     // help clean refs on way out
1503 >            ForkJoinTask.helpExpungeStaleExceptions();
1504 >        else                                // rethrow
1505              ForkJoinTask.rethrow(ex);
1506      }
1507  
# Line 1728 | Line 1754 | public class ForkJoinPool extends Abstra
1754                      U.putObject(wt, PARKBLOCKER, null);
1755                  }
1756              }
1757 <            if (h >= 0 || (h = w.hint) >= 0) {   // signal others before retry
1758 <                w.hint = -1;                     // reset
1733 <                helpSignal(null, h, true);
1734 <            }
1757 >            if (h >= 0 || w.hint >= 0)           // signal others before retry
1758 >                helpSignalHint(w);
1759          }
1760          return null;
1761      }
# Line 1753 | Line 1777 | public class ForkJoinPool extends Abstra
1777              !tryTerminate(false, false) && (int)prevCtl != 0) {
1778              int dc = -(short)(currentCtl >>> TC_SHIFT);
1779              long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1780 <            long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop
1780 >            long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1781              Thread wt = Thread.currentThread();
1782              while (ctl == currentCtl) {
1783                  Thread.interrupted();  // timed variant of version in scan()
# Line 1776 | Line 1800 | public class ForkJoinPool extends Abstra
1800      }
1801  
1802      /**
1803 <     * Scans through queues looking for work (optionally, while
1804 <     * joining a task); if any present, signals. May return early if
1805 <     * more signalling is detectably unneeded.
1803 >     * Scans through queues looking for work while joining a task; if
1804 >     * any present, signals. May return early if more signalling is
1805 >     * detectably unneeded.
1806       *
1807 <     * @param task if non-null, return early if done
1807 >     * @param task return early if done
1808       * @param origin an index to start scan
1785     * @param once if only the origin should be checked
1809       */
1810 <    private void helpSignal(ForkJoinTask<?> task, int origin, boolean once) {
1810 >    private void helpSignal(ForkJoinTask<?> task, int origin) {
1811          WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
1812 <        if ((u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1812 >        if (task != null && task.status >= 0 &&
1813 >            (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1814              (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1815 <            outer: for (int k = origin, j = once ? 0 : m; j >= 0; --j) {
1815 >            outer: for (int k = origin, j = m; j >= 0; --j) {
1816                  WorkQueue q = ws[k++ & m];
1817                  for (int n = m;;) { // limit to at most m signals
1818 <                    if (task != null && task.status < 0)
1818 >                    if (task.status < 0)
1819                          break outer;
1820                      if (q == null ||
1821 <                        ((s = (task == null ? -1 : 0) - q.base + q.top) <= n &&
1798 <                         (n = s) <= 0))
1821 >                        ((s = -q.base + q.top) <= n && (n = s) <= 0))
1822                          break;
1823                      if ((u = (int)((c = ctl) >>> 32)) >= 0 ||
1824                          (e = (int)c) <= 0 || m < (i = e & SMASK) ||
# Line 1817 | Line 1840 | public class ForkJoinPool extends Abstra
1840      }
1841  
1842      /**
1843 +     * Signals other workers if tasks are present in hinted queue.
1844 +     *
1845 +     * @param caller the worker with the hint
1846 +     */
1847 +    private void helpSignalHint(WorkQueue caller) {
1848 +        WorkQueue[] ws; WorkQueue q, w; Thread p; long c; int h, m, u, e, i, s;
1849 +        if (caller != null && (h = caller.hint) >= 0 &&
1850 +            (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1851 +            (ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
1852 +            (q = ws[h & m]) != null) {
1853 +            caller.hint = -1;
1854 +            for (int n = 2;;) { // limit to at most 2 signals
1855 +                int idleCount = (caller.eventCount < 0)? 0 : -1;
1856 +                if (((s = idleCount - q.base + q.top) <= n && (n = s) <= 0) ||
1857 +                    (u = (int)((c = ctl) >>> 32)) >= 0 ||
1858 +                    (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1859 +                    (w = ws[i]) == null)
1860 +                    break;
1861 +                long nc = (((long)(w.nextWait & E_MASK)) |
1862 +                           ((long)(u + UAC_UNIT) << 32));
1863 +                if (w.eventCount == (e | INT_SIGN) &&
1864 +                    U.compareAndSwapLong(this, CTL, c, nc)) {
1865 +                    w.hint = h;
1866 +                    w.eventCount = (e + E_SEQ) & E_MASK;
1867 +                    if ((p = w.parker) != null)
1868 +                        U.unpark(p);
1869 +                    if (--n <= 0)
1870 +                        break;
1871 +                }
1872 +            }
1873 +        }
1874 +    }
1875 +
1876 +    /**
1877       * Tries to locate and execute tasks for a stealer of the given
1878       * task, or in turn one of its stealers, Traces currentSteal ->
1879       * currentJoin links looking for a thread working on a descendant
# Line 1996 | Line 2053 | public class ForkJoinPool extends Abstra
2053              do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2054                           joiner.tryRemoveAndExec(task)); // process local tasks
2055              if (s >= 0 && (s = task.status) >= 0) {
2056 <                helpSignal(task, joiner.poolIndex, false);
2056 >                helpSignal(task, joiner.poolIndex);
2057                  if ((s = task.status) >= 0 &&
2058                      (task instanceof CountedCompleter))
2059                      s = helpComplete(task, LIFO_QUEUE);
# Line 2005 | Line 2062 | public class ForkJoinPool extends Abstra
2062                  if ((!joiner.isEmpty() ||           // try helping
2063                       (s = tryHelpStealer(joiner, task)) == 0) &&
2064                      (s = task.status) >= 0) {
2065 <                    helpSignal(task, joiner.poolIndex, false);
2065 >                    helpSignal(task, joiner.poolIndex);
2066                      if ((s = task.status) >= 0 && tryCompensate()) {
2067                          if (task.trySetSignal() && (s = task.status) >= 0) {
2068                              synchronized (task) {
# Line 2046 | Line 2103 | public class ForkJoinPool extends Abstra
2103              do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2104                           joiner.tryRemoveAndExec(task));
2105              if (s >= 0 && (s = task.status) >= 0) {
2106 <                helpSignal(task, joiner.poolIndex, false);
2106 >                helpSignal(task, joiner.poolIndex);
2107                  if ((s = task.status) >= 0 &&
2108                      (task instanceof CountedCompleter))
2109                      s = helpComplete(task, LIFO_QUEUE);
# Line 2378 | Line 2435 | public class ForkJoinPool extends Abstra
2435                      (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
2436                      break;
2437                  if (task == null) {
2438 <                    helpSignal(root, q.poolIndex, false);
2438 >                    helpSignal(root, q.poolIndex);
2439                      if (root.status >= 0)
2440                          helpComplete(root, SHARED_QUEUE);
2441                      break;
# Line 2421 | Line 2478 | public class ForkJoinPool extends Abstra
2478                  if (t instanceof CountedCompleter)
2479                      p.externalHelpComplete(q, t);
2480                  else
2481 <                    p.helpSignal(t, q.poolIndex, false);
2481 >                    p.helpSignal(t, q.poolIndex);
2482              }
2483          }
2484      }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines