ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.127 by dl, Sun Mar 4 15:52:45 2012 UTC vs.
Revision 1.131 by jsr166, Tue Aug 14 06:00:55 2012 UTC

# Line 629 | Line 629 | public class ForkJoinPool extends Abstra
629          final ForkJoinPool pool;   // the containing pool (may be null)
630          final ForkJoinWorkerThread owner; // owning thread or null if shared
631          volatile Thread parker;    // == owner during call to park; else null
632 <        ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
632 >        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
633          ForkJoinTask<?> currentSteal; // current non-local task being executed
634          // Heuristic padding to ameliorate unfortunate memory placements
635          Object p00, p01, p02, p03, p04, p05, p06, p07;
# Line 920 | Line 920 | public class ForkJoinPool extends Abstra
920           * any other cancelled task. Returns (true) immediately on any CAS
921           * or consistency check failure so caller can retry.
922           *
923 <         * @return false if no progress can be made
923 >         * @return 0 if no progress can be made, else positive
924 >         * (this unusual convention simplifies use with tryHelpStealer.)
925           */
926 <        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
927 <            boolean removed = false, empty = true, progress = true;
926 >        final int tryRemoveAndExec(ForkJoinTask<?> task) {
927 >            int stat = 1;
928 >            boolean removed = false, empty = true;
929              ForkJoinTask<?>[] a; int m, s, b, n;
930              if ((a = array) != null && (m = a.length - 1) >= 0 &&
931                  (n = (s = top) - (b = base)) > 0) {
# Line 953 | Line 955 | public class ForkJoinPool extends Abstra
955                      }
956                      if (--n == 0) {
957                          if (!empty && base == b)
958 <                            progress = false;
958 >                            stat = 0;
959                          break;
960                      }
961                  }
962              }
963              if (removed)
964                  task.doExec();
965 <            return progress;
965 >            return stat;
966          }
967  
968          /**
# Line 1102 | Line 1104 | public class ForkJoinPool extends Abstra
1104      private static final RuntimePermission modifyThreadPermission;
1105  
1106      /**
1107 <     * Per-thread submission bookeeping. Shared across all pools
1107 >     * Per-thread submission bookkeeping. Shared across all pools
1108       * to reduce ThreadLocal pollution and because random motion
1109       * to avoid contention in one pool is likely to hold for others.
1110       */
# Line 1138 | Line 1140 | public class ForkJoinPool extends Abstra
1140       * traversal parameters at the expense of sometimes blocking when
1141       * we could be helping.
1142       */
1143 <    private static final int MAX_HELP = 32;
1143 >    private static final int MAX_HELP = 64;
1144  
1145      /**
1146       * Secondary time-based bound (in nanosecs) for helping attempts
# Line 1148 | Line 1150 | public class ForkJoinPool extends Abstra
1150       * value should roughly approximate the time required to create
1151       * and/or activate a worker thread.
1152       */
1153 <    private static final long COMPENSATION_DELAY = 100L * 1000L; // 0.1 millisec
1153 >    private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec
1154  
1155      /**
1156       * Increment for seed generators. See class ThreadLocal for
# Line 1534 | Line 1536 | public class ForkJoinPool extends Abstra
1536                      t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1537                      if (q.base == b && ec >= 0 && t != null &&
1538                          U.compareAndSwapObject(a, i, t, null)) {
1539 <                        q.base = b + 1;       // specialization of pollAt
1539 >                        if (q.top - (q.base = b + 1) > 1)
1540 >                            signalWork();    // help pushes signal
1541                          return t;
1542                      }
1543                      else if (ec < 0 || j <= m) {
# Line 1545 | Line 1548 | public class ForkJoinPool extends Abstra
1548                  if (--j < 0)
1549                      break;
1550              }
1551 +
1552              long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns;
1553              if (e < 0)                        // decode ctl on empty scan
1554                  w.runState = -1;              // pool is terminating
# Line 1647 | Line 1651 | public class ForkJoinPool extends Abstra
1651       * leaves hints in workers to speed up subsequent calls. The
1652       * implementation is very branchy to cope with potential
1653       * inconsistencies or loops encountering chains that are stale,
1654 <     * unknown, or so long that they are likely cyclic.  All of these
1651 <     * cases are dealt with by just retrying by caller.
1654 >     * unknown, or so long that they are likely cyclic.
1655       *
1656       * @param joiner the joining worker
1657       * @param task the task to join
1658 <     * @return true if found or ran a task (and so is immediately retryable)
1658 >     * @return 0 if no progress can be made, negative if task
1659 >     * known complete, else positive
1660       */
1661 <    private boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1662 <        WorkQueue[] ws;
1663 <        int m, depth = MAX_HELP;                // remaining chain depth
1664 <        boolean progress = false;
1665 <        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 &&
1666 <            task.status >= 0) {
1667 <            ForkJoinTask<?> subtask = task;     // current target
1668 <            outer: for (WorkQueue j = joiner;;) {
1669 <                WorkQueue stealer = null;       // find stealer of subtask
1670 <                WorkQueue v = ws[j.stealHint & m]; // try hint
1671 <                if (v != null && v.currentSteal == subtask)
1672 <                    stealer = v;
1673 <                else {                          // scan
1674 <                    for (int i = 1; i <= m; i += 2) {
1675 <                        if ((v = ws[i]) != null && v.currentSteal == subtask &&
1676 <                            v != joiner) {
1677 <                            stealer = v;
1678 <                            j.stealHint = i;    // save hint
1679 <                            break;
1661 >    private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1662 >        int stat = 0, steps = 0;                    // bound to avoid cycles
1663 >        if (joiner != null && task != null) {       // hoist null checks
1664 >            restart: for (;;) {
1665 >                ForkJoinTask<?> subtask = task;     // current target
1666 >                for (WorkQueue j = joiner, v;;) {   // v is stealer of subtask
1667 >                    WorkQueue[] ws; int m, s, h;
1668 >                    if ((s = task.status) < 0) {
1669 >                        stat = s;
1670 >                        break restart;
1671 >                    }
1672 >                    if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
1673 >                        break restart;              // shutting down
1674 >                    if ((v = ws[h = (j.stealHint | 1) & m]) == null ||
1675 >                        v.currentSteal != subtask) {
1676 >                        for (int origin = h;;) {    // find stealer
1677 >                            if (((h = (h + 2) & m) & 15) == 1 &&
1678 >                                (subtask.status < 0 || j.currentJoin != subtask))
1679 >                                continue restart;   // occasional staleness check
1680 >                            if ((v = ws[h]) != null &&
1681 >                                v.currentSteal == subtask) {
1682 >                                j.stealHint = h;    // save hint
1683 >                                break;
1684 >                            }
1685 >                            if (h == origin)
1686 >                                break restart;      // cannot find stealer
1687                          }
1688                      }
1689 <                    if (stealer == null)
1690 <                        break;
1691 <                }
1692 <
1693 <                for (WorkQueue q = stealer;;) { // try to help stealer
1694 <                    ForkJoinTask[] a; ForkJoinTask<?> t; int b;
1695 <                    if (task.status < 0)
1696 <                        break outer;
1697 <                    if ((b = q.base) - q.top < 0 && (a = q.array) != null) {
1698 <                        progress = true;
1699 <                        int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1700 <                        t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1701 <                        if (subtask.status < 0) // must recheck before taking
1702 <                            break outer;
1703 <                        if (t != null &&
1704 <                            q.base == b &&
1705 <                            U.compareAndSwapObject(a, i, t, null)) {
1706 <                            q.base = b + 1;
1707 <                            joiner.runSubtask(t);
1689 >                    for (;;) { // help stealer or descend to its stealer
1690 >                        ForkJoinTask[] a;  int b;
1691 >                        if (subtask.status < 0)     // surround probes with
1692 >                            continue restart;       //   consistency checks
1693 >                        if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
1694 >                            int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1695 >                            ForkJoinTask<?> t =
1696 >                                (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1697 >                            if (subtask.status < 0 || j.currentJoin != subtask ||
1698 >                                v.currentSteal != subtask)
1699 >                                continue restart;   // stale
1700 >                            stat = 1;               // apparent progress
1701 >                            if (t != null && v.base == b &&
1702 >                                U.compareAndSwapObject(a, i, t, null)) {
1703 >                                v.base = b + 1;     // help stealer
1704 >                                joiner.runSubtask(t);
1705 >                            }
1706 >                            else if (v.base == b && ++steps == MAX_HELP)
1707 >                                break restart;      // v apparently stalled
1708 >                        }
1709 >                        else {                      // empty -- try to descend
1710 >                            ForkJoinTask<?> next = v.currentJoin;
1711 >                            if (subtask.status < 0 || j.currentJoin != subtask ||
1712 >                                v.currentSteal != subtask)
1713 >                                continue restart;   // stale
1714 >                            else if (next == null || ++steps == MAX_HELP)
1715 >                                break restart;      // dead-end or maybe cyclic
1716 >                            else {
1717 >                                subtask = next;
1718 >                                j = v;
1719 >                                break;
1720 >                            }
1721                          }
1698                        else if (q.base == b)
1699                            break outer;        // possibly stalled
1700                    }
1701                    else {                      // descend
1702                        ForkJoinTask<?> next = stealer.currentJoin;
1703                        if (--depth <= 0 || subtask.status < 0 ||
1704                            next == null || next == subtask)
1705                            break outer;        // stale, dead-end, or cyclic
1706                        subtask = next;
1707                        j = stealer;
1708                        break;
1722                      }
1723                  }
1724              }
1725          }
1726 <        return progress;
1726 >        return stat;
1727      }
1728  
1729      /**
# Line 1811 | Line 1824 | public class ForkJoinPool extends Abstra
1824       */
1825      final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
1826          int s;
1814        ForkJoinTask<?> prevJoin = joiner.currentJoin;
1827          if ((s = task.status) >= 0) {
1828 +            ForkJoinTask<?> prevJoin = joiner.currentJoin;
1829              joiner.currentJoin = task;
1830              long startTime = 0L;
1831              for (int k = 0;;) {
1832 <                if ((joiner.isEmpty() ?                  // try to help
1833 <                     !tryHelpStealer(joiner, task) :
1834 <                     !joiner.tryRemoveAndExec(task))) {
1832 >                if ((s = (joiner.isEmpty() ?           // try to help
1833 >                          tryHelpStealer(joiner, task) :
1834 >                          joiner.tryRemoveAndExec(task))) == 0 &&
1835 >                    (s = task.status) >= 0) {
1836                      if (k == 0) {
1837                          startTime = System.nanoTime();
1838                          tryPollForAndExec(joiner, task); // check uncommon case
# Line 1827 | Line 1841 | public class ForkJoinPool extends Abstra
1841                               System.nanoTime() - startTime >=
1842                               COMPENSATION_DELAY &&
1843                               tryCompensate(task, null)) {
1844 <                        if (task.trySetSignal() && task.status >= 0) {
1844 >                        if (task.trySetSignal()) {
1845                              synchronized (task) {
1846                                  if (task.status >= 0) {
1847                                      try {                // see ForkJoinTask
# Line 1844 | Line 1858 | public class ForkJoinPool extends Abstra
1858                                       (this, CTL, c = ctl, c + AC_UNIT));
1859                      }
1860                  }
1861 <                if ((s = task.status) < 0) {
1861 >                if (s < 0 || (s = task.status) < 0) {
1862                      joiner.currentJoin = prevJoin;
1863                      break;
1864                  }
# Line 1869 | Line 1883 | public class ForkJoinPool extends Abstra
1883          while ((s = task.status) >= 0 &&
1884                 (joiner.isEmpty() ?
1885                  tryHelpStealer(joiner, task) :
1886 <                joiner.tryRemoveAndExec(task)))
1886 >                joiner.tryRemoveAndExec(task)) != 0)
1887              ;
1888          return s;
1889      }
# Line 1901 | Line 1915 | public class ForkJoinPool extends Abstra
1915          }
1916      }
1917  
1918 +
1919      /**
1920       * Runs tasks until {@code isQuiescent()}. We piggyback on
1921       * active count ctl maintenance, but rather than blocking

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines