--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/07/23 14:09:17 1.37 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2010/07/24 20:28:18 1.39 @@ -97,19 +97,20 @@ public class ForkJoinWorkerThread extend * technique for implementing efficient futures" SIGPLAN Notices, * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs * in that: (1) We only maintain dependency links across workers - * upon steals, rather than maintain per-task bookkeeping. This - * may require a linear scan of workers array to locate stealers, - * but usually doesn't because stealers leave hints (that may - * become stale/wrong) of where to locate the kathem. This - * isolates cost to when it is needed, rather than adding to - * per-task overhead. (2) It is "shallow", ignoring nesting and - * potentially cyclic mutual steals. (3) It is intentionally - * racy: field currentJoin is updated only while actively joining, - * which means that we could miss links in the chain during - * long-lived tasks, GC stalls etc. (4) We bound the number of - * attempts to find work (see MAX_HELP_DEPTH) and fall back to - * suspending the worker and if necessary replacing it with a - * spare (see ForkJoinPool.tryAwaitJoin). + * upon steals, rather than use per-task bookkeeping. This may + * require a linear scan of workers array to locate stealers, but + * usually doesn't because stealers leave hints (that may become + * stale/wrong) of where to locate them. This isolates cost to + * when it is needed, rather than adding to per-task overhead. + * (2) It is "shallow", ignoring nesting and potentially cyclic + * mutual steals. (3) It is intentionally racy: field currentJoin + * is updated only while actively joining, which means that we + * miss links in the chain during long-lived tasks, GC stalls etc + * (which is OK since blocking in such cases is usually a good + * idea). (4) We bound the number of attempts to find work (see + * MAX_HELP_DEPTH) and fall back to suspending the worker and if + * necessary replacing it with a spare (see + * ForkJoinPool.tryAwaitJoin). * * Efficient implementation of these algorithms currently relies * on an uncomfortable amount of "Unsafe" mechanics. To maintain @@ -256,7 +257,6 @@ public class ForkJoinWorkerThread extend */ private int seed; - /** * Activity status. When true, this worker is considered active. * Accessed directly by pool. Must be false upon construction. @@ -265,8 +265,7 @@ public class ForkJoinWorkerThread extend /** * True if use local fifo, not default lifo, for local polling. - * Shadows value from ForkJoinPool, which resets it if changed - * pool-wide. + * Shadows value from ForkJoinPool. */ private final boolean locallyFifo; @@ -321,9 +320,9 @@ public class ForkJoinWorkerThread extend */ final void start(int poolIndex, UncaughtExceptionHandler ueh) { this.poolIndex = poolIndex; + setDaemon(true); if (ueh != null) setUncaughtExceptionHandler(ueh); - setDaemon(true); start(); } @@ -761,7 +760,7 @@ public class ForkJoinWorkerThread extend } /** - * Instrumented version of park used by ForkJoinPool.awaitEvent + * Instrumented version of park used by ForkJoinPool.eventSync */ final void doPark() { ++parkCount; @@ -802,20 +801,21 @@ public class ForkJoinWorkerThread extend s | SUSPENDED)) break; } + int pc = pool.parallelism; + pool.accumulateStealCount(this); boolean timed; long nanos; long startTime; - if (poolIndex < pool.parallelism) { + if (poolIndex < pc) { // untimed wait for core threads timed = false; nanos = 0L; startTime = 0L; } - else { + else { // timed wait for added threads timed = true; nanos = SPARE_KEEPALIVE_NANOS; startTime = System.nanoTime(); } - pool.accumulateStealCount(this); lastEventCount = 0; // reset upon resume interrupted(); // clear/ignore interrupts while ((runState & SUSPENDED) != 0) { @@ -905,8 +905,12 @@ public class ForkJoinWorkerThread extend * @return a task, if available */ final ForkJoinTask pollTask() { - ForkJoinTask t; - return (t = pollLocalTask()) != null ? t : scan(); + ForkJoinTask t = pollLocalTask(); + if (t == null) { + t = scan(); + currentSteal = null; // cannot retain/track + } + return t; } /** @@ -920,32 +924,24 @@ public class ForkJoinWorkerThread extend * @param joinMe the task to join * @return task status on exit */ - final int joinTask(ForkJoinTask joinMe) { + final int joinTask(ForkJoinTask joinMe) { int stat; ForkJoinTask prevJoin = currentJoin; - currentJoin = joinMe; + // Only written by this thread; only need ordered store + UNSAFE.putOrderedObject(this, currentJoinOffset, joinMe); if ((stat = joinMe.status) >= 0 && (sp == base || (stat = localHelpJoinTask(joinMe)) >= 0)) { - ForkJoinPool p = pool; - int helpRetries = 2; // initial patience values - int awaitRetries = -1; // -1 is sentinel for replace-check only - do { - helpJoinTask(joinMe, helpRetries); + for (int retries = 0; ; ++retries) { + helpJoinTask(joinMe, retries); if ((stat = joinMe.status) < 0) break; - boolean busy = p.tryAwaitJoin(joinMe, awaitRetries); + pool.tryAwaitJoin(joinMe, retries); if ((stat = joinMe.status) < 0) break; - if (awaitRetries == -1) - awaitRetries = 0; - else if (busy) - ++awaitRetries; - if (helpRetries < p.parallelism) - helpRetries <<= 1; Thread.yield(); // tame unbounded loop - } while (joinMe.status >= 0); + } } - currentJoin = prevJoin; + UNSAFE.putOrderedObject(this, currentJoinOffset, prevJoin); return stat; } @@ -997,14 +993,14 @@ public class ForkJoinWorkerThread extend * caller anyway) to slightly simplify control paths. * * @param joinMe the task to join + * @param rescans the number of times to recheck for work */ - final void helpJoinTask(ForkJoinTask joinMe, int retries) { + private void helpJoinTask(ForkJoinTask joinMe, int rescans) { ForkJoinWorkerThread[] ws = pool.workers; int n; if (ws == null || (n = ws.length) <= 1) return; // need at least 2 workers - - restart:while (joinMe.status >= 0 && --retries >= 0) { + restart:while (rescans-- >= 0 && joinMe.status >= 0) { ForkJoinTask task = joinMe; // base of chain ForkJoinWorkerThread thread = this; // thread with stolen task for (int depth = 0; depth < MAX_HELP_DEPTH; ++depth) { @@ -1029,10 +1025,9 @@ public class ForkJoinWorkerThread extend ForkJoinTask t = q[i]; if (task.status < 0) // stale continue restart; - if (v.base == b) { // recheck after reading t - if (t == null) // producer stalled - continue restart; // retry via restart - if (UNSAFE.compareAndSwapObject(q, u, t, null)) { + if (t != null) { + if (v.base == b && + UNSAFE.compareAndSwapObject(q, u, t, null)) { if (joinMe.status < 0) { UNSAFE.putObjectVolatile(q, u, t); return; // back out on cancel @@ -1045,12 +1040,14 @@ public class ForkJoinWorkerThread extend currentSteal = prevSteal; } } + else if (v.base == b) // producer stalled + continue restart; // retry via restart if (joinMe.status < 0) return; } // Try to descend to find v's stealer ForkJoinTask next = v.currentJoin; - if (next == null || task.status < 0) + if (next == null || next == task || task.status < 0) continue restart; // no descendent or stale if (joinMe.status < 0) return; @@ -1141,10 +1138,10 @@ public class ForkJoinWorkerThread extend private static final sun.misc.Unsafe UNSAFE = getUnsafe(); private static final long runStateOffset = objectFieldOffset("runState", ForkJoinWorkerThread.class); + private static final long currentJoinOffset = + objectFieldOffset("currentJoin", ForkJoinWorkerThread.class); private static final long qBase = UNSAFE.arrayBaseOffset(ForkJoinTask[].class); - private static final long threadStatusOffset = - objectFieldOffset("threadStatus", Thread.class); private static final int qShift; static {