--- jsr166/src/jsr166y/ForkJoinPool.java 2010/08/13 16:21:23 1.63 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/08/17 18:30:32 1.64 @@ -259,29 +259,24 @@ public class ForkJoinPool extends Abstra * workers that previously could not find a task to now find one: * Submission of a new task to the pool, or another worker pushing * a task onto a previously empty queue. (We also use this - * mechanism for termination actions that require wakeups of idle - * workers). Each worker maintains its last known event count, - * and blocks when a scan for work did not find a task AND its - * lastEventCount matches the current eventCount. Waiting idle - * workers are recorded in a variant of Treiber stack headed by - * field eventWaiters which, when nonzero, encodes the thread - * index and count awaited for by the worker thread most recently - * calling eventSync. This thread in turn has a record (field - * nextEventWaiter) for the next waiting worker. In addition to - * allowing simpler decisions about need for wakeup, the event - * count bits in eventWaiters serve the role of tags to avoid ABA - * errors in Treiber stacks. To reduce delays in task diffusion, - * workers not otherwise occupied may invoke method - * releaseEventWaiters, that removes and signals (unparks) workers - * not waiting on current count. To reduce stalls, To minimize - * task production stalls associate with signalling, any worker - * pushing a task on an empty queue invokes the weaker method - * signalWork, that only releases idle workers until it detects - * interference by other threads trying to release, and lets them - * take over. The net effect is a tree-like diffusion of signals, - * where released threads (and possibly others) help with unparks. - * To further reduce contention effects a bit, failed CASes to - * increment field eventCount are tolerated without retries. + * mechanism for configuration and termination actions that + * require wakeups of idle workers). Each worker maintains its + * last known event count, and blocks when a scan for work did not + * find a task AND its lastEventCount matches the current + * eventCount. Waiting idle workers are recorded in a variant of + * Treiber stack headed by field eventWaiters which, when nonzero, + * encodes the thread index and count awaited for by the worker + * thread most recently calling eventSync. This thread in turn has + * a record (field nextEventWaiter) for the next waiting worker. + * In addition to allowing simpler decisions about need for + * wakeup, the event count bits in eventWaiters serve the role of + * tags to avoid ABA errors in Treiber stacks. Upon any wakeup, + * released threads also try to release others (but give up upon + * contention to reduce useless flailing). The net effect is a + * tree-like diffusion of signals, where released threads (and + * possibly others) help with unparks. To further reduce + * contention effects a bit, failed CASes to increment field + * eventCount are tolerated without retries in signalWork. * Conceptually they are merged into the same event, which is OK * when their only purpose is to enable workers to scan for work. * @@ -304,15 +299,19 @@ public class ForkJoinPool extends Abstra * may become a spare at about the same time as another is * needlessly being created. We counteract this and related slop * in part by requiring resumed spares to immediately recheck (in - * preStep) to see whether they they should re-suspend. To avoid - * long-term build-up of spares, the oldest spare (see - * ForkJoinWorkerThread.suspendAsSpare) occasionally wakes up if - * not signalled and calls tryTrimSpare, which uses two different - * thresholds: Always killing if the number of spares is greater - * that 25% of total, and killing others only at a slower rate - * (UNUSED_SPARE_TRIM_RATE_NANOS). + * preStep) to see whether they they should re-suspend. * - * 6. Deciding when to create new workers. The main dynamic + * 6. Killing off unneeded workers. The Spare and Event queues use + * similar mechanisms to shed unused workers: The oldest (first) + * waiter uses a timed rather than hard wait. When this wait times + * out without a normal wakeup, it tries to shutdown any one (for + * convenience the newest) other waiter via tryShutdownSpare or + * tryShutdownWaiter, respectively. The wakeup rates for spares + * are much shorter than for waiters. Together, they will + * eventually reduce the number of worker threads to a minimum of + * one after a long enough period without use. + * + * 7. Deciding when to create new workers. The main dynamic * control in this class is deciding when to create extra threads * in method helpMaintainParallelism. We would like to keep * exactly #parallelism threads running, which is an impossble @@ -326,8 +325,10 @@ public class ForkJoinPool extends Abstra * impedes accuracy. Our main defense is to allow some slack in * creation thresholds, using rules that reflect the fact that the * more threads we have running, the more likely that we are - * underestimating the number running threads. The rules also - * better cope with the fact that some of the methods in this + * underestimating the number running threads. (We also include + * some heuristic use of Thread.yield when all workers appear to + * be busy, to improve likelihood of counts settling.) The rules + * also better cope with the fact that some of the methods in this * class tend to never become compiled (but are interpreted), so * some components of the entire set of controls might execute 100 * times faster than others. And similarly for cases where the @@ -419,6 +420,16 @@ public class ForkJoinPool extends Abstra new AtomicInteger(); /** + * The wakeup interval (in nanoseconds) for the oldest worker + * worker waiting for an event invokes tryShutdownWaiter to shrink + * the number of workers. The exact value does not matter too + * much, but should be long enough to slowly release resources + * during long periods without use without disrupting normal use. + */ + private static final long SHRINK_RATE_NANOS = + 60L * 1000L * 1000L * 1000L; // one minute + + /** * Absolute bound for parallelism level. Twice this number plus * one (i.e., 0xfff) must fit into a 16bit field to enable * word-packing for some counts and indices. @@ -463,17 +474,6 @@ public class ForkJoinPool extends Abstra private volatile long stealCount; /** - * The last nanoTime that a spare thread was trimmed - */ - private volatile long trimTime; - - /** - * The rate at which to trim unused spares - */ - static final long UNUSED_SPARE_TRIM_RATE_NANOS = - 1000L * 1000L * 1000L; // 1 sec - - /** * Encoded record of top of treiber stack of threads waiting for * events. The top 32 bits contain the count being waited for. The * bottom 16 bits contains one plus the pool index of waiting @@ -514,8 +514,12 @@ public class ForkJoinPool extends Abstra * These are bundled together to ensure consistent read for * termination checks (i.e., that runLevel is at least SHUTDOWN * and active threads is zero). + * + * Notes: Most direct CASes are dependent on these bitfield + * positions. Also, this field is non-private to enable direct + * performance-sensitive CASes in ForkJoinWorkerThread. */ - private volatile int runState; + volatile int runState; // Note: The order among run level values matters. private static final int RUNLEVEL_SHIFT = 16; @@ -523,7 +527,6 @@ public class ForkJoinPool extends Abstra private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1); private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2); private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1; - private static final int ONE_ACTIVE = 1; // active update delta /** * Holds number of total (i.e., created and not yet terminated) @@ -565,8 +568,8 @@ public class ForkJoinPool extends Abstra private final int poolNumber; - // Utilities for CASing fields. Note that several of these - // are manually inlined by callers + // Utilities for CASing fields. Note that most of these + // are usually manually inlined by callers /** * Increments running count part of workerCounts @@ -599,11 +602,12 @@ public class ForkJoinPool extends Abstra private void decrementWorkerCounts(int dr, int dt) { for (;;) { int wc = workerCounts; - if (wc == 0 && (runState & TERMINATED) != 0) - return; // lagging termination on a backout if ((wc & RUNNING_COUNT_MASK) - dr < 0 || - (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) + (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) { + if ((runState & TERMINATED) != 0) + return; // lagging termination on a backout Thread.yield(); + } if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, wc - (dr + dt))) return; @@ -628,7 +632,7 @@ public class ForkJoinPool extends Abstra final boolean tryIncrementActiveCount() { int c; return UNSAFE.compareAndSwapInt(this, runStateOffset, - c = runState, c + ONE_ACTIVE); + c = runState, c + 1); } /** @@ -638,7 +642,7 @@ public class ForkJoinPool extends Abstra final boolean tryDecrementActiveCount() { int c; return UNSAFE.compareAndSwapInt(this, runStateOffset, - c = runState, c - ONE_ACTIVE); + c = runState, c - 1); } /** @@ -705,8 +709,10 @@ public class ForkJoinPool extends Abstra * Tries to create and add new worker. Assumes that worker counts * are already updated to accommodate the worker, so adjusts on * failure. + * + * @return the worker, or null on failure */ - private void addWorker() { + private ForkJoinWorkerThread addWorker() { ForkJoinWorkerThread w = null; try { w = factory.newThread(this); @@ -716,8 +722,11 @@ public class ForkJoinPool extends Abstra tryTerminate(false); // in case of failure during shutdown } } - if (w != null) + if (w != null) { w.start(recordWorker(w), ueh); + advanceEventCount(); + } + return w; } /** @@ -740,24 +749,22 @@ public class ForkJoinPool extends Abstra /** * Releases workers blocked on a count not equal to current count. * Normally called after precheck that eventWaiters isn't zero to - * avoid wasted array checks. - * - * @param signalling true if caller is a signalling worker so can - * exit upon (conservatively) detected contention by other threads - * who will continue to release + * avoid wasted array checks. Gives up upon a change in count or + * contention, letting other workers take over. */ - private void releaseEventWaiters(boolean signalling) { + private void releaseEventWaiters() { ForkJoinWorkerThread[] ws = workers; int n = ws.length; - long h; // head of stack - ForkJoinWorkerThread w; int id, ec; - while ((id = ((int)((h = eventWaiters) & WAITER_ID_MASK)) - 1) >= 0 && - (int)(h >>> EVENT_COUNT_SHIFT) != (ec = eventCount) && - id < n && (w = ws[id]) != null) { - if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, - h, h = w.nextWaiter)) - LockSupport.unpark(w); - if (signalling && (eventCount != ec || eventWaiters != h)) + long h = eventWaiters; + int ec = eventCount; + ForkJoinWorkerThread w; int id; + while ((int)(h >>> EVENT_COUNT_SHIFT) != ec && + (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 && + id < n && (w = ws[id]) != null && + UNSAFE.compareAndSwapLong(this, eventWaitersOffset, + h, h = w.nextWaiter)) { + LockSupport.unpark(w); + if (eventWaiters != h || eventCount != ec) break; } } @@ -770,37 +777,99 @@ public class ForkJoinPool extends Abstra int c; // try to increment event count -- CAS failure OK UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1); if (eventWaiters != 0L) - releaseEventWaiters(true); + releaseEventWaiters(); } /** - * Blocks worker until terminating or event count - * advances from last value held by worker + * Adds the given worker to event queue and blocks until + * terminating or event count advances from the workers + * lastEventCount value * * @param w the calling worker thread */ private void eventSync(ForkJoinWorkerThread w) { - int wec = w.lastEventCount; - long nh = (((long)wec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1)); + int ec = w.lastEventCount; + long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1)); long h; while ((runState < SHUTDOWN || !tryTerminate(false)) && - ((h = eventWaiters) == 0L || - (int)(h >>> EVENT_COUNT_SHIFT) == wec) && - eventCount == wec) { + (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 || + (int)(h >>> EVENT_COUNT_SHIFT) == ec) && + eventCount == ec) { if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset, w.nextWaiter = h, nh)) { - while (runState < TERMINATING && eventCount == wec) { - if (!tryAccumulateStealCount(w)) // transfer while idle - continue; - Thread.interrupted(); // clear/ignore interrupt - if (eventCount != wec) - break; + awaitEvent(w, ec); + break; + } + } + } + + /** + * Blocks the given worker (that has already been entered as an + * event waiter) until terminating or event count advances from + * the given value. The oldest (first) waiter uses a timed wait to + * occasionally one-by-one shrink the number of workers (to a + * minumum of one) if the pool has not been used for extended + * periods. + * + * @param w the calling worker thread + * @param ec the count + */ + private void awaitEvent(ForkJoinWorkerThread w, int ec) { + while (eventCount == ec) { + if (tryAccumulateStealCount(w)) { // transfer while idle + boolean untimed = (w.nextWaiter != 0L || + (workerCounts & RUNNING_COUNT_MASK) <= 1); + long startTime = untimed? 0 : System.nanoTime(); + Thread.interrupted(); // clear/ignore interrupt + if (eventCount != ec || !w.isRunning() || + runState >= TERMINATING) // recheck after clear + break; + if (untimed) LockSupport.park(w); + else { + LockSupport.parkNanos(w, SHRINK_RATE_NANOS); + if (eventCount != ec || !w.isRunning() || + runState >= TERMINATING) + break; + if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS) + tryShutdownWaiter(ec); } - break; } } - w.lastEventCount = eventCount; + } + + /** + * Callback from the oldest waiter in awaitEvent waking up after a + * period of non-use. Tries (once) to shutdown an event waiter (or + * a spare, if one exists). Note that we don't need CAS or locks + * here because the method is called only from one thread + * occasionally waking (and even misfires are OK). Note that + * until the shutdown worker fully terminates, workerCounts + * will overestimate total count, which is tolerable. + * + * @param ec the event count waited on by caller (to abort + * attempt if count has since changed). + */ + private void tryShutdownWaiter(int ec) { + if (spareWaiters != 0) { // prefer killing spares + tryShutdownSpare(); + return; + } + ForkJoinWorkerThread[] ws = workers; + int n = ws.length; + long h = eventWaiters; + ForkJoinWorkerThread w; int id; long nh; + if (runState == 0 && + submissionQueue.isEmpty() && + eventCount == ec && + (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 && + id < n && (w = ws[id]) != null && + (nh = w.nextWaiter) != 0L && // keep at least one worker + UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) { + w.shutdown(); + LockSupport.unpark(w); + } + releaseEventWaiters(); } // Maintaining spares @@ -809,166 +878,206 @@ public class ForkJoinPool extends Abstra * Pushes worker onto the spare stack */ final void pushSpare(ForkJoinWorkerThread w) { - int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex+1); + int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1); do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset, w.nextSpare = spareWaiters,ns)); } /** - * Tries (once) to resume a spare if running count is less than - * target parallelism. Fails on contention or stale workers. + * Callback from oldest spare occasionally waking up. Tries + * (once) to shutdown a spare. Same idea as tryShutdownWaiter. */ - private void tryResumeSpare() { + final void tryShutdownSpare() { int sw, id; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws; if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 && id < (ws = workers).length && (w = ws[id]) != null && - (workerCounts & RUNNING_COUNT_MASK) < parallelism && - eventWaiters == 0L && - spareWaiters == sw && + (workerCounts & RUNNING_COUNT_MASK) >= parallelism && UNSAFE.compareAndSwapInt(this, spareWaitersOffset, - sw, w.nextSpare) && - w.tryUnsuspend()) { - int c; // try increment; if contended, finish after unpark - boolean inc = UNSAFE.compareAndSwapInt(this, workerCountsOffset, - c = workerCounts, - c + ONE_RUNNING); + sw, w.nextSpare)) { + w.shutdown(); LockSupport.unpark(w); - if (!inc) { - do {} while(!UNSAFE.compareAndSwapInt(this, workerCountsOffset, - c = workerCounts, - c + ONE_RUNNING)); + advanceEventCount(); + } + } + + /** + * Tries (once) to resume a spare if worker counts match + * the given count. + * + * @param wc workerCounts value on invocation of this method + */ + private void tryResumeSpare(int wc) { + ForkJoinWorkerThread[] ws = workers; + int n = ws.length; + int sw, id, rs; ForkJoinWorkerThread w; + if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 && + id < n && (w = ws[id]) != null && + (rs = runState) < TERMINATING && + eventWaiters == 0L && workerCounts == wc) { + // In case all workers busy, heuristically back off to let settle + Thread.yield(); + if (eventWaiters == 0L && runState == rs && // recheck + workerCounts == wc && spareWaiters == sw && + UNSAFE.compareAndSwapInt(this, spareWaitersOffset, + sw, w.nextSpare)) { + int c; // increment running count before resume + do {} while(!UNSAFE.compareAndSwapInt + (this, workerCountsOffset, + c = workerCounts, c + ONE_RUNNING)); + if (w.tryUnsuspend()) + LockSupport.unpark(w); + else // back out if w was shutdown + decrementWorkerCounts(ONE_RUNNING, 0); } } } + // adding workers on demand + /** - * Callback from oldest spare occasionally waking up. Tries - * (once) to shutdown a spare if more than 25% spare overage, or - * if UNUSED_SPARE_TRIM_RATE_NANOS have elapsed and there are at - * least #parallelism running threads. Note that we don't need CAS - * or locks here because the method is called only from the oldest - * suspended spare occasionally waking (and even misfires are OK). - * - * @param now the wake up nanoTime of caller - */ - final void tryTrimSpare(long now) { - long lastTrim = trimTime; - trimTime = now; - helpMaintainParallelism(); // first, help wake up any needed spares - int sw, id; - ForkJoinWorkerThread w; - ForkJoinWorkerThread[] ws; + * Adds one or more workers if needed to establish target parallelism. + * Retries upon contention. + */ + private void addWorkerIfBelowTarget() { int pc = parallelism; - int wc = workerCounts; - if ((wc & RUNNING_COUNT_MASK) >= pc && - (((wc >>> TOTAL_COUNT_SHIFT) - pc) > (pc >>> 2) + 1 ||// approx 25% - now - lastTrim >= UNUSED_SPARE_TRIM_RATE_NANOS) && - (id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 && - id < (ws = workers).length && (w = ws[id]) != null && - UNSAFE.compareAndSwapInt(this, spareWaitersOffset, - sw, w.nextSpare)) - w.shutdown(false); + int wc; + while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < pc && + runState < TERMINATING) { + if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, + wc + (ONE_RUNNING|ONE_TOTAL))) { + if (addWorker() == null) + break; + } + } + } + + /** + * Tries (once) to add a new worker if all existing workers are + * busy, and there are either no running workers or the deficit is + * at least twice the surplus. + * + * @param wc workerCounts value on invocation of this method + */ + private void tryAddWorkerIfBusy(int wc) { + int tc, rc, rs; + int pc = parallelism; + if ((tc = wc >>> TOTAL_COUNT_SHIFT) < MAX_WORKERS && + ((rc = wc & RUNNING_COUNT_MASK) == 0 || + rc < pc - ((tc - pc) << 1)) && + (rs = runState) < TERMINATING && + (rs & ACTIVE_COUNT_MASK) == tc) { + // Since all workers busy, heuristically back off to let settle + Thread.yield(); + if (eventWaiters == 0L && spareWaiters == 0 && // recheck + runState == rs && workerCounts == wc && + UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, + wc + (ONE_RUNNING|ONE_TOTAL))) + addWorker(); + } } /** * Does at most one of: * * 1. Help wake up existing workers waiting for work via - * releaseEventWaiters. (If any exist, then it probably doesn't + * releaseEventWaiters. (If any exist, then it doesn't * matter right now if under target parallelism level.) * - * 2. If below parallelism level and a spare exists, try (once) - * to resume it via tryResumeSpare. + * 2. If a spare exists, try (once) to resume it via tryResumeSpare. * - * 3. If neither of the above, tries (once) to add a new - * worker if either there are not enough total, or if all - * existing workers are busy, there are either no running - * workers or the deficit is at least twice the surplus. + * 3. If there are not enough total workers, add some + * via addWorkerIfBelowTarget; + * + * 4. Try (once) to add a new worker if all existing workers + * are busy, via tryAddWorkerIfBusy */ private void helpMaintainParallelism() { - // uglified to work better when not compiled - int pc, wc, rc, tc, rs; long h; - if ((h = eventWaiters) != 0L) { + long h; int pc, wc; + if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0) { if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount) - releaseEventWaiters(false); // avoid useless call + releaseEventWaiters(); // avoid useless call } else if ((pc = parallelism) > - (rc = ((wc = workerCounts) & RUNNING_COUNT_MASK))) { + ((wc = workerCounts) & RUNNING_COUNT_MASK)) { if (spareWaiters != 0) - tryResumeSpare(); - else if ((rs = runState) < TERMINATING && - ((tc = wc >>> TOTAL_COUNT_SHIFT) < pc || - (tc == (rs & ACTIVE_COUNT_MASK) && // all busy - (rc == 0 || // must add - rc < pc - ((tc - pc) << 1)) && // within slack - tc < MAX_WORKERS && runState == rs)) && // recheck busy - workerCounts == wc && - UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, - wc + (ONE_RUNNING|ONE_TOTAL))) - addWorker(); + tryResumeSpare(wc); + else if ((wc >>> TOTAL_COUNT_SHIFT) < pc) + addWorkerIfBelowTarget(); + else + tryAddWorkerIfBusy(wc); } } /** * Callback from workers invoked upon each top-level action (i.e., - * stealing a task or taking a submission and running - * it). Performs one or more of the following: + * stealing a task or taking a submission and running it). + * Performs one or more of the following: * - * 1. If the worker cannot find work (misses > 0), updates its - * active status to inactive and updates activeCount unless - * this is the first miss and there is contention, in which - * case it may try again (either in this or a subsequent - * call). + * 1. If the worker is active, try to set its active status to + * inactive and update activeCount. On contention, we may try + * again on this or subsequent call. * - * 2. If there are at least 2 misses, awaits the next task event - * via eventSync + * 2. Release any existing event waiters that are now relesable * - * 3. If there are too many running threads, suspends this worker - * (first forcing inactivation if necessary). If it is not + * 3. If there are too many running threads, suspend this worker + * (first forcing inactive if necessary). If it is not * needed, it may be killed while suspended via - * tryTrimSpare. Otherwise, upon resume it rechecks to make + * tryShutdownSpare. Otherwise, upon resume it rechecks to make * sure that it is still needed. * - * 4. Helps release and/or reactivate other workers via - * helpMaintainParallelism + * 4. If more than 1 miss, await the next task event via + * eventSync (first forcing inactivation if necessary), upon + * which worker may also be killed, via tryShutdownWaiter. + * + * 5. Help reactivate other workers via helpMaintainParallelism * * @param w the worker * @param misses the number of scans by caller failing to find work - * (saturating at 2 just to avoid wraparound) + * (saturating at 2 to avoid wraparound) */ final void preStep(ForkJoinWorkerThread w, int misses) { boolean active = w.active; int pc = parallelism; for (;;) { - int wc = workerCounts; - int rc = wc & RUNNING_COUNT_MASK; - if (active && (misses > 0 || rc > pc)) { - int rs; // try inactivate - if (UNSAFE.compareAndSwapInt(this, runStateOffset, - rs = runState, rs - ONE_ACTIVE)) - active = w.active = false; - else if (misses > 1 || rc > pc || - (rs & ACTIVE_COUNT_MASK) >= pc) - continue; // force inactivate + int rs, wc, rc, ec; long h; + if (active && UNSAFE.compareAndSwapInt(this, runStateOffset, + rs = runState, rs - 1)) + active = w.active = false; + if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0 && + (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) { + releaseEventWaiters(); + if (misses > 1) + continue; // clear before sync below } - if (misses > 1) { - misses = 0; // don't re-sync - eventSync(w); // continue loop to recheck rc - } - else if (rc > pc) { - if (workerCounts == wc && // try to suspend as spare + if ((rc = ((wc = workerCounts) & RUNNING_COUNT_MASK)) > pc) { + if (!active && // must inactivate to suspend + workerCounts == wc && // try to suspend as spare UNSAFE.compareAndSwapInt(this, workerCountsOffset, - wc, wc - ONE_RUNNING) && - !w.suspendAsSpare()) // false if killed - break; + wc, wc - ONE_RUNNING)) { + w.suspendAsSpare(); + if (!w.isRunning()) + break; // was killed while spare + } + continue; } - else { - if (rc < pc || eventWaiters != 0L) - helpMaintainParallelism(); - break; + if (misses > 0) { + if ((ec = eventCount) == w.lastEventCount && misses > 1) { + if (!active) { // must inactivate to sync + eventSync(w); + if (w.isRunning()) + misses = 1; // don't re-sync + else + break; // was killed while waiting + } + continue; + } + w.lastEventCount = ec; } + if (rc < pc) + helpMaintainParallelism(); + break; } } @@ -1073,10 +1182,10 @@ public class ForkJoinPool extends Abstra * Actions on transition to TERMINATING * * Runs up to four passes through workers: (0) shutting down each - * quietly (without waking up if parked) to quickly spread - * notifications without unnecessary bouncing around event queues - * etc (1) wake up and help cancel tasks (2) interrupt (3) mop up - * races with interrupted workers + * (without waking up if parked) to quickly spread notifications + * without unnecessary bouncing around event queues etc (1) wake + * up and help cancel tasks (2) interrupt (3) mop up races with + * interrupted workers */ private void startTerminating() { cancelSubmissions(); @@ -1089,7 +1198,7 @@ public class ForkJoinPool extends Abstra for (int i = 0; i < n; ++i) { ForkJoinWorkerThread w = ws[i]; if (w != null) { - w.shutdown(true); + w.shutdown(); if (passes > 0 && !w.isTerminated()) { w.cancelTasks(); LockSupport.unpark(w); @@ -1237,7 +1346,6 @@ public class ForkJoinPool extends Abstra this.workerLock = new ReentrantLock(); this.termination = new Phaser(1); this.poolNumber = poolNumberGenerator.incrementAndGet(); - this.trimTime = System.nanoTime(); } /** @@ -1266,7 +1374,10 @@ public class ForkJoinPool extends Abstra throw new RejectedExecutionException(); submissionQueue.offer(task); advanceEventCount(); - helpMaintainParallelism(); // start or wake up workers + if (eventWaiters != 0L) + releaseEventWaiters(); + if ((workerCounts >>> TOTAL_COUNT_SHIFT) < parallelism) + addWorkerIfBelowTarget(); } /** @@ -1285,9 +1396,6 @@ public class ForkJoinPool extends Abstra /** * Arranges for (asynchronous) execution of the given task. - * If the caller is already engaged in a fork/join computation in - * the current pool, this method is equivalent in effect to - * {@link ForkJoinTask#fork}. * * @param task the task * @throws NullPointerException if the task is null @@ -1316,9 +1424,6 @@ public class ForkJoinPool extends Abstra /** * Submits a ForkJoinTask for execution. - * If the caller is already engaged in a fork/join computation in - * the current pool, this method is equivalent in effect to - * {@link ForkJoinTask#fork}. * * @param task the task to submit * @return the task