--- jsr166/src/jsr166y/ForkJoinPool.java 2010/08/18 14:05:27 1.65
+++ jsr166/src/jsr166y/ForkJoinPool.java 2010/10/24 19:37:26 1.83
@@ -6,17 +6,22 @@
package jsr166y;
-import java.util.concurrent.*;
-
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.CountDownLatch;
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
@@ -69,7 +74,7 @@ import java.util.concurrent.CountDownLat
*
Call from within fork/join computations |
*
*
- * Arange async execution |
+ * Arrange async execution |
* {@link #execute(ForkJoinTask)} |
* {@link ForkJoinTask#fork} |
*
@@ -140,7 +145,7 @@ public class ForkJoinPool extends Abstra
* Beyond work-stealing support and essential bookkeeping, the
* main responsibility of this framework is to take actions when
* one worker is waiting to join a task stolen (or always held by)
- * another. Becauae we are multiplexing many tasks on to a pool
+ * another. Because we are multiplexing many tasks on to a pool
* of workers, we can't just let them block (as in Thread.join).
* We also cannot just reassign the joiner's run-time stack with
* another and replace it later, which would be a form of
@@ -157,19 +162,21 @@ public class ForkJoinPool extends Abstra
* links to try to find such a task.
*
* Compensating: Unless there are already enough live threads,
- * method helpMaintainParallelism() may create or or
+ * method helpMaintainParallelism() may create or
* re-activate a spare thread to compensate for blocked
* joiners until they unblock.
*
- * Because the determining existence of conservatively safe
- * helping targets, the availability of already-created spares,
- * and the apparent need to create new spares are all racy and
- * require heuristic guidance, we rely on multiple retries of
- * each. Further, because it is impossible to keep exactly the
- * target (parallelism) number of threads running at any given
- * time, we allow compensation during joins to fail, and enlist
- * all other threads to help out whenever they are not otherwise
- * occupied (i.e., mainly in method preStep).
+ * It is impossible to keep exactly the target (parallelism)
+ * number of threads running at any given time. Determining
+ * existence of conservatively safe helping targets, the
+ * availability of already-created spares, and the apparent need
+ * to create new spares are all racy and require heuristic
+ * guidance, so we rely on multiple retries of each. Compensation
+ * occurs in slow-motion. It is triggered only upon timeouts of
+ * Object.wait used for joins. This reduces poor decisions that
+ * would otherwise be made when threads are waiting for others
+ * that are stalled because of unrelated activities such as
+ * garbage collection.
*
* The ManagedBlocker extension API can't use helping so relies
* only on compensation in method awaitBlocker.
@@ -224,11 +231,11 @@ public class ForkJoinPool extends Abstra
* ManagedBlocker), we may create or resume others to take their
* place until they unblock (see below). Implementing this
* requires counts of the number of "running" threads (i.e., those
- * that are neither blocked nor artifically suspended) as well as
+ * that are neither blocked nor artificially suspended) as well as
* the total number. These two values are packed into one field,
* "workerCounts" because we need accurate snapshots when deciding
* to create, resume or suspend. Note however that the
- * correspondance of these counts to reality is not guaranteed. In
+ * correspondence of these counts to reality is not guaranteed. In
* particular updates for unblocked threads may lag until they
* actually wake up.
*
@@ -271,69 +278,65 @@ public class ForkJoinPool extends Abstra
* In addition to allowing simpler decisions about need for
* wakeup, the event count bits in eventWaiters serve the role of
* tags to avoid ABA errors in Treiber stacks. Upon any wakeup,
- * released threads also try to release others (but give up upon
- * contention to reduce useless flailing). The net effect is a
- * tree-like diffusion of signals, where released threads (and
- * possibly others) help with unparks. To further reduce
- * contention effects a bit, failed CASes to increment field
- * eventCount are tolerated without retries in signalWork.
+ * released threads also try to release at most two others. The
+ * net effect is a tree-like diffusion of signals, where released
+ * threads (and possibly others) help with unparks. To further
+ * reduce contention effects a bit, failed CASes to increment
+ * field eventCount are tolerated without retries in signalWork.
* Conceptually they are merged into the same event, which is OK
* when their only purpose is to enable workers to scan for work.
*
- * 5. Managing suspension of extra workers. When a worker is about
- * to block waiting for a join (or via ManagedBlockers), we may
- * create a new thread to maintain parallelism level, or at least
- * avoid starvation. Usually, extra threads are needed for only
- * very short periods, yet join dependencies are such that we
- * sometimes need them in bursts. Rather than create new threads
- * each time this happens, we suspend no-longer-needed extra ones
- * as "spares". For most purposes, we don't distinguish "extra"
- * spare threads from normal "core" threads: On each call to
- * preStep (the only point at which we can do this) a worker
- * checks to see if there are now too many running workers, and if
- * so, suspends itself. Method helpMaintainParallelism looks for
- * suspended threads to resume before considering creating a new
- * replacement. The spares themselves are encoded on another
- * variant of a Treiber Stack, headed at field "spareWaiters".
- * Note that the use of spares is intrinsically racy. One thread
- * may become a spare at about the same time as another is
- * needlessly being created. We counteract this and related slop
- * in part by requiring resumed spares to immediately recheck (in
- * preStep) to see whether they they should re-suspend.
- *
- * 6. Killing off unneeded workers. The Spare and Event queues use
- * similar mechanisms to shed unused workers: The oldest (first)
- * waiter uses a timed rather than hard wait. When this wait times
- * out without a normal wakeup, it tries to shutdown any one (for
- * convenience the newest) other waiter via tryShutdownSpare or
- * tryShutdownWaiter, respectively. The wakeup rates for spares
- * are much shorter than for waiters. Together, they will
- * eventually reduce the number of worker threads to a minimum of
- * one after a long enough period without use.
+ * 5. Managing suspension of extra workers. When a worker notices
+ * (usually upon timeout of a wait()) that there are too few
+ * running threads, we may create a new thread to maintain
+ * parallelism level, or at least avoid starvation. Usually, extra
+ * threads are needed for only very short periods, yet join
+ * dependencies are such that we sometimes need them in
+ * bursts. Rather than create new threads each time this happens,
+ * we suspend no-longer-needed extra ones as "spares". For most
+ * purposes, we don't distinguish "extra" spare threads from
+ * normal "core" threads: On each call to preStep (the only point
+ * at which we can do this) a worker checks to see if there are
+ * now too many running workers, and if so, suspends itself.
+ * Method helpMaintainParallelism looks for suspended threads to
+ * resume before considering creating a new replacement. The
+ * spares themselves are encoded on another variant of a Treiber
+ * Stack, headed at field "spareWaiters". Note that the use of
+ * spares is intrinsically racy. One thread may become a spare at
+ * about the same time as another is needlessly being created. We
+ * counteract this and related slop in part by requiring resumed
+ * spares to immediately recheck (in preStep) to see whether they
+ * should re-suspend.
+ *
+ * 6. Killing off unneeded workers. A timeout mechanism is used to
+ * shed unused workers: The oldest (first) event queue waiter uses
+ * a timed rather than hard wait. When this wait times out without
+ * a normal wakeup, it tries to shutdown any one (for convenience
+ * the newest) other spare or event waiter via
+ * tryShutdownUnusedWorker. This eventually reduces the number of
+ * worker threads to a minimum of one after a long enough period
+ * without use.
*
* 7. Deciding when to create new workers. The main dynamic
* control in this class is deciding when to create extra threads
* in method helpMaintainParallelism. We would like to keep
- * exactly #parallelism threads running, which is an impossble
+ * exactly #parallelism threads running, which is an impossible
* task. We always need to create one when the number of running
* threads would become zero and all workers are busy. Beyond
- * this, we must rely on heuristics that work well in the the
- * presence of transients phenomena such as GC stalls, dynamic
+ * this, we must rely on heuristics that work well in the
+ * presence of transient phenomena such as GC stalls, dynamic
* compilation, and wake-up lags. These transients are extremely
* common -- we are normally trying to fully saturate the CPUs on
* a machine, so almost any activity other than running tasks
- * impedes accuracy. Our main defense is to allow some slack in
- * creation thresholds, using rules that reflect the fact that the
- * more threads we have running, the more likely that we are
- * underestimating the number running threads. (We also include
- * some heuristic use of Thread.yield when all workers appear to
- * be busy, to improve likelihood of counts settling.) The rules
- * also better cope with the fact that some of the methods in this
- * class tend to never become compiled (but are interpreted), so
- * some components of the entire set of controls might execute 100
- * times faster than others. And similarly for cases where the
- * apparent lack of work is just due to GC stalls and other
- * transient system activity.
+ * impedes accuracy. Our main defense is to allow parallelism to
+ * lapse for a while during joins, and use a timeout to see if,
+ * after the resulting settling, there is still a need for
+ * additional workers. This also better copes with the fact that
+ * some of the methods in this class tend to never become compiled
+ * (but are interpreted), so some components of the entire set of
+ * controls might execute 100 times faster than others. And
+ * similarly for cases where the apparent lack of work is just due
+ * to GC stalls and other transient system activity.
*
* Beware that there is a lot of representation-level coupling
* among classes ForkJoinPool, ForkJoinWorkerThread, and
@@ -348,7 +351,7 @@ public class ForkJoinPool extends Abstra
* "while ((local = field) != 0)") which are usually the simplest
* way to ensure the required read orderings (which are sometimes
* critical). Also several occurrences of the unusual "do {}
- * while(!cas...)" which is the simplest way to force an update of
+ * while (!cas...)" which is the simplest way to force an update of
* a CAS'ed variable. There are also other coding oddities that
* help some methods perform reasonably even when interpreted (not
* compiled), at the expense of some messy constructions that
@@ -420,14 +423,26 @@ public class ForkJoinPool extends Abstra
new AtomicInteger();
/**
+ * The time to block in a join (see awaitJoin) before checking if
+ * a new worker should be (re)started to maintain parallelism
+ * level. The value should be short enough to maintain global
+ * responsiveness and progress but long enough to avoid
+ * counterproductive firings during GC stalls or unrelated system
+ * activity, and to not bog down systems with continual re-firings
+ * on GCs or legitimately long waits.
+ */
+ private static final long JOIN_TIMEOUT_MILLIS = 250L; // 4 per second
+
+ /**
* The wakeup interval (in nanoseconds) for the oldest worker
- * worker waiting for an event invokes tryShutdownWaiter to shrink
- * the number of workers. The exact value does not matter too
- * much, but should be long enough to slowly release resources
- * during long periods without use without disrupting normal use.
+ * waiting for an event to invoke tryShutdownUnusedWorker to
+ * shrink the number of workers. The exact value does not matter
+ * too much. It must be short enough to release resources during
+ * sustained periods of idleness, but not so short that threads
+ * are continually re-created.
*/
private static final long SHRINK_RATE_NANOS =
- 60L * 1000L * 1000L * 1000L; // one minute
+ 30L * 1000L * 1000L * 1000L; // 2 per minute
/**
* Absolute bound for parallelism level. Twice this number plus
@@ -474,7 +489,7 @@ public class ForkJoinPool extends Abstra
private volatile long stealCount;
/**
- * Encoded record of top of treiber stack of threads waiting for
+ * Encoded record of top of Treiber stack of threads waiting for
* events. The top 32 bits contain the count being waited for. The
* bottom 16 bits contains one plus the pool index of waiting
* worker thread. (Bits 16-31 are unused.)
@@ -493,7 +508,7 @@ public class ForkJoinPool extends Abstra
private volatile int eventCount;
/**
- * Encoded record of top of treiber stack of spare threads waiting
+ * Encoded record of top of Treiber stack of spare threads waiting
* for resumption. The top 16 bits contain an arbitrary count to
* avoid ABA effects. The bottom 16bits contains one plus the pool
* index of waiting worker thread.
@@ -507,7 +522,7 @@ public class ForkJoinPool extends Abstra
* Lifecycle control. The low word contains the number of workers
* that are (probably) executing tasks. This value is atomically
* incremented before a worker gets a task to run, and decremented
- * when worker has no tasks and cannot find any. Bits 16-18
+ * when a worker has no tasks and cannot find any. Bits 16-18
* contain runLevel value. When all are zero, the pool is
* running. Level transitions are monotonic (running -> shutdown
* -> terminating -> terminated) so each transition adds a bit.
@@ -567,7 +582,6 @@ public class ForkJoinPool extends Abstra
*/
private final int poolNumber;
-
// Utilities for CASing fields. Note that most of these
// are usually manually inlined by callers
@@ -597,7 +611,7 @@ public class ForkJoinPool extends Abstra
* (rarely) necessary when other count updates lag.
*
* @param dr -- either zero or ONE_RUNNING
- * @param dt == either zero or ONE_TOTAL
+ * @param dt -- either zero or ONE_TOTAL
*/
private void decrementWorkerCounts(int dr, int dt) {
for (;;) {
@@ -615,27 +629,6 @@ public class ForkJoinPool extends Abstra
}
/**
- * Increments event count
- */
- private void advanceEventCount() {
- int c;
- do {} while(!UNSAFE.compareAndSwapInt(this, eventCountOffset,
- c = eventCount, c+1));
- }
-
- /**
- * Tries incrementing active count; fails on contention.
- * Called by workers before executing tasks.
- *
- * @return true on success
- */
- final boolean tryIncrementActiveCount() {
- int c;
- return UNSAFE.compareAndSwapInt(this, runStateOffset,
- c = runState, c + 1);
- }
-
- /**
* Tries decrementing active count; fails on contention.
* Called when workers cannot find tasks to run.
*/
@@ -687,11 +680,11 @@ public class ForkJoinPool extends Abstra
}
/**
- * Nulls out record of worker in workers array
+ * Nulls out record of worker in workers array.
*/
private void forgetWorker(ForkJoinWorkerThread w) {
int idx = w.poolIndex;
- // Locking helps method recordWorker avoid unecessary expansion
+ // Locking helps method recordWorker avoid unnecessary expansion
final ReentrantLock lock = this.workerLock;
lock.lock();
try {
@@ -703,42 +696,16 @@ public class ForkJoinPool extends Abstra
}
}
- // adding and removing workers
-
- /**
- * Tries to create and add new worker. Assumes that worker counts
- * are already updated to accommodate the worker, so adjusts on
- * failure.
- *
- * @return the worker, or null on failure
- */
- private ForkJoinWorkerThread addWorker() {
- ForkJoinWorkerThread w = null;
- try {
- w = factory.newThread(this);
- } finally { // Adjust on either null or exceptional factory return
- if (w == null) {
- decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
- tryTerminate(false); // in case of failure during shutdown
- }
- }
- if (w != null) {
- w.start(recordWorker(w), ueh);
- advanceEventCount();
- }
- return w;
- }
-
/**
* Final callback from terminating worker. Removes record of
* worker from array, and adjusts counts. If pool is shutting
- * down, tries to complete terminatation.
+ * down, tries to complete termination.
*
* @param w the worker
*/
final void workerTerminated(ForkJoinWorkerThread w) {
forgetWorker(w);
- decrementWorkerCounts(w.isTrimmed()? 0 : ONE_RUNNING, ONE_TOTAL);
+ decrementWorkerCounts(w.isTrimmed() ? 0 : ONE_RUNNING, ONE_TOTAL);
while (w.stealCount != 0) // collect final count
tryAccumulateStealCount(w);
tryTerminate(false);
@@ -750,22 +717,28 @@ public class ForkJoinPool extends Abstra
* Releases workers blocked on a count not equal to current count.
* Normally called after precheck that eventWaiters isn't zero to
* avoid wasted array checks. Gives up upon a change in count or
- * contention, letting other workers take over.
+ * upon releasing two workers, letting others take over.
*/
private void releaseEventWaiters() {
ForkJoinWorkerThread[] ws = workers;
int n = ws.length;
long h = eventWaiters;
int ec = eventCount;
+ boolean releasedOne = false;
ForkJoinWorkerThread w; int id;
- while ((int)(h >>> EVENT_COUNT_SHIFT) != ec &&
- (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
- id < n && (w = ws[id]) != null &&
- UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
- h, h = w.nextWaiter)) {
- LockSupport.unpark(w);
- if (eventWaiters != h || eventCount != ec)
+ while ((id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
+ (int)(h >>> EVENT_COUNT_SHIFT) != ec &&
+ id < n && (w = ws[id]) != null) {
+ if (UNSAFE.compareAndSwapLong(this, eventWaitersOffset,
+ h, w.nextWaiter)) {
+ LockSupport.unpark(w);
+ if (releasedOne) // exit on second release
+ break;
+ releasedOne = true;
+ }
+ if (eventCount != ec)
break;
+ h = eventWaiters;
}
}
@@ -782,13 +755,12 @@ public class ForkJoinPool extends Abstra
/**
* Adds the given worker to event queue and blocks until
- * terminating or event count advances from the workers
- * lastEventCount value
+ * terminating or event count advances from the given value
*
* @param w the calling worker thread
+ * @param ec the count
*/
- private void eventSync(ForkJoinWorkerThread w) {
- int ec = w.lastEventCount;
+ private void eventSync(ForkJoinWorkerThread w, int ec) {
long nh = (((long)ec) << EVENT_COUNT_SHIFT) | ((long)(w.poolIndex+1));
long h;
while ((runState < SHUTDOWN || !tryTerminate(false)) &&
@@ -808,7 +780,7 @@ public class ForkJoinPool extends Abstra
* event waiter) until terminating or event count advances from
* the given value. The oldest (first) waiter uses a timed wait to
* occasionally one-by-one shrink the number of workers (to a
- * minumum of one) if the pool has not been used for extended
+ * minimum of one) if the pool has not been used for extended
* periods.
*
* @param w the calling worker thread
@@ -819,63 +791,27 @@ public class ForkJoinPool extends Abstra
if (tryAccumulateStealCount(w)) { // transfer while idle
boolean untimed = (w.nextWaiter != 0L ||
(workerCounts & RUNNING_COUNT_MASK) <= 1);
- long startTime = untimed? 0 : System.nanoTime();
+ long startTime = untimed ? 0 : System.nanoTime();
Thread.interrupted(); // clear/ignore interrupt
- if (eventCount != ec || !w.isRunning() ||
- runState >= TERMINATING) // recheck after clear
- break;
+ if (eventCount != ec || w.isTerminating())
+ break; // recheck after clear
if (untimed)
LockSupport.park(w);
else {
LockSupport.parkNanos(w, SHRINK_RATE_NANOS);
- if (eventCount != ec || !w.isRunning() ||
- runState >= TERMINATING)
+ if (eventCount != ec || w.isTerminating())
break;
if (System.nanoTime() - startTime >= SHRINK_RATE_NANOS)
- tryShutdownWaiter(ec);
+ tryShutdownUnusedWorker(ec);
}
}
}
}
- /**
- * Callback from the oldest waiter in awaitEvent waking up after a
- * period of non-use. Tries (once) to shutdown an event waiter (or
- * a spare, if one exists). Note that we don't need CAS or locks
- * here because the method is called only from one thread
- * occasionally waking (and even misfires are OK). Note that
- * until the shutdown worker fully terminates, workerCounts
- * will overestimate total count, which is tolerable.
- *
- * @param ec the event count waited on by caller (to abort
- * attempt if count has since changed).
- */
- private void tryShutdownWaiter(int ec) {
- if (spareWaiters != 0) { // prefer killing spares
- tryShutdownSpare();
- return;
- }
- ForkJoinWorkerThread[] ws = workers;
- int n = ws.length;
- long h = eventWaiters;
- ForkJoinWorkerThread w; int id; long nh;
- if (runState == 0 &&
- submissionQueue.isEmpty() &&
- eventCount == ec &&
- (id = ((int)(h & WAITER_ID_MASK)) - 1) >= 0 &&
- id < n && (w = ws[id]) != null &&
- (nh = w.nextWaiter) != 0L && // keep at least one worker
- UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh)) {
- w.shutdown();
- LockSupport.unpark(w);
- }
- releaseEventWaiters();
- }
-
- // Maintaining spares
+ // Maintaining parallelism
/**
- * Pushes worker onto the spare stack
+ * Pushes worker onto the spare stack.
*/
final void pushSpare(ForkJoinWorkerThread w) {
int ns = (++w.spareCount << SPARE_COUNT_SHIFT) | (w.poolIndex + 1);
@@ -884,130 +820,124 @@ public class ForkJoinPool extends Abstra
}
/**
- * Callback from oldest spare occasionally waking up. Tries
- * (once) to shutdown a spare. Same idea as tryShutdownWaiter.
+ * Tries (once) to resume a spare if the number of running
+ * threads is less than target.
*/
- final void tryShutdownSpare() {
+ private void tryResumeSpare() {
int sw, id;
- ForkJoinWorkerThread w;
- ForkJoinWorkerThread[] ws;
- if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
- id < (ws = workers).length && (w = ws[id]) != null &&
- (workerCounts & RUNNING_COUNT_MASK) >= parallelism &&
- UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
- sw, w.nextSpare)) {
- w.shutdown();
- LockSupport.unpark(w);
- advanceEventCount();
- }
- }
-
- /**
- * Tries (once) to resume a spare if worker counts match
- * the given count.
- *
- * @param wc workerCounts value on invocation of this method
- */
- private void tryResumeSpare(int wc) {
ForkJoinWorkerThread[] ws = workers;
int n = ws.length;
- int sw, id, rs; ForkJoinWorkerThread w;
- if ((id = ((sw = spareWaiters) & SPARE_ID_MASK) - 1) >= 0 &&
+ ForkJoinWorkerThread w;
+ if ((sw = spareWaiters) != 0 &&
+ (id = (sw & SPARE_ID_MASK) - 1) >= 0 &&
id < n && (w = ws[id]) != null &&
- (rs = runState) < TERMINATING &&
- eventWaiters == 0L && workerCounts == wc) {
- // In case all workers busy, heuristically back off to let settle
- Thread.yield();
- if (eventWaiters == 0L && runState == rs && // recheck
- workerCounts == wc && spareWaiters == sw &&
- UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
- sw, w.nextSpare)) {
- int c; // increment running count before resume
- do {} while(!UNSAFE.compareAndSwapInt
- (this, workerCountsOffset,
- c = workerCounts, c + ONE_RUNNING));
- if (w.tryUnsuspend())
- LockSupport.unpark(w);
- else // back out if w was shutdown
- decrementWorkerCounts(ONE_RUNNING, 0);
- }
+ (workerCounts & RUNNING_COUNT_MASK) < parallelism &&
+ spareWaiters == sw &&
+ UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
+ sw, w.nextSpare)) {
+ int c; // increment running count before resume
+ do {} while (!UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ c = workerCounts, c + ONE_RUNNING));
+ if (w.tryUnsuspend())
+ LockSupport.unpark(w);
+ else // back out if w was shutdown
+ decrementWorkerCounts(ONE_RUNNING, 0);
}
}
- // adding workers on demand
-
/**
- * Adds one or more workers if needed to establish target parallelism.
- * Retries upon contention.
+ * Tries to increase the number of running workers if below target
+ * parallelism: If a spare exists tries to resume it via
+ * tryResumeSpare. Otherwise, if not enough total workers or all
+ * existing workers are busy, adds a new worker. In all cases also
+ * helps wake up releasable workers waiting for work.
*/
- private void addWorkerIfBelowTarget() {
+ private void helpMaintainParallelism() {
int pc = parallelism;
- int wc;
- while (((wc = workerCounts) >>> TOTAL_COUNT_SHIFT) < pc &&
- runState < TERMINATING) {
- if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
- wc + (ONE_RUNNING|ONE_TOTAL))) {
- if (addWorker() == null)
+ int wc, rs, tc;
+ while (((wc = workerCounts) & RUNNING_COUNT_MASK) < pc &&
+ (rs = runState) < TERMINATING) {
+ if (spareWaiters != 0)
+ tryResumeSpare();
+ else if ((tc = wc >>> TOTAL_COUNT_SHIFT) >= MAX_WORKERS ||
+ (tc >= pc && (rs & ACTIVE_COUNT_MASK) != tc))
+ break; // enough total
+ else if (runState == rs && workerCounts == wc &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
+ wc + (ONE_RUNNING|ONE_TOTAL))) {
+ ForkJoinWorkerThread w = null;
+ Throwable fail = null;
+ try {
+ w = factory.newThread(this);
+ } catch (Throwable ex) {
+ fail = ex;
+ }
+ if (w == null) { // null or exceptional factory return
+ decrementWorkerCounts(ONE_RUNNING, ONE_TOTAL);
+ tryTerminate(false); // handle failure during shutdown
+ // If originating from an external caller,
+ // propagate exception, else ignore
+ if (fail != null && runState < TERMINATING &&
+ !(Thread.currentThread() instanceof
+ ForkJoinWorkerThread))
+ UNSAFE.throwException(fail);
break;
+ }
+ w.start(recordWorker(w), ueh);
+ if ((workerCounts >>> TOTAL_COUNT_SHIFT) >= pc) {
+ int c; // advance event count
+ UNSAFE.compareAndSwapInt(this, eventCountOffset,
+ c = eventCount, c+1);
+ break; // add at most one unless total below target
+ }
}
}
+ if (eventWaiters != 0L)
+ releaseEventWaiters();
}
/**
- * Tries (once) to add a new worker if all existing workers are
- * busy, and there are either no running workers or the deficit is
- * at least twice the surplus.
- *
- * @param wc workerCounts value on invocation of this method
- */
- private void tryAddWorkerIfBusy(int wc) {
- int tc, rc, rs;
- int pc = parallelism;
- if ((tc = wc >>> TOTAL_COUNT_SHIFT) < MAX_WORKERS &&
- ((rc = wc & RUNNING_COUNT_MASK) == 0 ||
- rc < pc - ((tc - pc) << 1)) &&
- (rs = runState) < TERMINATING &&
- (rs & ACTIVE_COUNT_MASK) == tc) {
- // Since all workers busy, heuristically back off to let settle
- Thread.yield();
- if (eventWaiters == 0L && spareWaiters == 0 && // recheck
- runState == rs && workerCounts == wc &&
- UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
- wc + (ONE_RUNNING|ONE_TOTAL)))
- addWorker();
- }
- }
-
- /**
- * Does at most one of:
- *
- * 1. Help wake up existing workers waiting for work via
- * releaseEventWaiters. (If any exist, then it doesn't
- * matter right now if under target parallelism level.)
- *
- * 2. If a spare exists, try (once) to resume it via tryResumeSpare.
- *
- * 3. If there are not enough total workers, add some
- * via addWorkerIfBelowTarget;
+ * Callback from the oldest waiter in awaitEvent waking up after a
+ * period of non-use. If all workers are idle, tries (once) to
+ * shutdown an event waiter or a spare, if one exists. Note that
+ * we don't need CAS or locks here because the method is called
+ * only from one thread occasionally waking (and even misfires are
+ * OK). Note that until the shutdown worker fully terminates,
+ * workerCounts will overestimate total count, which is tolerable.
*
- * 4. Try (once) to add a new worker if all existing workers
- * are busy, via tryAddWorkerIfBusy
+ * @param ec the event count waited on by caller (to abort
+ * attempt if count has since changed).
*/
- private void helpMaintainParallelism() {
- long h; int pc, wc;
- if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0) {
- if ((int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
- releaseEventWaiters(); // avoid useless call
- }
- else if ((pc = parallelism) >
- ((wc = workerCounts) & RUNNING_COUNT_MASK)) {
- if (spareWaiters != 0)
- tryResumeSpare(wc);
- else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
- addWorkerIfBelowTarget();
- else
- tryAddWorkerIfBusy(wc);
+ private void tryShutdownUnusedWorker(int ec) {
+ if (runState == 0 && eventCount == ec) { // only trigger if all idle
+ ForkJoinWorkerThread[] ws = workers;
+ int n = ws.length;
+ ForkJoinWorkerThread w = null;
+ boolean shutdown = false;
+ int sw;
+ long h;
+ if ((sw = spareWaiters) != 0) { // prefer killing spares
+ int id = (sw & SPARE_ID_MASK) - 1;
+ if (id >= 0 && id < n && (w = ws[id]) != null &&
+ UNSAFE.compareAndSwapInt(this, spareWaitersOffset,
+ sw, w.nextSpare))
+ shutdown = true;
+ }
+ else if ((h = eventWaiters) != 0L) {
+ long nh;
+ int id = ((int)(h & WAITER_ID_MASK)) - 1;
+ if (id >= 0 && id < n && (w = ws[id]) != null &&
+ (nh = w.nextWaiter) != 0L && // keep at least one worker
+ UNSAFE.compareAndSwapLong(this, eventWaitersOffset, h, nh))
+ shutdown = true;
+ }
+ if (w != null && shutdown) {
+ w.shutdown();
+ LockSupport.unpark(w);
+ }
}
+ releaseEventWaiters(); // in case of interference
}
/**
@@ -1015,132 +945,157 @@ public class ForkJoinPool extends Abstra
* stealing a task or taking a submission and running it).
* Performs one or more of the following:
*
- * 1. If the worker is active, try to set its active status to
- * inactive and update activeCount. On contention, we may try
- * again on this or subsequent call.
- *
- * 2. Release any existing event waiters that are now relesable
- *
- * 3. If there are too many running threads, suspend this worker
- * (first forcing inactive if necessary). If it is not
- * needed, it may be killed while suspended via
- * tryShutdownSpare. Otherwise, upon resume it rechecks to make
- * sure that it is still needed.
- *
- * 4. If more than 1 miss, await the next task event via
- * eventSync (first forcing inactivation if necessary), upon
- * which worker may also be killed, via tryShutdownWaiter.
- *
- * 5. Help reactivate other workers via helpMaintainParallelism
+ * 1. If the worker is active and either did not run a task
+ * or there are too many workers, try to set its active status
+ * to inactive and update activeCount. On contention, we may
+ * try again in this or a subsequent call.
+ *
+ * 2. If not enough total workers, help create some.
+ *
+ * 3. If there are too many running workers, suspend this worker
+ * (first forcing inactive if necessary). If it is not needed,
+ * it may be shutdown while suspended (via
+ * tryShutdownUnusedWorker). Otherwise, upon resume it
+ * rechecks running thread count and need for event sync.
+ *
+ * 4. If worker did not run a task, await the next task event via
+ * eventSync if necessary (first forcing inactivation), upon
+ * which the worker may be shutdown via
+ * tryShutdownUnusedWorker. Otherwise, help release any
+ * existing event waiters that are now releasable,
*
* @param w the worker
- * @param misses the number of scans by caller failing to find work
- * (saturating at 2 to avoid wraparound)
+ * @param ran true if worker ran a task since last call to this method
*/
- final void preStep(ForkJoinWorkerThread w, int misses) {
+ final void preStep(ForkJoinWorkerThread w, boolean ran) {
+ int wec = w.lastEventCount;
boolean active = w.active;
+ boolean inactivate = false;
int pc = parallelism;
- for (;;) {
- int rs, wc, rc, ec; long h;
- if (active && UNSAFE.compareAndSwapInt(this, runStateOffset,
- rs = runState, rs - 1))
- active = w.active = false;
- if (((int)((h = eventWaiters) & WAITER_ID_MASK)) != 0 &&
- (int)(h >>> EVENT_COUNT_SHIFT) != eventCount) {
- releaseEventWaiters();
- if (misses > 1)
- continue; // clear before sync below
+ while (w.runState == 0) {
+ int rs = runState;
+ if (rs >= TERMINATING) { // propagate shutdown
+ w.shutdown();
+ break;
}
- if ((rc = ((wc = workerCounts) & RUNNING_COUNT_MASK)) > pc) {
- if (!active && // must inactivate to suspend
+ if ((inactivate || (active && (rs & ACTIVE_COUNT_MASK) >= pc)) &&
+ UNSAFE.compareAndSwapInt(this, runStateOffset, rs, rs - 1))
+ inactivate = active = w.active = false;
+ int wc = workerCounts;
+ if ((wc & RUNNING_COUNT_MASK) > pc) {
+ if (!(inactivate |= active) && // must inactivate to suspend
workerCounts == wc && // try to suspend as spare
UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc, wc - ONE_RUNNING)) {
+ wc, wc - ONE_RUNNING))
w.suspendAsSpare();
- if (!w.isRunning())
- break; // was killed while spare
- }
- continue;
}
- if (misses > 0) {
- if ((ec = eventCount) == w.lastEventCount && misses > 1) {
- if (!active) { // must inactivate to sync
- eventSync(w);
- if (w.isRunning())
- misses = 1; // don't re-sync
- else
- break; // was killed while waiting
- }
- continue;
+ else if ((wc >>> TOTAL_COUNT_SHIFT) < pc)
+ helpMaintainParallelism(); // not enough workers
+ else if (!ran) {
+ long h = eventWaiters;
+ int ec = eventCount;
+ if (h != 0L && (int)(h >>> EVENT_COUNT_SHIFT) != ec)
+ releaseEventWaiters(); // release others before waiting
+ else if (ec != wec) {
+ w.lastEventCount = ec; // no need to wait
+ break;
}
- w.lastEventCount = ec;
+ else if (!(inactivate |= active))
+ eventSync(w, wec); // must inactivate before sync
}
- if (rc < pc)
- helpMaintainParallelism();
- break;
+ else
+ break;
}
}
/**
* Helps and/or blocks awaiting join of the given task.
- * Alternates between helpJoinTask() and helpMaintainParallelism()
- * as many times as there is a deficit in running count (or longer
- * if running count would become zero), then blocks if task still
- * not done.
+ * See above for explanation.
*
* @param joinMe the task to join
- */
- final void awaitJoin(ForkJoinTask> joinMe, ForkJoinWorkerThread worker) {
- int threshold = parallelism; // descend blocking thresholds
+ * @param worker the current worker thread
+ * @param timed true if wait should time out
+ * @param nanos timeout value if timed
+ */
+ final void awaitJoin(ForkJoinTask> joinMe, ForkJoinWorkerThread worker,
+ boolean timed, long nanos) {
+ long startTime = timed? System.nanoTime() : 0L;
+ int retries = 2 + (parallelism >> 2); // #helpJoins before blocking
while (joinMe.status >= 0) {
- boolean block; int wc;
+ int wc;
+ long nt = 0L;
+ if (runState >= TERMINATING) {
+ joinMe.cancelIgnoringExceptions();
+ break;
+ }
worker.helpJoinTask(joinMe);
if (joinMe.status < 0)
break;
- if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
- if (threshold > 0)
- --threshold;
- else
- advanceEventCount(); // force release
- block = false;
- }
- else
- block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc, wc - ONE_RUNNING);
- helpMaintainParallelism();
- if (block) {
- int c;
- joinMe.internalAwaitDone();
+ else if (retries > 0)
+ --retries;
+ else if (timed &&
+ (nt = nanos - (System.nanoTime() - startTime)) <= 0L)
+ break;
+ else if (((wc = workerCounts) & RUNNING_COUNT_MASK) != 0 &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING)) {
+ int stat, c; long h;
+ while ((stat = joinMe.status) >= 0 &&
+ (h = eventWaiters) != 0L && // help release others
+ (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
+ releaseEventWaiters();
+ if (stat >= 0) {
+ if ((workerCounts & RUNNING_COUNT_MASK) != 0) {
+ long ms; int ns;
+ if (!timed) {
+ ms = JOIN_TIMEOUT_MILLIS;
+ ns = 0;
+ }
+ else { // at most JOIN_TIMEOUT_MILLIS per wait
+ ms = nt / 1000000;
+ if (ms > JOIN_TIMEOUT_MILLIS) {
+ ms = JOIN_TIMEOUT_MILLIS;
+ ns = 0;
+ }
+ else
+ ns = (int) (nt % 1000000);
+ }
+ stat = joinMe.internalAwaitDone(ms, ns);
+ }
+ if (stat >= 0) // timeout or no running workers
+ helpMaintainParallelism();
+ }
do {} while (!UNSAFE.compareAndSwapInt
(this, workerCountsOffset,
c = workerCounts, c + ONE_RUNNING));
- break;
+ if (stat < 0)
+ break; // else restart
}
}
}
/**
- * Same idea as awaitJoin, but no helping
+ * Same idea as awaitJoin, but no helping, retries, or timeouts.
*/
final void awaitBlocker(ManagedBlocker blocker)
throws InterruptedException {
- int threshold = parallelism;
while (!blocker.isReleasable()) {
- boolean block; int wc;
- if (((wc = workerCounts) & RUNNING_COUNT_MASK) <= threshold) {
- if (threshold > 0)
- --threshold;
- else
- advanceEventCount();
- block = false;
- }
- else
- block = UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc, wc - ONE_RUNNING);
- helpMaintainParallelism();
- if (block) {
+ int wc = workerCounts;
+ if ((wc & RUNNING_COUNT_MASK) != 0 &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ wc, wc - ONE_RUNNING)) {
try {
- do {} while (!blocker.isReleasable() && !blocker.block());
+ while (!blocker.isReleasable()) {
+ long h = eventWaiters;
+ if (h != 0L &&
+ (int)(h >>> EVENT_COUNT_SHIFT) != eventCount)
+ releaseEventWaiters();
+ else if ((workerCounts & RUNNING_COUNT_MASK) == 0 &&
+ runState < TERMINATING)
+ helpMaintainParallelism();
+ else if (blocker.block())
+ break;
+ }
} finally {
int c;
do {} while (!UNSAFE.compareAndSwapInt
@@ -1178,6 +1133,7 @@ public class ForkJoinPool extends Abstra
return true;
}
+
/**
* Actions on transition to TERMINATING
*
@@ -1190,19 +1146,18 @@ public class ForkJoinPool extends Abstra
private void startTerminating() {
cancelSubmissions();
for (int passes = 0; passes < 4 && workerCounts != 0; ++passes) {
- advanceEventCount();
+ int c; // advance event count
+ UNSAFE.compareAndSwapInt(this, eventCountOffset,
+ c = eventCount, c+1);
eventWaiters = 0L; // clobber lists
spareWaiters = 0;
- ForkJoinWorkerThread[] ws = workers;
- int n = ws.length;
- for (int i = 0; i < n; ++i) {
- ForkJoinWorkerThread w = ws[i];
+ for (ForkJoinWorkerThread w : workers) {
if (w != null) {
w.shutdown();
if (passes > 0 && !w.isTerminated()) {
w.cancelTasks();
LockSupport.unpark(w);
- if (passes > 1) {
+ if (passes > 1 && !w.isInterrupted()) {
try {
w.interrupt();
} catch (SecurityException ignore) {
@@ -1215,7 +1170,7 @@ public class ForkJoinPool extends Abstra
}
/**
- * Clear out and cancel submissions, ignoring exceptions
+ * Clears out and cancels submissions, ignoring exceptions.
*/
private void cancelSubmissions() {
ForkJoinTask> task;
@@ -1230,15 +1185,15 @@ public class ForkJoinPool extends Abstra
// misc support for ForkJoinWorkerThread
/**
- * Returns pool number
+ * Returns pool number.
*/
final int getPoolNumber() {
return poolNumber;
}
/**
- * Tries to accumulates steal count from a worker, clearing
- * the worker's value.
+ * Tries to accumulate steal count from a worker, clearing
+ * the worker's value if successful.
*
* @return true if worker steal count now zero
*/
@@ -1260,9 +1215,12 @@ public class ForkJoinPool extends Abstra
*/
final int idlePerActive() {
int pc = parallelism; // use parallelism, not rc
- int ac = runState; // no mask -- artifically boosts during shutdown
+ int ac = runState; // no mask -- artificially boosts during shutdown
// Use exact results for small values, saturate past 4
- return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
+ return ((pc <= ac) ? 0 :
+ (pc >>> 1 <= ac) ? 1 :
+ (pc >>> 2 <= ac) ? 3 :
+ pc >>> 3);
}
// Public and protected methods
@@ -1312,13 +1270,13 @@ public class ForkJoinPool extends Abstra
* use {@link #defaultForkJoinWorkerThreadFactory}.
* @param handler the handler for internal worker threads that
* terminate due to unrecoverable errors encountered while executing
- * tasks. For default value, use null
.
+ * tasks. For default value, use {@code null}.
* @param asyncMode if true,
* establishes local first-in-first-out scheduling mode for forked
* tasks that are never joined. This mode may be more appropriate
* than default locally stack-based mode in applications in which
* worker threads only process event-style asynchronous tasks.
- * For default value, use false
.
+ * For default value, use {@code false}.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
@@ -1353,8 +1311,9 @@ public class ForkJoinPool extends Abstra
* @param pc the initial parallelism level
*/
private static int initialArraySizeFor(int pc) {
- // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
+ // If possible, initially allocate enough space for one spare
int size = pc < MAX_WORKERS ? pc + 1 : MAX_WORKERS;
+ // See Hackers Delight, sec 3.2. We know MAX_WORKERS < (1 >>> 16)
size |= size >>> 1;
size |= size >>> 2;
size |= size >>> 4;
@@ -1365,19 +1324,13 @@ public class ForkJoinPool extends Abstra
// Execution methods
/**
- * Common code for execute, invoke and submit
+ * Submits task and creates, starts, or resumes some workers if necessary
*/
private void doSubmit(ForkJoinTask task) {
- if (task == null)
- throw new NullPointerException();
- if (runState >= SHUTDOWN)
- throw new RejectedExecutionException();
submissionQueue.offer(task);
- advanceEventCount();
- if (eventWaiters != 0L)
- releaseEventWaiters();
- if ((workerCounts >>> TOTAL_COUNT_SHIFT) < parallelism)
- addWorkerIfBelowTarget();
+ int c; // try to increment event count -- CAS failure OK
+ UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
+ helpMaintainParallelism();
}
/**
@@ -1390,8 +1343,33 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public T invoke(ForkJoinTask task) {
- doSubmit(task);
- return task.join();
+ if (task == null)
+ throw new NullPointerException();
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ return task.invoke(); // bypass submit if in same pool
+ else {
+ doSubmit(task);
+ return task.join();
+ }
+ }
+
+ /**
+ * Unless terminating, forks task if within an ongoing FJ
+ * computation in the current pool, else submits as external task.
+ */
+ private void forkOrSubmit(ForkJoinTask task) {
+ if (runState >= SHUTDOWN)
+ throw new RejectedExecutionException();
+ Thread t = Thread.currentThread();
+ if ((t instanceof ForkJoinWorkerThread) &&
+ ((ForkJoinWorkerThread)t).pool == this)
+ task.fork();
+ else
+ doSubmit(task);
}
/**
@@ -1403,7 +1381,9 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public void execute(ForkJoinTask> task) {
- doSubmit(task);
+ if (task == null)
+ throw new NullPointerException();
+ forkOrSubmit(task);
}
// AbstractExecutorService methods
@@ -1414,12 +1394,14 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public void execute(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask> job;
if (task instanceof ForkJoinTask>) // avoid re-wrap
job = (ForkJoinTask>) task;
else
job = ForkJoinTask.adapt(task, null);
- doSubmit(job);
+ forkOrSubmit(job);
}
/**
@@ -1432,7 +1414,9 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public ForkJoinTask submit(ForkJoinTask task) {
- doSubmit(task);
+ if (task == null)
+ throw new NullPointerException();
+ forkOrSubmit(task);
return task;
}
@@ -1442,8 +1426,10 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public ForkJoinTask submit(Callable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask job = ForkJoinTask.adapt(task);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1453,8 +1439,10 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public ForkJoinTask submit(Runnable task, T result) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask job = ForkJoinTask.adapt(task, result);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1464,12 +1452,14 @@ public class ForkJoinPool extends Abstra
* scheduled for execution
*/
public ForkJoinTask> submit(Runnable task) {
+ if (task == null)
+ throw new NullPointerException();
ForkJoinTask> job;
if (task instanceof ForkJoinTask>) // avoid re-wrap
job = (ForkJoinTask>) task;
else
job = ForkJoinTask.adapt(task, null);
- doSubmit(job);
+ forkOrSubmit(job);
return job;
}
@@ -1529,7 +1519,7 @@ public class ForkJoinPool extends Abstra
/**
* Returns the number of worker threads that have started but not
- * yet terminated. This result returned by this method may differ
+ * yet terminated. The result returned by this method may differ
* from {@link #getParallelism} when threads are created to
* maintain parallelism when others are cooperatively blocked.
*
@@ -1614,13 +1604,9 @@ public class ForkJoinPool extends Abstra
*/
public long getQueuedTaskCount() {
long count = 0;
- ForkJoinWorkerThread[] ws = workers;
- int n = ws.length;
- for (int i = 0; i < n; ++i) {
- ForkJoinWorkerThread w = ws[i];
+ for (ForkJoinWorkerThread w : workers)
if (w != null)
count += w.getQueueSize();
- }
return count;
}
@@ -1675,13 +1661,9 @@ public class ForkJoinPool extends Abstra
*/
protected int drainTasksTo(Collection super ForkJoinTask>> c) {
int count = submissionQueue.drainTo(c);
- ForkJoinWorkerThread[] ws = workers;
- int n = ws.length;
- for (int i = 0; i < n; ++i) {
- ForkJoinWorkerThread w = ws[i];
+ for (ForkJoinWorkerThread w : workers)
if (w != null)
count += w.drainTasksTo(c);
- }
return count;
}
@@ -1785,6 +1767,13 @@ public class ForkJoinPool extends Abstra
}
/**
+ * Returns true if terminating or terminated. Used by ForkJoinWorkerThread.
+ */
+ final boolean isAtLeastTerminating() {
+ return runState >= TERMINATING;
+ }
+
+ /**
* Returns {@code true} if this pool has been shut down.
*
* @return {@code true} if this pool has been shut down
@@ -1808,7 +1797,7 @@ public class ForkJoinPool extends Abstra
throws InterruptedException {
try {
return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0;
- } catch(TimeoutException ex) {
+ } catch (TimeoutException ex) {
return false;
}
}
@@ -1938,11 +1927,11 @@ public class ForkJoinPool extends Abstra
private static final long eventCountOffset =
objectFieldOffset("eventCount", ForkJoinPool.class);
private static final long eventWaitersOffset =
- objectFieldOffset("eventWaiters",ForkJoinPool.class);
+ objectFieldOffset("eventWaiters", ForkJoinPool.class);
private static final long stealCountOffset =
- objectFieldOffset("stealCount",ForkJoinPool.class);
+ objectFieldOffset("stealCount", ForkJoinPool.class);
private static final long spareWaitersOffset =
- objectFieldOffset("spareWaiters",ForkJoinPool.class);
+ objectFieldOffset("spareWaiters", ForkJoinPool.class);
private static long objectFieldOffset(String field, Class> klazz) {
try {