/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/licenses/publicdomain */ package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; /** * An {@link ExecutorService} that executes each submitted task using * one of possibly several pooled threads, normally configured * using {@link Executors} factory methods. * *
Thread pools address two different problems: they usually * provide improved performance when executing large numbers of * asynchronous tasks, due to reduced per-task invocation overhead, * and they provide a means of bounding and managing the resources, * including threads, consumed when executing a collection of tasks. * Each ThreadPoolExecutor also maintains some basic * statistics, such as the number of completed tasks. * *
To be useful across a wide range of contexts, this class * provides many adjustable parameters and extensibility * hooks. However, programmers are urged to use the more convenient * {@link Executors} factory methods {@link * Executors#newCachedThreadPool} (unbounded thread pool, with * automatic thread reclamation), {@link Executors#newFixedThreadPool} * (fixed size thread pool) and {@link * Executors#newSingleThreadExecutor} (single background thread), that * preconfigure settings for the most common usage * scenarios. Otherwise, use the following guide when manually * configuring and tuning this class: * *
If hook or callback methods throw * exceptions, internal worker threads may in turn fail and * abruptly terminate.
Extension example. Most extensions of this class * override one or more of the protected hook methods. For example, * here is a subclass that adds a simple pause/resume feature: * *
* class PausableThreadPoolExecutor extends ThreadPoolExecutor { * private boolean isPaused; * private ReentrantLock pauseLock = new ReentrantLock(); * private Condition unpaused = pauseLock.newCondition(); * * public PausableThreadPoolExecutor(...) { super(...); } * * protected void beforeExecute(Thread t, Runnable r) { * super.beforeExecute(t, r); * pauseLock.lock(); * try { * while (isPaused) unpaused.await(); * } catch (InterruptedException ie) { * t.interrupt(); * } finally { * pauseLock.unlock(); * } * } * * public void pause() { * pauseLock.lock(); * try { * isPaused = true; * } finally { * pauseLock.unlock(); * } * } * * public void resume() { * pauseLock.lock(); * try { * isPaused = false; * unpaused.signalAll(); * } finally { * pauseLock.unlock(); * } * } * } ** @since 1.5 * @author Doug Lea */ public class ThreadPoolExecutor extends AbstractExecutorService { /* * A TPE manages a largish set of control fields, mainly runState, * poolSize, corePoolSize, maximumPoolSize. In general, state * changes only occur within mainLock regions, but nearly all * fields are volatile, so can be read outside of locked * regions. This enables the most performance-critical actions, * such as enqueuing and dequeing tasks in workQueue, to normally * proceed without holding this lock when they see that the state * allows actions. This sometimes requires a form of double-check. * For example when it appears that poolSize is less than * corePoolSize, addIfUnderCorePoolSize is called, which checks * sizes and runState under the lock before actually creating a * new thread. * * The main lifecyle control is via runState, taking on values: * RUNNING: Accept new tasks and process queued tasks * SHUTDOWN: Don't accept new tasks, but process queued tasks * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * TERMINATED: Same as stop, plus all threads have terminated * with transitions: * * RUNNING -> SHUTDOWN * On invocation of shutdown() when pool or queue nonempty * {RUNNING or SHUTDOWN} -> STOP * On invocation of shutdownNow() when pool or queue nonempty * {SHUTDOWN or STOP} -> TERMINATED * When both queue and pool become empty * RUNNING -> TERMINATED * On invocation of shutdown when both queue and pool empty * (This bypasses creating a new thread just to cause termination) * */ /** * Permission for checking shutdown */ private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); /** * Queue used for holding tasks and handing off to worker threads. */ private final BlockingQueue
There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate threads that
* the caller is not permitted to modify because it does not hold
* {@link java.lang.RuntimePermission}("modifyThread"),
* or the security manager's checkAccess method denies access.
*/
public List This method may be useful as one part of a cancellation
* scheme. It may fail to remove tasks that have been converted
* into other forms before being placed on the internal queue. For
* example, a task entered using submit might be
* converted into a form that maintains Future status.
* However, in such cases, method {@link ThreadPoolExecutor#purge}
* may be used to remove those Futures that have been cancelled.
*
* @param task the task to remove
* @return true if the task was removed
*/
public boolean remove(Runnable task) {
return getQueue().remove(task);
}
/**
* Tries to remove from the work queue all {@link Future}
* tasks that have been cancelled. This method can be useful as a
* storage reclamation operation, that has no other impact on
* functionality. Cancelled tasks are never executed, but may
* accumulate in work queues until worker threads can actively
* remove them. Invoking this method instead tries to remove them now.
* However, this method may fail to remove tasks in
* the presence of interference by other threads.
*/
public void purge() {
// Fail if we encounter interference during traversal
try {
Iterator This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke super.beforeExecute at the end of
* this method.
*
* @param t the thread that will run task r.
* @param r the task that will be executed.
*/
protected void beforeExecute(Thread t, Runnable r) { }
/**
* Method invoked upon completion of execution of the given Runnable.
* This method is invoked by the thread that executed the task. If
* non-null, the Throwable is the uncaught RuntimeException
* or Error that caused execution to terminate abruptly.
*
* Note: When actions are enclosed in tasks (such as
* {@link FutureTask}) either explicitly or via methods such as
* submit, these task objects catch and maintain
* computational exceptions, and so they do not cause abrupt
* termination, and the internal exceptions are not
* passed to this method.
*
* This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke super.afterExecute at the
* beginning of this method.
*
* @param r the runnable that has completed.
* @param t the exception that caused termination, or null if
* execution completed normally.
*/
protected void afterExecute(Runnable r, Throwable t) { }
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* super.terminated within this method.
*/
protected void terminated() { }
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the execute method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a CallerRunsPolicy.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* RejectedExecutionException.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an AbortPolicy.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a DiscardPolicy.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries execute, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a DiscardOldestPolicy for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}