ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.127 by dl, Sun Mar 4 15:52:45 2012 UTC vs.
Revision 1.128 by dl, Mon Apr 9 13:11:44 2012 UTC

# Line 629 | Line 629 | public class ForkJoinPool extends Abstra
629          final ForkJoinPool pool;   // the containing pool (may be null)
630          final ForkJoinWorkerThread owner; // owning thread or null if shared
631          volatile Thread parker;    // == owner during call to park; else null
632 <        ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
632 >        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
633          ForkJoinTask<?> currentSteal; // current non-local task being executed
634          // Heuristic padding to ameliorate unfortunate memory placements
635          Object p00, p01, p02, p03, p04, p05, p06, p07;
# Line 920 | Line 920 | public class ForkJoinPool extends Abstra
920           * any other cancelled task. Returns (true) immediately on any CAS
921           * or consistency check failure so caller can retry.
922           *
923 <         * @return false if no progress can be made
923 >         * @return 0 if no progress can be made, else positive
924 >         * (this unusual convention simplifies use with tryHelpStealer.)
925           */
926 <        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
927 <            boolean removed = false, empty = true, progress = true;
926 >        final int tryRemoveAndExec(ForkJoinTask<?> task) {
927 >            int stat = 1;
928 >            boolean removed = false, empty = true;
929              ForkJoinTask<?>[] a; int m, s, b, n;
930              if ((a = array) != null && (m = a.length - 1) >= 0 &&
931                  (n = (s = top) - (b = base)) > 0) {
# Line 953 | Line 955 | public class ForkJoinPool extends Abstra
955                      }
956                      if (--n == 0) {
957                          if (!empty && base == b)
958 <                            progress = false;
958 >                            stat = 0;
959                          break;
960                      }
961                  }
962              }
963              if (removed)
964                  task.doExec();
965 <            return progress;
965 >            return stat;
966          }
967  
968          /**
# Line 1045 | Line 1047 | public class ForkJoinPool extends Abstra
1047              ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
1048          }
1049      }
1048
1050      /**
1051       * Per-thread records for threads that submit to pools. Currently
1052       * holds only pseudo-random seed / index that is used to choose
# Line 1138 | Line 1139 | public class ForkJoinPool extends Abstra
1139       * traversal parameters at the expense of sometimes blocking when
1140       * we could be helping.
1141       */
1142 <    private static final int MAX_HELP = 32;
1142 >    private static final int MAX_HELP = 64;
1143  
1144      /**
1145       * Secondary time-based bound (in nanosecs) for helping attempts
# Line 1148 | Line 1149 | public class ForkJoinPool extends Abstra
1149       * value should roughly approximate the time required to create
1150       * and/or activate a worker thread.
1151       */
1152 <    private static final long COMPENSATION_DELAY = 100L * 1000L; // 0.1 millisec
1152 >    private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec
1153  
1154      /**
1155       * Increment for seed generators. See class ThreadLocal for
# Line 1545 | Line 1546 | public class ForkJoinPool extends Abstra
1546                  if (--j < 0)
1547                      break;
1548              }
1549 +
1550              long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns;
1551              if (e < 0)                        // decode ctl on empty scan
1552                  w.runState = -1;              // pool is terminating
# Line 1647 | Line 1649 | public class ForkJoinPool extends Abstra
1649       * leaves hints in workers to speed up subsequent calls. The
1650       * implementation is very branchy to cope with potential
1651       * inconsistencies or loops encountering chains that are stale,
1652 <     * unknown, or so long that they are likely cyclic.  All of these
1651 <     * cases are dealt with by just retrying by caller.
1652 >     * unknown, or so long that they are likely cyclic.
1653       *
1654       * @param joiner the joining worker
1655       * @param task the task to join
1656 <     * @return true if found or ran a task (and so is immediately retryable)
1656 >     * @return 0 if no progress can be made, negative if task
1657 >     * known complete, else positive
1658       */
1659 <    private boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1660 <        WorkQueue[] ws;
1661 <        int m, depth = MAX_HELP;                // remaining chain depth
1662 <        boolean progress = false;
1663 <        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 &&
1664 <            task.status >= 0) {
1665 <            ForkJoinTask<?> subtask = task;     // current target
1666 <            outer: for (WorkQueue j = joiner;;) {
1667 <                WorkQueue stealer = null;       // find stealer of subtask
1668 <                WorkQueue v = ws[j.stealHint & m]; // try hint
1669 <                if (v != null && v.currentSteal == subtask)
1670 <                    stealer = v;
1671 <                else {                          // scan
1672 <                    for (int i = 1; i <= m; i += 2) {
1673 <                        if ((v = ws[i]) != null && v.currentSteal == subtask &&
1674 <                            v != joiner) {
1675 <                            stealer = v;
1676 <                            j.stealHint = i;    // save hint
1677 <                            break;
1659 >    private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1660 >        int stat = 0, steps = 0;                    // bound to avoid cycles
1661 >        if (joiner != null && task != null) {       // hoist null checks
1662 >            restart: for (;;) {
1663 >                ForkJoinTask<?> subtask = task;     // current target
1664 >                for (WorkQueue j = joiner, v;;) {   // v is stealer of subtask
1665 >                    WorkQueue[] ws; int m, s, h;
1666 >                    if ((s = task.status) < 0) {
1667 >                        stat = s;
1668 >                        break restart;
1669 >                    }
1670 >                    if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
1671 >                        break restart;              // shutting down
1672 >                    if ((v = ws[h = (j.stealHint | 1) & m]) == null ||
1673 >                        v.currentSteal != subtask) {
1674 >                        for (int origin = h;;) {    // find stealer
1675 >                            if (((h = (h + 2) & m) & 15) == 1 &&
1676 >                                (subtask.status < 0 || j.currentJoin != subtask))
1677 >                                continue restart;   // occasional staleness check
1678 >                            if ((v = ws[h]) != null &&
1679 >                                v.currentSteal == subtask) {
1680 >                                j.stealHint = h;    // save hint
1681 >                                break;
1682 >                            }
1683 >                            if (h == origin)
1684 >                                break restart;      // cannot find stealer
1685                          }
1686                      }
1687 <                    if (stealer == null)
1688 <                        break;
1689 <                }
1690 <
1691 <                for (WorkQueue q = stealer;;) { // try to help stealer
1692 <                    ForkJoinTask[] a; ForkJoinTask<?> t; int b;
1693 <                    if (task.status < 0)
1694 <                        break outer;
1695 <                    if ((b = q.base) - q.top < 0 && (a = q.array) != null) {
1696 <                        progress = true;
1697 <                        int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1698 <                        t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1699 <                        if (subtask.status < 0) // must recheck before taking
1700 <                            break outer;
1701 <                        if (t != null &&
1702 <                            q.base == b &&
1703 <                            U.compareAndSwapObject(a, i, t, null)) {
1704 <                            q.base = b + 1;
1705 <                            joiner.runSubtask(t);
1687 >                    for (;;) { // help stealer or descend to its stealer
1688 >                        ForkJoinTask[] a;  int b;
1689 >                        if (subtask.status < 0)     // surround probes with
1690 >                            continue restart;       //   consistency checks
1691 >                        if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
1692 >                            int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1693 >                            ForkJoinTask<?> t =
1694 >                                (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1695 >                            if (subtask.status < 0 || j.currentJoin != subtask ||
1696 >                                v.currentSteal != subtask)
1697 >                                continue restart;   // stale
1698 >                            stat = 1;               // apparent progress
1699 >                            if (t != null && v.base == b &&
1700 >                                U.compareAndSwapObject(a, i, t, null)) {
1701 >                                v.base = b + 1;     // help stealer
1702 >                                joiner.runSubtask(t);
1703 >                            }
1704 >                            else if (v.base == b && ++steps == MAX_HELP)
1705 >                                break restart;      // v apparently stalled
1706 >                        }
1707 >                        else {                      // empty -- try to descend
1708 >                            ForkJoinTask<?> next = v.currentJoin;
1709 >                            if (subtask.status < 0 || j.currentJoin != subtask ||
1710 >                                v.currentSteal != subtask)
1711 >                                continue restart;   // stale
1712 >                            else if (next == null || ++steps == MAX_HELP)
1713 >                                break restart;      // dead-end or maybe cyclic
1714 >                            else {
1715 >                                subtask = next;
1716 >                                j = v;
1717 >                                break;
1718 >                            }
1719                          }
1698                        else if (q.base == b)
1699                            break outer;        // possibly stalled
1700                    }
1701                    else {                      // descend
1702                        ForkJoinTask<?> next = stealer.currentJoin;
1703                        if (--depth <= 0 || subtask.status < 0 ||
1704                            next == null || next == subtask)
1705                            break outer;        // stale, dead-end, or cyclic
1706                        subtask = next;
1707                        j = stealer;
1708                        break;
1720                      }
1721                  }
1722              }
1723          }
1724 <        return progress;
1724 >        return stat;
1725      }
1726  
1727      /**
# Line 1811 | Line 1822 | public class ForkJoinPool extends Abstra
1822       */
1823      final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
1824          int s;
1814        ForkJoinTask<?> prevJoin = joiner.currentJoin;
1825          if ((s = task.status) >= 0) {
1826 +            ForkJoinTask<?> prevJoin = joiner.currentJoin;
1827              joiner.currentJoin = task;
1828              long startTime = 0L;
1829              for (int k = 0;;) {
1830 <                if ((joiner.isEmpty() ?                  // try to help
1831 <                     !tryHelpStealer(joiner, task) :
1832 <                     !joiner.tryRemoveAndExec(task))) {
1830 >                if ((s = (joiner.isEmpty() ?           // try to help
1831 >                          tryHelpStealer(joiner, task) :
1832 >                          joiner.tryRemoveAndExec(task))) == 0 &&
1833 >                    (s = task.status) >= 0) {
1834                      if (k == 0) {
1835                          startTime = System.nanoTime();
1836                          tryPollForAndExec(joiner, task); // check uncommon case
# Line 1827 | Line 1839 | public class ForkJoinPool extends Abstra
1839                               System.nanoTime() - startTime >=
1840                               COMPENSATION_DELAY &&
1841                               tryCompensate(task, null)) {
1842 <                        if (task.trySetSignal() && task.status >= 0) {
1842 >                        if (task.trySetSignal()) {
1843                              synchronized (task) {
1844                                  if (task.status >= 0) {
1845                                      try {                // see ForkJoinTask
# Line 1844 | Line 1856 | public class ForkJoinPool extends Abstra
1856                                       (this, CTL, c = ctl, c + AC_UNIT));
1857                      }
1858                  }
1859 <                if ((s = task.status) < 0) {
1859 >                if (s < 0 || (s = task.status) < 0) {
1860                      joiner.currentJoin = prevJoin;
1861                      break;
1862                  }
# Line 1869 | Line 1881 | public class ForkJoinPool extends Abstra
1881          while ((s = task.status) >= 0 &&
1882                 (joiner.isEmpty() ?
1883                  tryHelpStealer(joiner, task) :
1884 <                joiner.tryRemoveAndExec(task)))
1884 >                joiner.tryRemoveAndExec(task)) != 0)
1885              ;
1886          return s;
1887      }
# Line 1901 | Line 1913 | public class ForkJoinPool extends Abstra
1913          }
1914      }
1915  
1916 +
1917      /**
1918       * Runs tasks until {@code isQuiescent()}. We piggyback on
1919       * active count ctl maintenance, but rather than blocking

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines