--- jsr166/src/jsr166y/ForkJoinPool.java 2009/07/29 12:05:55 1.30 +++ jsr166/src/jsr166y/ForkJoinPool.java 2009/08/03 00:53:15 1.40 @@ -21,36 +21,37 @@ import java.util.concurrent.atomic.Atomi /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. - * A ForkJoinPool provides the entry point for submissions from - * non-ForkJoinTasks, as well as management and monitoring operations. - * Normally a single ForkJoinPool is used for a large number of - * submitted tasks. Otherwise, use would not usually outweigh the - * construction and bookkeeping overhead of creating a large set of - * threads. + * A {@code ForkJoinPool} provides the entry point for submissions + * from non-{@code ForkJoinTask}s, as well as management and + * monitoring operations. Normally a single {@code ForkJoinPool} is + * used for a large number of submitted tasks. Otherwise, use would + * not usually outweigh the construction and bookkeeping overhead of + * creating a large set of threads. * - *

ForkJoinPools differ from other kinds of Executors mainly in - * that they provide work-stealing: all threads in the pool - * attempt to find and execute subtasks created by other active tasks - * (eventually blocking if none exist). This makes them efficient when - * most tasks spawn other subtasks (as do most ForkJoinTasks), as well - * as the mixed execution of some plain Runnable- or Callable- based - * activities along with ForkJoinTasks. When setting {@linkplain - * #setAsyncMode async mode}, a ForkJoinPool may also be appropriate - * for use with fine-grained tasks that are never joined. Otherwise, - * other ExecutorService implementations are typically more + *

{@code ForkJoinPool}s differ from other kinds of {@link + * Executor}s mainly in that they provide work-stealing: all + * threads in the pool attempt to find and execute subtasks created by + * other active tasks (eventually blocking if none exist). This makes + * them efficient when most tasks spawn other subtasks (as do most + * {@code ForkJoinTask}s), as well as the mixed execution of some + * plain {@code Runnable}- or {@code Callable}- based activities along + * with {@code ForkJoinTask}s. When setting {@linkplain #setAsyncMode + * async mode}, a {@code ForkJoinPool} may also be appropriate for use + * with fine-grained tasks that are never joined. Otherwise, other + * {@code ExecutorService} implementations are typically more * appropriate choices. * - *

A ForkJoinPool may be constructed with a given parallelism level - * (target pool size), which it attempts to maintain by dynamically - * adding, suspending, or resuming threads, even if some tasks are - * waiting to join others. However, no such adjustments are performed - * in the face of blocked IO or other unmanaged synchronization. The - * nested {@link ManagedBlocker} interface enables extension of - * the kinds of synchronization accommodated. The target parallelism - * level may also be changed dynamically ({@link #setParallelism}) - * and thread construction can be limited using methods - * {@link #setMaximumPoolSize} and/or - * {@link #setMaintainsParallelism}. + *

A {@code ForkJoinPool} may be constructed with a given + * parallelism level (target pool size), which it attempts to maintain + * by dynamically adding, suspending, or resuming threads, even if + * some tasks are waiting to join others. However, no such adjustments + * are performed in the face of blocked IO or other unmanaged + * synchronization. The nested {@link ManagedBlocker} interface + * enables extension of the kinds of synchronization accommodated. + * The target parallelism level may also be changed dynamically + * ({@link #setParallelism}) and thread construction can be limited + * using methods {@link #setMaximumPoolSize} and/or {@link + * #setMaintainsParallelism}. * *

In addition to execution and lifecycle control methods, this * class provides status check methods (for example @@ -62,7 +63,7 @@ import java.util.concurrent.atomic.Atomi *

Implementation notes: This implementation restricts the * maximum number of running threads to 32767. Attempts to create * pools with greater than the maximum result in - * IllegalArgumentExceptions. + * {@code IllegalArgumentException}. * * @since 1.7 * @author Doug Lea @@ -81,10 +82,10 @@ public class ForkJoinPool extends Abstra private static final int MAX_THREADS = 0x7FFF; /** - * Factory for creating new ForkJoinWorkerThreads. A - * ForkJoinWorkerThreadFactory must be defined and used for - * ForkJoinWorkerThread subclasses that extend base functionality - * or initialize threads with different contexts. + * Factory for creating new {@link ForkJoinWorkerThread}s. + * A {@code ForkJoinWorkerThreadFactory} must be defined and used + * for {@code ForkJoinWorkerThread} subclasses that extend base + * functionality or initialize threads with different contexts. */ public static interface ForkJoinWorkerThreadFactory { /** @@ -578,7 +579,7 @@ public class ForkJoinPool extends Abstra * @throws NullPointerException if task is null * @throws RejectedExecutionException if pool is shut down */ - public void execute(ForkJoinTask task) { + public void execute(ForkJoinTask task) { doSubmit(task); } @@ -589,18 +590,18 @@ public class ForkJoinPool extends Abstra if (task instanceof ForkJoinTask) // avoid re-wrap job = (ForkJoinTask) task; else - job = new AdaptedRunnable(task, null); + job = ForkJoinTask.adapt(task, null); doSubmit(job); } public ForkJoinTask submit(Callable task) { - ForkJoinTask job = new AdaptedCallable(task); + ForkJoinTask job = ForkJoinTask.adapt(task); doSubmit(job); return job; } public ForkJoinTask submit(Runnable task, T result) { - ForkJoinTask job = new AdaptedRunnable(task, result); + ForkJoinTask job = ForkJoinTask.adapt(task, result); doSubmit(job); return job; } @@ -610,7 +611,7 @@ public class ForkJoinPool extends Abstra if (task instanceof ForkJoinTask) // avoid re-wrap job = (ForkJoinTask) task; else - job = new AdaptedRunnable(task, null); + job = ForkJoinTask.adapt(task, null); doSubmit(job); return job; } @@ -629,65 +630,12 @@ public class ForkJoinPool extends Abstra return task; } - /** - * Adaptor for Runnables. This implements RunnableFuture - * to be compliant with AbstractExecutorService constraints. - */ - static final class AdaptedRunnable extends ForkJoinTask - implements RunnableFuture { - final Runnable runnable; - final T resultOnCompletion; - T result; - AdaptedRunnable(Runnable runnable, T result) { - if (runnable == null) throw new NullPointerException(); - this.runnable = runnable; - this.resultOnCompletion = result; - } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { - runnable.run(); - result = resultOnCompletion; - return true; - } - public void run() { invoke(); } - private static final long serialVersionUID = 5232453952276885070L; - } - - /** - * Adaptor for Callables - */ - static final class AdaptedCallable extends ForkJoinTask - implements RunnableFuture { - final Callable callable; - T result; - AdaptedCallable(Callable callable) { - if (callable == null) throw new NullPointerException(); - this.callable = callable; - } - public T getRawResult() { return result; } - public void setRawResult(T v) { result = v; } - public boolean exec() { - try { - result = callable.call(); - return true; - } catch (Error err) { - throw err; - } catch (RuntimeException rex) { - throw rex; - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - public void run() { invoke(); } - private static final long serialVersionUID = 2838392045355241008L; - } public List> invokeAll(Collection> tasks) { ArrayList> forkJoinTasks = new ArrayList>(tasks.size()); for (Callable task : tasks) - forkJoinTasks.add(new AdaptedCallable(task)); + forkJoinTasks.add(ForkJoinTask.adapt(task)); invoke(new InvokeAll(forkJoinTasks)); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -840,7 +788,7 @@ public class ForkJoinPool extends Abstra * Setting this value has no effect on current pool size. It * controls construction of new threads. * - * @throws IllegalArgumentException if negative or greater then + * @throws IllegalArgumentException if negative or greater than * internal implementation limit */ public void setMaximumPoolSize(int newMax) { @@ -1008,8 +956,8 @@ public class ForkJoinPool extends Abstra } /** - * Returns an estimate of the number tasks submitted to this pool - * that have not yet begun executing. This method takes time + * Returns an estimate of the number of tasks submitted to this + * pool that have not yet begun executing. This method takes time * proportional to the number of submissions. * * @return the number of queued submissions @@ -1122,8 +1070,22 @@ public class ForkJoinPool extends Abstra public void shutdown() { checkPermission(); transitionRunStateTo(SHUTDOWN); - if (canTerminateOnShutdown(runControl)) + if (canTerminateOnShutdown(runControl)) { + if (workers == null) { // shutting down before workers created + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + if (workers == null) { + terminate(); + transitionRunStateTo(TERMINATED); + termination.signalAll(); + } + } finally { + lock.unlock(); + } + } terminateOnShutdown(); + } } /** @@ -1774,11 +1736,13 @@ public class ForkJoinPool extends Abstra /** * Interface for extending managed parallelism for tasks running - * in ForkJoinPools. A ManagedBlocker provides two methods. + * in {@link ForkJoinPool}s. + * + *

A {@code ManagedBlocker} provides two methods. * Method {@code isReleasable} must return {@code true} if * blocking is not necessary. Method {@code block} blocks the * current thread if necessary (perhaps internally invoking - * {@code isReleasable} before actually blocking.). + * {@code isReleasable} before actually blocking). * *

For example, here is a ManagedBlocker based on a * ReentrantLock: @@ -1817,25 +1781,27 @@ public class ForkJoinPool extends Abstra /** * Blocks in accord with the given blocker. If the current thread - * is a ForkJoinWorkerThread, this method possibly arranges for a - * spare thread to be activated if necessary to ensure parallelism - * while the current thread is blocked. If - * {@code maintainParallelism} is {@code true} and the pool supports - * it ({@link #getMaintainsParallelism}), this method attempts to - * maintain the pool's nominal parallelism. Otherwise it activates - * a thread only if necessary to avoid complete starvation. This - * option may be preferable when blockages use timeouts, or are - * almost always brief. + * is a {@link ForkJoinWorkerThread}, this method possibly + * arranges for a spare thread to be activated if necessary to + * ensure parallelism while the current thread is blocked. + * + *

If {@code maintainParallelism} is {@code true} and the pool + * supports it ({@link #getMaintainsParallelism}), this method + * attempts to maintain the pool's nominal parallelism. Otherwise + * it activates a thread only if necessary to avoid complete + * starvation. This option may be preferable when blockages use + * timeouts, or are almost always brief. * - *

If the caller is not a ForkJoinTask, this method is behaviorally - * equivalent to + *

If the caller is not a {@link ForkJoinTask}, this method is + * behaviorally equivalent to *

 {@code
      * while (!blocker.isReleasable())
      *   if (blocker.block())
      *     return;
      * }
- * If the caller is a ForkJoinTask, then the pool may first - * be expanded to ensure parallelism, and later adjusted. + * + * If the caller is a {@code ForkJoinTask}, then the pool may + * first be expanded to ensure parallelism, and later adjusted. * * @param blocker the blocker * @param maintainParallelism if {@code true} and supported by @@ -1867,14 +1833,16 @@ public class ForkJoinPool extends Abstra do {} while (!blocker.isReleasable() && !blocker.block()); } - // AbstractExecutorService overrides + // AbstractExecutorService overrides. These rely on undocumented + // fact that ForkJoinTask.adapt returns ForkJoinTasks that also + // implement RunnableFuture. protected RunnableFuture newTaskFor(Runnable runnable, T value) { - return new AdaptedRunnable(runnable, value); + return (RunnableFuture) ForkJoinTask.adapt(runnable, value); } protected RunnableFuture newTaskFor(Callable callable) { - return new AdaptedCallable(callable); + return (RunnableFuture) ForkJoinTask.adapt(callable); } // Unsafe mechanics