--- jsr166/src/jsr166y/ForkJoinPool.java 2010/05/27 16:46:48 1.56 +++ jsr166/src/jsr166y/ForkJoinPool.java 2010/07/07 19:52:31 1.57 @@ -21,7 +21,7 @@ import java.util.concurrent.CountDownLat /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. * A {@code ForkJoinPool} provides the entry point for submissions - * from non-{@code ForkJoinTask}s, as well as management and + * from non-{@code ForkJoinTask} clients, as well as management and * monitoring operations. * *

A {@code ForkJoinPool} differs from other kinds of {@link @@ -30,33 +30,19 @@ import java.util.concurrent.CountDownLat * execute subtasks created by other active tasks (eventually blocking * waiting for work if none exist). This enables efficient processing * when most tasks spawn other subtasks (as do most {@code - * ForkJoinTask}s). A {@code ForkJoinPool} may also be used for mixed - * execution of some plain {@code Runnable}- or {@code Callable}- - * based activities along with {@code ForkJoinTask}s. When setting - * {@linkplain #setAsyncMode async mode}, a {@code ForkJoinPool} may - * also be appropriate for use with fine-grained tasks of any form - * that are never joined. Otherwise, other {@code ExecutorService} - * implementations are typically more appropriate choices. + * ForkJoinTask}s). When setting asyncMode to true in + * constructors, {@code ForkJoinPool}s may also be appropriate for use + * with event-style tasks that are never joined. * *

A {@code ForkJoinPool} is constructed with a given target * parallelism level; by default, equal to the number of available - * processors. Unless configured otherwise via {@link - * #setMaintainsParallelism}, the pool attempts to maintain this - * number of active (or available) threads by dynamically adding, - * suspending, or resuming internal worker threads, even if some tasks - * are stalled waiting to join others. However, no such adjustments - * are performed in the face of blocked IO or other unmanaged - * synchronization. The nested {@link ManagedBlocker} interface - * enables extension of the kinds of synchronization accommodated. - * The target parallelism level may also be changed dynamically - * ({@link #setParallelism}). The total number of threads may be - * limited using method {@link #setMaximumPoolSize}, in which case it - * may become possible for the activities of a pool to stall due to - * the lack of available threads to process new tasks. When the pool - * is executing tasks, these and other configuration setting methods - * may only gradually affect actual pool sizes. It is normally best - * practice to invoke these methods only when the pool is known to be - * quiescent. + * processors. The pool attempts to maintain enough active (or + * available) threads by dynamically adding, suspending, or resuming + * internal worker threads, even if some tasks are stalled waiting to + * join others. However, no such adjustments are guaranteed in the + * face of blocked IO or other unmanaged synchronization. The nested + * {@link ManagedBlocker} interface enables extension of the kinds of + * synchronization accommodated. * *

In addition to execution and lifecycle control methods, this * class provides status check methods (for example @@ -65,6 +51,44 @@ import java.util.concurrent.CountDownLat * {@link #toString} returns indications of pool state in a * convenient form for informal monitoring. * + *

As is the case with other ExecutorServices, there are three + * main task execution methods summarized in the follwoing + * table. These are designed to be used by clients not already engaged + * in fork/join computations in the current pool. The main forms of + * these methods accept instances of {@code ForkJoinTask}, but + * overloaded forms also allow mixed execution of plain {@code + * Runnable}- or {@code Callable}- based activities as well. However, + * tasks that are already executing in a pool should normally + * NOT use these pool execution methods, but instead use the + * within-computation forms listed in the table. To avoid inadvertant + * cyclic task dependencies and to improve performance, task + * submissions to the current pool by an ongoing fork/join + * computations may be implicitly translated to the corresponding + * ForkJoinTask forms. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Call from non-fork/join clients Call from within fork/join computations
Arange async execution {@link #execute(ForkJoinTask)} {@link ForkJoinTask#fork}
Await and obtain result {@link #invoke(ForkJoinTask)} {@link ForkJoinTask#invoke}
Arrange exec and obtain Future {@link #submit(ForkJoinTask)} {@link ForkJoinTask#fork} (ForkJoinTasks are Futures)
+ * *

Sample Usage. Normally a single {@code ForkJoinPool} is * used for all parallel task execution in a program or subsystem. * Otherwise, use would not usually outweigh the construction and @@ -147,9 +171,8 @@ public class ForkJoinPool extends Abstra * work with other policies. * * 2. Bookkeeping for dynamically adding and removing workers. We - * maintain a given level of parallelism (or, if - * maintainsParallelism is false, at least avoid starvation). When - * some workers are known to be blocked (on joins or via + * aim to approximately maintain the given level of parallelism. + * When some workers are known to be blocked (on joins or via * ManagedBlocker), we may create or resume others to take their * place until they unblock (see below). Implementing this * requires counts of the number of "running" threads (i.e., those @@ -167,10 +190,8 @@ public class ForkJoinPool extends Abstra * threads Updates to the workerCounts field sometimes transiently * encounter a fair amount of contention when join dependencies * are such that many threads block or unblock at about the same - * time. We alleviate this by sometimes bundling updates (for - * example blocking one thread on join and resuming a spare cancel - * each other out), and in most other cases performing an - * alternative action like releasing waiters or locating spares. + * time. We alleviate this by sometimes performing an alternative + * action on contention like releasing waiters or locating spares. * * 3. Maintaining global run state. The run state of the pool * consists of a runLevel (SHUTDOWN, TERMINATING, etc) similar to @@ -258,49 +279,23 @@ public class ForkJoinPool extends Abstra * * 6. Deciding when to create new workers. The main dynamic * control in this class is deciding when to create extra threads, - * in methods awaitJoin and awaitBlocker. We always - * need to create one when the number of running threads becomes - * zero. But because blocked joins are typically dependent, we - * don't necessarily need or want one-to-one replacement. Using a - * one-to-one compensation rule often leads to enough useless - * overhead creating, suspending, resuming, and/or killing threads - * to signficantly degrade throughput. We use a rule reflecting - * the idea that, the more spare threads you already have, the - * more evidence you need to create another one. The "evidence" - * here takes two forms: (1) Using a creation threshold expressed - * in terms of the current deficit -- target minus running - * threads. To reduce flickering and drift around target values, - * the relation is quadratic: adding a spare if (dc*dc)>=(sc*pc) - * (where dc is deficit, sc is number of spare threads and pc is - * target parallelism.) (2) Using a form of adaptive - * spionning. requiring a number of threshold checks proportional - * to the number of spare threads. This effectively reduces churn - * at the price of systematically undershooting target parallelism - * when many threads are blocked. However, biasing toward - * undeshooting partially compensates for the above mechanics to - * suspend extra threads, that normally lead to overshoot because - * we can only suspend workers in-between top-level actions. It - * also better copes with the fact that some of the methods in - * this class tend to never become compiled (but are interpreted), - * so some components of the entire set of controls might execute - * many times faster than others. And similarly for cases where - * the apparent lack of work is just due to GC stalls and other - * transient system activity. - * - * 7. Maintaining other configuration parameters and monitoring - * statistics. Updates to fields controlling parallelism level, - * max size, etc can only meaningfully take effect for individual - * threads upon their next top-level actions; i.e., between - * stealing/running tasks/submission, which are separated by calls - * to preStep. Memory ordering for these (assumed infrequent) - * reconfiguration calls is ensured by using reads and writes to - * volatile field workerCounts (that must be read in preStep anyway) - * as "fences" -- user-level reads are preceded by reads of - * workCounts, and writes are followed by no-op CAS to - * workerCounts. The values reported by other management and - * monitoring methods are either computed on demand, or are kept - * in fields that are only updated when threads are otherwise - * idle. + * in methods awaitJoin and awaitBlocker. We always need to create + * one when the number of running threads becomes zero. But + * because blocked joins are typically dependent, we don't + * necessarily need or want one-to-one replacement. Instead, we + * use a combination of heuristics that adds threads only when the + * pool appears to be approaching starvation. These effectively + * reduce churn at the price of systematically undershooting + * target parallelism when many threads are blocked. However, + * biasing toward undeshooting partially compensates for the above + * mechanics to suspend extra threads, that normally lead to + * overshoot because we can only suspend workers in-between + * top-level actions. It also better copes with the fact that some + * of the methods in this class tend to never become compiled (but + * are interpreted), so some components of the entire set of + * controls might execute many times faster than others. And + * similarly for cases where the apparent lack of work is just due + * to GC stalls and other transient system activity. * * Beware that there is a lot of representation-level coupling * among classes ForkJoinPool, ForkJoinWorkerThread, and @@ -346,7 +341,7 @@ public class ForkJoinPool extends Abstra * Default ForkJoinWorkerThreadFactory implementation; creates a * new ForkJoinWorkerThread. */ - static class DefaultForkJoinWorkerThreadFactory + static class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); @@ -414,7 +409,7 @@ public class ForkJoinPool extends Abstra /** * Latch released upon termination. */ - private final CountDownLatch terminationLatch; + private final Phaser termination; /** * Creation factory for worker threads. @@ -485,38 +480,23 @@ public class ForkJoinPool extends Abstra private static final int ONE_RUNNING = 1; private static final int ONE_TOTAL = 1 << TOTAL_COUNT_SHIFT; - /* - * Fields parallelism. maxPoolSize, and maintainsParallelism are - * non-volatile, but external reads/writes use workerCount fences - * to ensure visability. - */ - /** * The target parallelism level. + * Accessed directly by ForkJoinWorkerThreads. */ - private int parallelism; - - /** - * The maximum allowed pool size. - */ - private int maxPoolSize; + final int parallelism; /** * True if use local fifo, not default lifo, for local polling - * Replicated by ForkJoinWorkerThreads + * Read by, and replicated by ForkJoinWorkerThreads */ - private volatile boolean locallyFifo; + final boolean locallyFifo; /** - * Controls whether to add spares to maintain parallelism + * The uncaught exception handler used when any worker abruptly + * terminates. */ - private boolean maintainsParallelism; - - /** - * The uncaught exception handler used when any worker - * abruptly terminates - */ - private volatile Thread.UncaughtExceptionHandler ueh; + private final Thread.UncaughtExceptionHandler ueh; /** * Pool number, just for assigning useful names to worker threads @@ -526,17 +506,17 @@ public class ForkJoinPool extends Abstra // utilities for updating fields /** - * Adds delta to running count. Used mainly by ForkJoinTask. + * Increments running count. Also used by ForkJoinTask. */ - final void updateRunningCount(int delta) { - int wc; + final void incrementRunningCount() { + int c; do {} while (!UNSAFE.compareAndSwapInt(this, workerCountsOffset, - wc = workerCounts, - wc + delta)); + c = workerCounts, + c + ONE_RUNNING)); } - + /** - * Decrements running count unless already zero + * Tries to decrement running count unless already zero */ final boolean tryDecrementRunningCount() { int wc = workerCounts; @@ -547,24 +527,6 @@ public class ForkJoinPool extends Abstra } /** - * Write fence for user modifications of pool parameters - * (parallelism. etc). Note that it doesn't matter if CAS fails. - */ - private void workerCountWriteFence() { - int wc; - UNSAFE.compareAndSwapInt(this, workerCountsOffset, - wc = workerCounts, wc); - } - - /** - * Read fence for external reads of pool parameters - * (parallelism. maxPoolSize, etc). - */ - private void workerCountReadFence() { - int ignore = workerCounts; - } - - /** * Tries incrementing active count; fails on contention. * Called by workers before executing tasks. * @@ -663,7 +625,7 @@ public class ForkJoinPool extends Abstra return null; } } - w.start(recordWorker(w), locallyFifo, ueh); + w.start(recordWorker(w), ueh); return w; } @@ -733,18 +695,9 @@ public class ForkJoinPool extends Abstra // Waiting for and signalling events /** - * Ensures eventCount on exit is different (mod 2^32) than on - * entry. CAS failures are OK -- any change in count suffices. - */ - private void advanceEventCount() { - int c; - UNSAFE.compareAndSwapInt(this, eventCountOffset, c = eventCount, c+1); - } - - /** * Releases workers blocked on a count not equal to current count. */ - final void releaseWaiters() { + private void releaseWaiters() { long top; int id; while ((id = (int)((top = eventWaiters) & WAITER_INDEX_MASK)) > 0 && @@ -759,10 +712,22 @@ public class ForkJoinPool extends Abstra } /** + * Ensures eventCount on exit is different (mod 2^32) than on + * entry and wakes up all waiters + */ + private void signalEvent() { + int c; + do {} while (!UNSAFE.compareAndSwapInt(this, eventCountOffset, + c = eventCount, c+1)); + releaseWaiters(); + } + + /** * Advances eventCount and releases waiters until interference by * other releasing threads is detected. */ final void signalWork() { + // EventCount CAS failures are OK -- any change in count suffices. int ec; UNSAFE.compareAndSwapInt(this, eventCountOffset, ec=eventCount, ec+1); outer:for (;;) { @@ -841,9 +806,9 @@ public class ForkJoinPool extends Abstra boolean inactivate = !worked & active; for (;;) { if (inactivate) { - int c = runState; + int rs = runState; if (UNSAFE.compareAndSwapInt(this, runStateOffset, - c, c - ONE_ACTIVE)) + rs, rs - ONE_ACTIVE)) inactivate = active = w.active = false; } int wc = workerCounts; @@ -861,27 +826,28 @@ public class ForkJoinPool extends Abstra } /** - * Adjusts counts and creates or resumes compensating threads for - * a worker blocking on task joinMe. First tries resuming an - * existing spare (which usually also avoids any count - * adjustment), but must then decrement running count to determine - * whether a new thread is needed. See above for fuller - * explanation. This code is sprawled out non-modularly mainly - * because adaptive spinning works best if the entire method is - * either interpreted or compiled vs having only some pieces of it - * compiled. + * Tries to decrement running count, and if so, possibly creates + * or resumes compensating threads before blocking on task joinMe. + * This code is sprawled out with manual inlining to evade some + * JIT oddities. * * @param joinMe the task to join - * @return task status on exit (to simplify usage by callers) + * @return task status on exit */ - final int awaitJoin(ForkJoinTask joinMe) { - int pc = parallelism; - boolean adj = false; // true when running count adjusted - int scans = 0; - - while (joinMe.status >= 0) { - ForkJoinWorkerThread spare = null; - if ((workerCounts & RUNNING_COUNT_MASK) < pc) { + final int tryAwaitJoin(ForkJoinTask joinMe) { + int cw = workerCounts; // read now to spoil CAS if counts change as ... + releaseWaiters(); // ... a byproduct of releaseWaiters + int stat = joinMe.status; + if (stat >= 0 && // inline variant of tryDecrementRunningCount + (cw & RUNNING_COUNT_MASK) > 0 && + UNSAFE.compareAndSwapInt(this, workerCountsOffset, + cw, cw - ONE_RUNNING)) { + int pc = parallelism; + int scans = 0; // to require confirming passes to add threads + outer: while ((workerCounts & RUNNING_COUNT_MASK) < pc) { + if ((stat = joinMe.status) < 0) + break; + ForkJoinWorkerThread spare = null; ForkJoinWorkerThread[] ws = workers; int nws = ws.length; for (int i = 0; i < nws; ++i) { @@ -891,183 +857,127 @@ public class ForkJoinPool extends Abstra break; } } - if (joinMe.status < 0) + if ((stat = joinMe.status) < 0) // recheck to narrow race break; - } - int wc = workerCounts; - int rc = wc & RUNNING_COUNT_MASK; - int dc = pc - rc; - if (dc > 0 && spare != null && spare.tryUnsuspend()) { - if (adj) { - int c; - do {} while (!UNSAFE.compareAndSwapInt - (this, workerCountsOffset, - c = workerCounts, c + ONE_RUNNING)); - } - adj = true; - LockSupport.unpark(spare); - } - else if (adj) { - if (dc <= 0) + int wc = workerCounts; + int rc = wc & RUNNING_COUNT_MASK; + if (rc >= pc) break; - int tc = wc >>> TOTAL_COUNT_SHIFT; - if (scans > tc) { - int ts = (tc - pc) * pc; - if (rc != 0 && (dc * dc < ts || !maintainsParallelism)) - break; - if (scans > ts && tc < maxPoolSize && - UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, - wc+(ONE_RUNNING|ONE_TOTAL))){ - addWorker(); + if (spare != null) { + if (spare.tryUnsuspend()) { + int c; // inline incrementRunningCount + do {} while (!UNSAFE.compareAndSwapInt + (this, workerCountsOffset, + c = workerCounts, c + ONE_RUNNING)); + LockSupport.unpark(spare); break; } + continue; + } + int tc = wc >>> TOTAL_COUNT_SHIFT; + int sc = tc - pc; + if (rc > 0) { + int p = pc; + int s = sc; + while (s-- >= 0) { // try keeping 3/4 live + if (rc > (p -= (p >>> 2) + 1)) + break outer; + } + } + if (scans++ > sc && tc < MAX_THREADS && + UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, + wc + (ONE_RUNNING|ONE_TOTAL))) { + addWorker(); + break; } } - else if (rc != 0) - adj = UNSAFE.compareAndSwapInt (this, workerCountsOffset, - wc, wc - ONE_RUNNING); - if ((scans++ & 1) == 0) - releaseWaiters(); // help others progress - else - Thread.yield(); // avoid starving productive threads - } - - if (adj) { - joinMe.internalAwaitDone(); - int c; + if (stat >= 0) + stat = joinMe.internalAwaitDone(); + int c; // inline incrementRunningCount do {} while (!UNSAFE.compareAndSwapInt (this, workerCountsOffset, c = workerCounts, c + ONE_RUNNING)); } - return joinMe.status; + return stat; } /** - * Same idea as awaitJoin + * Same idea as (and mostly pasted from) tryAwaitJoin, but + * self-contained */ - final void awaitBlocker(ManagedBlocker blocker, boolean maintainPar) + final void awaitBlocker(ManagedBlocker blocker) throws InterruptedException { - maintainPar &= maintainsParallelism; + for (;;) { + if (blocker.isReleasable()) + return; + int cw = workerCounts; + releaseWaiters(); + if ((cw & RUNNING_COUNT_MASK) > 0 && + UNSAFE.compareAndSwapInt(this, workerCountsOffset, + cw, cw - ONE_RUNNING)) + break; + } + boolean done = false; int pc = parallelism; - boolean adj = false; // true when running count adjusted int scans = 0; - boolean done; - - for (;;) { + outer: while ((workerCounts & RUNNING_COUNT_MASK) < pc) { if (done = blocker.isReleasable()) break; ForkJoinWorkerThread spare = null; - if ((workerCounts & RUNNING_COUNT_MASK) < pc) { - ForkJoinWorkerThread[] ws = workers; - int nws = ws.length; - for (int i = 0; i < nws; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null && w.isSuspended()) { - spare = w; - break; - } - } - if (done = blocker.isReleasable()) + ForkJoinWorkerThread[] ws = workers; + int nws = ws.length; + for (int i = 0; i < nws; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null && w.isSuspended()) { + spare = w; break; + } } + if (done = blocker.isReleasable()) + break; int wc = workerCounts; int rc = wc & RUNNING_COUNT_MASK; - int dc = pc - rc; - if (dc > 0 && spare != null && spare.tryUnsuspend()) { - if (adj) { + if (rc >= pc) + break; + if (spare != null) { + if (spare.tryUnsuspend()) { int c; do {} while (!UNSAFE.compareAndSwapInt (this, workerCountsOffset, c = workerCounts, c + ONE_RUNNING)); + LockSupport.unpark(spare); + break; } - adj = true; - LockSupport.unpark(spare); + continue; } - else if (adj) { - if (dc <= 0) - break; - int tc = wc >>> TOTAL_COUNT_SHIFT; - if (scans > tc) { - int ts = (tc - pc) * pc; - if (rc != 0 && (dc * dc < ts || !maintainPar)) - break; - if (scans > ts && tc < maxPoolSize && - UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, - wc+(ONE_RUNNING|ONE_TOTAL))){ - addWorker(); - break; - } + int tc = wc >>> TOTAL_COUNT_SHIFT; + int sc = tc - pc; + if (rc > 0) { + int p = pc; + int s = sc; + while (s-- >= 0) { + if (rc > (p -= (p >>> 2) + 1)) + break outer; } } - else if (rc != 0) - adj = UNSAFE.compareAndSwapInt (this, workerCountsOffset, - wc, wc - ONE_RUNNING); - if ((++scans & 1) == 0) - releaseWaiters(); // help others progress - else - Thread.yield(); // avoid starving productive threads + if (scans++ > sc && tc < MAX_THREADS && + UNSAFE.compareAndSwapInt(this, workerCountsOffset, wc, + wc + (ONE_RUNNING|ONE_TOTAL))) { + addWorker(); + break; + } } - try { if (!done) - do {} while (!blocker.isReleasable() && !blocker.block()); + do {} while (!blocker.isReleasable() && + !blocker.block()); } finally { - if (adj) { - int c; - do {} while (!UNSAFE.compareAndSwapInt - (this, workerCountsOffset, - c = workerCounts, c + ONE_RUNNING)); - } - } - } - - /** - * Unless there are not enough other running threads, adjusts - * counts and blocks a worker performing helpJoin that cannot find - * any work. - * - * @return true if joinMe now done - */ - final boolean tryAwaitBusyJoin(ForkJoinTask joinMe) { - int pc = parallelism; - outer:for (;;) { - releaseWaiters(); - if ((workerCounts & RUNNING_COUNT_MASK) < pc) { - ForkJoinWorkerThread[] ws = workers; - int nws = ws.length; - for (int i = 0; i < nws; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null && w.isSuspended()) { - if (joinMe.status < 0) - return true; - if ((workerCounts & RUNNING_COUNT_MASK) > pc) - break; - if (w.tryUnsuspend()) { - LockSupport.unpark(w); - break outer; - } - continue outer; - } - } - } - if (joinMe.status < 0) - return true; - int wc = workerCounts; - if ((wc & RUNNING_COUNT_MASK) <= 2 || - (wc >>> TOTAL_COUNT_SHIFT) < pc) - return false; // keep this thread alive - if (UNSAFE.compareAndSwapInt(this, workerCountsOffset, - wc, wc - ONE_RUNNING)) - break; + int c; + do {} while (!UNSAFE.compareAndSwapInt + (this, workerCountsOffset, + c = workerCounts, c + ONE_RUNNING)); } - - joinMe.internalAwaitDone(); - int c; - do {} while (!UNSAFE.compareAndSwapInt - (this, workerCountsOffset, - c = workerCounts, c + ONE_RUNNING)); - return true; - } + } /** * Possibly initiates and/or completes termination. @@ -1090,7 +1000,7 @@ public class ForkJoinPool extends Abstra // Finish now if all threads terminated; else in some subsequent call if ((workerCounts >>> TOTAL_COUNT_SHIFT) == 0) { advanceRunLevel(TERMINATED); - terminationLatch.countDown(); + termination.arrive(); } return true; } @@ -1103,8 +1013,7 @@ public class ForkJoinPool extends Abstra cancelSubmissions(); shutdownWorkers(); cancelWorkerTasks(); - advanceEventCount(); - releaseWaiters(); + signalEvent(); interruptWorkers(); } } @@ -1194,8 +1103,8 @@ public class ForkJoinPool extends Abstra * active thread. */ final int idlePerActive() { - int ac = runState; // no mask -- artifically boosts during shutdown int pc = parallelism; // use targeted parallelism, not rc + int ac = runState; // no mask -- artifically boosts during shutdown // Use exact results for small values, saturate past 4 return pc <= ac? 0 : pc >>> 1 <= ac? 1 : pc >>> 2 <= ac? 3 : pc >>> 3; } @@ -1206,8 +1115,9 @@ public class ForkJoinPool extends Abstra /** * Creates a {@code ForkJoinPool} with parallelism equal to {@link - * java.lang.Runtime#availableProcessors}, and using the {@linkplain - * #defaultForkJoinWorkerThreadFactory default thread factory}. + * java.lang.Runtime#availableProcessors}, using the {@linkplain + * #defaultForkJoinWorkerThreadFactory default thread factory}, + * no UncaughtExceptionHandler, and non-async LIFO processing mode. * * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads @@ -1216,13 +1126,14 @@ public class ForkJoinPool extends Abstra */ public ForkJoinPool() { this(Runtime.getRuntime().availableProcessors(), - defaultForkJoinWorkerThreadFactory); + defaultForkJoinWorkerThreadFactory, null, false); } /** * Creates a {@code ForkJoinPool} with the indicated parallelism - * level and using the {@linkplain - * #defaultForkJoinWorkerThreadFactory default thread factory}. + * level, the {@linkplain + * #defaultForkJoinWorkerThreadFactory default thread factory}, + * no UncaughtExceptionHandler, and non-async LIFO processing mode. * * @param parallelism the parallelism level * @throws IllegalArgumentException if parallelism less than or @@ -1233,31 +1144,25 @@ public class ForkJoinPool extends Abstra * java.lang.RuntimePermission}{@code ("modifyThread")} */ public ForkJoinPool(int parallelism) { - this(parallelism, defaultForkJoinWorkerThreadFactory); - } - - /** - * Creates a {@code ForkJoinPool} with parallelism equal to {@link - * java.lang.Runtime#availableProcessors}, and using the given - * thread factory. - * - * @param factory the factory for creating new threads - * @throws NullPointerException if the factory is null - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}{@code ("modifyThread")} - */ - public ForkJoinPool(ForkJoinWorkerThreadFactory factory) { - this(Runtime.getRuntime().availableProcessors(), factory); + this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } /** - * Creates a {@code ForkJoinPool} with the given parallelism and - * thread factory. + * Creates a {@code ForkJoinPool} with the given parameters. * - * @param parallelism the parallelism level - * @param factory the factory for creating new threads + * @param parallelism the parallelism level. For default value, + * use {@link java.lang.Runtime#availableProcessors}. + * @param factory the factory for creating new threads. For default value, + * use {@link #defaultForkJoinWorkerThreadFactory}. + * @param handler the handler for internal worker threads that + * terminate due to unrecoverable errors encountered while executing + * tasks. For default value, use null. + * @param asyncMode if true, + * establishes local first-in-first-out scheduling mode for forked + * tasks that are never joined. This mode may be more appropriate + * than default locally stack-based mode in applications in which + * worker threads only process event-style asynchronous tasks. + * For default value, use false. * @throws IllegalArgumentException if parallelism less than or * equal to zero, or greater than implementation limit * @throws NullPointerException if the factory is null @@ -1266,22 +1171,25 @@ public class ForkJoinPool extends Abstra * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")} */ - public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) { + public ForkJoinPool(int parallelism, + ForkJoinWorkerThreadFactory factory, + Thread.UncaughtExceptionHandler handler, + boolean asyncMode) { checkPermission(); if (factory == null) throw new NullPointerException(); if (parallelism <= 0 || parallelism > MAX_THREADS) throw new IllegalArgumentException(); - this.poolNumber = poolNumberGenerator.incrementAndGet(); - int arraySize = initialArraySizeFor(parallelism); this.parallelism = parallelism; this.factory = factory; - this.maxPoolSize = MAX_THREADS; - this.maintainsParallelism = true; + this.ueh = handler; + this.locallyFifo = asyncMode; + int arraySize = initialArraySizeFor(parallelism); this.workers = new ForkJoinWorkerThread[arraySize]; this.submissionQueue = new LinkedTransferQueue>(); this.workerLock = new ReentrantLock(); - this.terminationLatch = new CountDownLatch(1); + this.termination = new Phaser(1); + this.poolNumber = poolNumberGenerator.incrementAndGet(); } /** @@ -1308,14 +1216,24 @@ public class ForkJoinPool extends Abstra throw new NullPointerException(); if (runState >= SHUTDOWN) throw new RejectedExecutionException(); - submissionQueue.offer(task); - advanceEventCount(); - releaseWaiters(); - ensureEnoughTotalWorkers(); + // Convert submissions to current pool into forks + Thread t = Thread.currentThread(); + ForkJoinWorkerThread w; + if ((t instanceof ForkJoinWorkerThread) && + (w = (ForkJoinWorkerThread) t).pool == this) + w.pushTask(task); + else { + submissionQueue.offer(task); + signalEvent(); + ensureEnoughTotalWorkers(); + } } /** * Performs the given task, returning its result upon completion. + * If the caller is already engaged in a fork/join computation in + * the current pool, this method is equivalent in effect to + * {@link ForkJoinTask#invoke}. * * @param task the task * @return the task's result @@ -1330,6 +1248,9 @@ public class ForkJoinPool extends Abstra /** * Arranges for (asynchronous) execution of the given task. + * If the caller is already engaged in a fork/join computation in + * the current pool, this method is equivalent in effect to + * {@link ForkJoinTask#fork}. * * @param task the task * @throws NullPointerException if the task is null @@ -1357,6 +1278,23 @@ public class ForkJoinPool extends Abstra } /** + * Submits a ForkJoinTask for execution. + * If the caller is already engaged in a fork/join computation in + * the current pool, this method is equivalent in effect to + * {@link ForkJoinTask#fork}. + * + * @param task the task to submit + * @return the task + * @throws NullPointerException if the task is null + * @throws RejectedExecutionException if the task cannot be + * scheduled for execution + */ + public ForkJoinTask submit(ForkJoinTask task) { + doSubmit(task); + return task; + } + + /** * @throws NullPointerException if the task is null * @throws RejectedExecutionException if the task cannot be * scheduled for execution @@ -1394,20 +1332,6 @@ public class ForkJoinPool extends Abstra } /** - * Submits a ForkJoinTask for execution. - * - * @param task the task to submit - * @return the task - * @throws NullPointerException if the task is null - * @throws RejectedExecutionException if the task cannot be - * scheduled for execution - */ - public ForkJoinTask submit(ForkJoinTask task) { - doSubmit(task); - return task; - } - - /** * @throws NullPointerException {@inheritDoc} * @throws RejectedExecutionException {@inheritDoc} */ @@ -1449,87 +1373,15 @@ public class ForkJoinPool extends Abstra * @return the handler, or {@code null} if none */ public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { - workerCountReadFence(); return ueh; } /** - * Sets the handler for internal worker threads that terminate due - * to unrecoverable errors encountered while executing tasks. - * Unless set, the current default or ThreadGroup handler is used - * as handler. - * - * @param h the new handler - * @return the old handler, or {@code null} if none - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}{@code ("modifyThread")} - */ - public Thread.UncaughtExceptionHandler - setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) { - checkPermission(); - Thread.UncaughtExceptionHandler old = ueh; - if (h != old) { - ueh = h; - ForkJoinWorkerThread[] ws = workers; - int nws = ws.length; - for (int i = 0; i < nws; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null) - w.setUncaughtExceptionHandler(h); - } - } - return old; - } - - /** - * Sets the target parallelism level of this pool. - * - * @param parallelism the target parallelism - * @throws IllegalArgumentException if parallelism less than or - * equal to zero or greater than maximum size bounds - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}{@code ("modifyThread")} - */ - public void setParallelism(int parallelism) { - checkPermission(); - if (parallelism <= 0 || parallelism > maxPoolSize) - throw new IllegalArgumentException(); - workerCountReadFence(); - int pc = this.parallelism; - if (pc != parallelism) { - this.parallelism = parallelism; - workerCountWriteFence(); - // Release spares. If too many, some will die after re-suspend - ForkJoinWorkerThread[] ws = workers; - int nws = ws.length; - for (int i = 0; i < nws; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null && w.tryUnsuspend()) { - int c; - do {} while (!UNSAFE.compareAndSwapInt - (this, workerCountsOffset, - c = workerCounts, c + ONE_RUNNING)); - LockSupport.unpark(w); - } - } - ensureEnoughTotalWorkers(); - advanceEventCount(); - releaseWaiters(); // force config recheck by existing workers - } - } - - /** * Returns the targeted parallelism level of this pool. * * @return the targeted parallelism level of this pool */ public int getParallelism() { - // workerCountReadFence(); // inlined below - int ignore = workerCounts; return parallelism; } @@ -1546,102 +1398,12 @@ public class ForkJoinPool extends Abstra } /** - * Returns the maximum number of threads allowed to exist in the - * pool. Unless set using {@link #setMaximumPoolSize}, the - * maximum is an implementation-defined value designed only to - * prevent runaway growth. - * - * @return the maximum - */ - public int getMaximumPoolSize() { - workerCountReadFence(); - return maxPoolSize; - } - - /** - * Sets the maximum number of threads allowed to exist in the - * pool. The given value should normally be greater than or equal - * to the {@link #getParallelism parallelism} level. Setting this - * value has no effect on current pool size. It controls - * construction of new threads. The use of this method may cause - * tasks that intrinsically require extra threads for dependent - * computations to indefinitely stall. If you are instead trying - * to minimize internal thread creation, consider setting {@link - * #setMaintainsParallelism} as false. - * - * @throws IllegalArgumentException if negative or greater than - * internal implementation limit - */ - public void setMaximumPoolSize(int newMax) { - if (newMax < 0 || newMax > MAX_THREADS) - throw new IllegalArgumentException(); - maxPoolSize = newMax; - workerCountWriteFence(); - } - - /** - * Returns {@code true} if this pool dynamically maintains its - * target parallelism level. If false, new threads are added only - * to avoid possible starvation. This setting is by default true. - * - * @return {@code true} if maintains parallelism - */ - public boolean getMaintainsParallelism() { - workerCountReadFence(); - return maintainsParallelism; - } - - /** - * Sets whether this pool dynamically maintains its target - * parallelism level. If false, new threads are added only to - * avoid possible starvation. - * - * @param enable {@code true} to maintain parallelism - */ - public void setMaintainsParallelism(boolean enable) { - maintainsParallelism = enable; - workerCountWriteFence(); - } - - /** - * Establishes local first-in-first-out scheduling mode for forked - * tasks that are never joined. This mode may be more appropriate - * than default locally stack-based mode in applications in which - * worker threads only process asynchronous tasks. This method is - * designed to be invoked only when the pool is quiescent, and - * typically only before any tasks are submitted. The effects of - * invocations at other times may be unpredictable. - * - * @param async if {@code true}, use locally FIFO scheduling - * @return the previous mode - * @see #getAsyncMode - */ - public boolean setAsyncMode(boolean async) { - workerCountReadFence(); - boolean oldMode = locallyFifo; - if (oldMode != async) { - locallyFifo = async; - workerCountWriteFence(); - ForkJoinWorkerThread[] ws = workers; - int nws = ws.length; - for (int i = 0; i < nws; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null) - w.setAsyncMode(async); - } - } - return oldMode; - } - - /** * Returns {@code true} if this pool uses local first-in-first-out * scheduling mode for forked tasks that are never joined. * * @return {@code true} if this pool uses async mode - * @see #setAsyncMode */ public boolean getAsyncMode() { - workerCountReadFence(); return locallyFifo; } @@ -1782,6 +1544,22 @@ public class ForkJoinPool extends Abstra } /** + * Returns count of total parks by existing workers. + * Used during development only since not meaningful to users. + */ + private int collectParkCount() { + int count = 0; + ForkJoinWorkerThread[] ws = workers; + int nws = ws.length; + for (int i = 0; i < nws; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + count += w.parkCount; + } + return count; + } + + /** * Returns a string identifying this pool, as well as its state, * including indications of run state, parallelism level, and * worker and task counts. @@ -1798,6 +1576,7 @@ public class ForkJoinPool extends Abstra int pc = parallelism; int rs = runState; int ac = rs & ACTIVE_COUNT_MASK; + // int pk = collectParkCount(); return super.toString() + "[" + runLevelToString(rs) + ", parallelism = " + pc + @@ -1807,6 +1586,7 @@ public class ForkJoinPool extends Abstra ", steals = " + st + ", tasks = " + qt + ", submissions = " + qs + + // ", parks = " + pk + "]"; } @@ -1902,7 +1682,11 @@ public class ForkJoinPool extends Abstra */ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - return terminationLatch.await(timeout, unit); + try { + return termination.awaitAdvanceInterruptibly(0, timeout, unit) > 0; + } catch(TimeoutException ex) { + return false; + } } /** @@ -1954,14 +1738,7 @@ public class ForkJoinPool extends Abstra * Blocks in accord with the given blocker. If the current thread * is a {@link ForkJoinWorkerThread}, this method possibly * arranges for a spare thread to be activated if necessary to - * ensure parallelism while the current thread is blocked. - * - *

If {@code maintainParallelism} is {@code true} and the pool - * supports it ({@link #getMaintainsParallelism}), this method - * attempts to maintain the pool's nominal parallelism. Otherwise - * it activates a thread only if necessary to avoid complete - * starvation. This option may be preferable when blockages use - * timeouts, or are almost always brief. + * ensure sufficient parallelism while the current thread is blocked. * *

If the caller is not a {@link ForkJoinTask}, this method is * behaviorally equivalent to @@ -1975,29 +1752,16 @@ public class ForkJoinPool extends Abstra * first be expanded to ensure parallelism, and later adjusted. * * @param blocker the blocker - * @param maintainParallelism if {@code true} and supported by - * this pool, attempt to maintain the pool's nominal parallelism; - * otherwise activate a thread only if necessary to avoid - * complete starvation. * @throws InterruptedException if blocker.block did so */ - public static void managedBlock(ManagedBlocker blocker, - boolean maintainParallelism) + public static void managedBlock(ManagedBlocker blocker) throws InterruptedException { Thread t = Thread.currentThread(); if (t instanceof ForkJoinWorkerThread) - ((ForkJoinWorkerThread) t).pool. - awaitBlocker(blocker, maintainParallelism); - else - awaitBlocker(blocker); - } - - /** - * Performs Non-FJ blocking - */ - private static void awaitBlocker(ManagedBlocker blocker) - throws InterruptedException { - do {} while (!blocker.isReleasable() && !blocker.block()); + ((ForkJoinWorkerThread) t).pool.awaitBlocker(blocker); + else { + do {} while (!blocker.isReleasable() && !blocker.block()); + } } // AbstractExecutorService overrides. These rely on undocumented @@ -2026,7 +1790,6 @@ public class ForkJoinPool extends Abstra private static final long stealCountOffset = objectFieldOffset("stealCount",ForkJoinPool.class); - private static long objectFieldOffset(String field, Class klazz) { try { return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field));