--- jsr166/src/jsr166y/ForkJoinPool.java 2010/08/11 18:45:12 1.61 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/08/18 14:05:27 1.65 @@ -110,7 +110,7 @@ import java.util.concurrent.CountDownLat * *
This implementation rejects submitted tasks (that is, by throwing
* {@link RejectedExecutionException}) only when the pool is shut down
- * or internal resources have been exhuasted.
+ * or internal resources have been exhausted.
*
* @since 1.7
* @author Doug Lea
@@ -259,29 +259,24 @@ public class ForkJoinPool extends Abstra
* workers that previously could not find a task to now find one:
* Submission of a new task to the pool, or another worker pushing
* a task onto a previously empty queue. (We also use this
- * mechanism for termination actions that require wakeups of idle
- * workers). Each worker maintains its last known event count,
- * and blocks when a scan for work did not find a task AND its
- * lastEventCount matches the current eventCount. Waiting idle
- * workers are recorded in a variant of Treiber stack headed by
- * field eventWaiters which, when nonzero, encodes the thread
- * index and count awaited for by the worker thread most recently
- * calling eventSync. This thread in turn has a record (field
- * nextEventWaiter) for the next waiting worker. In addition to
- * allowing simpler decisions about need for wakeup, the event
- * count bits in eventWaiters serve the role of tags to avoid ABA
- * errors in Treiber stacks. To reduce delays in task diffusion,
- * workers not otherwise occupied may invoke method
- * releaseEventWaiters, that removes and signals (unparks) workers
- * not waiting on current count. To reduce stalls, To minimize
- * task production stalls associate with signalling, any worker
- * pushing a task on an empty queue invokes the weaker method
- * signalWork, that only releases idle workers until it detects
- * interference by other threads trying to release, and lets them
- * take over. The net effect is a tree-like diffusion of signals,
- * where released threads (and possibly others) help with unparks.
- * To further reduce contention effects a bit, failed CASes to
- * increment field eventCount are tolerated without retries.
+ * mechanism for configuration and termination actions that
+ * require wakeups of idle workers). Each worker maintains its
+ * last known event count, and blocks when a scan for work did not
+ * find a task AND its lastEventCount matches the current
+ * eventCount. Waiting idle workers are recorded in a variant of
+ * Treiber stack headed by field eventWaiters which, when nonzero,
+ * encodes the thread index and count awaited for by the worker
+ * thread most recently calling eventSync. This thread in turn has
+ * a record (field nextEventWaiter) for the next waiting worker.
+ * In addition to allowing simpler decisions about need for
+ * wakeup, the event count bits in eventWaiters serve the role of
+ * tags to avoid ABA errors in Treiber stacks. Upon any wakeup,
+ * released threads also try to release others (but give up upon
+ * contention to reduce useless flailing). The net effect is a
+ * tree-like diffusion of signals, where released threads (and
+ * possibly others) help with unparks. To further reduce
+ * contention effects a bit, failed CASes to increment field
+ * eventCount are tolerated without retries in signalWork.
* Conceptually they are merged into the same event, which is OK
* when their only purpose is to enable workers to scan for work.
*
@@ -304,15 +299,19 @@ public class ForkJoinPool extends Abstra
* may become a spare at about the same time as another is
* needlessly being created. We counteract this and related slop
* in part by requiring resumed spares to immediately recheck (in
- * preStep) to see whether they they should re-suspend. To avoid
- * long-term build-up of spares, the oldest spare (see
- * ForkJoinWorkerThread.suspendAsSpare) occasionally wakes up if
- * not signalled and calls tryTrimSpare, which uses two different
- * thresholds: Always killing if the number of spares is greater
- * that 25% of total, and killing others only at a slower rate
- * (UNUSED_SPARE_TRIM_RATE_NANOS).
+ * preStep) to see whether they they should re-suspend.
*
- * 6. Deciding when to create new workers. The main dynamic
+ * 6. Killing off unneeded workers. The Spare and Event queues use
+ * similar mechanisms to shed unused workers: The oldest (first)
+ * waiter uses a timed rather than hard wait. When this wait times
+ * out without a normal wakeup, it tries to shutdown any one (for
+ * convenience the newest) other waiter via tryShutdownSpare or
+ * tryShutdownWaiter, respectively. The wakeup rates for spares
+ * are much shorter than for waiters. Together, they will
+ * eventually reduce the number of worker threads to a minimum of
+ * one after a long enough period without use.
+ *
+ * 7. Deciding when to create new workers. The main dynamic
* control in this class is deciding when to create extra threads
* in method helpMaintainParallelism. We would like to keep
* exactly #parallelism threads running, which is an impossble
@@ -326,8 +325,10 @@ public class ForkJoinPool extends Abstra
* impedes accuracy. Our main defense is to allow some slack in
* creation thresholds, using rules that reflect the fact that the
* more threads we have running, the more likely that we are
- * underestimating the number running threads. The rules also
- * better cope with the fact that some of the methods in this
+ * underestimating the number running threads. (We also include
+ * some heuristic use of Thread.yield when all workers appear to
+ * be busy, to improve likelihood of counts settling.) The rules
+ * also better cope with the fact that some of the methods in this
* class tend to never become compiled (but are interpreted), so
* some components of the entire set of controls might execute 100
* times faster than others. And similarly for cases where the
@@ -419,6 +420,16 @@ public class ForkJoinPool extends Abstra
new AtomicInteger();
/**
+ * The wakeup interval (in nanoseconds) for the oldest worker
+ * worker waiting for an event invokes tryShutdownWaiter to shrink
+ * the number of workers. The exact value does not matter too
+ * much, but should be long enough to slowly release resources
+ * during long periods without use without disrupting normal use.
+ */
+ private static final long SHRINK_RATE_NANOS =
+ 60L * 1000L * 1000L * 1000L; // one minute
+
+ /**
* Absolute bound for parallelism level. Twice this number plus
* one (i.e., 0xfff) must fit into a 16bit field to enable
* word-packing for some counts and indices.
@@ -463,17 +474,6 @@ public class ForkJoinPool extends Abstra
private volatile long stealCount;
/**
- * The last nanoTime that a spare thread was trimmed
- */
- private volatile long trimTime;
-
- /**
- * The rate at which to trim unused spares
- */
- static final long UNUSED_SPARE_TRIM_RATE_NANOS =
- 1000L * 1000L * 1000L; // 1 sec
-
- /**
* Encoded record of top of treiber stack of threads waiting for
* events. The top 32 bits contain the count being waited for. The
* bottom 16 bits contains one plus the pool index of waiting
@@ -514,8 +514,12 @@ public class ForkJoinPool extends Abstra
* These are bundled together to ensure consistent read for
* termination checks (i.e., that runLevel is at least SHUTDOWN
* and active threads is zero).
+ *
+ * Notes: Most direct CASes are dependent on these bitfield
+ * positions. Also, this field is non-private to enable direct
+ * performance-sensitive CASes in ForkJoinWorkerThread.
*/
- private volatile int runState;
+ volatile int runState;
// Note: The order among run level values matters.
private static final int RUNLEVEL_SHIFT = 16;
@@ -523,7 +527,6 @@ public class ForkJoinPool extends Abstra
private static final int TERMINATING = 1 << (RUNLEVEL_SHIFT + 1);
private static final int TERMINATED = 1 << (RUNLEVEL_SHIFT + 2);
private static final int ACTIVE_COUNT_MASK = (1 << RUNLEVEL_SHIFT) - 1;
- private static final int ONE_ACTIVE = 1; // active update delta
/**
* Holds number of total (i.e., created and not yet terminated)
@@ -565,8 +568,8 @@ public class ForkJoinPool extends Abstra
private final int poolNumber;
- // Utilities for CASing fields. Note that several of these
- // are manually inlined by callers
+ // Utilities for CASing fields. Note that most of these
+ // are usually manually inlined by callers
/**
* Increments running count part of workerCounts
@@ -599,11 +602,12 @@ public class ForkJoinPool extends Abstra
private void decrementWorkerCounts(int dr, int dt) {
for (;;) {
int wc = workerCounts;
- if (wc == 0 && (runState & TERMINATED) != 0)
- return; // lagging termination on a backout
if ((wc & RUNNING_COUNT_MASK) - dr < 0 ||
- (wc >>> TOTAL_COUNT_SHIFT) - dt < 0)
+ (wc >>> TOTAL_COUNT_SHIFT) - dt < 0) {
+ if ((runState & TERMINATED) != 0)
+ return; // lagging termination on a backout
Thread.yield();
+ }
if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
wc, wc - (dr + dt)))
return;
@@ -628,7 +632,7 @@ public class ForkJoinPool extends Abstra
final boolean tryIncrementActiveCount() {
int c;
return UNSAFE.compareAndSwapInt(this, runStateOffset,
- c = runState, c + ONE_ACTIVE);
+ c = runState, c + 1);
}
/**
@@ -638,7 +642,7 @@ public class ForkJoinPool extends Abstra
final boolean tryDecrementActiveCount() {
int c;
return UNSAFE.compareAndSwapInt(this, runStateOffset,
- c = runState, c - ONE_ACTIVE);
+ c = runState, c - 1);
}
/**
@@ -705,8 +709,10 @@ public class ForkJoinPool extends Abstra
* Tries to create and add new worker. Assumes that worker counts
* are already updated to accommodate the worker, so adjusts on
* failure.
+ *
+ * @return the worker, or null on failure
*/
- private void addWorker() {
+ private ForkJoinWorkerThread addWorker() {
ForkJoinWorkerThread w = null;
try {
w = factory.newThread(this);
@@ -716,8 +722,11 @@ public class ForkJoinPool extends Abstra
tryTerminate(false); // in case of failure during shutdown
}
}
- if (w != null)
+ if (w != null) {
w.start(recordWorker(w), ueh);
+ advanceEventCount();
+ }
+ return w;
}
/**
@@ -740,24 +749,22 @@ public class ForkJoinPool extends Abstra
/**
* Releases workers blocked on a count not equal to current count.
* Normally called after precheck that eventWaiters isn't zero to
- * avoid wasted array checks.
- *
- * @param signalling true if caller is a signalling worker so can
- * exit upon (conservatively) detected contention by other threads
- * who will continue to release
+ * avoid wasted array checks. Gives up upon a change in count or
+ * contention, letting other workers take over.
*/
- private void releaseEventWaiters(boolean signalling) {
+ private void releaseEventWaiters() {
ForkJoinWorkerThread[] ws = workers;
int n = ws.length;
- long h; // head of stack
- ForkJoinWorkerThread w; int id, ec;
- while ((id = ((int)((h = eventWaiters) & WAITER_ID_MASK)) - 1) >= 0 &&
- (int)(h >>> EVENT_COUNT_SHIFT) != (ec = eventCount) &&
- id < n && (w = ws[id]) != null) {
- if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
- h, h = w.nextWaiter))
- LockSupport.unpark(w);
- if (signalling && (eventCount != ec || eventWaiters != h))
+ long h = eventWaiters;
+ int ec = eventCount;
+ ForkJoinWorkerThread w; int id;
+ while ((int)(h >>> EVENT_COUNT_SHIFT) != ec &&
+ (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
+ id < n && (w = ws[id]) != null &&
+ UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
+ h, h = w.nextWaiter)) {
+ LockSupport.unpark(w);
+ if (eventWaiters != h || eventCount != ec)
break;
}
}
@@ -770,37 +777,99 @@ public class ForkJoinPool extends Abstra
int c; // try to increment event count -- CAS failure OK
UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
if (eventWaiters != 0L)
- releaseEventWaiters(true);
+ releaseEventWaiters();
}
/**
- * Blocks worker until terminating or event count
- * advances from last value held by worker
+ * Adds the given worker to event queue and blocks until
+ * terminating or event count advances from the workers
+ * lastEventCount value
*
* @param w the calling worker thread
*/
private void eventSync(ForkJoinWorkerThread w) {
- int wec = w.lastEventCount;
- long nh = (((long)wec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
+ int ec = w.lastEventCount;
+ long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
long h;
while ((runState < SHUTDOWN || !tryTerminate(false)) &&
- ((h = eventWaiters) == 0L ||
- (int)(h >>> EVENT_COUNT_SHIFT) == wec) &&
- eventCount == wec) {
+ (((int)((h = eventWaiters) & WAITER_ID_MASK)) == 0 ||
+ (int)(h >>> EVENT_COUNT_SHIFT) == ec) &&
+ eventCount == ec) {
if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
w.nextWaiter = h, nh)) {
- while (runState < TERMINATING && eventCount == wec) {
- if (!tryAccumulateStealCount(w)) // transfer while idle
- continue;
- Thread.interrupted(); // clear/ignore interrupt
- if (eventCount != wec)
- break;
+ awaitEvent(w, ec);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Blocks the given worker (that has already been entered as an
+ * event waiter) until terminating or event count advances from
+ * the given value. The oldest (first) waiter uses a timed wait to
+ * occasionally one-by-one shrink the number of workers (to a
+ * minumum of one) if the pool has not been used for extended
+ * periods.
+ *
+ * @param w the calling worker thread
+ * @param ec the count
+ */
+ private void awaitEvent(ForkJoinWorkerThread w, int ec) {
+ while (eventCount == ec) {
+ if (tryAccumulateStealCount(w)) { // transfer while idle
+ boolean untimed = (w.nextWaiter != 0L ||
+ (workerCounts & RUNNING_COUNT_MASK) <= 1);
+ long startTime = untimed? 0 : System.nanoTime();
+ Thread.interrupted(); // clear/ignore interrupt
+ if (eventCount != ec || !w.isRunning() ||
+ runState >= TERMINATING) // recheck after clear
+ break;
+ if (untimed)
LockSupport.park(w);
+ else {
+ LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
+ if (eventCount != ec || !w.isRunning() ||
+ runState >= TERMINATING)
+ break;
+ if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
+ tryShutdownWaiter(ec);
}
- break;
}
}
- w.lastEventCount = eventCount;
+ }
+
+ /**
+ * Callback from the oldest waiter in awaitEvent waking up after a
+ * period of non-use. Tries (once) to shutdown an event waiter (or
+ * a spare, if one exists). Note that we don't need CAS or locks
+ * here because the method is called only from one thread
+ * occasionally waking (and even misfires are OK). Note that
+ * until the shutdown worker fully terminates, workerCounts
+ * will overestimate total count, which is tolerable.
+ *
+ * @param ec the event count waited on by caller (to abort
+ * attempt if count has since changed).
+ */
+ private void tryShutdownWaiter(int ec) {
+ if (spareWaiters != 0) { // prefer killing spares
+ tryShutdownSpare();
+ return;
+ }
+ ForkJoinWorkerThread[] ws = workers;
+ int n = ws.length;
+ long h = eventWaiters;
+ ForkJoinWorkerThread w; int id; long nh;
+ if (runState == 0 &&
+ submissionQueue.isEmpty() &&
+ eventCount == ec &&
+ (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
+ id < n && (w = ws[id]) != null &&
+ (nh = w.nextWaiter) != 0L && // keep at least one worker
+ UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) {
+ w.shutdown();
+ LockSupport.unpark(w);
+ }
+ releaseEventWaiters();
}
// Maintaining spares
@@ -809,166 +878,206 @@ public class ForkJoinPool extends Abstra
* Pushes worker onto the spare stack
*/
final void pushSpare(ForkJoinWorkerThread w) {
- int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex+1);
+ int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
do {} while (!UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
w.nextSpare = spareWaiters,ns));
}
/**
- * Tries (once) to resume a spare if running count is less than
- * target parallelism. Fails on contention or stale workers.
+ * Callback from oldest spare occasionally waking up. Tries
+ * (once) to shutdown a spare. Same idea as tryShutdownWaiter.
*/
- private void tryResumeSpare() {
+ final void tryShutdownSpare() {
int sw, id;
ForkJoinWorkerThread w;
ForkJoinWorkerThread[] ws;
if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
id < (ws = workers).length && (w = ws[id]) != null &&
- (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
- eventWaiters == 0L &&
- spareWaiters == sw &&
+ (workerCounts & RUNNING_COUNT_MASK) >= parallelism &&
UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
- sw, w.nextSpare) &&
- w.tryUnsuspend()) {
- int c; // try increment; if contended, finish after unpark
- boolean inc = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- c = workerCounts,
- c + ONE_RUNNING);
+ sw, w.nextSpare)) {
+ w.shutdown();
LockSupport.unpark(w);
- if (!inc) {
- do {} while(!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- c = workerCounts,
- c + ONE_RUNNING));
+ advanceEventCount();
+ }
+ }
+
+ /**
+ * Tries (once) to resume a spare if worker counts match
+ * the given count.
+ *
+ * @param wc workerCounts value on invocation of this method
+ */
+ private void tryResumeSpare(int wc) {
+ ForkJoinWorkerThread[] ws = workers;
+ int n = ws.length;
+ int sw, id, rs; ForkJoinWorkerThread w;
+ if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
+ id < n && (w = ws[id]) != null &&
+ (rs = runState) < TERMINATING &&
+ eventWaiters == 0L && workerCounts == wc) {
+ // In case all workers busy, heuristically back off to let settle
+ Thread.yield();
+ if (eventWaiters == 0L && runState == rs && // recheck
+ workerCounts == wc && spareWaiters == sw &&
+ UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
+ sw, w.nextSpare)) {
+ int c; // increment running count before resume
+ do {} while(!UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ c = workerCounts, c + ONE_RUNNING));
+ if (w.tryUnsuspend())
+ LockSupport.unpark(w);
+ else // back out if w was shutdown
+ decrementWorkerCounts(ONE_RUNNING, 0);
}
}
}
+ // adding workers on demand
+
/**
- * Callback from oldest spare occasionally waking up. Tries
- * (once) to shutdown a spare if more than 25% spare overage, or
- * if UNUSED_SPARE_TRIM_RATE_NANOS have elapsed and there are at
- * least #parallelism running threads. Note that we don't need CAS
- * or locks here because the method is called only from the oldest
- * suspended spare occasionally waking (and even misfires are OK).
- *
- * @param now the wake up nanoTime of caller
- */
- final void tryTrimSpare(long now) {
- long lastTrim = trimTime;
- trimTime = now;
- helpMaintainParallelism(); // first, help wake up any needed spares
- int sw, id;
- ForkJoinWorkerThread w;
- ForkJoinWorkerThread[] ws;
+ * Adds one or more workers if needed to establish target parallelism.
+ * Retries upon contention.
+ */
+ private void addWorkerIfBelowTarget() {
int pc = parallelism;
- int wc = workerCounts;
- if ((wc & RUNNING_COUNT_MASK) >= pc &&
- (((wc >>> TOTAL_COUNT_SHIFT) - pc) > (pc >>> 2) + 1 ||// approx 25%
- now - lastTrim >= UNUSED_SPARE_TRIM_RATE_NANOS) &&
- (id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
- id < (ws = workers).length && (w = ws[id]) != null &&
- UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
- sw, w.nextSpare))
- w.shutdown(false);
+ int wc;
+ while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < pc &&
+ runState < TERMINATING) {
+ if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
+ wc + (ONE_RUNNING|ONE_TOTAL))) {
+ if (addWorker() == null)
+ break;
+ }
+ }
+ }
+
+ /**
+ * Tries (once) to add a new worker if all existing workers are
+ * busy, and there are either no running workers or the deficit is
+ * at least twice the surplus.
+ *
+ * @param wc workerCounts value on invocation of this method
+ */
+ private void tryAddWorkerIfBusy(int wc) {
+ int tc, rc, rs;
+ int pc = parallelism;
+ if ((tc = wc >>> TOTAL_COUNT_SHIFT) < MAX_WORKERS &&
+ ((rc = wc & RUNNING_COUNT_MASK) == 0 ||
+ rc < pc - ((tc - pc) << 1)) &&
+ (rs = runState) < TERMINATING &&
+ (rs & ACTIVE_COUNT_MASK) == tc) {
+ // Since all workers busy, heuristically back off to let settle
+ Thread.yield();
+ if (eventWaiters == 0L && spareWaiters == 0 && // recheck
+ runState == rs && workerCounts == wc &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
+ wc + (ONE_RUNNING|ONE_TOTAL)))
+ addWorker();
+ }
}
/**
* Does at most one of:
*
* 1. Help wake up existing workers waiting for work via
- * releaseEventWaiters. (If any exist, then it probably doesn't
+ * releaseEventWaiters. (If any exist, then it doesn't
* matter right now if under target parallelism level.)
*
- * 2. If below parallelism level and a spare exists, try (once)
- * to resume it via tryResumeSpare.
+ * 2. If a spare exists, try (once) to resume it via tryResumeSpare.
*
- * 3. If neither of the above, tries (once) to add a new
- * worker if either there are not enough total, or if all
- * existing workers are busy, there are either no running
- * workers or the deficit is at least twice the surplus.
+ * 3. If there are not enough total workers, add some
+ * via addWorkerIfBelowTarget;
+ *
+ * 4. Try (once) to add a new worker if all existing workers
+ * are busy, via tryAddWorkerIfBusy
*/
private void helpMaintainParallelism() {
- // uglified to work better when not compiled
- int pc, wc, rc, tc, rs; long h;
- if ((h = eventWaiters) != 0L) {
+ long h; int pc, wc;
+ if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0) {
if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
- releaseEventWaiters(false); // avoid useless call
+ releaseEventWaiters(); // avoid useless call
}
else if ((pc = parallelism) >
- (rc = ((wc = workerCounts) & RUNNING_COUNT_MASK))) {
+ ((wc = workerCounts) & RUNNING_COUNT_MASK)) {
if (spareWaiters != 0)
- tryResumeSpare();
- else if ((rs = runState) < TERMINATING &&
- ((tc = wc >>> TOTAL_COUNT_SHIFT) < pc ||
- (tc == (rs & ACTIVE_COUNT_MASK) && // all busy
- (rc == 0 || // must add
- rc < pc - ((tc - pc) << 1)) && // within slack
- tc < MAX_WORKERS && runState == rs)) && // recheck busy
- workerCounts == wc &&
- UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
- wc + (ONE_RUNNING|ONE_TOTAL)))
- addWorker();
+ tryResumeSpare(wc);
+ else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
+ addWorkerIfBelowTarget();
+ else
+ tryAddWorkerIfBusy(wc);
}
}
/**
* Callback from workers invoked upon each top-level action (i.e.,
- * stealing a task or taking a submission and running
- * it). Performs one or more of the following:
+ * stealing a task or taking a submission and running it).
+ * Performs one or more of the following:
*
- * 1. If the worker cannot find work (misses > 0), updates its
- * active status to inactive and updates activeCount unless
- * this is the first miss and there is contention, in which
- * case it may try again (either in this or a subsequent
- * call).
+ * 1. If the worker is active, try to set its active status to
+ * inactive and update activeCount. On contention, we may try
+ * again on this or subsequent call.
*
- * 2. If there are at least 2 misses, awaits the next task event
- * via eventSync
+ * 2. Release any existing event waiters that are now relesable
*
- * 3. If there are too many running threads, suspends this worker
- * (first forcing inactivation if necessary). If it is not
+ * 3. If there are too many running threads, suspend this worker
+ * (first forcing inactive if necessary). If it is not
* needed, it may be killed while suspended via
- * tryTrimSpare. Otherwise, upon resume it rechecks to make
+ * tryShutdownSpare. Otherwise, upon resume it rechecks to make
* sure that it is still needed.
*
- * 4. Helps release and/or reactivate other workers via
- * helpMaintainParallelism
+ * 4. If more than 1 miss, await the next task event via
+ * eventSync (first forcing inactivation if necessary), upon
+ * which worker may also be killed, via tryShutdownWaiter.
+ *
+ * 5. Help reactivate other workers via helpMaintainParallelism
*
* @param w the worker
* @param misses the number of scans by caller failing to find work
- * (saturating at 2 just to avoid wraparound)
+ * (saturating at 2 to avoid wraparound)
*/
final void preStep(ForkJoinWorkerThread w, int misses) {
boolean active = w.active;
int pc = parallelism;
for (;;) {
- int wc = workerCounts;
- int rc = wc & RUNNING_COUNT_MASK;
- if (active && (misses > 0 || rc > pc)) {
- int rs; // try inactivate
- if (UNSAFE.compareAndSwapInt(this, runStateOffset,
- rs = runState, rs - ONE_ACTIVE))
- active = w.active = false;
- else if (misses > 1 || rc > pc ||
- (rs & ACTIVE_COUNT_MASK) >= pc)
- continue; // force inactivate
+ int rs, wc, rc, ec; long h;
+ if (active && UNSAFE.compareAndSwapInt(this, runStateOffset,
+ rs = runState, rs - 1))
+ active = w.active = false;
+ if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0 &&
+ (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) {
+ releaseEventWaiters();
+ if (misses > 1)
+ continue; // clear before sync below
}
- if (misses > 1) {
- misses = 0; // don't re-sync
- eventSync(w); // continue loop to recheck rc
- }
- else if (rc > pc) {
- if (workerCounts == wc && // try to suspend as spare
+ if ((rc = ((wc = workerCounts) & RUNNING_COUNT_MASK)) > pc) {
+ if (!active && // must inactivate to suspend
+ workerCounts == wc && // try to suspend as spare
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc, wc - ONE_RUNNING) &&
- !w.suspendAsSpare()) // false if killed
- break;
+ wc, wc - ONE_RUNNING)) {
+ w.suspendAsSpare();
+ if (!w.isRunning())
+ break; // was killed while spare
+ }
+ continue;
}
- else {
- if (rc < pc || eventWaiters != 0L)
- helpMaintainParallelism();
- break;
+ if (misses > 0) {
+ if ((ec = eventCount) == w.lastEventCount && misses > 1) {
+ if (!active) { // must inactivate to sync
+ eventSync(w);
+ if (w.isRunning())
+ misses = 1; // don't re-sync
+ else
+ break; // was killed while waiting
+ }
+ continue;
+ }
+ w.lastEventCount = ec;
}
+ if (rc < pc)
+ helpMaintainParallelism();
+ break;
}
}
@@ -1073,10 +1182,10 @@ public class ForkJoinPool extends Abstra
* Actions on transition to TERMINATING
*
* Runs up to four passes through workers: (0) shutting down each
- * quietly (without waking up if parked) to quickly spread
- * notifications without unnecessary bouncing around event queues
- * etc (1) wake up and help cancel tasks (2) interrupt (3) mop up
- * races with interrupted workers
+ * (without waking up if parked) to quickly spread notifications
+ * without unnecessary bouncing around event queues etc (1) wake
+ * up and help cancel tasks (2) interrupt (3) mop up races with
+ * interrupted workers
*/
private void startTerminating() {
cancelSubmissions();
@@ -1089,7 +1198,7 @@ public class ForkJoinPool extends Abstra
for (int i = 0; i < n; ++i) {
ForkJoinWorkerThread w = ws[i];
if (w != null) {
- w.shutdown(true);
+ w.shutdown();
if (passes > 0 && !w.isTerminated()) {
w.cancelTasks();
LockSupport.unpark(w);
@@ -1237,7 +1346,6 @@ public class ForkJoinPool extends Abstra
this.workerLock = new ReentrantLock();
this.termination = new Phaser(1);
this.poolNumber = poolNumberGenerator.incrementAndGet();
- this.trimTime = System.nanoTime();
}
/**
@@ -1266,14 +1374,14 @@ public class ForkJoinPool extends Abstra
throw new RejectedExecutionException();
submissionQueue.offer(task);
advanceEventCount();
- helpMaintainParallelism(); // start or wake up workers
+ if (eventWaiters != 0L)
+ releaseEventWaiters();
+ if ((workerCounts >>> TOTAL_COUNT_SHIFT) < parallelism)
+ addWorkerIfBelowTarget();
}
/**
* Performs the given task, returning its result upon completion.
- * If the caller is already engaged in a fork/join computation in
- * the current pool, this method is equivalent in effect to
- * {@link ForkJoinTask#invoke}.
*
* @param task the task
* @return the task's result
@@ -1288,9 +1396,6 @@ public class ForkJoinPool extends Abstra
/**
* Arranges for (asynchronous) execution of the given task.
- * If the caller is already engaged in a fork/join computation in
- * the current pool, this method is equivalent in effect to
- * {@link ForkJoinTask#fork}.
*
* @param task the task
* @throws NullPointerException if the task is null
@@ -1319,9 +1424,6 @@ public class ForkJoinPool extends Abstra
/**
* Submits a ForkJoinTask for execution.
- * If the caller is already engaged in a fork/join computation in
- * the current pool, this method is equivalent in effect to
- * {@link ForkJoinTask#fork}.
*
* @param task the task to submit
* @return the task
@@ -1753,11 +1855,11 @@ public class ForkJoinPool extends Abstra
* QueueTaker(BlockingQueue