ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166y/ForkJoinPool.java
(Generate patch)

Comparing jsr166/src/jsr166y/ForkJoinPool.java (file contents):
Revision 1.123 by dl, Mon Feb 20 18:20:06 2012 UTC vs.
Revision 1.133 by jsr166, Sun Oct 21 04:14:31 2012 UTC

# Line 413 | Line 413 | public class ForkJoinPool extends Abstra
413       * unblocked threads to the point that we know they are available)
414       * leading to more situations requiring more threads, and so
415       * on. This aspect of control can be seen as an (analytically
416 <     * intractible) game with an opponent that may choose the worst
416 >     * intractable) game with an opponent that may choose the worst
417       * (for us) active thread to stall at any time.  We take several
418       * precautions to bound losses (and thus bound gains), mainly in
419       * methods tryCompensate and awaitJoin: (1) We only try
# Line 629 | Line 629 | public class ForkJoinPool extends Abstra
629          final ForkJoinPool pool;   // the containing pool (may be null)
630          final ForkJoinWorkerThread owner; // owning thread or null if shared
631          volatile Thread parker;    // == owner during call to park; else null
632 <        ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
632 >        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
633          ForkJoinTask<?> currentSteal; // current non-local task being executed
634          // Heuristic padding to ameliorate unfortunate memory placements
635          Object p00, p01, p02, p03, p04, p05, p06, p07;
# Line 721 | Line 721 | public class ForkJoinPool extends Abstra
721           * version of this method because it is never needed.)
722           */
723          final ForkJoinTask<?> pop() {
724 <            ForkJoinTask<?> t; int m;
725 <            ForkJoinTask<?>[] a = array;
726 <            if (a != null && (m = a.length - 1) >= 0) {
724 >            ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
725 >            if ((a = array) != null && (m = a.length - 1) >= 0) {
726                  for (int s; (s = top - 1) - base >= 0;) {
727 <                    int j = ((m & s) << ASHIFT) + ABASE;
728 <                    if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) == null)
727 >                    long j = ((m & s) << ASHIFT) + ABASE;
728 >                    if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
729                          break;
730                      if (U.compareAndSwapObject(a, j, t, null)) {
731                          top = s;
# Line 830 | Line 829 | public class ForkJoinPool extends Abstra
829          }
830  
831          /**
833         * If present, removes from queue and executes the given task, or
834         * any other cancelled task. Returns (true) immediately on any CAS
835         * or consistency check failure so caller can retry.
836         *
837         * @return false if no progress can be made
838         */
839        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
840            boolean removed = false, empty = true, progress = true;
841            ForkJoinTask<?>[] a; int m, s, b, n;
842            if ((a = array) != null && (m = a.length - 1) >= 0 &&
843                (n = (s = top) - (b = base)) > 0) {
844                for (ForkJoinTask<?> t;;) {           // traverse from s to b
845                    int j = ((--s & m) << ASHIFT) + ABASE;
846                    t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
847                    if (t == null)                    // inconsistent length
848                        break;
849                    else if (t == task) {
850                        if (s + 1 == top) {           // pop
851                            if (!U.compareAndSwapObject(a, j, task, null))
852                                break;
853                            top = s;
854                            removed = true;
855                        }
856                        else if (base == b)           // replace with proxy
857                            removed = U.compareAndSwapObject(a, j, task,
858                                                             new EmptyTask());
859                        break;
860                    }
861                    else if (t.status >= 0)
862                        empty = false;
863                    else if (s + 1 == top) {          // pop and throw away
864                        if (U.compareAndSwapObject(a, j, t, null))
865                            top = s;
866                        break;
867                    }
868                    if (--n == 0) {
869                        if (!empty && base == b)
870                            progress = false;
871                        break;
872                    }
873                }
874            }
875            if (removed)
876                task.doExec();
877            return progress;
878        }
879
880        /**
832           * Initializes or doubles the capacity of array. Call either
833           * by owner or with lock held -- it is OK for base, but not
834           * top, to move while resizings are in progress.
# Line 939 | Line 890 | public class ForkJoinPool extends Abstra
890          // Execution methods
891  
892          /**
893 <         * Removes and runs tasks until empty, using local mode
894 <         * ordering. Normally called only after checking for apparent
895 <         * non-emptiness.
896 <         */
897 <        final void runLocalTasks() {
898 <            // hoist checks from repeated pop/poll
899 <            ForkJoinTask<?>[] a; int m;
900 <            if ((a = array) != null && (m = a.length - 1) >= 0) {
901 <                if (mode == 0) {
902 <                    for (int s; (s = top - 1) - base >= 0;) {
903 <                        int j = ((m & s) << ASHIFT) + ABASE;
904 <                        ForkJoinTask<?> t =
905 <                            (ForkJoinTask<?>)U.getObjectVolatile(a, j);
955 <                        if (t != null) {
956 <                            if (U.compareAndSwapObject(a, j, t, null)) {
957 <                                top = s;
958 <                                t.doExec();
959 <                            }
960 <                        }
961 <                        else
962 <                            break;
963 <                    }
893 >         * Pops and runs tasks until empty.
894 >         */
895 >        private void popAndExecAll() {
896 >            // A bit faster than repeated pop calls
897 >            ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t;
898 >            while ((a = array) != null && (m = a.length - 1) >= 0 &&
899 >                   (s = top - 1) - base >= 0 &&
900 >                   (t = ((ForkJoinTask<?>)
901 >                         U.getObject(a, j = ((m & s) << ASHIFT) + ABASE)))
902 >                   != null) {
903 >                if (U.compareAndSwapObject(a, j, t, null)) {
904 >                    top = s;
905 >                    t.doExec();
906                  }
907 <                else {
908 <                    for (int b; (b = base) - top < 0;) {
909 <                        int j = ((m & b) << ASHIFT) + ABASE;
910 <                        ForkJoinTask<?> t =
911 <                            (ForkJoinTask<?>)U.getObjectVolatile(a, j);
912 <                        if (t != null) {
913 <                            if (base == b &&
914 <                                U.compareAndSwapObject(a, j, t, null)) {
915 <                                base = b + 1;
916 <                                t.doExec();
917 <                            }
918 <                        } else if (base == b) {
919 <                            if (b + 1 == top)
907 >            }
908 >        }
909 >
910 >        /**
911 >         * Polls and runs tasks until empty.
912 >         */
913 >        private void pollAndExecAll() {
914 >            for (ForkJoinTask<?> t; (t = poll()) != null;)
915 >                t.doExec();
916 >        }
917 >
918 >        /**
919 >         * If present, removes from queue and executes the given task, or
920 >         * any other cancelled task. Returns (true) immediately on any CAS
921 >         * or consistency check failure so caller can retry.
922 >         *
923 >         * @return 0 if no progress can be made, else positive
924 >         * (this unusual convention simplifies use with tryHelpStealer.)
925 >         */
926 >        final int tryRemoveAndExec(ForkJoinTask<?> task) {
927 >            int stat = 1;
928 >            boolean removed = false, empty = true;
929 >            ForkJoinTask<?>[] a; int m, s, b, n;
930 >            if ((a = array) != null && (m = a.length - 1) >= 0 &&
931 >                (n = (s = top) - (b = base)) > 0) {
932 >                for (ForkJoinTask<?> t;;) {           // traverse from s to b
933 >                    int j = ((--s & m) << ASHIFT) + ABASE;
934 >                    t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
935 >                    if (t == null)                    // inconsistent length
936 >                        break;
937 >                    else if (t == task) {
938 >                        if (s + 1 == top) {           // pop
939 >                            if (!U.compareAndSwapObject(a, j, task, null))
940                                  break;
941 <                            Thread.yield(); // wait for lagging update
941 >                            top = s;
942 >                            removed = true;
943                          }
944 +                        else if (base == b)           // replace with proxy
945 +                            removed = U.compareAndSwapObject(a, j, task,
946 +                                                             new EmptyTask());
947 +                        break;
948 +                    }
949 +                    else if (t.status >= 0)
950 +                        empty = false;
951 +                    else if (s + 1 == top) {          // pop and throw away
952 +                        if (U.compareAndSwapObject(a, j, t, null))
953 +                            top = s;
954 +                        break;
955 +                    }
956 +                    if (--n == 0) {
957 +                        if (!empty && base == b)
958 +                            stat = 0;
959 +                        break;
960                      }
961                  }
962              }
963 +            if (removed)
964 +                task.doExec();
965 +            return stat;
966          }
967  
968          /**
969           * Executes a top-level task and any local tasks remaining
970           * after execution.
989         *
990         * @return true unless terminating
971           */
972 <        final boolean runTask(ForkJoinTask<?> t) {
993 <            boolean alive = true;
972 >        final void runTask(ForkJoinTask<?> t) {
973              if (t != null) {
974                  currentSteal = t;
975                  t.doExec();
976 <                if (top != base)        // conservative guard
977 <                    runLocalTasks();
976 >                if (top != base) {       // process remaining local tasks
977 >                    if (mode == 0)
978 >                        popAndExecAll();
979 >                    else
980 >                        pollAndExecAll();
981 >                }
982                  ++nsteals;
983                  currentSteal = null;
984              }
1002            else if (runState < 0)      // terminating
1003                alive = false;
1004            return alive;
985          }
986  
987          /**
# Line 1124 | Line 1104 | public class ForkJoinPool extends Abstra
1104      private static final RuntimePermission modifyThreadPermission;
1105  
1106      /**
1107 <     * Per-thread submission bookeeping. Shared across all pools
1107 >     * Per-thread submission bookkeeping. Shared across all pools
1108       * to reduce ThreadLocal pollution and because random motion
1109       * to avoid contention in one pool is likely to hold for others.
1110       */
# Line 1160 | Line 1140 | public class ForkJoinPool extends Abstra
1140       * traversal parameters at the expense of sometimes blocking when
1141       * we could be helping.
1142       */
1143 <    private static final int MAX_HELP = 32;
1143 >    private static final int MAX_HELP = 64;
1144  
1145      /**
1146       * Secondary time-based bound (in nanosecs) for helping attempts
# Line 1170 | Line 1150 | public class ForkJoinPool extends Abstra
1150       * value should roughly approximate the time required to create
1151       * and/or activate a worker thread.
1152       */
1153 <    private static final long COMPENSATION_DELAY = 100L * 1000L; // 0.1 millisec
1153 >    private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec
1154  
1155      /**
1156       * Increment for seed generators. See class ThreadLocal for
# Line 1321 | Line 1301 | public class ForkJoinPool extends Abstra
1301       *
1302       * @param w the worker's queue
1303       */
1304 +
1305      final void registerWorker(WorkQueue w) {
1306          Mutex lock = this.lock;
1307          lock.lock();
1308          try {
1309              WorkQueue[] ws = workQueues;
1310              if (w != null && ws != null) {          // skip on shutdown/failure
1311 <                int rs, n;
1331 <                while ((n = ws.length) <            // ensure can hold total
1332 <                       (parallelism + (short)(ctl >>> TC_SHIFT) << 1))
1333 <                    workQueues = ws = Arrays.copyOf(ws, n << 1);
1334 <                int m = n - 1;
1311 >                int rs, n = ws.length, m = n - 1;
1312                  int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence
1313                  w.seed = (s == 0) ? 1 : s;          // ensure non-zero seed
1314                  int r = (s << 1) | 1;               // use odd-numbered indices
1315 <                while (ws[r &= m] != null)          // step by approx half size
1316 <                    r += ((n >>> 1) & SQMASK) + 2;
1315 >                if (ws[r &= m] != null) {           // collision
1316 >                    int probes = 0;                 // step by approx half size
1317 >                    int step = (n <= 4) ? 2 : ((n >>> 1) & SQMASK) + 2;
1318 >                    while (ws[r = (r + step) & m] != null) {
1319 >                        if (++probes >= n) {
1320 >                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
1321 >                            m = n - 1;
1322 >                            probes = 0;
1323 >                        }
1324 >                    }
1325 >                }
1326                  w.eventCount = w.poolIndex = r;     // establish before recording
1327                  ws[r] = w;                          // also update seq
1328                  runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN);
# Line 1399 | Line 1385 | public class ForkJoinPool extends Abstra
1385       * range). If no queue exists at the index, one is created.  If
1386       * the queue is busy, another index is randomly chosen. The
1387       * submitMask bounds the effective number of queues to the
1388 <     * (nearest poswer of two for) parallelism level.
1388 >     * (nearest power of two for) parallelism level.
1389       *
1390       * @param task the task. Caller must ensure non-null.
1391       */
# Line 1483 | Line 1469 | public class ForkJoinPool extends Abstra
1469          }
1470      }
1471  
1486
1472      // Scanning for tasks
1473  
1474      /**
# Line 1491 | Line 1476 | public class ForkJoinPool extends Abstra
1476       */
1477      final void runWorker(WorkQueue w) {
1478          w.growArray(false);         // initialize queue array in this thread
1479 <        do {} while (w.runTask(scan(w)));
1479 >        do { w.runTask(scan(w)); } while (w.runState >= 0);
1480      }
1481  
1482      /**
# Line 1534 | Line 1519 | public class ForkJoinPool extends Abstra
1519       * awaiting signal,
1520       *
1521       * @param w the worker (via its WorkQueue)
1522 <     * @return a task or null of none found
1522 >     * @return a task or null if none found
1523       */
1524      private final ForkJoinTask<?> scan(WorkQueue w) {
1525          WorkQueue[] ws;                       // first update random seed
# Line 1551 | Line 1536 | public class ForkJoinPool extends Abstra
1536                      t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1537                      if (q.base == b && ec >= 0 && t != null &&
1538                          U.compareAndSwapObject(a, i, t, null)) {
1539 <                        q.base = b + 1;       // specialization of pollAt
1539 >                        if (q.top - (q.base = b + 1) > 1)
1540 >                            signalWork();    // help pushes signal
1541                          return t;
1542                      }
1543 <                    else if ((t != null || b + 1 != q.top) &&
1558 <                             (ec < 0 || j <= m)) {
1543 >                    else if (ec < 0 || j <= m) {
1544                          rs = 0;               // mark scan as imcomplete
1545                          break;                // caller can retry after release
1546                      }
# Line 1563 | Line 1548 | public class ForkJoinPool extends Abstra
1548                  if (--j < 0)
1549                      break;
1550              }
1551 +
1552              long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns;
1553              if (e < 0)                        // decode ctl on empty scan
1554                  w.runState = -1;              // pool is terminating
# Line 1588 | Line 1574 | public class ForkJoinPool extends Abstra
1574                  else {
1575                      if ((ns = w.nsteals) != 0) {
1576                          w.nsteals = 0;        // set rescans if ran task
1577 <                        w.rescans = (a > 0)? 0 : a + parallelism;
1577 >                        w.rescans = (a > 0) ? 0 : a + parallelism;
1578                          w.totalSteals += ns;
1579                      }
1580                      if (a == 1 - parallelism) // quiescent
# Line 1630 | Line 1616 | public class ForkJoinPool extends Abstra
1616       */
1617      private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
1618          if (w.eventCount < 0 && !tryTerminate(false, false) &&
1619 <            (int)prevCtl != 0 && ctl == currentCtl) {
1619 >            (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) {
1620              Thread wt = Thread.currentThread();
1621              Thread.yield();            // yield before block
1622              while (ctl == currentCtl) {
# Line 1665 | Line 1651 | public class ForkJoinPool extends Abstra
1651       * leaves hints in workers to speed up subsequent calls. The
1652       * implementation is very branchy to cope with potential
1653       * inconsistencies or loops encountering chains that are stale,
1654 <     * unknown, or so long that they are likely cyclic.  All of these
1669 <     * cases are dealt with by just retrying by caller.
1654 >     * unknown, or so long that they are likely cyclic.
1655       *
1656       * @param joiner the joining worker
1657       * @param task the task to join
1658 <     * @return true if found or ran a task (and so is immediately retryable)
1658 >     * @return 0 if no progress can be made, negative if task
1659 >     * known complete, else positive
1660       */
1661 <    private boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1662 <        WorkQueue[] ws;
1663 <        int m, depth = MAX_HELP;                // remaining chain depth
1664 <        boolean progress = false;
1665 <        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 &&
1666 <            task.status >= 0) {
1667 <            ForkJoinTask<?> subtask = task;     // current target
1668 <            outer: for (WorkQueue j = joiner;;) {
1669 <                WorkQueue stealer = null;       // find stealer of subtask
1670 <                WorkQueue v = ws[j.stealHint & m]; // try hint
1671 <                if (v != null && v.currentSteal == subtask)
1672 <                    stealer = v;
1673 <                else {                          // scan
1674 <                    for (int i = 1; i <= m; i += 2) {
1675 <                        if ((v = ws[i]) != null && v.currentSteal == subtask &&
1676 <                            v != joiner) {
1677 <                            stealer = v;
1678 <                            j.stealHint = i;    // save hint
1679 <                            break;
1661 >    private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
1662 >        int stat = 0, steps = 0;                    // bound to avoid cycles
1663 >        if (joiner != null && task != null) {       // hoist null checks
1664 >            restart: for (;;) {
1665 >                ForkJoinTask<?> subtask = task;     // current target
1666 >                for (WorkQueue j = joiner, v;;) {   // v is stealer of subtask
1667 >                    WorkQueue[] ws; int m, s, h;
1668 >                    if ((s = task.status) < 0) {
1669 >                        stat = s;
1670 >                        break restart;
1671 >                    }
1672 >                    if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
1673 >                        break restart;              // shutting down
1674 >                    if ((v = ws[h = (j.stealHint | 1) & m]) == null ||
1675 >                        v.currentSteal != subtask) {
1676 >                        for (int origin = h;;) {    // find stealer
1677 >                            if (((h = (h + 2) & m) & 15) == 1 &&
1678 >                                (subtask.status < 0 || j.currentJoin != subtask))
1679 >                                continue restart;   // occasional staleness check
1680 >                            if ((v = ws[h]) != null &&
1681 >                                v.currentSteal == subtask) {
1682 >                                j.stealHint = h;    // save hint
1683 >                                break;
1684 >                            }
1685 >                            if (h == origin)
1686 >                                break restart;      // cannot find stealer
1687                          }
1688                      }
1689 <                    if (stealer == null)
1690 <                        break;
1691 <                }
1692 <
1693 <                for (WorkQueue q = stealer;;) { // try to help stealer
1694 <                    ForkJoinTask[] a; ForkJoinTask<?> t; int b;
1695 <                    if (task.status < 0)
1696 <                        break outer;
1697 <                    if ((b = q.base) - q.top < 0 && (a = q.array) != null) {
1698 <                        progress = true;
1699 <                        int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1700 <                        t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1701 <                        if (subtask.status < 0) // must recheck before taking
1702 <                            break outer;
1703 <                        if (t != null &&
1704 <                            q.base == b &&
1705 <                            U.compareAndSwapObject(a, i, t, null)) {
1706 <                            q.base = b + 1;
1707 <                            joiner.runSubtask(t);
1689 >                    for (;;) { // help stealer or descend to its stealer
1690 >                        ForkJoinTask[] a;  int b;
1691 >                        if (subtask.status < 0)     // surround probes with
1692 >                            continue restart;       //   consistency checks
1693 >                        if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
1694 >                            int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
1695 >                            ForkJoinTask<?> t =
1696 >                                (ForkJoinTask<?>)U.getObjectVolatile(a, i);
1697 >                            if (subtask.status < 0 || j.currentJoin != subtask ||
1698 >                                v.currentSteal != subtask)
1699 >                                continue restart;   // stale
1700 >                            stat = 1;               // apparent progress
1701 >                            if (t != null && v.base == b &&
1702 >                                U.compareAndSwapObject(a, i, t, null)) {
1703 >                                v.base = b + 1;     // help stealer
1704 >                                joiner.runSubtask(t);
1705 >                            }
1706 >                            else if (v.base == b && ++steps == MAX_HELP)
1707 >                                break restart;      // v apparently stalled
1708 >                        }
1709 >                        else {                      // empty -- try to descend
1710 >                            ForkJoinTask<?> next = v.currentJoin;
1711 >                            if (subtask.status < 0 || j.currentJoin != subtask ||
1712 >                                v.currentSteal != subtask)
1713 >                                continue restart;   // stale
1714 >                            else if (next == null || ++steps == MAX_HELP)
1715 >                                break restart;      // dead-end or maybe cyclic
1716 >                            else {
1717 >                                subtask = next;
1718 >                                j = v;
1719 >                                break;
1720 >                            }
1721                          }
1716                        else if (q.base == b)
1717                            break outer;        // possibly stalled
1718                    }
1719                    else {                      // descend
1720                        ForkJoinTask<?> next = stealer.currentJoin;
1721                        if (--depth <= 0 || subtask.status < 0 ||
1722                            next == null || next == subtask)
1723                            break outer;        // stale, dead-end, or cyclic
1724                        subtask = next;
1725                        j = stealer;
1726                        break;
1722                      }
1723                  }
1724              }
1725          }
1726 <        return progress;
1726 >        return stat;
1727      }
1728  
1729      /**
# Line 1757 | Line 1752 | public class ForkJoinPool extends Abstra
1752       * adds a new thread if no idle workers are available and either
1753       * pool would become completely starved or: (at least half
1754       * starved, and fewer than 50% spares exist, and there is at least
1755 <     * one task apparently available). Even though the availablity
1755 >     * one task apparently available). Even though the availability
1756       * check requires a full scan, it is worthwhile in reducing false
1757       * alarms.
1758       *
1759 <     * @param task if nonnull, a task being waited for
1760 <     * @param blocker if nonnull, a blocker being waited for
1759 >     * @param task if non-null, a task being waited for
1760 >     * @param blocker if non-null, a blocker being waited for
1761       * @return true if the caller can block, else should recheck and retry
1762       */
1763      final boolean tryCompensate(ForkJoinTask<?> task, ManagedBlocker blocker) {
# Line 1821 | Line 1816 | public class ForkJoinPool extends Abstra
1816      }
1817  
1818      /**
1819 <     * Helps and/or blocks until the given task is done
1819 >     * Helps and/or blocks until the given task is done.
1820       *
1821       * @param joiner the joining worker
1822       * @param task the task
1823       * @return task status on exit
1824       */
1825      final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
1826 <        ForkJoinTask<?> prevJoin = joiner.currentJoin;
1827 <        joiner.currentJoin = task;
1828 <        long startTime = 0L;
1829 <        for (int k = 0, s; ; ++k) {
1830 <            if ((joiner.isEmpty() ?                  // try to help
1831 <                 !tryHelpStealer(joiner, task) :
1832 <                 !joiner.tryRemoveAndExec(task))) {
1833 <                if (k == 0) {
1834 <                    startTime = System.nanoTime();
1835 <                    tryPollForAndExec(joiner, task); // check uncommon case
1836 <                }
1837 <                else if ((k & (MAX_HELP - 1)) == 0 &&
1838 <                         System.nanoTime() - startTime >= COMPENSATION_DELAY &&
1839 <                         tryCompensate(task, null)) {
1840 <                    if (task.trySetSignal() && task.status >= 0) {
1841 <                        synchronized (task) {
1842 <                            if (task.status >= 0) {
1843 <                                try {                // see ForkJoinTask
1844 <                                    task.wait();     //  for explanation
1845 <                                } catch (InterruptedException ie) {
1826 >        int s;
1827 >        if ((s = task.status) >= 0) {
1828 >            ForkJoinTask<?> prevJoin = joiner.currentJoin;
1829 >            joiner.currentJoin = task;
1830 >            long startTime = 0L;
1831 >            for (int k = 0;;) {
1832 >                if ((s = (joiner.isEmpty() ?           // try to help
1833 >                          tryHelpStealer(joiner, task) :
1834 >                          joiner.tryRemoveAndExec(task))) == 0 &&
1835 >                    (s = task.status) >= 0) {
1836 >                    if (k == 0) {
1837 >                        startTime = System.nanoTime();
1838 >                        tryPollForAndExec(joiner, task); // check uncommon case
1839 >                    }
1840 >                    else if ((k & (MAX_HELP - 1)) == 0 &&
1841 >                             System.nanoTime() - startTime >=
1842 >                             COMPENSATION_DELAY &&
1843 >                             tryCompensate(task, null)) {
1844 >                        if (task.trySetSignal()) {
1845 >                            synchronized (task) {
1846 >                                if (task.status >= 0) {
1847 >                                    try {                // see ForkJoinTask
1848 >                                        task.wait();     //  for explanation
1849 >                                    } catch (InterruptedException ie) {
1850 >                                    }
1851                                  }
1852 +                                else
1853 +                                    task.notifyAll();
1854                              }
1853                            else
1854                                task.notifyAll();
1855                          }
1856 +                        long c;                          // re-activate
1857 +                        do {} while (!U.compareAndSwapLong
1858 +                                     (this, CTL, c = ctl, c + AC_UNIT));
1859                      }
1857                    long c;                          // re-activate
1858                    do {} while (!U.compareAndSwapLong
1859                                 (this, CTL, c = ctl, c + AC_UNIT));
1860                  }
1861 +                if (s < 0 || (s = task.status) < 0) {
1862 +                    joiner.currentJoin = prevJoin;
1863 +                    break;
1864 +                }
1865 +                else if ((k++ & (MAX_HELP - 1)) == MAX_HELP >>> 1)
1866 +                    Thread.yield();                     // for politeness
1867              }
1862            if ((s = task.status) < 0) {
1863                joiner.currentJoin = prevJoin;
1864                return s;
1865            }
1866            else if ((k & (MAX_HELP - 1)) == MAX_HELP >>> 1)
1867                Thread.yield();                     // for politeness
1868          }
1869 +        return s;
1870      }
1871  
1872      /**
# Line 1882 | Line 1883 | public class ForkJoinPool extends Abstra
1883          while ((s = task.status) >= 0 &&
1884                 (joiner.isEmpty() ?
1885                  tryHelpStealer(joiner, task) :
1886 <                joiner.tryRemoveAndExec(task)))
1886 >                joiner.tryRemoveAndExec(task)) != 0)
1887              ;
1888          return s;
1889      }
# Line 1914 | Line 1915 | public class ForkJoinPool extends Abstra
1915          }
1916      }
1917  
1918 +
1919      /**
1920       * Runs tasks until {@code isQuiescent()}. We piggyback on
1921       * active count ctl maintenance, but rather than blocking
# Line 1922 | Line 1924 | public class ForkJoinPool extends Abstra
1924       */
1925      final void helpQuiescePool(WorkQueue w) {
1926          for (boolean active = true;;) {
1927 <            if (w.base - w.top < 0)
1928 <                w.runLocalTasks();  // exhaust local queue
1927 >            ForkJoinTask<?> localTask; // exhaust local queue
1928 >            while ((localTask = w.nextLocalTask()) != null)
1929 >                localTask.doExec();
1930              WorkQueue q = findNonEmptyStealQueue(w);
1931              if (q != null) {
1932                  ForkJoinTask<?> t; int b;

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines