--- jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/01/07 20:51:36 1.4 +++ jsr166/src/jsr166y/ForkJoinWorkerThread.java 2009/01/12 17:16:18 1.5 @@ -21,10 +21,6 @@ import java.lang.reflect.*; * do create such a subclass, you will also need to supply a custom * ForkJoinWorkerThreadFactory to use it in a ForkJoinPool. * - *

This class also provides methods for generating per-thread - * random numbers, with the same properties as {@link - * java.util.Random} but with each generator isolated from those of - * other threads. */ public class ForkJoinWorkerThread extends Thread { /* @@ -134,17 +130,20 @@ public class ForkJoinWorkerThread extend /** * Maximum work-stealing queue array size. Must be less than or - * equal to 1 << 30 to ensure lack of index wraparound. + * equal to 1 << 28 to ensure lack of index wraparound. (This + * is less than usual bounds, because we need leftshift by 3 + * to be in int range). */ - private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 30; + private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 28; /** - * Generator of seeds for per-thread random numbers. + * The pool this thread works in. Accessed directly by ForkJoinTask */ - private static final Random randomSeedGenerator = new Random(); + final ForkJoinPool pool; /** * The work-stealing queue array. Size must be a power of two. + * Initialized when thread starts, to improve memory locality. */ private ForkJoinTask[] queue; @@ -163,15 +162,12 @@ public class ForkJoinWorkerThread extend private volatile int base; /** - * The pool this thread works in. - */ - final ForkJoinPool pool; - - /** - * Index of this worker in pool array. Set once by pool before - * running, and accessed directly by pool during cleanup etc + * Activity status. When true, this worker is considered active. + * Must be false upon construction. It must be true when executing + * tasks, and BEFORE stealing a task. It must be false before + * calling pool.sync */ - int poolIndex; + private boolean active; /** * Run state of this worker. Supports simple versions of the usual @@ -179,19 +175,11 @@ public class ForkJoinWorkerThread extend */ private volatile int runState; - // Runstate values. Order matters - private static final int RUNNING = 0; - private static final int SHUTDOWN = 1; - private static final int TERMINATING = 2; - private static final int TERMINATED = 3; - /** - * Activity status. When true, this worker is considered active. - * Must be false upon construction. It must be true when executing - * tasks, and BEFORE stealing a task. It must be false before - * blocking on the Pool Barrier. + * Seed for random number generator for choosing steal victims. + * Uses Marsaglia xorshift. Must be nonzero upon initialization. */ - private boolean active; + private int seed; /** * Number of steals, transferred to pool when idle @@ -199,19 +187,16 @@ public class ForkJoinWorkerThread extend private int stealCount; /** - * Seed for random number generator for choosing steal victims - */ - private int randomVictimSeed; - - /** - * Seed for embedded Jurandom + * Index of this worker in pool array. Set once by pool before + * running, and accessed directly by pool during cleanup etc */ - private long juRandomSeed; + int poolIndex; /** - * The last barrier event waited for + * The last barrier event waited for. Accessed in pool callback + * methods, but only by current thread. */ - private long eventCount; + long lastEventCount; /** * Creates a ForkJoinWorkerThread operating in the given pool. @@ -221,10 +206,11 @@ public class ForkJoinWorkerThread extend protected ForkJoinWorkerThread(ForkJoinPool pool) { if (pool == null) throw new NullPointerException(); this.pool = pool; - // remaining initialization deferred to onStart + // Note: poolIndex is set by pool during construction + // Remaining initialization is deferred to onStart } - // public access methods + // Public access methods /** * Returns the pool hosting this thread @@ -239,44 +225,167 @@ public class ForkJoinWorkerThread extend * returned value ranges from zero to the maximum number of * threads (minus one) that have ever been created in the pool. * This method may be useful for applications that track status or - * collect results on a per-worker basis. + * collect results per-worker rather than per-task. * @return the index number. */ public int getPoolIndex() { return poolIndex; } - // Access methods used by Pool + + // Runstate management + + // Runstate values. Order matters + private static final int RUNNING = 0; + private static final int SHUTDOWN = 1; + private static final int TERMINATING = 2; + private static final int TERMINATED = 3; + + final boolean isShutdown() { return runState >= SHUTDOWN; } + final boolean isTerminating() { return runState >= TERMINATING; } + final boolean isTerminated() { return runState == TERMINATED; } + final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); } + final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } /** - * Get and clear steal count for accumulation by pool. Called - * only when known to be idle (in pool.sync and termination). + * Transition to at least the given state. Return true if not + * already at least given state. */ - final int getAndClearStealCount() { - int sc = stealCount; - stealCount = 0; - return sc; + private boolean transitionRunStateTo(int state) { + for (;;) { + int s = runState; + if (s >= state) + return false; + if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state)) + return true; + } } /** - * Returns estimate of the number of tasks in the queue, without - * correcting for transient negative values + * Try to set status to active; fail on contention */ - final int getRawQueueSize() { - return sp - base; + private boolean tryActivate() { + if (!active) { + if (!pool.tryIncrementActiveCount()) + return false; + active = true; + } + return true; } - // Intrinsics-based support for queue operations. - // Currently these three (setSp, setSlot, casSlotNull) are - // usually manually inlined to improve performance + /** + * Try to set status to active; fail on contention + */ + private boolean tryInactivate() { + if (active) { + if (!pool.tryDecrementActiveCount()) + return false; + active = false; + } + return true; + } /** - * Sets sp in store-order. + * Computes next value for random victim probe. Scans don't + * require a very high quality generator, but also not a crummy + * one. Marsaglia xor-shift is cheap and works well. */ - private void setSp(int s) { - _unsafe.putOrderedInt(this, spOffset, s); + private static int xorShift(int r) { + r ^= r << 1; + r ^= r >>> 3; + r ^= r << 10; + return r; } + // Lifecycle methods + + /** + * This method is required to be public, but should never be + * called explicitly. It performs the main run loop to execute + * ForkJoinTasks. + */ + public void run() { + Throwable exception = null; + try { + onStart(); + pool.sync(this); // await first pool event + mainLoop(); + } catch (Throwable ex) { + exception = ex; + } finally { + onTermination(exception); + } + } + + /** + * Execute tasks until shut down. + */ + private void mainLoop() { + while (!isShutdown()) { + ForkJoinTask t = pollTask(); + if (t != null || (t = pollSubmission()) != null) + t.quietlyExec(); + else if (tryInactivate()) + pool.sync(this); + } + } + + /** + * Initializes internal state after construction but before + * processing any tasks. If you override this method, you must + * invoke super.onStart() at the beginning of the method. + * Initialization requires care: Most fields must have legal + * default values, to ensure that attempted accesses from other + * threads work correctly even before this thread starts + * processing tasks. + */ + protected void onStart() { + // Allocate while starting to improve chances of thread-local + // isolation + queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; + // Initial value of seed need not be especially random but + // should differ across workers and must be nonzero + int p = poolIndex + 1; + seed = p + (p << 8) + (p << 16) + (p << 24); // spread bits + } + + /** + * Perform cleanup associated with termination of this worker + * thread. If you override this method, you must invoke + * super.onTermination at the end of the overridden method. + * + * @param exception the exception causing this thread to abort due + * to an unrecoverable error, or null if completed normally. + */ + protected void onTermination(Throwable exception) { + // Execute remaining local tasks unless aborting or terminating + while (exception == null && !pool.isTerminating() && base != sp) { + try { + ForkJoinTask t = popTask(); + if (t != null) + t.quietlyExec(); + } catch(Throwable ex) { + exception = ex; + } + } + // Cancel other tasks, transition status, notify pool, and + // propagate exception to uncaught exception handler + try { + do;while (!tryInactivate()); // ensure inactive + cancelTasks(); + runState = TERMINATED; + pool.workerTerminated(this); + } catch (Throwable ex) { // Shouldn't ever happen + if (exception == null) // but if so, at least rethrown + exception = ex; + } finally { + if (exception != null) + ForkJoinTask.rethrowException(exception); + } + } + + // Intrinsics-based support for queue operations. + /** * Add in store-order the given task at given slot of q to * null. Caller must ensure q is nonnull and index is in range. @@ -295,6 +404,13 @@ public class ForkJoinWorkerThread extend return _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null); } + /** + * Sets sp in store-order. + */ + private void storeSp(int s) { + _unsafe.putOrderedInt(this, spOffset, s); + } + // Main queue methods /** @@ -305,10 +421,10 @@ public class ForkJoinWorkerThread extend ForkJoinTask[] q = queue; int mask = q.length - 1; int s = sp; - _unsafe.putOrderedObject(q, ((s & mask) << qShift) + qBase, t); - _unsafe.putOrderedInt(this, spOffset, ++s); + setSlot(q, s & mask, t); + storeSp(++s); if ((s -= base) == 1) - pool.signalNonEmptyWorkerQueue(); + pool.signalWork(); else if (s >= mask) growQueue(); } @@ -319,14 +435,14 @@ public class ForkJoinWorkerThread extend * @return a task, or null if none or contended. */ private ForkJoinTask deqTask() { - ForkJoinTask[] q; ForkJoinTask t; + ForkJoinTask[] q; int i; int b; if (sp != (b = base) && (q = queue) != null && // must read q after b (t = q[i = (q.length - 1) & b]) != null && - _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { + casSlotNull(q, i, t)) { base = b + 1; return t; } @@ -334,20 +450,22 @@ public class ForkJoinWorkerThread extend } /** - * Returns a popped task, or null if empty. Called only by - * current thread. + * Returns a popped task, or null if empty. Ensures active status + * if nonnull. Called only by current thread. */ final ForkJoinTask popTask() { - ForkJoinTask t; - int i; - ForkJoinTask[] q = queue; - int mask = q.length - 1; int s = sp; - if (s != base && - (t = q[i = (s - 1) & mask]) != null && - _unsafe.compareAndSwapObject(q, (i << qShift) + qBase, t, null)) { - _unsafe.putOrderedInt(this, spOffset, s - 1); - return t; + while (s != base) { + if (tryActivate()) { + ForkJoinTask[] q = queue; + int mask = q.length - 1; + int i = (s - 1) & mask; + ForkJoinTask t = q[i]; + if (t == null || !casSlotNull(q, i, t)) + break; + storeSp(s - 1); + return t; + } } return null; } @@ -355,16 +473,15 @@ public class ForkJoinWorkerThread extend /** * Specialized version of popTask to pop only if * topmost element is the given task. Called only - * by current thread. + * by current thread while active. * @param t the task. Caller must ensure nonnull */ final boolean unpushTask(ForkJoinTask t) { ForkJoinTask[] q = queue; int mask = q.length - 1; int s = sp - 1; - if (_unsafe.compareAndSwapObject(q, ((s & mask) << qShift) + qBase, - t, null)) { - _unsafe.putOrderedInt(this, spOffset, s); + if (casSlotNull(q, s & mask, t)) { + storeSp(s); return true; } return false; @@ -402,319 +519,130 @@ public class ForkJoinWorkerThread extend t = null; setSlot(newQ, b & newMask, t); } while (++b != bf); - pool.signalIdleWorkers(false); + pool.signalWork(); } - // Runstate management - - final boolean isShutdown() { return runState >= SHUTDOWN; } - final boolean isTerminating() { return runState >= TERMINATING; } - final boolean isTerminated() { return runState == TERMINATED; } - final boolean shutdown() { return transitionRunStateTo(SHUTDOWN); } - final boolean shutdownNow() { return transitionRunStateTo(TERMINATING); } - /** - * Transition to at least the given state. Return true if not - * already at least given state. - */ - private boolean transitionRunStateTo(int state) { - for (;;) { - int s = runState; - if (s >= state) - return false; - if (_unsafe.compareAndSwapInt(this, runStateOffset, s, state)) - return true; - } - } - - /** - * Ensure status is active and if necessary adjust pool active count - */ - final void activate() { - if (!active) { - active = true; - pool.incrementActiveCount(); - } - } - - /** - * Ensure status is inactive and if necessary adjust pool active count - */ - final void inactivate() { - if (active) { - active = false; - pool.decrementActiveCount(); - } - } - - // Lifecycle methods - - /** - * Initializes internal state after construction but before - * processing any tasks. If you override this method, you must - * invoke super.onStart() at the beginning of the method. - * Initialization requires care: Most fields must have legal - * default values, to ensure that attempted accesses from other - * threads work correctly even before this thread starts - * processing tasks. - */ - protected void onStart() { - juRandomSeed = randomSeedGenerator.nextLong(); - do;while((randomVictimSeed = nextRandomInt()) == 0); // must be nonzero - if (queue == null) - queue = new ForkJoinTask[INITIAL_QUEUE_CAPACITY]; - - // Heuristically allow one initial thread to warm up; others wait - if (poolIndex < pool.getParallelism() - 1) { - eventCount = pool.sync(this, 0); - activate(); - } - } - - /** - * Perform cleanup associated with termination of this worker - * thread. If you override this method, you must invoke - * super.onTermination at the end of the overridden method. - * - * @param exception the exception causing this thread to abort due - * to an unrecoverable error, or null if completed normally. - */ - protected void onTermination(Throwable exception) { - try { - clearLocalTasks(); - inactivate(); - cancelTasks(); - } finally { - terminate(exception); - } - } - - /** - * Notify pool of termination and, if exception is nonnull, - * rethrow it to trigger this thread's uncaughtExceptionHandler - */ - private void terminate(Throwable exception) { - transitionRunStateTo(TERMINATED); - try { - pool.workerTerminated(this); - } finally { - if (exception != null) - ForkJoinTask.rethrowException(exception); - } - } - - /** - * Run local tasks on exit from main. - */ - private void clearLocalTasks() { - while (base != sp && !pool.isTerminating()) { - ForkJoinTask t = popTask(); - if (t != null) { - activate(); // ensure active status - t.quietlyExec(); - } - } - } - - /** - * Removes and cancels all tasks in queue. Can be called from any - * thread. - */ - final void cancelTasks() { - while (base != sp) { - ForkJoinTask t = deqTask(); - if (t != null) - t.cancelIgnoringExceptions(); - } - } - - /** - * This method is required to be public, but should never be - * called explicitly. It performs the main run loop to execute - * ForkJoinTasks. - */ - public void run() { - Throwable exception = null; - try { - onStart(); - while (!isShutdown()) - step(); - } catch (Throwable ex) { - exception = ex; - } finally { - onTermination(exception); - } - } - - /** - * Main top-level action. - */ - private void step() { - ForkJoinTask t = sp != base? popTask() : null; - if (t != null || (t = scan(null, true)) != null) { - activate(); - t.quietlyExec(); - } - else { - inactivate(); - eventCount = pool.sync(this, eventCount); - } - } - - // scanning for and stealing tasks - - /** - * Computes next value for random victim probe. Scans don't - * require a very high quality generator, but also not a crummy - * one. Marsaglia xor-shift is cheap and works well. - * - * This is currently unused, and manually inlined - */ - private static int xorShift(int r) { - r ^= r << 1; - r ^= r >>> 3; - r ^= r << 10; - return r; - } - - /** - * Tries to steal a task from another worker and/or, if enabled, - * submission queue. Starts at a random index of workers array, - * and probes workers until finding one with non-empty queue or - * finding that all are empty. It randomly selects the first n-1 - * probes. If these are empty, it resorts to full circular - * traversal, which is necessary to accurately set active status - * by caller. Also restarts if pool barrier has tripped since last - * scan, which forces refresh of workers array, in case barrier - * was associated with resize. + * Tries to steal a task from another worker. Starts at a random + * index of workers array, and probes workers until finding one + * with non-empty queue or finding that all are empty. It + * randomly selects the first n probes. If these are empty, it + * resorts to a full circular traversal, which is necessary to + * accurately set active status by caller. Also restarts if pool + * events occurred since last scan, which forces refresh of + * workers array, in case barrier was associated with resize. * * This method must be both fast and quiet -- usually avoiding * memory accesses that could disrupt cache sharing etc other than * those needed to check for and take tasks. This accounts for, * among other things, updating random seed in place without - * storing it until exit. (Note that we only need to store it if - * we found a task; otherwise it doesn't matter if we start at the - * same place next time.) + * storing it until exit. * - * @param joinMe if non null; exit early if done - * @param checkSubmissions true if OK to take submissions * @return a task, or null if none found */ - private ForkJoinTask scan(ForkJoinTask joinMe, - boolean checkSubmissions) { - ForkJoinPool p = pool; - if (p == null) // Never null, but avoids - return null; // implicit nullchecks below - int r = randomVictimSeed; // extract once to keep scan quiet - restart: // outer loop refreshes ws array - while (joinMe == null || joinMe.status >= 0) { - int mask; - ForkJoinWorkerThread[] ws = p.workers; - if (ws != null && (mask = ws.length - 1) > 0) { - int probes = -mask; // use random index while negative + private ForkJoinTask scan() { + ForkJoinTask t = null; + int r = seed; // extract once to keep scan quiet + ForkJoinWorkerThread[] ws; // refreshed on outer loop + int mask; // must be power 2 minus 1 and > 0 + outer:do { + if ((ws = pool.workers) != null && (mask = ws.length - 1) > 0) { int idx = r; + int probes = ~mask; // use random index while negative for (;;) { - ForkJoinWorkerThread v; - // inlined xorshift to update seed - r ^= r << 1; r ^= r >>> 3; r ^= r << 10; - if ((v = ws[mask & idx]) != null && v.sp != v.base) { - ForkJoinTask t; - activate(); - if ((joinMe == null || joinMe.status >= 0) && - (t = v.deqTask()) != null) { - randomVictimSeed = r; - ++stealCount; - return t; - } - continue restart; // restart on contention + r = xorShift(r); // update random seed + ForkJoinWorkerThread v = ws[mask & idx]; + if (v == null || v.sp == v.base) { + if (probes <= mask) + idx = (probes++ < 0)? r : (idx + 1); + else + break; } - if ((probes >> 1) <= mask) // n-1 random then circular - idx = (probes++ < 0)? r : (idx + 1); + else if (!tryActivate() || (t = v.deqTask()) == null) + continue outer; // restart on contention else - break; + break outer; } } - if (checkSubmissions && p.hasQueuedSubmissions()) { - activate(); - ForkJoinTask t = p.pollSubmission(); - if (t != null) - return t; - } - else { - long ec = eventCount; // restart on pool event - if ((eventCount = p.getEventCount()) == ec) - break; - } - } - return null; + } while (pool.hasNewSyncEvent(this)); // retry on pool events + seed = r; + return t; } /** - * Callback from pool.sync to rescan before blocking. If a - * task is found, it is pushed so it can be executed upon return. - * @return true if found and pushed a task - */ - final boolean prescan() { - ForkJoinTask t = scan(null, true); - if (t != null) { - pushTask(t); - return true; - } - else { - inactivate(); - return false; - } + * Pops or steals a task + * @return a task, if available + */ + final ForkJoinTask pollTask() { + ForkJoinTask t = popTask(); + if (t == null && (t = scan()) != null) + ++stealCount; + return t; } - // Support for ForkJoinTask methods - /** - * Scan, returning early if joinMe done + * Returns a pool submission, if one exists, activating first. + * @return a submission, if available */ - final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { - ForkJoinTask t = scan(joinMe, false); - if (t != null && joinMe.status < 0 && sp == base) { - pushTask(t); // unsteal if done and this task would be stealable - t = null; + private ForkJoinTask pollSubmission() { + ForkJoinPool p = pool; + while (p.hasQueuedSubmissions()) { + ForkJoinTask t; + if (tryActivate() && (t = p.pollSubmission()) != null) + return t; } - return t; + return null; } - + + // Methods accessed only by Pool + /** - * Pops or steals a task - * @return task, or null if none available + * Removes and cancels all tasks in queue. Can be called from any + * thread. */ - final ForkJoinTask pollLocalOrStolenTask() { + final void cancelTasks() { ForkJoinTask t; - return (t = popTask()) == null? scan(null, false) : t; + while (base != sp && (t = deqTask()) != null) + t.cancelIgnoringExceptions(); } /** - * Runs tasks until pool isQuiescent + * Get and clear steal count for accumulation by pool. Called + * only when known to be idle (in pool.sync and termination). */ - final void helpQuiescePool() { - for (;;) { - ForkJoinTask t = pollLocalOrStolenTask(); - if (t != null) { - activate(); - t.quietlyExec(); - } - else { - inactivate(); - if (pool.isQuiescent()) { - activate(); // re-activate on exit - break; + final int getAndClearStealCount() { + int sc = stealCount; + stealCount = 0; + return sc; + } + + /** + * Returns true if at least one worker in the given array appears + * to have at least one queued task. + * @param ws array of workers + */ + static boolean hasQueuedTasks(ForkJoinWorkerThread[] ws) { + if (ws != null) { + int len = ws.length; + for (int j = 0; j < 2; ++j) { // need two passes for clean sweep + for (int i = 0; i < len; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null && w.sp != w.base) + return true; } } } + return false; } + // Support methods for ForkJoinTask + /** * Returns an estimate of the number of tasks in the queue. */ final int getQueueSize() { int n = sp - base; - return n <= 0? 0 : n; // suppress momentarily negative values + return n < 0? 0 : n; // suppress momentarily negative values } /** @@ -726,126 +654,30 @@ public class ForkJoinWorkerThread extend return (sp - base) - (pool.getIdleThreadCount() >>> 1); } - // Per-worker exported random numbers - - // Same constants as java.util.Random - final static long JURandomMultiplier = 0x5DEECE66DL; - final static long JURandomAddend = 0xBL; - final static long JURandomMask = (1L << 48) - 1; - - private final int nextJURandom(int bits) { - long next = (juRandomSeed * JURandomMultiplier + JURandomAddend) & - JURandomMask; - juRandomSeed = next; - return (int)(next >>> (48 - bits)); - } - - private final int nextJURandomInt(int n) { - if (n <= 0) - throw new IllegalArgumentException("n must be positive"); - int bits = nextJURandom(31); - if ((n & -n) == n) - return (int)((n * (long)bits) >> 31); - - for (;;) { - int val = bits % n; - if (bits - val + (n-1) >= 0) - return val; - bits = nextJURandom(31); - } - } - - private final long nextJURandomLong() { - return ((long)(nextJURandom(32)) << 32) + nextJURandom(32); - } - - private final long nextJURandomLong(long n) { - if (n <= 0) - throw new IllegalArgumentException("n must be positive"); - long offset = 0; - while (n >= Integer.MAX_VALUE) { // randomly pick half range - int bits = nextJURandom(2); // 2nd bit for odd vs even split - long half = n >>> 1; - long nextn = ((bits & 2) == 0)? half : n - half; - if ((bits & 1) == 0) - offset += n - nextn; - n = nextn; - } - return offset + nextJURandomInt((int)n); - } - - private final double nextJURandomDouble() { - return (((long)(nextJURandom(26)) << 27) + nextJURandom(27)) - / (double)(1L << 53); - } - /** - * Returns a random integer using a per-worker random - * number generator with the same properties as - * {@link java.util.Random#nextInt} - * @return the next pseudorandom, uniformly distributed {@code int} - * value from this worker's random number generator's sequence - */ - public static int nextRandomInt() { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - nextJURandom(32); - } - - /** - * Returns a random integer using a per-worker random - * number generator with the same properties as - * {@link java.util.Random#nextInt(int)} - * @param n the bound on the random number to be returned. Must be - * positive. - * @return the next pseudorandom, uniformly distributed {@code int} - * value between {@code 0} (inclusive) and {@code n} (exclusive) - * from this worker's random number generator's sequence - * @throws IllegalArgumentException if n is not positive - */ - public static int nextRandomInt(int n) { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - nextJURandomInt(n); - } - - /** - * Returns a random long using a per-worker random - * number generator with the same properties as - * {@link java.util.Random#nextLong} - * @return the next pseudorandom, uniformly distributed {@code long} - * value from this worker's random number generator's sequence - */ - public static long nextRandomLong() { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - nextJURandomLong(); - } - - /** - * Returns a random integer using a per-worker random - * number generator with the same properties as - * {@link java.util.Random#nextInt(int)} - * @param n the bound on the random number to be returned. Must be - * positive. - * @return the next pseudorandom, uniformly distributed {@code int} - * value between {@code 0} (inclusive) and {@code n} (exclusive) - * from this worker's random number generator's sequence - * @throws IllegalArgumentException if n is not positive + * Scan, returning early if joinMe done */ - public static long nextRandomLong(long n) { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - nextJURandomLong(n); + final ForkJoinTask scanWhileJoining(ForkJoinTask joinMe) { + ForkJoinTask t = pollTask(); + if (t != null && joinMe.status < 0 && sp == base) { + pushTask(t); // unsteal if done and this task would be stealable + t = null; + } + return t; } - + /** - * Returns a random double using a per-worker random - * number generator with the same properties as - * {@link java.util.Random#nextDouble} - * @return the next pseudorandom, uniformly distributed {@code double} - * value between {@code 0.0} and {@code 1.0} from this - * worker's random number generator's sequence + * Runs tasks until pool isQuiescent */ - public static double nextRandomDouble() { - return ((ForkJoinWorkerThread)(Thread.currentThread())). - nextJURandomDouble(); + final void helpQuiescePool() { + for (;;) { + ForkJoinTask t = pollTask(); + if (t != null) + t.quietlyExec(); + else if (tryInactivate() && pool.isQuiescent()) + break; + } + do;while (!tryActivate()); // re-activate on exit } // Temporary Unsafe mechanics for preliminary release @@ -853,9 +685,9 @@ public class ForkJoinWorkerThread extend static final Unsafe _unsafe; static final long baseOffset; static final long spOffset; + static final long runStateOffset; static final long qBase; static final int qShift; - static final long runStateOffset; static { try { if (ForkJoinWorkerThread.class.getClassLoader() != null) {