/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; import java.util.*; /** * Factory and utility methods for the Executor classes * defined in java.util.concurrent. * * @since 1.5 * @see Executor * @see ExecutorService * @see Future * * @spec JSR-166 * @revised $Date: 2003/06/06 18:42:17 $ * @editor $Author: dl $ */ public class Executors { /** * A wrapper class that exposes only the ExecutorService methods * of an implementation. */ static private class DelegatedExecutorService implements ExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } } /** * Creates a thread pool that reuses a fixed set of threads * operating off a shared unbounded queue. * * @param nThreads the number of threads in the pool * @return the newly created thread pool */ public static ExecutorService newFixedThreadPool(int nThreads) { return new DelegatedExecutorService (new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } /** * Creates a thread pool that reuses a fixed set of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed. * * @param nThreads the number of threads in the pool * @param threadfactory the factory to use when creating new threads * @return the newly created thread pool */ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new DelegatedExecutorService (new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory, null)); } /** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. * * @return the newly-created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { return new DelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } /** * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create new threads when needed. * @param threadfactory the factory to use when creating new * threads * * @return the newly-created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new DelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory, null)); } /** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to execute will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. * * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new DelegatedExecutorService (new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue())); } /** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadfactory the factory to use when creating new threads * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new DelegatedExecutorService (new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory, null)); } /** * Executes a Runnable task and returns a Future representing that * task. * * @param executor the Executor to which the task will be submitted * @param task the task to submit * @param value the value which will become the return value of * the task upon task completion * @return a Future representing pending completion of the task * @throws ExecutionException if the task cannot be scheduled * for execution */ public static Future execute(Executor executor, Runnable task, T value) { FutureTask ftask; if (executor instanceof ThreadPoolExecutor) { ftask = new ThreadPoolFutureTask( (ThreadPoolExecutor) executor, task, value); } else { ftask = new FutureTask(task, value); } executor.execute(ftask); return ftask; } /** * Executes a value-returning task and returns a Future * representing the pending results of the task. * * @param executor the Executor to which the task will be submitted * @param task the task to submit * @return a Future representing pending completion of the task * @throws ExecutionException if task cannot be scheduled for execution */ public static FutureTask execute(Executor executor, Callable task) { FutureTask ftask; if (executor instanceof ThreadPoolExecutor) { ftask = new ThreadPoolFutureTask( (ThreadPoolExecutor) executor, task); } else { ftask = new FutureTask(task); } executor.execute(ftask); return ftask; } /** * Executes a Runnable task and blocks until it completes normally * or throws an exception. * * @param executor the Executor to which the task will be submitted * @param task the task to submit * @throws ExecutionException if task cannot be scheduled for execution */ public static void invoke(Executor executor, Runnable task) throws ExecutionException, InterruptedException { FutureTask ftask = new FutureTask(task, Boolean.TRUE); executor.execute(ftask); ftask.get(); } /** * Executes a value-returning task and blocks until it returns a * value or throws an exception. * * @param executor the Executor to which the task will be submitted * @param task the task to submit * @return a Future representing pending completion of the task * @throws ExecutionException if task cannot be scheduled for execution */ public static T invoke(Executor executor, Callable task) throws ExecutionException, InterruptedException { FutureTask ftask = new FutureTask(task); executor.execute(ftask); return ftask.get(); } private static class ThreadPoolFutureTask extends FutureTask { ThreadPoolFutureTask(ThreadPoolExecutor tpe, Callable callable) { super(callable); this.tpe = tpe; } ThreadPoolFutureTask(ThreadPoolExecutor tpe, Runnable runnable, V result) { super(runnable, result); this.tpe = tpe; } public boolean cancel(boolean mayInterruptIfRunning) { tpe.remove(this); // ignore success/failure return super.cancel(mayInterruptIfRunning); } private final ThreadPoolExecutor tpe; } }