--- jsr166/src/jsr166y/ForkJoinPool.java 2010/05/27 16:46:48 1.56 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/07/07 19:52:31 1.57 @@ -21,7 +21,7 @@ import java.util.concurrent.CountDownLat /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. * A {@code ForkJoinPool} provides the entry point for submissions - * from non-{@code ForkJoinTask}s, as well as management and + * from non-{@code ForkJoinTask} clients, as well as management and * monitoring operations. * *
A {@code ForkJoinPool} differs from other kinds of {@link @@ -30,33 +30,19 @@ import java.util.concurrent.CountDownLat * execute subtasks created by other active tasks (eventually blocking * waiting for work if none exist). This enables efficient processing * when most tasks spawn other subtasks (as do most {@code - * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed - * execution of some plain {@code Runnable}- or {@code Callable}- - * based activities along with {@code ForkJoinTask}s. When setting - * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may - * also be appropriate for use with fine-grained tasks of any form - * that are never joined. Otherwise, other {@code ExecutorService} - * implementations are typically more appropriate choices. + * ForkJoinTask}s). When setting asyncMode to true in + * constructors, {@code ForkJoinPool}s may also be appropriate for use + * with event-style tasks that are never joined. * *
A {@code ForkJoinPool} is constructed with a given target * parallelism level; by default, equal to the number of available - * processors. Unless configured otherwise via {@link - * #setMaintainsParallelism}, the pool attempts to maintain this - * number of active (or available) threads by dynamically adding, - * suspending, or resuming internal worker threads, even if some tasks - * are stalled waiting to join others. However, no such adjustments - * are performed in the face of blocked IO or other unmanaged - * synchronization. The nested {@link ManagedBlocker} interface - * enables extension of the kinds of synchronization accommodated. - * The target parallelism level may also be changed dynamically - * ({@link #setParallelism}). The total number of threads may be - * limited using method {@link #setMaximumPoolSize}, in which case it - * may become possible for the activities of a pool to stall due to - * the lack of available threads to process new tasks. When the pool - * is executing tasks, these and other configuration setting methods - * may only gradually affect actual pool sizes. It is normally best - * practice to invoke these methods only when the pool is known to be - * quiescent. + * processors. The pool attempts to maintain enough active (or + * available) threads by dynamically adding, suspending, or resuming + * internal worker threads, even if some tasks are stalled waiting to + * join others. However, no such adjustments are guaranteed in the + * face of blocked IO or other unmanaged synchronization. The nested + * {@link ManagedBlocker} interface enables extension of the kinds of + * synchronization accommodated. * *
In addition to execution and lifecycle control methods, this * class provides status check methods (for example @@ -65,6 +51,44 @@ import java.util.concurrent.CountDownLat * {@link #toString} returns indications of pool state in a * convenient form for informal monitoring. * + *
As is the case with other ExecutorServices, there are three + * main task execution methods summarized in the follwoing + * table. These are designed to be used by clients not already engaged + * in fork/join computations in the current pool. The main forms of + * these methods accept instances of {@code ForkJoinTask}, but + * overloaded forms also allow mixed execution of plain {@code + * Runnable}- or {@code Callable}- based activities as well. However, + * tasks that are already executing in a pool should normally + * NOT use these pool execution methods, but instead use the + * within-computation forms listed in the table. To avoid inadvertant + * cyclic task dependencies and to improve performance, task + * submissions to the current pool by an ongoing fork/join + * computations may be implicitly translated to the corresponding + * ForkJoinTask forms. + * + *
+ * | Call from non-fork/join clients | + *Call from within fork/join computations | + *
Arange async execution | + *{@link #execute(ForkJoinTask)} | + *{@link ForkJoinTask#fork} | + *
Await and obtain result | + *{@link #invoke(ForkJoinTask)} | + *{@link ForkJoinTask#invoke} | + *
Arrange exec and obtain Future | + *{@link #submit(ForkJoinTask)} | + *{@link ForkJoinTask#fork} (ForkJoinTasks are Futures) | + *
Sample Usage. Normally a single {@code ForkJoinPool} is
* used for all parallel task execution in a program or subsystem.
* Otherwise, use would not usually outweigh the construction and
@@ -147,9 +171,8 @@ public class ForkJoinPool extends Abstra
* work with other policies.
*
* 2. Bookkeeping for dynamically adding and removing workers. We
- * maintain a given level of parallelism (or, if
- * maintainsParallelism is false, at least avoid starvation). When
- * some workers are known to be blocked (on joins or via
+ * aim to approximately maintain the given level of parallelism.
+ * When some workers are known to be blocked (on joins or via
* ManagedBlocker), we may create or resume others to take their
* place until they unblock (see below). Implementing this
* requires counts of the number of "running" threads (i.e., those
@@ -167,10 +190,8 @@ public class ForkJoinPool extends Abstra
* threads Updates to the workerCounts field sometimes transiently
* encounter a fair amount of contention when join dependencies
* are such that many threads block or unblock at about the same
- * time. We alleviate this by sometimes bundling updates (for
- * example blocking one thread on join and resuming a spare cancel
- * each other out), and in most other cases performing an
- * alternative action like releasing waiters or locating spares.
+ * time. We alleviate this by sometimes performing an alternative
+ * action on contention like releasing waiters or locating spares.
*
* 3. Maintaining global run state. The run state of the pool
* consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to
@@ -258,49 +279,23 @@ public class ForkJoinPool extends Abstra
*
* 6. Deciding when to create new workers. The main dynamic
* control in this class is deciding when to create extra threads,
- * in methods awaitJoin and awaitBlocker. We always
- * need to create one when the number of running threads becomes
- * zero. But because blocked joins are typically dependent, we
- * don't necessarily need or want one-to-one replacement. Using a
- * one-to-one compensation rule often leads to enough useless
- * overhead creating, suspending, resuming, and/or killing threads
- * to signficantly degrade throughput. We use a rule reflecting
- * the idea that, the more spare threads you already have, the
- * more evidence you need to create another one. The "evidence"
- * here takes two forms: (1) Using a creation threshold expressed
- * in terms of the current deficit -- target minus running
- * threads. To reduce flickering and drift around target values,
- * the relation is quadratic: adding a spare if (dc*dc)>=(sc*pc)
- * (where dc is deficit, sc is number of spare threads and pc is
- * target parallelism.) (2) Using a form of adaptive
- * spionning. requiring a number of threshold checks proportional
- * to the number of spare threads. This effectively reduces churn
- * at the price of systematically undershooting target parallelism
- * when many threads are blocked. However, biasing toward
- * undeshooting partially compensates for the above mechanics to
- * suspend extra threads, that normally lead to overshoot because
- * we can only suspend workers in-between top-level actions. It
- * also better copes with the fact that some of the methods in
- * this class tend to never become compiled (but are interpreted),
- * so some components of the entire set of controls might execute
- * many times faster than others. And similarly for cases where
- * the apparent lack of work is just due to GC stalls and other
- * transient system activity.
- *
- * 7. Maintaining other configuration parameters and monitoring
- * statistics. Updates to fields controlling parallelism level,
- * max size, etc can only meaningfully take effect for individual
- * threads upon their next top-level actions; i.e., between
- * stealing/running tasks/submission, which are separated by calls
- * to preStep. Memory ordering for these (assumed infrequent)
- * reconfiguration calls is ensured by using reads and writes to
- * volatile field workerCounts (that must be read in preStep anyway)
- * as "fences" -- user-level reads are preceded by reads of
- * workCounts, and writes are followed by no-op CAS to
- * workerCounts. The values reported by other management and
- * monitoring methods are either computed on demand, or are kept
- * in fields that are only updated when threads are otherwise
- * idle.
+ * in methods awaitJoin and awaitBlocker. We always need to create
+ * one when the number of running threads becomes zero. But
+ * because blocked joins are typically dependent, we don't
+ * necessarily need or want one-to-one replacement. Instead, we
+ * use a combination of heuristics that adds threads only when the
+ * pool appears to be approaching starvation. These effectively
+ * reduce churn at the price of systematically undershooting
+ * target parallelism when many threads are blocked. However,
+ * biasing toward undeshooting partially compensates for the above
+ * mechanics to suspend extra threads, that normally lead to
+ * overshoot because we can only suspend workers in-between
+ * top-level actions. It also better copes with the fact that some
+ * of the methods in this class tend to never become compiled (but
+ * are interpreted), so some components of the entire set of
+ * controls might execute many times faster than others. And
+ * similarly for cases where the apparent lack of work is just due
+ * to GC stalls and other transient system activity.
*
* Beware that there is a lot of representation-level coupling
* among classes ForkJoinPool, ForkJoinWorkerThread, and
@@ -346,7 +341,7 @@ public class ForkJoinPool extends Abstra
* Default ForkJoinWorkerThreadFactory implementation; creates a
* new ForkJoinWorkerThread.
*/
- static class DefaultForkJoinWorkerThreadFactory
+ static class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
@@ -414,7 +409,7 @@ public class ForkJoinPool extends Abstra
/**
* Latch released upon termination.
*/
- private final CountDownLatch terminationLatch;
+ private final Phaser termination;
/**
* Creation factory for worker threads.
@@ -485,38 +480,23 @@ public class ForkJoinPool extends Abstra
private static final int ONE_RUNNING = 1;
private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT;
- /*
- * Fields parallelism. maxPoolSize, and maintainsParallelism are
- * non-volatile, but external reads/writes use workerCount fences
- * to ensure visability.
- */
-
/**
* The target parallelism level.
+ * Accessed directly by ForkJoinWorkerThreads.
*/
- private int parallelism;
-
- /**
- * The maximum allowed pool size.
- */
- private int maxPoolSize;
+ final int parallelism;
/**
* True if use local fifo, not default lifo, for local polling
- * Replicated by ForkJoinWorkerThreads
+ * Read by, and replicated by ForkJoinWorkerThreads
*/
- private volatile boolean locallyFifo;
+ final boolean locallyFifo;
/**
- * Controls whether to add spares to maintain parallelism
+ * The uncaught exception handler used when any worker abruptly
+ * terminates.
*/
- private boolean maintainsParallelism;
-
- /**
- * The uncaught exception handler used when any worker
- * abruptly terminates
- */
- private volatile Thread.UncaughtExceptionHandler ueh;
+ private final Thread.UncaughtExceptionHandler ueh;
/**
* Pool number, just for assigning useful names to worker threads
@@ -526,17 +506,17 @@ public class ForkJoinPool extends Abstra
// utilities for updating fields
/**
- * Adds delta to running count. Used mainly by ForkJoinTask.
+ * Increments running count. Also used by ForkJoinTask.
*/
- final void updateRunningCount(int delta) {
- int wc;
+ final void incrementRunningCount() {
+ int c;
do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc = workerCounts,
- wc + delta));
+ c = workerCounts,
+ c + ONE_RUNNING));
}
-
+
/**
- * Decrements running count unless already zero
+ * Tries to decrement running count unless already zero
*/
final boolean tryDecrementRunningCount() {
int wc = workerCounts;
@@ -547,24 +527,6 @@ public class ForkJoinPool extends Abstra
}
/**
- * Write fence for user modifications of pool parameters
- * (parallelism. etc). Note that it doesn't matter if CAS fails.
- */
- private void workerCountWriteFence() {
- int wc;
- UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc = workerCounts, wc);
- }
-
- /**
- * Read fence for external reads of pool parameters
- * (parallelism. maxPoolSize, etc).
- */
- private void workerCountReadFence() {
- int ignore = workerCounts;
- }
-
- /**
* Tries incrementing active count; fails on contention.
* Called by workers before executing tasks.
*
@@ -663,7 +625,7 @@ public class ForkJoinPool extends Abstra
return null;
}
}
- w.start(recordWorker(w), locallyFifo, ueh);
+ w.start(recordWorker(w), ueh);
return w;
}
@@ -733,18 +695,9 @@ public class ForkJoinPool extends Abstra
// Waiting for and signalling events
/**
- * Ensures eventCount on exit is different (mod 2^32) than on
- * entry. CAS failures are OK -- any change in count suffices.
- */
- private void advanceEventCount() {
- int c;
- UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1);
- }
-
- /**
* Releases workers blocked on a count not equal to current count.
*/
- final void releaseWaiters() {
+ private void releaseWaiters() {
long top;
int id;
while ((id = (int)((top = eventWaiters) & WAITER_INDEX_MASK)) > 0 &&
@@ -759,10 +712,22 @@ public class ForkJoinPool extends Abstra
}
/**
+ * Ensures eventCount on exit is different (mod 2^32) than on
+ * entry and wakes up all waiters
+ */
+ private void signalEvent() {
+ int c;
+ do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset,
+ c = eventCount, c+1));
+ releaseWaiters();
+ }
+
+ /**
* Advances eventCount and releases waiters until interference by
* other releasing threads is detected.
*/
final void signalWork() {
+ // EventCount CAS failures are OK -- any change in count suffices.
int ec;
UNSAFE.compareAndSwapInt(this, eventCountOffset, ec=eventCount, ec+1);
outer:for (;;) {
@@ -841,9 +806,9 @@ public class ForkJoinPool extends Abstra
boolean inactivate = !worked & active;
for (;;) {
if (inactivate) {
- int c = runState;
+ int rs = runState;
if (UNSAFE.compareAndSwapInt(this, runStateOffset,
- c, c - ONE_ACTIVE))
+ rs, rs - ONE_ACTIVE))
inactivate = active = w.active = false;
}
int wc = workerCounts;
@@ -861,27 +826,28 @@ public class ForkJoinPool extends Abstra
}
/**
- * Adjusts counts and creates or resumes compensating threads for
- * a worker blocking on task joinMe. First tries resuming an
- * existing spare (which usually also avoids any count
- * adjustment), but must then decrement running count to determine
- * whether a new thread is needed. See above for fuller
- * explanation. This code is sprawled out non-modularly mainly
- * because adaptive spinning works best if the entire method is
- * either interpreted or compiled vs having only some pieces of it
- * compiled.
+ * Tries to decrement running count, and if so, possibly creates
+ * or resumes compensating threads before blocking on task joinMe.
+ * This code is sprawled out with manual inlining to evade some
+ * JIT oddities.
*
* @param joinMe the task to join
- * @return task status on exit (to simplify usage by callers)
+ * @return task status on exit
*/
- final int awaitJoin(ForkJoinTask> joinMe) {
- int pc = parallelism;
- boolean adj = false; // true when running count adjusted
- int scans = 0;
-
- while (joinMe.status >= 0) {
- ForkJoinWorkerThread spare = null;
- if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
+ final int tryAwaitJoin(ForkJoinTask> joinMe) {
+ int cw = workerCounts; // read now to spoil CAS if counts change as ...
+ releaseWaiters(); // ... a byproduct of releaseWaiters
+ int stat = joinMe.status;
+ if (stat >= 0 && // inline variant of tryDecrementRunningCount
+ (cw & RUNNING_COUNT_MASK) > 0 &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ cw, cw - ONE_RUNNING)) {
+ int pc = parallelism;
+ int scans = 0; // to require confirming passes to add threads
+ outer: while ((workerCounts & RUNNING_COUNT_MASK) < pc) {
+ if ((stat = joinMe.status) < 0)
+ break;
+ ForkJoinWorkerThread spare = null;
ForkJoinWorkerThread[] ws = workers;
int nws = ws.length;
for (int i = 0; i < nws; ++i) {
@@ -891,183 +857,127 @@ public class ForkJoinPool extends Abstra
break;
}
}
- if (joinMe.status < 0)
+ if ((stat = joinMe.status) < 0) // recheck to narrow race
break;
- }
- int wc = workerCounts;
- int rc = wc & RUNNING_COUNT_MASK;
- int dc = pc - rc;
- if (dc > 0 && spare != null && spare.tryUnsuspend()) {
- if (adj) {
- int c;
- do {} while (!UNSAFE.compareAndSwapInt
- (this, workerCountsOffset,
- c = workerCounts, c + ONE_RUNNING));
- }
- adj = true;
- LockSupport.unpark(spare);
- }
- else if (adj) {
- if (dc <= 0)
+ int wc = workerCounts;
+ int rc = wc & RUNNING_COUNT_MASK;
+ if (rc >= pc)
break;
- int tc = wc >>> TOTAL_COUNT_SHIFT;
- if (scans > tc) {
- int ts = (tc - pc) * pc;
- if (rc != 0 && (dc * dc < ts || !maintainsParallelism))
- break;
- if (scans > ts && tc < maxPoolSize &&
- UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
- wc+(ONE_RUNNING|ONE_TOTAL))){
- addWorker();
+ if (spare != null) {
+ if (spare.tryUnsuspend()) {
+ int c; // inline incrementRunningCount
+ do {} while (!UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ c = workerCounts, c + ONE_RUNNING));
+ LockSupport.unpark(spare);
break;
}
+ continue;
+ }
+ int tc = wc >>> TOTAL_COUNT_SHIFT;
+ int sc = tc - pc;
+ if (rc > 0) {
+ int p = pc;
+ int s = sc;
+ while (s-- >= 0) { // try keeping 3/4 live
+ if (rc > (p -= (p >>> 2) + 1))
+ break outer;
+ }
+ }
+ if (scans++ > sc && tc < MAX_THREADS &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
+ wc + (ONE_RUNNING|ONE_TOTAL))) {
+ addWorker();
+ break;
}
}
- else if (rc != 0)
- adj = UNSAFE.compareAndSwapInt (this, workerCountsOffset,
- wc, wc - ONE_RUNNING);
- if ((scans++ & 1) == 0)
- releaseWaiters(); // help others progress
- else
- Thread.yield(); // avoid starving productive threads
- }
-
- if (adj) {
- joinMe.internalAwaitDone();
- int c;
+ if (stat >= 0)
+ stat = joinMe.internalAwaitDone();
+ int c; // inline incrementRunningCount
do {} while (!UNSAFE.compareAndSwapInt
(this, workerCountsOffset,
c = workerCounts, c + ONE_RUNNING));
}
- return joinMe.status;
+ return stat;
}
/**
- * Same idea as awaitJoin
+ * Same idea as (and mostly pasted from) tryAwaitJoin, but
+ * self-contained
*/
- final void awaitBlocker(ManagedBlocker blocker, boolean maintainPar)
+ final void awaitBlocker(ManagedBlocker blocker)
throws InterruptedException {
- maintainPar &= maintainsParallelism;
+ for (;;) {
+ if (blocker.isReleasable())
+ return;
+ int cw = workerCounts;
+ releaseWaiters();
+ if ((cw & RUNNING_COUNT_MASK) > 0 &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset,
+ cw, cw - ONE_RUNNING))
+ break;
+ }
+ boolean done = false;
int pc = parallelism;
- boolean adj = false; // true when running count adjusted
int scans = 0;
- boolean done;
-
- for (;;) {
+ outer: while ((workerCounts & RUNNING_COUNT_MASK) < pc) {
if (done = blocker.isReleasable())
break;
ForkJoinWorkerThread spare = null;
- if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
- ForkJoinWorkerThread[] ws = workers;
- int nws = ws.length;
- for (int i = 0; i < nws; ++i) {
- ForkJoinWorkerThread w = ws[i];
- if (w != null && w.isSuspended()) {
- spare = w;
- break;
- }
- }
- if (done = blocker.isReleasable())
+ ForkJoinWorkerThread[] ws = workers;
+ int nws = ws.length;
+ for (int i = 0; i < nws; ++i) {
+ ForkJoinWorkerThread w = ws[i];
+ if (w != null && w.isSuspended()) {
+ spare = w;
break;
+ }
}
+ if (done = blocker.isReleasable())
+ break;
int wc = workerCounts;
int rc = wc & RUNNING_COUNT_MASK;
- int dc = pc - rc;
- if (dc > 0 && spare != null && spare.tryUnsuspend()) {
- if (adj) {
+ if (rc >= pc)
+ break;
+ if (spare != null) {
+ if (spare.tryUnsuspend()) {
int c;
do {} while (!UNSAFE.compareAndSwapInt
(this, workerCountsOffset,
c = workerCounts, c + ONE_RUNNING));
+ LockSupport.unpark(spare);
+ break;
}
- adj = true;
- LockSupport.unpark(spare);
+ continue;
}
- else if (adj) {
- if (dc <= 0)
- break;
- int tc = wc >>> TOTAL_COUNT_SHIFT;
- if (scans > tc) {
- int ts = (tc - pc) * pc;
- if (rc != 0 && (dc * dc < ts || !maintainPar))
- break;
- if (scans > ts && tc < maxPoolSize &&
- UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
- wc+(ONE_RUNNING|ONE_TOTAL))){
- addWorker();
- break;
- }
+ int tc = wc >>> TOTAL_COUNT_SHIFT;
+ int sc = tc - pc;
+ if (rc > 0) {
+ int p = pc;
+ int s = sc;
+ while (s-- >= 0) {
+ if (rc > (p -= (p >>> 2) + 1))
+ break outer;
}
}
- else if (rc != 0)
- adj = UNSAFE.compareAndSwapInt (this, workerCountsOffset,
- wc, wc - ONE_RUNNING);
- if ((++scans & 1) == 0)
- releaseWaiters(); // help others progress
- else
- Thread.yield(); // avoid starving productive threads
+ if (scans++ > sc && tc < MAX_THREADS &&
+ UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc,
+ wc + (ONE_RUNNING|ONE_TOTAL))) {
+ addWorker();
+ break;
+ }
}
-
try {
if (!done)
- do {} while (!blocker.isReleasable() && !blocker.block());
+ do {} while (!blocker.isReleasable() &&
+ !blocker.block());
} finally {
- if (adj) {
- int c;
- do {} while (!UNSAFE.compareAndSwapInt
- (this, workerCountsOffset,
- c = workerCounts, c + ONE_RUNNING));
- }
- }
- }
-
- /**
- * Unless there are not enough other running threads, adjusts
- * counts and blocks a worker performing helpJoin that cannot find
- * any work.
- *
- * @return true if joinMe now done
- */
- final boolean tryAwaitBusyJoin(ForkJoinTask> joinMe) {
- int pc = parallelism;
- outer:for (;;) {
- releaseWaiters();
- if ((workerCounts & RUNNING_COUNT_MASK) < pc) {
- ForkJoinWorkerThread[] ws = workers;
- int nws = ws.length;
- for (int i = 0; i < nws; ++i) {
- ForkJoinWorkerThread w = ws[i];
- if (w != null && w.isSuspended()) {
- if (joinMe.status < 0)
- return true;
- if ((workerCounts & RUNNING_COUNT_MASK) > pc)
- break;
- if (w.tryUnsuspend()) {
- LockSupport.unpark(w);
- break outer;
- }
- continue outer;
- }
- }
- }
- if (joinMe.status < 0)
- return true;
- int wc = workerCounts;
- if ((wc & RUNNING_COUNT_MASK) <= 2 ||
- (wc >>> TOTAL_COUNT_SHIFT) < pc)
- return false; // keep this thread alive
- if (UNSAFE.compareAndSwapInt(this, workerCountsOffset,
- wc, wc - ONE_RUNNING))
- break;
+ int c;
+ do {} while (!UNSAFE.compareAndSwapInt
+ (this, workerCountsOffset,
+ c = workerCounts, c + ONE_RUNNING));
}
-
- joinMe.internalAwaitDone();
- int c;
- do {} while (!UNSAFE.compareAndSwapInt
- (this, workerCountsOffset,
- c = workerCounts, c + ONE_RUNNING));
- return true;
- }
+ }
/**
* Possibly initiates and/or completes termination.
@@ -1090,7 +1000,7 @@ public class ForkJoinPool extends Abstra
// Finish now if all threads terminated; else in some subsequent call
if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) {
advanceRunLevel(TERMINATED);
- terminationLatch.countDown();
+ termination.arrive();
}
return true;
}
@@ -1103,8 +1013,7 @@ public class ForkJoinPool extends Abstra
cancelSubmissions();
shutdownWorkers();
cancelWorkerTasks();
- advanceEventCount();
- releaseWaiters();
+ signalEvent();
interruptWorkers();
}
}
@@ -1194,8 +1103,8 @@ public class ForkJoinPool extends Abstra
* active thread.
*/
final int idlePerActive() {
- int ac = runState; // no mask -- artifically boosts during shutdown
int pc = parallelism; // use targeted parallelism, not rc
+ int ac = runState; // no mask -- artifically boosts during shutdown
// Use exact results for small values, saturate past 4
return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3;
}
@@ -1206,8 +1115,9 @@ public class ForkJoinPool extends Abstra
/**
* Creates a {@code ForkJoinPool} with parallelism equal to {@link
- * java.lang.Runtime#availableProcessors}, and using the {@linkplain
- * #defaultForkJoinWorkerThreadFactory default thread factory}.
+ * java.lang.Runtime#availableProcessors}, using the {@linkplain
+ * #defaultForkJoinWorkerThreadFactory default thread factory},
+ * no UncaughtExceptionHandler, and non-async LIFO processing mode.
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
@@ -1216,13 +1126,14 @@ public class ForkJoinPool extends Abstra
*/
public ForkJoinPool() {
this(Runtime.getRuntime().availableProcessors(),
- defaultForkJoinWorkerThreadFactory);
+ defaultForkJoinWorkerThreadFactory, null, false);
}
/**
* Creates a {@code ForkJoinPool} with the indicated parallelism
- * level and using the {@linkplain
- * #defaultForkJoinWorkerThreadFactory default thread factory}.
+ * level, the {@linkplain
+ * #defaultForkJoinWorkerThreadFactory default thread factory},
+ * no UncaughtExceptionHandler, and non-async LIFO processing mode.
*
* @param parallelism the parallelism level
* @throws IllegalArgumentException if parallelism less than or
@@ -1233,31 +1144,25 @@ public class ForkJoinPool extends Abstra
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool(int parallelism) {
- this(parallelism, defaultForkJoinWorkerThreadFactory);
- }
-
- /**
- * Creates a {@code ForkJoinPool} with parallelism equal to {@link
- * java.lang.Runtime#availableProcessors}, and using the given
- * thread factory.
- *
- * @param factory the factory for creating new threads
- * @throws NullPointerException if the factory is null
- * @throws SecurityException if a security manager exists and
- * the caller is not permitted to modify threads
- * because it does not hold {@link
- * java.lang.RuntimePermission}{@code ("modifyThread")}
- */
- public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
- this(Runtime.getRuntime().availableProcessors(), factory);
+ this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
/**
- * Creates a {@code ForkJoinPool} with the given parallelism and
- * thread factory.
+ * Creates a {@code ForkJoinPool} with the given parameters.
*
- * @param parallelism the parallelism level
- * @param factory the factory for creating new threads
+ * @param parallelism the parallelism level. For default value,
+ * use {@link java.lang.Runtime#availableProcessors}.
+ * @param factory the factory for creating new threads. For default value,
+ * use {@link #defaultForkJoinWorkerThreadFactory}.
+ * @param handler the handler for internal worker threads that
+ * terminate due to unrecoverable errors encountered while executing
+ * tasks. For default value, use If {@code maintainParallelism} is {@code true} and the pool
- * supports it ({@link #getMaintainsParallelism}), this method
- * attempts to maintain the pool's nominal parallelism. Otherwise
- * it activates a thread only if necessary to avoid complete
- * starvation. This option may be preferable when blockages use
- * timeouts, or are almost always brief.
+ * ensure sufficient parallelism while the current thread is blocked.
*
* If the caller is not a {@link ForkJoinTask}, this method is
* behaviorally equivalent to
@@ -1975,29 +1752,16 @@ public class ForkJoinPool extends Abstra
* first be expanded to ensure parallelism, and later adjusted.
*
* @param blocker the blocker
- * @param maintainParallelism if {@code true} and supported by
- * this pool, attempt to maintain the pool's nominal parallelism;
- * otherwise activate a thread only if necessary to avoid
- * complete starvation.
* @throws InterruptedException if blocker.block did so
*/
- public static void managedBlock(ManagedBlocker blocker,
- boolean maintainParallelism)
+ public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread)
- ((ForkJoinWorkerThread) t).pool.
- awaitBlocker(blocker, maintainParallelism);
- else
- awaitBlocker(blocker);
- }
-
- /**
- * Performs Non-FJ blocking
- */
- private static void awaitBlocker(ManagedBlocker blocker)
- throws InterruptedException {
- do {} while (!blocker.isReleasable() && !blocker.block());
+ ((ForkJoinWorkerThread) t).pool.awaitBlocker(blocker);
+ else {
+ do {} while (!blocker.isReleasable() && !blocker.block());
+ }
}
// AbstractExecutorService overrides. These rely on undocumented
@@ -2026,7 +1790,6 @@ public class ForkJoinPool extends Abstra
private static final long stealCountOffset =
objectFieldOffset("stealCount",ForkJoinPool.class);
-
private static long objectFieldOffset(String field, Class> klazz) {
try {
return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));
null
.
+ * @param asyncMode if true,
+ * establishes local first-in-first-out scheduling mode for forked
+ * tasks that are never joined. This mode may be more appropriate
+ * than default locally stack-based mode in applications in which
+ * worker threads only process event-style asynchronous tasks.
+ * For default value, use false
.
* @throws IllegalArgumentException if parallelism less than or
* equal to zero, or greater than implementation limit
* @throws NullPointerException if the factory is null
@@ -1266,22 +1171,25 @@ public class ForkJoinPool extends Abstra
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
- public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
+ public ForkJoinPool(int parallelism,
+ ForkJoinWorkerThreadFactory factory,
+ Thread.UncaughtExceptionHandler handler,
+ boolean asyncMode) {
checkPermission();
if (factory == null)
throw new NullPointerException();
if (parallelism <= 0 || parallelism > MAX_THREADS)
throw new IllegalArgumentException();
- this.poolNumber = poolNumberGenerator.incrementAndGet();
- int arraySize = initialArraySizeFor(parallelism);
this.parallelism = parallelism;
this.factory = factory;
- this.maxPoolSize = MAX_THREADS;
- this.maintainsParallelism = true;
+ this.ueh = handler;
+ this.locallyFifo = asyncMode;
+ int arraySize = initialArraySizeFor(parallelism);
this.workers = new ForkJoinWorkerThread[arraySize];
this.submissionQueue = new LinkedTransferQueue