--- jsr166/src/jsr166y/ForkJoinPool.java 2009/01/06 14:30:31 1.1 +++ jsr166/src/jsr166y/ForkJoinPool.java 2009/01/07 16:07:37 1.2 @@ -13,48 +13,49 @@ import sun.misc.Unsafe; import java.lang.reflect.*; /** - * Host for a group of ForkJoinWorkerThreads. A ForkJoinPool provides - * the entry point for tasks submitted from non-ForkJoinTasks, as well - * as management and monitoring operations. Normally a single - * ForkJoinPool is used for a large number of submitted - * tasks. Otherwise, use would not usually outweigh the construction - * and bookkeeping overhead of creating a large set of threads. + * An {@link ExecutorService} for running {@link ForkJoinTask}s. A + * ForkJoinPool provides the entry point for submissions from + * non-ForkJoinTasks, as well as management and monitoring operations. + * Normally a single ForkJoinPool is used for a large number of + * submitted tasks. Otherwise, use would not usually outweigh the + * construction and bookkeeping overhead of creating a large set of + * threads. * - *
ForkJoinPools differ from other kinds of Executor mainly in that - * they provide work-stealing: all threads in the pool + *
ForkJoinPools differ from other kinds of Executors mainly in + * that they provide work-stealing: all threads in the pool * attempt to find and execute subtasks created by other active tasks * (eventually blocking if none exist). This makes them efficient when - * most tasks spawn other subtasks (as do most ForkJoinTasks) but - * possibly less so otherwise. It is however fine to combine execution - * of some plain Runnable- or Callable- based activities with that of - * ForkJoinTasks. + * most tasks spawn other subtasks (as do most ForkJoinTasks), as well + * as the mixed execution of some plain Runnable- or Callable- based + * activities along with ForkJoinTasks. Otherwise, other + * ExecutorService implementations are typically more appropriate + * choices. * *
A ForkJoinPool may be constructed with a given parallelism level * (target pool size), which it attempts to maintain by dynamically - * adding, suspending, or resuming threads, even if some tasks have - * blocked waiting to join others. However, no such adjustments are - * performed in the face of blocked IO or other unmanaged - * synchronization. The nested ManagedBlocker interface enables - * extension of the kinds of synchronization accommodated. - * - *
The target parallelism level may also be set dynamically. You
- * can limit the number of threads dynamically constructed using
- * method setMaximumPoolSize and/or
- * setMaintainParallelism.
+ * adding, suspending, or resuming threads, even if some tasks are
+ * waiting to join others. However, no such adjustments are performed
+ * in the face of blocked IO or other unmanaged synchronization. The
+ * nested ManagedBlocker
interface enables extension of
+ * the kinds of synchronization accommodated. The target parallelism
+ * level may also be changed dynamically (setParallelism
)
+ * and dynamically thread construction can be limited using methods
+ * setMaximumPoolSize
and/or
+ * setMaintainsParallelism
.
*
*
In addition to execution and lifecycle control methods, this
* class provides status check methods (for example
- * getStealCount) that are intended to aid in developing,
+ * getStealCount
) that are intended to aid in developing,
* tuning, and monitoring fork/join applications. Also, method
- * toString returns indications of pool state in a convenient
- * form for informal monitoring.
+ * toString
returns indications of pool state in a
+ * convenient form for informal monitoring.
*
*
Implementation notes: This implementation restricts the
- * maximum parallelism to 32767. Attempts to create pools with greater
- * than the maximum result in IllegalArgumentExceptions.
+ * maximum number of running threads to 32767. Attempts to create
+ * pools with greater than the maximum result in
+ * IllegalArgumentExceptions.
*/
-public class ForkJoinPool extends AbstractExecutorService
- implements ExecutorService {
+public class ForkJoinPool extends AbstractExecutorService {
/*
* See the extended comments interspersed below for design,
@@ -87,7 +88,7 @@ public class ForkJoinPool extends Abstra
* Default ForkJoinWorkerThreadFactory implementation, creates a
* new ForkJoinWorkerThread.
*/
- public static class DefaultForkJoinWorkerThreadFactory
+ static class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
try {
@@ -99,14 +100,13 @@ public class ForkJoinPool extends Abstra
}
/**
- * The default ForkJoinWorkerThreadFactory, used unless overridden
- * in ForkJoinPool constructors.
+ * Creates a new ForkJoinWorkerThread. This factory is used unless
+ * overridden in ForkJoinPool constructors.
*/
- private static final DefaultForkJoinWorkerThreadFactory
+ public static final ForkJoinWorkerThreadFactory
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
-
/**
* Permission required for callers of methods that may start or
* kill threads.
@@ -320,7 +320,7 @@ public class ForkJoinPool extends Abstra
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
- * java.lang.RuntimePermission}("modifyThread"),
+ * java.lang.RuntimePermission} For example, here is a ManagedBlocker based on a
@@ -1642,8 +1633,8 @@ public class ForkJoinPool extends Abstra
* is a ForkJoinWorkerThread, this method possibly arranges for a
* spare thread to be activated if necessary to ensure parallelism
* while the current thread is blocked. If
- * maintainParallelism is true and the pool supports it
- * (see getMaintainsParallelism), this method attempts to
+ * ("modifyThread")
,
*/
public ForkJoinPool() {
this(Runtime.getRuntime().availableProcessors(),
@@ -336,14 +336,14 @@ public class ForkJoinPool extends Abstra
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
- * java.lang.RuntimePermission}("modifyThread"),
+ * java.lang.RuntimePermission}("modifyThread")
,
*/
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory);
}
/**
- * Creates a ForkJoinPool with a pool size equal to the number of
+ * Creates a ForkJoinPool with parallelism equal to the number of
* processors available on the system and using the given
* ForkJoinWorkerThreadFactory,
* @param factory the factory for creating new threads
@@ -351,15 +351,14 @@ public class ForkJoinPool extends Abstra
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
- * java.lang.RuntimePermission}("modifyThread"),
+ * java.lang.RuntimePermission}("modifyThread")
,
*/
public ForkJoinPool(ForkJoinWorkerThreadFactory factory) {
this(Runtime.getRuntime().availableProcessors(), factory);
}
/**
- * Creates a ForkJoinPool with the indicated target number of
- * worker threads and the given factory.
+ * Creates a ForkJoinPool with the given parallelism and factory.
*
* @param parallelism the targeted number of worker threads
* @param factory the factory for creating new threads
@@ -369,7 +368,7 @@ public class ForkJoinPool extends Abstra
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
- * java.lang.RuntimePermission}("modifyThread"),
+ * java.lang.RuntimePermission}("modifyThread")
,
*/
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory) {
if (parallelism <= 0 || parallelism > MAX_THREADS)
@@ -500,57 +499,6 @@ public class ForkJoinPool extends Abstra
}
}
- /**
- * Sets the handler for internal worker threads that terminate due
- * to unrecoverable errors encountered while executing tasks.
- * Unless set, the current default or ThreadGroup handler is used
- * as handler.
- *
- * @param h the new handler
- * @return the old handler, or null if none
- * @throws SecurityException if a security manager exists and
- * the caller is not permitted to modify threads
- * because it does not hold {@link
- * java.lang.RuntimePermission}("modifyThread"),
- */
- public Thread.UncaughtExceptionHandler
- setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
- checkPermission();
- Thread.UncaughtExceptionHandler old = null;
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- old = ueh;
- ueh = h;
- ForkJoinWorkerThread[] ws = workers;
- for (int i = 0; i < ws.length; ++i) {
- ForkJoinWorkerThread w = ws[i];
- if (w != null)
- w.setUncaughtExceptionHandler(h);
- }
- } finally {
- lock.unlock();
- }
- return old;
- }
-
- /**
- * Returns the handler for internal worker threads that terminate
- * due to unrecoverable errors encountered while executing tasks.
- * @return the handler, or null if none
- */
- public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
- Thread.UncaughtExceptionHandler h;
- final ReentrantLock lock = this.workerLock;
- lock.lock();
- try {
- h = ueh;
- } finally {
- lock.unlock();
- }
- return h;
- }
-
// Execution methods
/**
@@ -609,14 +557,6 @@ public class ForkJoinPool extends Abstra
return job;
}
- protected ("modifyThread")
,
+ */
+ public Thread.UncaughtExceptionHandler
+ setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler h) {
+ checkPermission();
+ Thread.UncaughtExceptionHandler old = null;
+ final ReentrantLock lock = this.workerLock;
+ lock.lock();
+ try {
+ old = ueh;
+ ueh = h;
+ ForkJoinWorkerThread[] ws = workers;
+ for (int i = 0; i < ws.length; ++i) {
+ ForkJoinWorkerThread w = ws[i];
+ if (w != null)
+ w.setUncaughtExceptionHandler(h);
+ }
+ } finally {
+ lock.unlock();
+ }
+ return old;
+ }
+
+
+ /**
* Sets the target paralleism level of this pool.
* @param parallelism the target parallelism
* @throws IllegalArgumentException if parallelism less than or
@@ -705,7 +697,7 @@ public class ForkJoinPool extends Abstra
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
- * java.lang.RuntimePermission}("modifyThread"),
+ * java.lang.RuntimePermission}("modifyThread")
,
*/
public void setParallelism(int parallelism) {
checkPermission();
@@ -730,8 +722,6 @@ public class ForkJoinPool extends Abstra
/**
* Returns the targeted number of worker threads in this pool.
- * This value does not necessarily reflect transient changes as
- * threads are added, removed, or abruptly terminate.
*
* @return the targeted number of worker threads in this pool
*/
@@ -742,7 +732,7 @@ public class ForkJoinPool extends Abstra
/**
* Returns the number of worker threads that have started but not
* yet terminated. This result returned by this method may differ
- * from getParallelism when threads are created to
+ * from getParallelism
when threads are created to
* maintain parallelism when others are cooperatively blocked.
*
* @return the number of worker threads
@@ -797,8 +787,8 @@ public class ForkJoinPool extends Abstra
}
/**
- * Returns the approximate number of worker threads that are not
- * blocked waiting to join tasks or for other managed
+ * Returns an estimate of the number of worker threads that are
+ * not blocked waiting to join tasks or for other managed
* synchronization.
*
* @return the number of worker threads
@@ -808,7 +798,7 @@ public class ForkJoinPool extends Abstra
}
/**
- * Returns the approximate number of threads that are currently
+ * Returns an estimate of the number of threads that are currently
* stealing or executing tasks. This method may overestimate the
* number of active threads.
* @return the number of active threads.
@@ -818,7 +808,7 @@ public class ForkJoinPool extends Abstra
}
/**
- * Returns the approximate number of threads that are currently
+ * Returns an estimate of the number of threads that are currently
* idle waiting for tasks. This method may underestimate the
* number of idle threads.
* @return the number of idle threads.
@@ -867,11 +857,12 @@ public class ForkJoinPool extends Abstra
}
/**
- * Returns the total number of tasks currently held in queues by
- * worker threads (but not including tasks submitted to the pool
- * that have not begun executing). This value is only an
- * approximation, obtained by iterating across all threads in the
- * pool. This method may be useful for tuning task granularities.
+ * Returns an estimate of the total number of tasks currently held
+ * in queues by worker threads (but not including tasks submitted
+ * to the pool that have not begun executing). This value is only
+ * an approximation, obtained by iterating across all threads in
+ * the pool. This method may be useful for tuning task
+ * granularities.
* @return the number of queued tasks.
*/
public long getQueuedTaskCount() {
@@ -886,7 +877,7 @@ public class ForkJoinPool extends Abstra
}
/**
- * Returns the approximate number tasks submitted to this pool
+ * Returns an estimate of the number tasks submitted to this pool
* that have not yet begun executing. This method takes time
* proportional to the number of submissions.
* @return the number of queued submissions.
@@ -898,7 +889,7 @@ public class ForkJoinPool extends Abstra
/**
* Returns true if there are any tasks submitted to this pool
* that have not yet begun executing.
- * @return true if there are any queued submissions.
+ * @return true
if there are any queued submissions.
*/
public boolean hasQueuedSubmissions() {
return !submissionQueue.isEmpty();
@@ -961,7 +952,7 @@ public class ForkJoinPool extends Abstra
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
- * java.lang.RuntimePermission}("modifyThread"),
+ * java.lang.RuntimePermission}("modifyThread")
,
*/
public void shutdown() {
checkPermission();
@@ -981,7 +972,7 @@ public class ForkJoinPool extends Abstra
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
* because it does not hold {@link
- * java.lang.RuntimePermission}("modifyThread"),
+ * java.lang.RuntimePermission}("modifyThread")
,
*/
public Listtrue
if all tasks have completed following shut down.
*
- * @return true if all tasks have completed following shut down
+ * @return true
if all tasks have completed following shut down
*/
public boolean isTerminated() {
return runStateOf(runControl) == TERMINATED;
}
/**
- * Returns true if the process of termination has
+ * Returns true
if the process of termination has
* commenced but possibly not yet completed.
*
- * @return true if terminating
+ * @return true
if terminating
*/
public boolean isTerminating() {
return runStateOf(runControl) >= TERMINATING;
}
/**
- * Returns true if this pool has been shut down.
+ * Returns true
if this pool has been shut down.
*
- * @return true if this pool has been shut down
+ * @return true
if this pool has been shut down
*/
public boolean isShutdown() {
return runStateOf(runControl) >= SHUTDOWN;
@@ -1024,8 +1015,8 @@ public class ForkJoinPool extends Abstra
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
- * @return true if this executor terminated and
- * false if the timeout elapsed before termination
+ * @return true
if this executor terminated and
+ * false
if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
@@ -1598,8 +1589,8 @@ public class ForkJoinPool extends Abstra
/**
* Interface for extending managed parallelism for tasks running
* in ForkJoinPools. A ManagedBlocker provides two methods.
- * Method isReleasable must return true if blocking is not
- * necessary. Method block blocks the current thread
+ * Method isReleasable
must return true if blocking is not
+ * necessary. Method block
blocks the current thread
* if necessary (perhaps internally invoking isReleasable before
* actually blocking.).
* maintainParallelism
is true and the pool supports
+ * it ({@link #getMaintainsParallelism}), this method attempts to
* maintain the pool's nominal parallelism. Otherwise if activates
* a thread only if necessary to avoid complete starvation. This
* option may be preferable when blockages use timeouts, or are
@@ -1689,6 +1680,16 @@ public class ForkJoinPool extends Abstra
do;while (!blocker.isReleasable() && !blocker.block());
}
+ // AbstractExecutorService overrides
+
+ protected