--- jsr166/src/jsr166y/ForkJoinPool.java 2012/03/04 15:52:45 1.127 +++ jsr166/src/jsr166y/ForkJoinPool.java 2012/04/09 13:11:44 1.128 @@ -629,7 +629,7 @@ public class ForkJoinPool extends Abstra final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared volatile Thread parker; // == owner during call to park; else null - ForkJoinTask currentJoin; // task being joined in awaitJoin + volatile ForkJoinTask currentJoin; // task being joined in awaitJoin ForkJoinTask currentSteal; // current non-local task being executed // Heuristic padding to ameliorate unfortunate memory placements Object p00, p01, p02, p03, p04, p05, p06, p07; @@ -920,10 +920,12 @@ public class ForkJoinPool extends Abstra * any other cancelled task. Returns (true) immediately on any CAS * or consistency check failure so caller can retry. * - * @return false if no progress can be made + * @return 0 if no progress can be made, else positive + * (this unusual convention simplifies use with tryHelpStealer.) */ - final boolean tryRemoveAndExec(ForkJoinTask task) { - boolean removed = false, empty = true, progress = true; + final int tryRemoveAndExec(ForkJoinTask task) { + int stat = 1; + boolean removed = false, empty = true; ForkJoinTask[] a; int m, s, b, n; if ((a = array) != null && (m = a.length - 1) >= 0 && (n = (s = top) - (b = base)) > 0) { @@ -953,14 +955,14 @@ public class ForkJoinPool extends Abstra } if (--n == 0) { if (!empty && base == b) - progress = false; + stat = 0; break; } } } if (removed) task.doExec(); - return progress; + return stat; } /** @@ -1045,7 +1047,6 @@ public class ForkJoinPool extends Abstra ASHIFT = 31 - Integer.numberOfLeadingZeros(s); } } - /** * Per-thread records for threads that submit to pools. Currently * holds only pseudo-random seed / index that is used to choose @@ -1138,7 +1139,7 @@ public class ForkJoinPool extends Abstra * traversal parameters at the expense of sometimes blocking when * we could be helping. */ - private static final int MAX_HELP = 32; + private static final int MAX_HELP = 64; /** * Secondary time-based bound (in nanosecs) for helping attempts @@ -1148,7 +1149,7 @@ public class ForkJoinPool extends Abstra * value should roughly approximate the time required to create * and/or activate a worker thread. */ - private static final long COMPENSATION_DELAY = 100L * 1000L; // 0.1 millisec + private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec /** * Increment for seed generators. See class ThreadLocal for @@ -1545,6 +1546,7 @@ public class ForkJoinPool extends Abstra if (--j < 0) break; } + long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns; if (e < 0) // decode ctl on empty scan w.runState = -1; // pool is terminating @@ -1647,70 +1649,79 @@ public class ForkJoinPool extends Abstra * leaves hints in workers to speed up subsequent calls. The * implementation is very branchy to cope with potential * inconsistencies or loops encountering chains that are stale, - * unknown, or so long that they are likely cyclic. All of these - * cases are dealt with by just retrying by caller. + * unknown, or so long that they are likely cyclic. * * @param joiner the joining worker * @param task the task to join - * @return true if found or ran a task (and so is immediately retryable) + * @return 0 if no progress can be made, negative if task + * known complete, else positive */ - private boolean tryHelpStealer(WorkQueue joiner, ForkJoinTask task) { - WorkQueue[] ws; - int m, depth = MAX_HELP; // remaining chain depth - boolean progress = false; - if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && - task.status >= 0) { - ForkJoinTask subtask = task; // current target - outer: for (WorkQueue j = joiner;;) { - WorkQueue stealer = null; // find stealer of subtask - WorkQueue v = ws[j.stealHint & m]; // try hint - if (v != null && v.currentSteal == subtask) - stealer = v; - else { // scan - for (int i = 1; i <= m; i += 2) { - if ((v = ws[i]) != null && v.currentSteal == subtask && - v != joiner) { - stealer = v; - j.stealHint = i; // save hint - break; + private int tryHelpStealer(WorkQueue joiner, ForkJoinTask task) { + int stat = 0, steps = 0; // bound to avoid cycles + if (joiner != null && task != null) { // hoist null checks + restart: for (;;) { + ForkJoinTask subtask = task; // current target + for (WorkQueue j = joiner, v;;) { // v is stealer of subtask + WorkQueue[] ws; int m, s, h; + if ((s = task.status) < 0) { + stat = s; + break restart; + } + if ((ws = workQueues) == null || (m = ws.length - 1) <= 0) + break restart; // shutting down + if ((v = ws[h = (j.stealHint | 1) & m]) == null || + v.currentSteal != subtask) { + for (int origin = h;;) { // find stealer + if (((h = (h + 2) & m) & 15) == 1 && + (subtask.status < 0 || j.currentJoin != subtask)) + continue restart; // occasional staleness check + if ((v = ws[h]) != null && + v.currentSteal == subtask) { + j.stealHint = h; // save hint + break; + } + if (h == origin) + break restart; // cannot find stealer } } - if (stealer == null) - break; - } - - for (WorkQueue q = stealer;;) { // try to help stealer - ForkJoinTask[] a; ForkJoinTask t; int b; - if (task.status < 0) - break outer; - if ((b = q.base) - q.top < 0 && (a = q.array) != null) { - progress = true; - int i = (((a.length - 1) & b) << ASHIFT) + ABASE; - t = (ForkJoinTask)U.getObjectVolatile(a, i); - if (subtask.status < 0) // must recheck before taking - break outer; - if (t != null && - q.base == b && - U.compareAndSwapObject(a, i, t, null)) { - q.base = b + 1; - joiner.runSubtask(t); + for (;;) { // help stealer or descend to its stealer + ForkJoinTask[] a; int b; + if (subtask.status < 0) // surround probes with + continue restart; // consistency checks + if ((b = v.base) - v.top < 0 && (a = v.array) != null) { + int i = (((a.length - 1) & b) << ASHIFT) + ABASE; + ForkJoinTask t = + (ForkJoinTask)U.getObjectVolatile(a, i); + if (subtask.status < 0 || j.currentJoin != subtask || + v.currentSteal != subtask) + continue restart; // stale + stat = 1; // apparent progress + if (t != null && v.base == b && + U.compareAndSwapObject(a, i, t, null)) { + v.base = b + 1; // help stealer + joiner.runSubtask(t); + } + else if (v.base == b && ++steps == MAX_HELP) + break restart; // v apparently stalled + } + else { // empty -- try to descend + ForkJoinTask next = v.currentJoin; + if (subtask.status < 0 || j.currentJoin != subtask || + v.currentSteal != subtask) + continue restart; // stale + else if (next == null || ++steps == MAX_HELP) + break restart; // dead-end or maybe cyclic + else { + subtask = next; + j = v; + break; + } } - else if (q.base == b) - break outer; // possibly stalled - } - else { // descend - ForkJoinTask next = stealer.currentJoin; - if (--depth <= 0 || subtask.status < 0 || - next == null || next == subtask) - break outer; // stale, dead-end, or cyclic - subtask = next; - j = stealer; - break; } } } } - return progress; + return stat; } /** @@ -1811,14 +1822,15 @@ public class ForkJoinPool extends Abstra */ final int awaitJoin(WorkQueue joiner, ForkJoinTask task) { int s; - ForkJoinTask prevJoin = joiner.currentJoin; if ((s = task.status) >= 0) { + ForkJoinTask prevJoin = joiner.currentJoin; joiner.currentJoin = task; long startTime = 0L; for (int k = 0;;) { - if ((joiner.isEmpty() ? // try to help - !tryHelpStealer(joiner, task) : - !joiner.tryRemoveAndExec(task))) { + if ((s = (joiner.isEmpty() ? // try to help + tryHelpStealer(joiner, task) : + joiner.tryRemoveAndExec(task))) == 0 && + (s = task.status) >= 0) { if (k == 0) { startTime = System.nanoTime(); tryPollForAndExec(joiner, task); // check uncommon case @@ -1827,7 +1839,7 @@ public class ForkJoinPool extends Abstra System.nanoTime() - startTime >= COMPENSATION_DELAY && tryCompensate(task, null)) { - if (task.trySetSignal() && task.status >= 0) { + if (task.trySetSignal()) { synchronized (task) { if (task.status >= 0) { try { // see ForkJoinTask @@ -1844,7 +1856,7 @@ public class ForkJoinPool extends Abstra (this, CTL, c = ctl, c + AC_UNIT)); } } - if ((s = task.status) < 0) { + if (s < 0 || (s = task.status) < 0) { joiner.currentJoin = prevJoin; break; } @@ -1869,7 +1881,7 @@ public class ForkJoinPool extends Abstra while ((s = task.status) >= 0 && (joiner.isEmpty() ? tryHelpStealer(joiner, task) : - joiner.tryRemoveAndExec(task))) + joiner.tryRemoveAndExec(task)) != 0) ; return s; } @@ -1901,6 +1913,7 @@ public class ForkJoinPool extends Abstra } } + /** * Runs tasks until {@code isQuiescent()}. We piggyback on * active count ctl maintenance, but rather than blocking