--- jsr166/src/jsr166y/ForkJoinPool.java 2009/01/06 14:30:31 1.1 +++ jsr166/src/jsr166y/ForkJoinPool.java 2009/01/07 16:07:37 1.2 @@ -13,48 +13,49 @@ import sun.misc.Unsafe; import java.lang.reflect.*; /** - * Host for a group of ForkJoinWorkerThreads. A ForkJoinPool provides - * the entry point for tasks submitted from non-ForkJoinTasks, as well - * as management and monitoring operations. Normally a single - * ForkJoinPool is used for a large number of submitted - * tasks. Otherwise, use would not usually outweigh the construction - * and bookkeeping overhead of creating a large set of threads. + * An {@link ExecutorService} for running {@link ForkJoinTask}s. A + * ForkJoinPool provides the entry point for submissions from + * non-ForkJoinTasks, as well as management and monitoring operations. + * Normally a single ForkJoinPool is used for a large number of + * submitted tasks. Otherwise, use would not usually outweigh the + * construction and bookkeeping overhead of creating a large set of + * threads. * - *

ForkJoinPools differ from other kinds of Executor mainly in that - * they provide work-stealing: all threads in the pool + *

ForkJoinPools differ from other kinds of Executors mainly in + * that they provide work-stealing: all threads in the pool * attempt to find and execute subtasks created by other active tasks * (eventually blocking if none exist). This makes them efficient when - * most tasks spawn other subtasks (as do most ForkJoinTasks) but - * possibly less so otherwise. It is however fine to combine execution - * of some plain Runnable- or Callable- based activities with that of - * ForkJoinTasks. + * most tasks spawn other subtasks (as do most ForkJoinTasks), as well + * as the mixed execution of some plain Runnable- or Callable- based + * activities along with ForkJoinTasks. Otherwise, other + * ExecutorService implementations are typically more appropriate + * choices. * *

A ForkJoinPool may be constructed with a given parallelism level * (target pool size), which it attempts to maintain by dynamically - * adding, suspending, or resuming threads, even if some tasks have - * blocked waiting to join others. However, no such adjustments are - * performed in the face of blocked IO or other unmanaged - * synchronization. The nested ManagedBlocker interface enables - * extension of the kinds of synchronization accommodated. - * - *

The target parallelism level may also be set dynamically. You - * can limit the number of threads dynamically constructed using - * method setMaximumPoolSize and/or - * setMaintainParallelism. + * adding, suspending, or resuming threads, even if some tasks are + * waiting to join others. However, no such adjustments are performed + * in the face of blocked IO or other unmanaged synchronization. The + * nested ManagedBlocker interface enables extension of + * the kinds of synchronization accommodated. The target parallelism + * level may also be changed dynamically (setParallelism) + * and dynamically thread construction can be limited using methods + * setMaximumPoolSize and/or + * setMaintainsParallelism. * *

In addition to execution and lifecycle control methods, this * class provides status check methods (for example - * getStealCount) that are intended to aid in developing, + * getStealCount) that are intended to aid in developing, * tuning, and monitoring fork/join applications. Also, method - * toString returns indications of pool state in a convenient - * form for informal monitoring. + * toString returns indications of pool state in a + * convenient form for informal monitoring. * *

Implementation notes: This implementation restricts the - * maximum parallelism to 32767. Attempts to create pools with greater - * than the maximum result in IllegalArgumentExceptions. + * maximum number of running threads to 32767. Attempts to create + * pools with greater than the maximum result in + * IllegalArgumentExceptions. */ -public class ForkJoinPool extends AbstractExecutorService - implements ExecutorService { +public class ForkJoinPool extends AbstractExecutorService { /* * See the extended comments interspersed below for design, @@ -87,7 +88,7 @@ public class ForkJoinPool extends Abstra * Default ForkJoinWorkerThreadFactory implementation, creates a * new ForkJoinWorkerThread. */ - public static class DefaultForkJoinWorkerThreadFactory + static class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public ForkJoinWorkerThread newThread(ForkJoinPool pool) { try { @@ -99,14 +100,13 @@ public class ForkJoinPool extends Abstra } /** - * The default ForkJoinWorkerThreadFactory, used unless overridden - * in ForkJoinPool constructors. + * Creates a new ForkJoinWorkerThread. This factory is used unless + * overridden in ForkJoinPool constructors. */ - private static final DefaultForkJoinWorkerThreadFactory + public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); - /** * Permission required for callers of methods that may start or * kill threads. @@ -320,7 +320,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}("modifyThread"), */ public ForkJoinPool() { this(Runtime.getRuntime().availableProcessors(), @@ -336,14 +336,14 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}("modifyThread"), */ public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory); } /** - * Creates a ForkJoinPool with a pool size equal to the number of + * Creates a ForkJoinPool with parallelism equal to the number of * processors available on the system and using the given * ForkJoinWorkerThreadFactory, * @param factory the factory for creating new threads @@ -351,15 +351,14 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}("modifyThread"), */ public ForkJoinPool(ForkJoinWorkerThreadFactory factory) { this(Runtime.getRuntime().availableProcessors(), factory); } /** - * Creates a ForkJoinPool with the indicated target number of - * worker threads and the given factory. + * Creates a ForkJoinPool with the given parallelism and factory. * * @param parallelism the targeted number of worker threads * @param factory the factory for creating new threads @@ -369,7 +368,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}("modifyThread"), */ public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) { if (parallelism <= 0 || parallelism > MAX_THREADS) @@ -500,57 +499,6 @@ public class ForkJoinPool extends Abstra } } - /** - * Sets the handler for internal worker threads that terminate due - * to unrecoverable errors encountered while executing tasks. - * Unless set, the current default or ThreadGroup handler is used - * as handler. - * - * @param h the new handler - * @return the old handler, or null if none - * @throws SecurityException if a security manager exists and - * the caller is not permitted to modify threads - * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), - */ - public Thread.UncaughtExceptionHandler - setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) { - checkPermission(); - Thread.UncaughtExceptionHandler old = null; - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - old = ueh; - ueh = h; - ForkJoinWorkerThread[] ws = workers; - for (int i = 0; i < ws.length; ++i) { - ForkJoinWorkerThread w = ws[i]; - if (w != null) - w.setUncaughtExceptionHandler(h); - } - } finally { - lock.unlock(); - } - return old; - } - - /** - * Returns the handler for internal worker threads that terminate - * due to unrecoverable errors encountered while executing tasks. - * @return the handler, or null if none - */ - public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { - Thread.UncaughtExceptionHandler h; - final ReentrantLock lock = this.workerLock; - lock.lock(); - try { - h = ueh; - } finally { - lock.unlock(); - } - return h; - } - // Execution methods /** @@ -609,14 +557,6 @@ public class ForkJoinPool extends Abstra return job; } - protected RunnableFuture newTaskFor(Runnable runnable, T value) { - return new AdaptedRunnable(runnable, value); - } - - protected RunnableFuture newTaskFor(Callable callable) { - return new AdaptedCallable(callable); - } - /** * Adaptor for Runnables. This implements RunnableFuture * to be compliant with AbstractExecutorService constraints @@ -698,6 +638,58 @@ public class ForkJoinPool extends Abstra } /** + * Returns the handler for internal worker threads that terminate + * due to unrecoverable errors encountered while executing tasks. + * @return the handler, or null if none + */ + public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { + Thread.UncaughtExceptionHandler h; + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + h = ueh; + } finally { + lock.unlock(); + } + return h; + } + + /** + * Sets the handler for internal worker threads that terminate due + * to unrecoverable errors encountered while executing tasks. + * Unless set, the current default or ThreadGroup handler is used + * as handler. + * + * @param h the new handler + * @return the old handler, or null if none + * @throws SecurityException if a security manager exists and + * the caller is not permitted to modify threads + * because it does not hold {@link + * java.lang.RuntimePermission}("modifyThread"), + */ + public Thread.UncaughtExceptionHandler + setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) { + checkPermission(); + Thread.UncaughtExceptionHandler old = null; + final ReentrantLock lock = this.workerLock; + lock.lock(); + try { + old = ueh; + ueh = h; + ForkJoinWorkerThread[] ws = workers; + for (int i = 0; i < ws.length; ++i) { + ForkJoinWorkerThread w = ws[i]; + if (w != null) + w.setUncaughtExceptionHandler(h); + } + } finally { + lock.unlock(); + } + return old; + } + + + /** * Sets the target paralleism level of this pool. * @param parallelism the target parallelism * @throws IllegalArgumentException if parallelism less than or @@ -705,7 +697,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}("modifyThread"), */ public void setParallelism(int parallelism) { checkPermission(); @@ -730,8 +722,6 @@ public class ForkJoinPool extends Abstra /** * Returns the targeted number of worker threads in this pool. - * This value does not necessarily reflect transient changes as - * threads are added, removed, or abruptly terminate. * * @return the targeted number of worker threads in this pool */ @@ -742,7 +732,7 @@ public class ForkJoinPool extends Abstra /** * Returns the number of worker threads that have started but not * yet terminated. This result returned by this method may differ - * from getParallelism when threads are created to + * from getParallelism when threads are created to * maintain parallelism when others are cooperatively blocked. * * @return the number of worker threads @@ -797,8 +787,8 @@ public class ForkJoinPool extends Abstra } /** - * Returns the approximate number of worker threads that are not - * blocked waiting to join tasks or for other managed + * Returns an estimate of the number of worker threads that are + * not blocked waiting to join tasks or for other managed * synchronization. * * @return the number of worker threads @@ -808,7 +798,7 @@ public class ForkJoinPool extends Abstra } /** - * Returns the approximate number of threads that are currently + * Returns an estimate of the number of threads that are currently * stealing or executing tasks. This method may overestimate the * number of active threads. * @return the number of active threads. @@ -818,7 +808,7 @@ public class ForkJoinPool extends Abstra } /** - * Returns the approximate number of threads that are currently + * Returns an estimate of the number of threads that are currently * idle waiting for tasks. This method may underestimate the * number of idle threads. * @return the number of idle threads. @@ -867,11 +857,12 @@ public class ForkJoinPool extends Abstra } /** - * Returns the total number of tasks currently held in queues by - * worker threads (but not including tasks submitted to the pool - * that have not begun executing). This value is only an - * approximation, obtained by iterating across all threads in the - * pool. This method may be useful for tuning task granularities. + * Returns an estimate of the total number of tasks currently held + * in queues by worker threads (but not including tasks submitted + * to the pool that have not begun executing). This value is only + * an approximation, obtained by iterating across all threads in + * the pool. This method may be useful for tuning task + * granularities. * @return the number of queued tasks. */ public long getQueuedTaskCount() { @@ -886,7 +877,7 @@ public class ForkJoinPool extends Abstra } /** - * Returns the approximate number tasks submitted to this pool + * Returns an estimate of the number tasks submitted to this pool * that have not yet begun executing. This method takes time * proportional to the number of submissions. * @return the number of queued submissions. @@ -898,7 +889,7 @@ public class ForkJoinPool extends Abstra /** * Returns true if there are any tasks submitted to this pool * that have not yet begun executing. - * @return true if there are any queued submissions. + * @return true if there are any queued submissions. */ public boolean hasQueuedSubmissions() { return !submissionQueue.isEmpty(); @@ -961,7 +952,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}("modifyThread"), */ public void shutdown() { checkPermission(); @@ -981,7 +972,7 @@ public class ForkJoinPool extends Abstra * @throws SecurityException if a security manager exists and * the caller is not permitted to modify threads * because it does not hold {@link - * java.lang.RuntimePermission}("modifyThread"), + * java.lang.RuntimePermission}("modifyThread"), */ public List shutdownNow() { checkPermission(); @@ -990,28 +981,28 @@ public class ForkJoinPool extends Abstra } /** - * Returns true if all tasks have completed following shut down. + * Returns true if all tasks have completed following shut down. * - * @return true if all tasks have completed following shut down + * @return true if all tasks have completed following shut down */ public boolean isTerminated() { return runStateOf(runControl) == TERMINATED; } /** - * Returns true if the process of termination has + * Returns true if the process of termination has * commenced but possibly not yet completed. * - * @return true if terminating + * @return true if terminating */ public boolean isTerminating() { return runStateOf(runControl) >= TERMINATING; } /** - * Returns true if this pool has been shut down. + * Returns true if this pool has been shut down. * - * @return true if this pool has been shut down + * @return true if this pool has been shut down */ public boolean isShutdown() { return runStateOf(runControl) >= SHUTDOWN; @@ -1024,8 +1015,8 @@ public class ForkJoinPool extends Abstra * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument - * @return true if this executor terminated and - * false if the timeout elapsed before termination + * @return true if this executor terminated and + * false if the timeout elapsed before termination * @throws InterruptedException if interrupted while waiting */ public boolean awaitTermination(long timeout, TimeUnit unit) @@ -1598,8 +1589,8 @@ public class ForkJoinPool extends Abstra /** * Interface for extending managed parallelism for tasks running * in ForkJoinPools. A ManagedBlocker provides two methods. - * Method isReleasable must return true if blocking is not - * necessary. Method block blocks the current thread + * Method isReleasable must return true if blocking is not + * necessary. Method block blocks the current thread * if necessary (perhaps internally invoking isReleasable before * actually blocking.). *

For example, here is a ManagedBlocker based on a @@ -1642,8 +1633,8 @@ public class ForkJoinPool extends Abstra * is a ForkJoinWorkerThread, this method possibly arranges for a * spare thread to be activated if necessary to ensure parallelism * while the current thread is blocked. If - * maintainParallelism is true and the pool supports it - * (see getMaintainsParallelism), this method attempts to + * maintainParallelism is true and the pool supports + * it ({@link #getMaintainsParallelism}), this method attempts to * maintain the pool's nominal parallelism. Otherwise if activates * a thread only if necessary to avoid complete starvation. This * option may be preferable when blockages use timeouts, or are @@ -1689,6 +1680,16 @@ public class ForkJoinPool extends Abstra do;while (!blocker.isReleasable() && !blocker.block()); } + // AbstractExecutorService overrides + + protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new AdaptedRunnable(runnable, value); + } + + protected RunnableFuture newTaskFor(Callable callable) { + return new AdaptedCallable(callable); + } + // Temporary Unsafe mechanics for preliminary release