ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Executors.java
Revision: 1.26
Committed: Fri Dec 5 17:56:24 2003 UTC (20 years, 6 months ago) by tim
Branch: MAIN
Changes since 1.25: +111 -7 lines
Log Message:
added newScheduledThreadPool factory methods and related support class

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.security.AccessControlContext;
11 import java.security.AccessController;
12 import java.security.PrivilegedAction;
13 import java.security.PrivilegedExceptionAction;
14
15 /**
16 * Factory and utility methods for {@link Executor}, {@link
17 * ExecutorService}, {@link ThreadFactory}, and {@link Future}
18 * classes defined in this package.
19 *
20 * @since 1.5
21 * @author Doug Lea
22 */
23 public class Executors {
24
25 /**
26 * A wrapper class that exposes only the ExecutorService methods
27 * of an implementation.
28 */
29 private static class DelegatedExecutorService implements ExecutorService {
30 private final ExecutorService e;
31 DelegatedExecutorService(ExecutorService executor) { e = executor; }
32 public void execute(Runnable command) { e.execute(command); }
33 public void shutdown() { e.shutdown(); }
34 public List shutdownNow() { return e.shutdownNow(); }
35 public boolean isShutdown() { return e.isShutdown(); }
36 public boolean isTerminated() { return e.isTerminated(); }
37 public boolean awaitTermination(long timeout, TimeUnit unit)
38 throws InterruptedException {
39 return e.awaitTermination(timeout, unit);
40 }
41 }
42
43 /**
44 * A wrapper class that exposes only the ExecutorService and
45 * ScheduleExecutor methods of a ScheduledThreadPoolExecutor.
46 */
47 private static class DelegatedScheduledExecutorService
48 extends DelegatedExecutorService
49 implements ScheduledExecutor {
50
51 private final ScheduledExecutor e;
52 DelegatedScheduledExecutorService(ScheduledThreadPoolExecutor executor) {
53 super(executor);
54 e = executor;
55 }
56 public ScheduledFuture<Boolean> schedule(Runnable command, long delay, TimeUnit unit) {
57 return e.schedule(command, delay, unit);
58 }
59 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
60 return e.schedule(callable, delay, unit);
61 }
62 public ScheduledFuture<Boolean> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
63 return e.scheduleAtFixedRate(command, initialDelay, period, unit);
64 }
65 public ScheduledFuture<Boolean> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
66 return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
67 }
68 }
69
70 /**
71 * Creates a thread pool that reuses a fixed set of threads
72 * operating off a shared unbounded queue. If any thread
73 * terminates due to a failure during execution prior to shutdown,
74 * a new one will take its place if needed to execute subsequent
75 * tasks.
76 *
77 * @param nThreads the number of threads in the pool
78 * @return the newly created thread pool
79 */
80 public static ExecutorService newFixedThreadPool(int nThreads) {
81 return new DelegatedExecutorService
82 (new ThreadPoolExecutor(nThreads, nThreads,
83 0L, TimeUnit.MILLISECONDS,
84 new LinkedBlockingQueue<Runnable>()));
85 }
86
87 /**
88 * Creates a thread pool that reuses a fixed set of threads
89 * operating off a shared unbounded queue, using the provided
90 * ThreadFactory to create new threads when needed.
91 *
92 * @param nThreads the number of threads in the pool
93 * @param threadFactory the factory to use when creating new threads
94 * @return the newly created thread pool
95 */
96 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
97 return new DelegatedExecutorService
98 (new ThreadPoolExecutor(nThreads, nThreads,
99 0L, TimeUnit.MILLISECONDS,
100 new LinkedBlockingQueue<Runnable>(),
101 threadFactory));
102 }
103
104 /**
105 * Creates an Executor that uses a single worker thread operating
106 * off an unbounded queue. (Note however that if this single
107 * thread terminates due to a failure during execution prior to
108 * shutdown, a new one will take its place if needed to execute
109 * subsequent tasks.) Tasks are guaranteed to execute
110 * sequentially, and no more than one task will be active at any
111 * given time. This method is equivalent in effect to
112 *<tt>new FixedThreadPool(1)</tt>.
113 *
114 * @return the newly-created single-threaded Executor
115 */
116 public static ExecutorService newSingleThreadExecutor() {
117 return new DelegatedExecutorService
118 (new ThreadPoolExecutor(1, 1,
119 0L, TimeUnit.MILLISECONDS,
120 new LinkedBlockingQueue<Runnable>()));
121 }
122
123 /**
124 * Creates an Executor that uses a single worker thread operating
125 * off an unbounded queue, and uses the provided ThreadFactory to
126 * create new threads when needed.
127 * @param threadFactory the factory to use when creating new
128 * threads
129 *
130 * @return the newly-created single-threaded Executor
131 */
132 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
133 return new DelegatedExecutorService
134 (new ThreadPoolExecutor(1, 1,
135 0L, TimeUnit.MILLISECONDS,
136 new LinkedBlockingQueue<Runnable>(),
137 threadFactory));
138 }
139
140 /**
141 * Creates a thread pool that creates new threads as needed, but
142 * will reuse previously constructed threads when they are
143 * available. These pools will typically improve the performance
144 * of programs that execute many short-lived asynchronous tasks.
145 * Calls to <tt>execute</tt> will reuse previously constructed
146 * threads if available. If no existing thread is available, a new
147 * thread will be created and added to the pool. Threads that have
148 * not been used for sixty seconds are terminated and removed from
149 * the cache. Thus, a pool that remains idle for long enough will
150 * not consume any resources. Note that pools with similar
151 * properties but different details (for example, timeout parameters)
152 * may be created using {@link ThreadPoolExecutor} constructors.
153 *
154 * @return the newly created thread pool
155 */
156 public static ExecutorService newCachedThreadPool() {
157 return new DelegatedExecutorService
158 (new ThreadPoolExecutor(0, Integer.MAX_VALUE,
159 60, TimeUnit.SECONDS,
160 new SynchronousQueue<Runnable>()));
161 }
162
163 /**
164 * Creates a thread pool that creates new threads as needed, but
165 * will reuse previously constructed threads when they are
166 * available, and uses the provided
167 * ThreadFactory to create new threads when needed.
168 * @param threadFactory the factory to use when creating new threads
169 * @return the newly created thread pool
170 */
171 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
172 return new DelegatedExecutorService
173 (new ThreadPoolExecutor(0, Integer.MAX_VALUE,
174 60, TimeUnit.SECONDS,
175 new SynchronousQueue<Runnable>(),
176 threadFactory));
177 }
178
179
180 /**
181 * Creates a thread pool that can schedule commands to run after a
182 * given delay, or to execute periodically.
183 * @return a <tt>ScheduledExecutor</tt> that may safely be cast to
184 * an <tt>ExecutorService</tt>.
185 */
186 public static ScheduledExecutor newScheduledThreadPool() {
187 return newScheduledThreadPool(0);
188 }
189
190
191 /**
192 * Creates a thread pool that can schedule commands to run after a
193 * given delay, or to execute periodically.
194 * @param corePoolSize the number of threads to keep in the pool,
195 * even if they are idle.
196 * @return a <tt>ScheduledExecutor</tt> that may safely be cast to
197 * an <tt>ExecutorService</tt>.
198 */
199 public static ScheduledExecutor newScheduledThreadPool(int corePoolSize) {
200 return newScheduledThreadPool(corePoolSize, null);
201 }
202
203
204 /**
205 * Creates a thread pool that can schedule commands to run after a
206 * given delay, or to execute periodically.
207 * @param corePoolSize the number of threads to keep in the pool,
208 * even if they are idle.
209 * @param threadFactory the factory to use when the executor
210 * creates a new thread.
211 * @return a <tt>ScheduledExecutor</tt> that may safely be cast to
212 * an <tt>ExecutorService</tt>.
213 */
214 public static ScheduledExecutor newScheduledThreadPool(int corePoolSize,
215 ThreadFactory threadFactory) {
216 return newScheduledThreadPool(corePoolSize, threadFactory, false, false);
217 }
218
219
220 /**
221 * Creates a thread pool that can schedule commands to run after a
222 * given delay, or to execute periodically.
223 * @param corePoolSize the number of threads to keep in the pool,
224 * even if they are idle.
225 * @param threadFactory the factory to use when the executor
226 * creates a new thread.
227 * @param continueExistingPeriodicTasksAfterShutdown whether to
228 * continue executing existing periodic tasks even when the returned
229 * executor has been <tt>shutdown</tt>.
230 * @param executeExistingDelayedTasksAfterShutdown whether to
231 * continue executing existing delayed tasks even when the returned
232 * executor has been <tt>shutdown</tt>.
233 * @return a <tt>ScheduledExecutor</tt> that may safely be cast to
234 * an <tt>ExecutorService</tt>.
235 */
236 public static ScheduledExecutor newScheduledThreadPool(
237 int corePoolSize,
238 ThreadFactory threadFactory,
239 boolean continueExistingPeriodicTasksAfterShutdown,
240 boolean executeExistingDelayedTasksAfterShutdown) {
241
242 ScheduledThreadPoolExecutor stpe = threadFactory == null ?
243 new ScheduledThreadPoolExecutor(corePoolSize) :
244 new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
245
246 stpe.setContinueExistingPeriodicTasksAfterShutdownPolicy(
247 continueExistingPeriodicTasksAfterShutdown);
248
249 stpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(
250 executeExistingDelayedTasksAfterShutdown);
251
252 return new DelegatedScheduledExecutorService(stpe);
253 }
254
255
256 /**
257 * Executes a Runnable task and returns a Future representing that
258 * task.
259 *
260 * @param executor the Executor to which the task will be submitted
261 * @param task the task to submit
262 * @return a Future representing pending completion of the task,
263 * and whose <tt>get()</tt> method will return <tt>Boolean.TRUE</tt>
264 * upon completion
265 * @throws RejectedExecutionException if task cannot be scheduled
266 * for execution
267 */
268 public static Future<Boolean> execute(Executor executor, Runnable task) {
269 FutureTask<Boolean> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
270 executor.execute(ftask);
271 return ftask;
272 }
273
274 /**
275 * Executes a Runnable task and returns a Future representing that
276 * task.
277 *
278 * @param executor the Executor to which the task will be submitted
279 * @param task the task to submit
280 * @param value the value which will become the return value of
281 * the task upon task completion
282 * @return a Future representing pending completion of the task
283 * @throws RejectedExecutionException if task cannot be scheduled
284 * for execution
285 */
286 public static <T> Future<T> execute(Executor executor, Runnable task, T value) {
287 FutureTask<T> ftask = new FutureTask<T>(task, value);
288 executor.execute(ftask);
289 return ftask;
290 }
291
292 /**
293 * Executes a value-returning task and returns a Future
294 * representing the pending results of the task.
295 *
296 * @param executor the Executor to which the task will be submitted
297 * @param task the task to submit
298 * @return a Future representing pending completion of the task
299 * @throws RejectedExecutionException if task cannot be scheduled
300 * for execution
301 */
302 public static <T> Future<T> execute(Executor executor, Callable<T> task) {
303 FutureTask<T> ftask = new FutureTask<T>(task);
304 executor.execute(ftask);
305 return ftask;
306 }
307
308 /**
309 * Executes a Runnable task and blocks until it completes normally
310 * or throws an exception.
311 *
312 * @param executor the Executor to which the task will be submitted
313 * @param task the task to submit
314 * @throws RejectedExecutionException if task cannot be scheduled
315 * for execution
316 * @throws ExecutionException if the task encountered an exception
317 * while executing
318 */
319 public static void invoke(Executor executor, Runnable task)
320 throws ExecutionException, InterruptedException {
321 FutureTask<Boolean> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
322 executor.execute(ftask);
323 ftask.get();
324 }
325
326 /**
327 * Executes a value-returning task and blocks until it returns a
328 * value or throws an exception.
329 *
330 * @param executor the Executor to which the task will be submitted
331 * @param task the task to submit
332 * @return a Future representing pending completion of the task
333 * @throws RejectedExecutionException if task cannot be scheduled
334 * for execution
335 * @throws InterruptedException if interrupted while waiting for
336 * completion
337 * @throws ExecutionException if the task encountered an exception
338 * while executing
339 */
340 public static <T> T invoke(Executor executor, Callable<T> task)
341 throws ExecutionException, InterruptedException {
342 FutureTask<T> ftask = new FutureTask<T>(task);
343 executor.execute(ftask);
344 return ftask.get();
345 }
346
347
348 /**
349 * Executes a privileged action under the current access control
350 * context and returns a Future representing the pending result
351 * object of that action.
352 *
353 * @param executor the Executor to which the task will be submitted
354 * @param action the action to submit
355 * @return a Future representing pending completion of the action
356 * @throws RejectedExecutionException if action cannot be scheduled
357 * for execution
358 */
359 public static Future<Object> execute(Executor executor, PrivilegedAction action) {
360 Callable<Object> task = new PrivilegedActionAdapter(action);
361 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
362 executor.execute(future);
363 return future;
364 }
365
366 /**
367 * Executes a privileged exception action under the current access control
368 * context and returns a Future representing the pending result
369 * object of that action.
370 *
371 * @param executor the Executor to which the task will be submitted
372 * @param action the action to submit
373 * @return a Future representing pending completion of the action
374 * @throws RejectedExecutionException if action cannot be scheduled
375 * for execution
376 */
377 public static Future<Object> execute(Executor executor, PrivilegedExceptionAction action) {
378 Callable<Object> task = new PrivilegedExceptionActionAdapter(action);
379 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
380 executor.execute(future);
381 return future;
382 }
383
384 private static class PrivilegedActionAdapter implements Callable<Object> {
385 PrivilegedActionAdapter(PrivilegedAction action) {
386 this.action = action;
387 }
388 public Object call () {
389 return action.run();
390 }
391 private final PrivilegedAction action;
392 }
393
394 private static class PrivilegedExceptionActionAdapter implements Callable<Object> {
395 PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
396 this.action = action;
397 }
398 public Object call () throws Exception {
399 return action.run();
400 }
401 private final PrivilegedExceptionAction action;
402 }
403
404 /**
405 * Return a default thread factory used to create new threads.
406 * This factory creates all new threads used by an Executor in the
407 * same {@link ThreadGroup}. If there is a {@link
408 * java.lang.SecurityManager}, it uses the group of {@link
409 * System#getSecurityManager}, else the group of the thread
410 * invoking this <tt>defaultThreadFactory</tt> method. Each new
411 * thread is created as a non-daemon thread with priority
412 * <tt>Thread.NORM_PRIORITY</tt>. New threads have names
413 * accessible via {@link Thread#getName} of
414 * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
415 * number of this factory, and <em>M</em> is the sequence number
416 * of the thread created by this factory.
417 * @return the thread factory
418 */
419 public static ThreadFactory defaultThreadFactory() {
420 return new DefaultThreadFactory();
421 }
422
423 /**
424 * Return a thread factory used to create new threads that
425 * have the same permissions as the current thread.
426 * This factory creates threads with the same settings as {@link
427 * Executors#defaultThreadFactory}, additionally setting the
428 * AccessControlContext and contextClassLoader of new threads to
429 * be the same as the thread invoking this
430 * <tt>privilegedThreadFactory</tt> method. A new
431 * <tt>privilegedThreadFactory</tt> can be created within an
432 * {@link AccessController#doPrivileged} action setting the
433 * current thread's access control context to create threads with
434 * the selected permission settings holding within that action.
435 *
436 * <p> Note that while tasks running within such threads will have
437 * the same access control and class loader settings as the
438 * current thread, they need not have the same {@link
439 * java.lang.ThreadLocal} or {@link
440 * java.lang.InheritableThreadLocal} values. If necessary,
441 * particular values of thread locals can be set or reset before
442 * any task runs in {@link ThreadPoolExecutor} subclasses using
443 * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is
444 * necessary to initialize worker threads to have the same
445 * InheritableThreadLocal settings as some other designated
446 * thread, you can create a custom ThreadFactory in which that
447 * thread waits for and services requests to create others that
448 * will inherit its values.
449 *
450 * @return the thread factory
451 * @throws AccessControlException if the current access control
452 * context does not have permission to both get and set context
453 * class loader.
454 * @see PrivilegedFutureTask
455 */
456 public static ThreadFactory privilegedThreadFactory() {
457 return new PrivilegedThreadFactory();
458 }
459
460 static class DefaultThreadFactory implements ThreadFactory {
461 static final AtomicInteger poolNumber = new AtomicInteger(1);
462 final ThreadGroup group;
463 final AtomicInteger threadNumber = new AtomicInteger(1);
464 final String namePrefix;
465
466 DefaultThreadFactory() {
467 SecurityManager s = System.getSecurityManager();
468 group = (s != null)? s.getThreadGroup() :
469 Thread.currentThread().getThreadGroup();
470 namePrefix = "pool-" +
471 poolNumber.getAndIncrement() +
472 "-thread-";
473 }
474
475 public Thread newThread(Runnable r) {
476 Thread t = new Thread(group, r,
477 namePrefix + threadNumber.getAndIncrement(),
478 0);
479 if (t.isDaemon())
480 t.setDaemon(false);
481 if (t.getPriority() != Thread.NORM_PRIORITY)
482 t.setPriority(Thread.NORM_PRIORITY);
483 return t;
484 }
485 }
486
487 static class PrivilegedThreadFactory extends DefaultThreadFactory {
488 private final ClassLoader ccl;
489 private final AccessControlContext acc;
490
491 PrivilegedThreadFactory() {
492 super();
493 this.ccl = Thread.currentThread().getContextClassLoader();
494 this.acc = AccessController.getContext();
495 acc.checkPermission(new RuntimePermission("setContextClassLoader"));
496 }
497
498 public Thread newThread(final Runnable r) {
499 return super.newThread(new Runnable() {
500 public void run() {
501 AccessController.doPrivileged(new PrivilegedAction() {
502 public Object run() {
503 Thread.currentThread().setContextClassLoader(ccl);
504 r.run();
505 return null;
506 }
507 }, acc);
508 }
509 });
510 }
511
512 }
513
514
515 /** Cannot instantiate. */
516 private Executors() {}
517 }