ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166e/ForkJoinPool.java (file contents):
Revision 1.23 by jsr166, Thu Nov 22 18:14:57 2012 UTC vs.
Revision 1.31 by dl, Sat Dec 15 22:15:47 2012 UTC

# Line 51 | Line 51 | import java.util.concurrent.TimeUnit;
51   * dynamically adding, suspending, or resuming internal worker
52   * threads, even if some tasks are stalled waiting to join
53   * others. However, no such adjustments are guaranteed in the face of
54 < * blocked IO or other unmanaged synchronization. The nested {@link
54 > * blocked I/O or other unmanaged synchronization. The nested {@link
55   * ManagedBlocker} interface enables extension of the kinds of
56   * synchronization accommodated.
57   *
# Line 1129 | Line 1129 | public class ForkJoinPool extends Abstra
1129      private static final long FAST_IDLE_TIMEOUT =  200L * 1000L * 1000L;
1130  
1131      /**
1132 +     * Tolerance for idle timeouts, to cope with timer undershoots
1133 +     */
1134 +    private static final long TIMEOUT_SLOP = 2000000L; // 20ms
1135 +
1136 +    /**
1137       * The maximum stolen->joining link depth allowed in method
1138       * tryHelpStealer.  Must be a power of two.  Depths for legitimate
1139       * chains are unbounded, but we use a fixed constant to avoid
# Line 1326 | Line 1331 | public class ForkJoinPool extends Abstra
1331       * wrap around zero, this method harmlessly fails to reinitialize
1332       * if workQueues exists, while still advancing plock.
1333       *
1334 <     * Additonally tries to create the first worker.
1334 >     * Additionally tries to create the first worker.
1335       */
1336      private void initWorkers() {
1337          WorkQueue[] ws, nws; int ps;
# Line 1350 | Line 1355 | public class ForkJoinPool extends Abstra
1355      }
1356  
1357      /**
1358 <     * Tries to create and start one worker. Adjusts counts etc on
1359 <     * failure.
1358 >     * Tries to create and start one worker if fewer than target
1359 >     * parallelism level exist. Adjusts counts etc on failure.
1360       */
1361      private void tryAddWorker() {
1362          long c; int u;
# Line 1467 | Line 1472 | public class ForkJoinPool extends Abstra
1472                                             ((c - TC_UNIT) & TC_MASK) |
1473                                             (c & ~(AC_MASK|TC_MASK)))));
1474  
1475 <        if (!tryTerminate(false, false) && w != null) {
1475 >        if (!tryTerminate(false, false) && w != null && w.array != null) {
1476              w.cancelAll();                  // cancel remaining tasks
1477 <            if (w.array != null)            // suppress signal if never ran
1478 <                tryAddWorker();             // create replacement
1479 <            if (ex == null)                 // help clean refs on way out
1480 <                ForkJoinTask.helpExpungeStaleExceptions();
1477 >            int e, u, i, n; WorkQueue[] ws; WorkQueue v; Thread p;
1478 >            while ((u = (int)((c = ctl) >>> 32)) < 0) {
1479 >                if ((e = (int)c) > 0) {     // activate or create replacement
1480 >                    if ((ws = workQueues) != null &&
1481 >                        ws.length > (i = e & SMASK) &&
1482 >                        (v = ws[i]) != null && v.eventCount == (e | INT_SIGN)) {
1483 >                        long nc = (((long)(v.nextWait & E_MASK)) |
1484 >                                   ((long)(u + UAC_UNIT) << 32));
1485 >                        if (U.compareAndSwapLong(this, CTL, c, nc)) {
1486 >                            v.eventCount = (e + E_SEQ) & E_MASK;
1487 >                            if ((p = v.parker) != null)
1488 >                                U.unpark(p);
1489 >                            break;
1490 >                        }
1491 >                    }
1492 >                    else
1493 >                        break;
1494 >                }
1495 >                else {
1496 >                    if ((short)u < 0)
1497 >                        tryAddWorker();
1498 >                    break;
1499 >                }
1500 >            }
1501          }
1502 <
1503 <        if (ex != null)                     // rethrow
1502 >        if (ex == null)                     // help clean refs on way out
1503 >            ForkJoinTask.helpExpungeStaleExceptions();
1504 >        else                                // rethrow
1505              ForkJoinTask.rethrow(ex);
1506      }
1507  
# Line 1728 | Line 1754 | public class ForkJoinPool extends Abstra
1754                      U.putObject(wt, PARKBLOCKER, null);
1755                  }
1756              }
1757 <            if (h >= 0 || (h = w.hint) >= 0) {   // signal others before retry
1758 <                w.hint = -1;                     // reset
1733 <                helpSignal(null, h, true);
1734 <            }
1757 >            if (h >= 0 || w.hint >= 0)           // signal others before retry
1758 >                helpSignalHint(w);
1759          }
1760          return null;
1761      }
# Line 1753 | Line 1777 | public class ForkJoinPool extends Abstra
1777              !tryTerminate(false, false) && (int)prevCtl != 0) {
1778              int dc = -(short)(currentCtl >>> TC_SHIFT);
1779              long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
1780 <            long deadline = System.nanoTime() + parkTime - 100000L; // 1ms slop
1780 >            long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
1781              Thread wt = Thread.currentThread();
1782              while (ctl == currentCtl) {
1783                  Thread.interrupted();  // timed variant of version in scan()
# Line 1776 | Line 1800 | public class ForkJoinPool extends Abstra
1800      }
1801  
1802      /**
1803 <     * Scans through queues looking for work (optionally, while
1804 <     * joining a task); if any present, signals. May return early if
1805 <     * more signalling is detectably unneeded.
1803 >     * Scans through queues looking for work while joining a task; if
1804 >     * any present, signals. May return early if more signalling is
1805 >     * detectably unneeded.
1806       *
1807 <     * @param task if non-null, return early if done
1807 >     * @param task return early if done
1808       * @param origin an index to start scan
1785     * @param once if only the origin should be checked
1809       */
1810 <    private void helpSignal(ForkJoinTask<?> task, int origin, boolean once) {
1810 >    private void helpSignal(ForkJoinTask<?> task, int origin) {
1811          WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
1812 <        if ((u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1812 >        if (task != null && task.status >= 0 &&
1813 >            (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1814              (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
1815 <            outer: for (int k = origin, j = once ? 0 : m; j >= 0; --j) {
1815 >            outer: for (int k = origin, j = m; j >= 0; --j) {
1816                  WorkQueue q = ws[k++ & m];
1817                  for (int n = m;;) { // limit to at most m signals
1818 <                    if (task != null && task.status < 0)
1818 >                    if (task.status < 0)
1819                          break outer;
1820                      if (q == null ||
1821 <                        ((s = (task == null ? -1 : 0) - q.base + q.top) <= n &&
1798 <                         (n = s) <= 0))
1821 >                        ((s = -q.base + q.top) <= n && (n = s) <= 0))
1822                          break;
1823                      if ((u = (int)((c = ctl) >>> 32)) >= 0 ||
1824                          (e = (int)c) <= 0 || m < (i = e & SMASK) ||
# Line 1817 | Line 1840 | public class ForkJoinPool extends Abstra
1840      }
1841  
1842      /**
1843 +     * Signals other workers if tasks are present in hinted queue.
1844 +     *
1845 +     * @param caller the worker with the hint
1846 +     */
1847 +    private void helpSignalHint(WorkQueue caller) {
1848 +        WorkQueue[] ws; WorkQueue q, w; Thread p; long c; int h, m, u, e, i, s;
1849 +        if (caller != null && (h = caller.hint) >= 0) {
1850 +            caller.hint = -1;
1851 +            if ((u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
1852 +                (ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
1853 +                (q = ws[h & m]) != null) {
1854 +                for (int n = 2;;) { // limit to at most 2 signals
1855 +                    int idleCount = (caller.eventCount < 0) ? 0 : -1;
1856 +                    if (((s = idleCount - q.base + q.top) <= n &&
1857 +                         (n = s) <= 0) ||
1858 +                        (u = (int)((c = ctl) >>> 32)) >= 0 ||
1859 +                        (e = (int)c) <= 0 || m < (i = e & SMASK) ||
1860 +                        (w = ws[i]) == null)
1861 +                        break;
1862 +                    long nc = (((long)(w.nextWait & E_MASK)) |
1863 +                               ((long)(u + UAC_UNIT) << 32));
1864 +                    if (w.eventCount == (e | INT_SIGN) &&
1865 +                        U.compareAndSwapLong(this, CTL, c, nc)) {
1866 +                        w.hint = h;
1867 +                        w.eventCount = (e + E_SEQ) & E_MASK;
1868 +                        if ((p = w.parker) != null)
1869 +                            U.unpark(p);
1870 +                        if (--n <= 0)
1871 +                            break;
1872 +                    }
1873 +                }
1874 +            }
1875 +        }
1876 +    }
1877 +
1878 +    /**
1879       * Tries to locate and execute tasks for a stealer of the given
1880       * task, or in turn one of its stealers, Traces currentSteal ->
1881       * currentJoin links looking for a thread working on a descendant
# Line 1996 | Line 2055 | public class ForkJoinPool extends Abstra
2055              do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2056                           joiner.tryRemoveAndExec(task)); // process local tasks
2057              if (s >= 0 && (s = task.status) >= 0) {
2058 <                helpSignal(task, joiner.poolIndex, false);
2058 >                helpSignal(task, joiner.poolIndex);
2059                  if ((s = task.status) >= 0 &&
2060                      (task instanceof CountedCompleter))
2061                      s = helpComplete(task, LIFO_QUEUE);
# Line 2005 | Line 2064 | public class ForkJoinPool extends Abstra
2064                  if ((!joiner.isEmpty() ||           // try helping
2065                       (s = tryHelpStealer(joiner, task)) == 0) &&
2066                      (s = task.status) >= 0) {
2067 <                    helpSignal(task, joiner.poolIndex, false);
2067 >                    helpSignal(task, joiner.poolIndex);
2068                      if ((s = task.status) >= 0 && tryCompensate()) {
2069                          if (task.trySetSignal() && (s = task.status) >= 0) {
2070                              synchronized (task) {
# Line 2046 | Line 2105 | public class ForkJoinPool extends Abstra
2105              do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
2106                           joiner.tryRemoveAndExec(task));
2107              if (s >= 0 && (s = task.status) >= 0) {
2108 <                helpSignal(task, joiner.poolIndex, false);
2108 >                helpSignal(task, joiner.poolIndex);
2109                  if ((s = task.status) >= 0 &&
2110                      (task instanceof CountedCompleter))
2111                      s = helpComplete(task, LIFO_QUEUE);
# Line 2378 | Line 2437 | public class ForkJoinPool extends Abstra
2437                      (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
2438                      break;
2439                  if (task == null) {
2440 <                    helpSignal(root, q.poolIndex, false);
2440 >                    helpSignal(root, q.poolIndex);
2441                      if (root.status >= 0)
2442                          helpComplete(root, SHARED_QUEUE);
2443                      break;
# Line 2421 | Line 2480 | public class ForkJoinPool extends Abstra
2480                  if (t instanceof CountedCompleter)
2481                      p.externalHelpComplete(q, t);
2482                  else
2483 <                    p.helpSignal(t, q.poolIndex, false);
2483 >                    p.helpSignal(t, q.poolIndex);
2484              }
2485          }
2486      }
# Line 3036 | Line 3095 | public class ForkJoinPool extends Abstra
3095       * commenced but not yet completed.  This method may be useful for
3096       * debugging. A return of {@code true} reported a sufficient
3097       * period after shutdown may indicate that submitted tasks have
3098 <     * ignored or suppressed interruption, or are waiting for IO,
3098 >     * ignored or suppressed interruption, or are waiting for I/O,
3099       * causing this executor not to properly terminate. (See the
3100       * advisory notes for class {@link ForkJoinTask} stating that
3101       * tasks should not normally entail blocking operations.  But if

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines