ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Executors.java
Revision: 1.26
Committed: Fri Dec 5 17:56:24 2003 UTC (20 years, 6 months ago) by tim
Branch: MAIN
Changes since 1.25: +111 -7 lines
Log Message:
added newScheduledThreadPool factory methods and related support class

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.2 import java.util.*;
9 dl 1.22 import java.util.concurrent.atomic.AtomicInteger;
10 tim 1.20 import java.security.AccessControlContext;
11     import java.security.AccessController;
12     import java.security.PrivilegedAction;
13     import java.security.PrivilegedExceptionAction;
14 tim 1.1
15     /**
16 dl 1.18 * Factory and utility methods for {@link Executor}, {@link
17 dl 1.25 * ExecutorService}, {@link ThreadFactory}, and {@link Future}
18     * classes defined in this package.
19 tim 1.1 *
20     * @since 1.5
21 dl 1.12 * @author Doug Lea
22 tim 1.1 */
23     public class Executors {
24    
25     /**
26 dl 1.2 * A wrapper class that exposes only the ExecutorService methods
27     * of an implementation.
28 tim 1.6 */
29 jozart 1.11 private static class DelegatedExecutorService implements ExecutorService {
30 dl 1.2 private final ExecutorService e;
31     DelegatedExecutorService(ExecutorService executor) { e = executor; }
32     public void execute(Runnable command) { e.execute(command); }
33     public void shutdown() { e.shutdown(); }
34     public List shutdownNow() { return e.shutdownNow(); }
35     public boolean isShutdown() { return e.isShutdown(); }
36     public boolean isTerminated() { return e.isTerminated(); }
37     public boolean awaitTermination(long timeout, TimeUnit unit)
38     throws InterruptedException {
39     return e.awaitTermination(timeout, unit);
40     }
41     }
42 tim 1.26
43     /**
44     * A wrapper class that exposes only the ExecutorService and
45     * ScheduleExecutor methods of a ScheduledThreadPoolExecutor.
46     */
47     private static class DelegatedScheduledExecutorService
48     extends DelegatedExecutorService
49     implements ScheduledExecutor {
50    
51     private final ScheduledExecutor e;
52     DelegatedScheduledExecutorService(ScheduledThreadPoolExecutor executor) {
53     super(executor);
54     e = executor;
55     }
56     public ScheduledFuture<Boolean> schedule(Runnable command, long delay, TimeUnit unit) {
57     return e.schedule(command, delay, unit);
58     }
59     public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
60     return e.schedule(callable, delay, unit);
61     }
62     public ScheduledFuture<Boolean> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
63     return e.scheduleAtFixedRate(command, initialDelay, period, unit);
64     }
65     public ScheduledFuture<Boolean> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
66     return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
67     }
68     }
69 dl 1.2
70     /**
71 tim 1.1 * Creates a thread pool that reuses a fixed set of threads
72 dl 1.16 * operating off a shared unbounded queue. If any thread
73     * terminates due to a failure during execution prior to shutdown,
74     * a new one will take its place if needed to execute subsequent
75     * tasks.
76 tim 1.1 *
77     * @param nThreads the number of threads in the pool
78     * @return the newly created thread pool
79     */
80 dl 1.2 public static ExecutorService newFixedThreadPool(int nThreads) {
81     return new DelegatedExecutorService
82     (new ThreadPoolExecutor(nThreads, nThreads,
83     0L, TimeUnit.MILLISECONDS,
84 dl 1.3 new LinkedBlockingQueue<Runnable>()));
85 dl 1.2 }
86    
87     /**
88     * Creates a thread pool that reuses a fixed set of threads
89     * operating off a shared unbounded queue, using the provided
90     * ThreadFactory to create new threads when needed.
91     *
92     * @param nThreads the number of threads in the pool
93 dl 1.12 * @param threadFactory the factory to use when creating new threads
94 dl 1.2 * @return the newly created thread pool
95     */
96     public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
97     return new DelegatedExecutorService
98     (new ThreadPoolExecutor(nThreads, nThreads,
99     0L, TimeUnit.MILLISECONDS,
100 dl 1.3 new LinkedBlockingQueue<Runnable>(),
101 dl 1.14 threadFactory));
102 dl 1.2 }
103    
104     /**
105     * Creates an Executor that uses a single worker thread operating
106     * off an unbounded queue. (Note however that if this single
107     * thread terminates due to a failure during execution prior to
108     * shutdown, a new one will take its place if needed to execute
109     * subsequent tasks.) Tasks are guaranteed to execute
110     * sequentially, and no more than one task will be active at any
111 dl 1.16 * given time. This method is equivalent in effect to
112     *<tt>new FixedThreadPool(1)</tt>.
113 dl 1.2 *
114     * @return the newly-created single-threaded Executor
115     */
116     public static ExecutorService newSingleThreadExecutor() {
117     return new DelegatedExecutorService
118 tim 1.6 (new ThreadPoolExecutor(1, 1,
119 dl 1.2 0L, TimeUnit.MILLISECONDS,
120 dl 1.3 new LinkedBlockingQueue<Runnable>()));
121 dl 1.2 }
122    
123     /**
124     * Creates an Executor that uses a single worker thread operating
125     * off an unbounded queue, and uses the provided ThreadFactory to
126     * create new threads when needed.
127 dl 1.12 * @param threadFactory the factory to use when creating new
128 dl 1.2 * threads
129     *
130     * @return the newly-created single-threaded Executor
131     */
132     public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
133     return new DelegatedExecutorService
134 tim 1.6 (new ThreadPoolExecutor(1, 1,
135 dl 1.2 0L, TimeUnit.MILLISECONDS,
136 dl 1.3 new LinkedBlockingQueue<Runnable>(),
137 dl 1.14 threadFactory));
138 tim 1.1 }
139    
140     /**
141     * Creates a thread pool that creates new threads as needed, but
142     * will reuse previously constructed threads when they are
143     * available. These pools will typically improve the performance
144     * of programs that execute many short-lived asynchronous tasks.
145     * Calls to <tt>execute</tt> will reuse previously constructed
146     * threads if available. If no existing thread is available, a new
147     * thread will be created and added to the pool. Threads that have
148     * not been used for sixty seconds are terminated and removed from
149     * the cache. Thus, a pool that remains idle for long enough will
150 dl 1.16 * not consume any resources. Note that pools with similar
151     * properties but different details (for example, timeout parameters)
152     * may be created using {@link ThreadPoolExecutor} constructors.
153 tim 1.1 *
154     * @return the newly created thread pool
155     */
156 dl 1.2 public static ExecutorService newCachedThreadPool() {
157     return new DelegatedExecutorService
158     (new ThreadPoolExecutor(0, Integer.MAX_VALUE,
159     60, TimeUnit.SECONDS,
160 dl 1.3 new SynchronousQueue<Runnable>()));
161 tim 1.1 }
162    
163     /**
164 dl 1.2 * Creates a thread pool that creates new threads as needed, but
165     * will reuse previously constructed threads when they are
166 tim 1.6 * available, and uses the provided
167 dl 1.2 * ThreadFactory to create new threads when needed.
168 dl 1.12 * @param threadFactory the factory to use when creating new threads
169 tim 1.1 * @return the newly created thread pool
170     */
171 dl 1.2 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
172     return new DelegatedExecutorService
173     (new ThreadPoolExecutor(0, Integer.MAX_VALUE,
174     60, TimeUnit.SECONDS,
175 dl 1.3 new SynchronousQueue<Runnable>(),
176 dl 1.14 threadFactory));
177 tim 1.1 }
178 tim 1.26
179    
180     /**
181     * Creates a thread pool that can schedule commands to run after a
182     * given delay, or to execute periodically.
183     * @return a <tt>ScheduledExecutor</tt> that may safely be cast to
184     * an <tt>ExecutorService</tt>.
185     */
186     public static ScheduledExecutor newScheduledThreadPool() {
187     return newScheduledThreadPool(0);
188     }
189    
190    
191     /**
192     * Creates a thread pool that can schedule commands to run after a
193     * given delay, or to execute periodically.
194     * @param corePoolSize the number of threads to keep in the pool,
195     * even if they are idle.
196     * @return a <tt>ScheduledExecutor</tt> that may safely be cast to
197     * an <tt>ExecutorService</tt>.
198     */
199     public static ScheduledExecutor newScheduledThreadPool(int corePoolSize) {
200     return newScheduledThreadPool(corePoolSize, null);
201     }
202    
203    
204     /**
205     * Creates a thread pool that can schedule commands to run after a
206     * given delay, or to execute periodically.
207     * @param corePoolSize the number of threads to keep in the pool,
208     * even if they are idle.
209     * @param threadFactory the factory to use when the executor
210     * creates a new thread.
211     * @return a <tt>ScheduledExecutor</tt> that may safely be cast to
212     * an <tt>ExecutorService</tt>.
213     */
214     public static ScheduledExecutor newScheduledThreadPool(int corePoolSize,
215     ThreadFactory threadFactory) {
216     return newScheduledThreadPool(corePoolSize, threadFactory, false, false);
217     }
218    
219    
220     /**
221     * Creates a thread pool that can schedule commands to run after a
222     * given delay, or to execute periodically.
223     * @param corePoolSize the number of threads to keep in the pool,
224     * even if they are idle.
225     * @param threadFactory the factory to use when the executor
226     * creates a new thread.
227     * @param continueExistingPeriodicTasksAfterShutdown whether to
228     * continue executing existing periodic tasks even when the returned
229     * executor has been <tt>shutdown</tt>.
230     * @param executeExistingDelayedTasksAfterShutdown whether to
231     * continue executing existing delayed tasks even when the returned
232     * executor has been <tt>shutdown</tt>.
233     * @return a <tt>ScheduledExecutor</tt> that may safely be cast to
234     * an <tt>ExecutorService</tt>.
235     */
236     public static ScheduledExecutor newScheduledThreadPool(
237     int corePoolSize,
238     ThreadFactory threadFactory,
239     boolean continueExistingPeriodicTasksAfterShutdown,
240     boolean executeExistingDelayedTasksAfterShutdown) {
241    
242     ScheduledThreadPoolExecutor stpe = threadFactory == null ?
243     new ScheduledThreadPoolExecutor(corePoolSize) :
244     new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
245    
246     stpe.setContinueExistingPeriodicTasksAfterShutdownPolicy(
247     continueExistingPeriodicTasksAfterShutdown);
248    
249     stpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(
250     executeExistingDelayedTasksAfterShutdown);
251    
252     return new DelegatedScheduledExecutorService(stpe);
253     }
254    
255 tim 1.1
256     /**
257 dl 1.25 * Executes a Runnable task and returns a Future representing that
258 tim 1.15 * task.
259     *
260     * @param executor the Executor to which the task will be submitted
261     * @param task the task to submit
262 dl 1.25 * @return a Future representing pending completion of the task,
263     * and whose <tt>get()</tt> method will return <tt>Boolean.TRUE</tt>
264     * upon completion
265 tim 1.15 * @throws RejectedExecutionException if task cannot be scheduled
266     * for execution
267     */
268 dl 1.25 public static Future<Boolean> execute(Executor executor, Runnable task) {
269 tim 1.15 FutureTask<Boolean> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
270     executor.execute(ftask);
271     return ftask;
272     }
273    
274     /**
275 tim 1.1 * Executes a Runnable task and returns a Future representing that
276     * task.
277     *
278     * @param executor the Executor to which the task will be submitted
279     * @param task the task to submit
280     * @param value the value which will become the return value of
281     * the task upon task completion
282 dl 1.5 * @return a Future representing pending completion of the task
283 jozart 1.9 * @throws RejectedExecutionException if task cannot be scheduled
284 tim 1.1 * for execution
285     */
286 dl 1.5 public static <T> Future<T> execute(Executor executor, Runnable task, T value) {
287 dl 1.10 FutureTask<T> ftask = new FutureTask<T>(task, value);
288 tim 1.1 executor.execute(ftask);
289     return ftask;
290     }
291    
292     /**
293     * Executes a value-returning task and returns a Future
294     * representing the pending results of the task.
295     *
296     * @param executor the Executor to which the task will be submitted
297     * @param task the task to submit
298     * @return a Future representing pending completion of the task
299 jozart 1.9 * @throws RejectedExecutionException if task cannot be scheduled
300     * for execution
301 tim 1.1 */
302 tim 1.15 public static <T> Future<T> execute(Executor executor, Callable<T> task) {
303 dl 1.10 FutureTask<T> ftask = new FutureTask<T>(task);
304 tim 1.1 executor.execute(ftask);
305     return ftask;
306     }
307    
308     /**
309     * Executes a Runnable task and blocks until it completes normally
310     * or throws an exception.
311     *
312     * @param executor the Executor to which the task will be submitted
313     * @param task the task to submit
314 jozart 1.9 * @throws RejectedExecutionException if task cannot be scheduled
315     * for execution
316 dl 1.19 * @throws ExecutionException if the task encountered an exception
317     * while executing
318 tim 1.1 */
319     public static void invoke(Executor executor, Runnable task)
320     throws ExecutionException, InterruptedException {
321 tim 1.13 FutureTask<Boolean> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
322 tim 1.1 executor.execute(ftask);
323     ftask.get();
324     }
325    
326     /**
327     * Executes a value-returning task and blocks until it returns a
328     * value or throws an exception.
329     *
330     * @param executor the Executor to which the task will be submitted
331     * @param task the task to submit
332     * @return a Future representing pending completion of the task
333 jozart 1.9 * @throws RejectedExecutionException if task cannot be scheduled
334     * for execution
335 dl 1.12 * @throws InterruptedException if interrupted while waiting for
336     * completion
337 dl 1.19 * @throws ExecutionException if the task encountered an exception
338     * while executing
339 tim 1.1 */
340     public static <T> T invoke(Executor executor, Callable<T> task)
341     throws ExecutionException, InterruptedException {
342     FutureTask<T> ftask = new FutureTask<T>(task);
343     executor.execute(ftask);
344     return ftask.get();
345 tim 1.6 }
346    
347 tim 1.20
348     /**
349     * Executes a privileged action under the current access control
350     * context and returns a Future representing the pending result
351     * object of that action.
352     *
353     * @param executor the Executor to which the task will be submitted
354     * @param action the action to submit
355     * @return a Future representing pending completion of the action
356     * @throws RejectedExecutionException if action cannot be scheduled
357     * for execution
358     */
359     public static Future<Object> execute(Executor executor, PrivilegedAction action) {
360     Callable<Object> task = new PrivilegedActionAdapter(action);
361 tim 1.21 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
362 tim 1.20 executor.execute(future);
363     return future;
364     }
365    
366     /**
367     * Executes a privileged exception action under the current access control
368     * context and returns a Future representing the pending result
369     * object of that action.
370     *
371     * @param executor the Executor to which the task will be submitted
372     * @param action the action to submit
373     * @return a Future representing pending completion of the action
374     * @throws RejectedExecutionException if action cannot be scheduled
375     * for execution
376     */
377     public static Future<Object> execute(Executor executor, PrivilegedExceptionAction action) {
378     Callable<Object> task = new PrivilegedExceptionActionAdapter(action);
379 tim 1.21 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
380 tim 1.20 executor.execute(future);
381     return future;
382     }
383    
384     private static class PrivilegedActionAdapter implements Callable<Object> {
385     PrivilegedActionAdapter(PrivilegedAction action) {
386     this.action = action;
387     }
388     public Object call () {
389     return action.run();
390     }
391     private final PrivilegedAction action;
392     }
393    
394     private static class PrivilegedExceptionActionAdapter implements Callable<Object> {
395     PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
396     this.action = action;
397     }
398     public Object call () throws Exception {
399     return action.run();
400     }
401     private final PrivilegedExceptionAction action;
402     }
403    
404 dl 1.22 /**
405     * Return a default thread factory used to create new threads.
406     * This factory creates all new threads used by an Executor in the
407     * same {@link ThreadGroup}. If there is a {@link
408     * java.lang.SecurityManager}, it uses the group of {@link
409     * System#getSecurityManager}, else the group of the thread
410     * invoking this <tt>defaultThreadFactory</tt> method. Each new
411     * thread is created as a non-daemon thread with priority
412     * <tt>Thread.NORM_PRIORITY</tt>. New threads have names
413     * accessible via {@link Thread#getName} of
414     * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
415     * number of this factory, and <em>M</em> is the sequence number
416     * of the thread created by this factory.
417     * @return the thread factory
418     */
419     public static ThreadFactory defaultThreadFactory() {
420 tim 1.26 return new DefaultThreadFactory();
421 dl 1.22 }
422    
423     /**
424 dl 1.24 * Return a thread factory used to create new threads that
425     * have the same permissions as the current thread.
426 dl 1.22 * This factory creates threads with the same settings as {@link
427     * Executors#defaultThreadFactory}, additionally setting the
428     * AccessControlContext and contextClassLoader of new threads to
429     * be the same as the thread invoking this
430     * <tt>privilegedThreadFactory</tt> method. A new
431     * <tt>privilegedThreadFactory</tt> can be created within an
432 dl 1.23 * {@link AccessController#doPrivileged} action setting the
433 dl 1.24 * current thread's access control context to create threads with
434 dl 1.23 * the selected permission settings holding within that action.
435 dl 1.22 *
436     * <p> Note that while tasks running within such threads will have
437     * the same access control and class loader settings as the
438     * current thread, they need not have the same {@link
439     * java.lang.ThreadLocal} or {@link
440     * java.lang.InheritableThreadLocal} values. If necessary,
441     * particular values of thread locals can be set or reset before
442     * any task runs in {@link ThreadPoolExecutor} subclasses using
443     * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is
444     * necessary to initialize worker threads to have the same
445     * InheritableThreadLocal settings as some other designated
446     * thread, you can create a custom ThreadFactory in which that
447     * thread waits for and services requests to create others that
448     * will inherit its values.
449     *
450     * @return the thread factory
451     * @throws AccessControlException if the current access control
452     * context does not have permission to both get and set context
453     * class loader.
454     * @see PrivilegedFutureTask
455     */
456     public static ThreadFactory privilegedThreadFactory() {
457 tim 1.26 return new PrivilegedThreadFactory();
458 dl 1.22 }
459    
460     static class DefaultThreadFactory implements ThreadFactory {
461 tim 1.26 static final AtomicInteger poolNumber = new AtomicInteger(1);
462     final ThreadGroup group;
463     final AtomicInteger threadNumber = new AtomicInteger(1);
464     final String namePrefix;
465 dl 1.22
466 tim 1.26 DefaultThreadFactory() {
467 dl 1.22 SecurityManager s = System.getSecurityManager();
468     group = (s != null)? s.getThreadGroup() :
469     Thread.currentThread().getThreadGroup();
470     namePrefix = "pool-" +
471     poolNumber.getAndIncrement() +
472     "-thread-";
473     }
474    
475     public Thread newThread(Runnable r) {
476     Thread t = new Thread(group, r,
477     namePrefix + threadNumber.getAndIncrement(),
478     0);
479     if (t.isDaemon())
480     t.setDaemon(false);
481     if (t.getPriority() != Thread.NORM_PRIORITY)
482     t.setPriority(Thread.NORM_PRIORITY);
483     return t;
484     }
485     }
486    
487     static class PrivilegedThreadFactory extends DefaultThreadFactory {
488     private final ClassLoader ccl;
489     private final AccessControlContext acc;
490    
491     PrivilegedThreadFactory() {
492     super();
493     this.ccl = Thread.currentThread().getContextClassLoader();
494     this.acc = AccessController.getContext();
495     acc.checkPermission(new RuntimePermission("setContextClassLoader"));
496     }
497    
498     public Thread newThread(final Runnable r) {
499     return super.newThread(new Runnable() {
500     public void run() {
501     AccessController.doPrivileged(new PrivilegedAction() {
502     public Object run() {
503     Thread.currentThread().setContextClassLoader(ccl);
504     r.run();
505     return null;
506     }
507     }, acc);
508     }
509     });
510     }
511    
512     }
513    
514 tim 1.20
515 tim 1.15 /** Cannot instantiate. */
516     private Executors() {}
517 tim 1.1 }