ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Executors.java
Revision: 1.38
Committed: Fri Dec 19 20:38:31 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.37: +84 -30 lines
Log Message:
Define and use Executors.callable instead of submit/invoke variants

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9 import java.util.concurrent.atomic.AtomicInteger;
10 import java.security.AccessControlContext;
11 import java.security.AccessController;
12 import java.security.PrivilegedAction;
13 import java.security.PrivilegedExceptionAction;
14
15 /**
16 * Factory and utility methods for {@link Executor}, {@link
17 * ExecutorService}, and {@link ThreadFactory} classes defined in this
18 * package.
19 *
20 * @since 1.5
21 * @author Doug Lea
22 */
23 public class Executors {
24
25 /**
26 * Creates a thread pool that reuses a fixed set of threads
27 * operating off a shared unbounded queue. If any thread
28 * terminates due to a failure during execution prior to shutdown,
29 * a new one will take its place if needed to execute subsequent
30 * tasks.
31 *
32 * @param nThreads the number of threads in the pool
33 * @return the newly created thread pool
34 */
35 public static ExecutorService newFixedThreadPool(int nThreads) {
36 return new ThreadPoolExecutor(nThreads, nThreads,
37 0L, TimeUnit.MILLISECONDS,
38 new LinkedBlockingQueue<Runnable>());
39 }
40
41 /**
42 * Creates a thread pool that reuses a fixed set of threads
43 * operating off a shared unbounded queue, using the provided
44 * ThreadFactory to create new threads when needed.
45 *
46 * @param nThreads the number of threads in the pool
47 * @param threadFactory the factory to use when creating new threads
48 * @return the newly created thread pool
49 */
50 public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
51 return new ThreadPoolExecutor(nThreads, nThreads,
52 0L, TimeUnit.MILLISECONDS,
53 new LinkedBlockingQueue<Runnable>(),
54 threadFactory);
55 }
56
57 /**
58 * Creates an Executor that uses a single worker thread operating
59 * off an unbounded queue. (Note however that if this single
60 * thread terminates due to a failure during execution prior to
61 * shutdown, a new one will take its place if needed to execute
62 * subsequent tasks.) Tasks are guaranteed to execute
63 * sequentially, and no more than one task will be active at any
64 * given time. The returned executor cannot be reconfigured
65 * to use additional threads.
66 *
67 * @return the newly-created single-threaded Executor
68 */
69 public static ExecutorService newSingleThreadExecutor() {
70 return unconfigurableExecutorService
71 (new ThreadPoolExecutor(1, 1,
72 0L, TimeUnit.MILLISECONDS,
73 new LinkedBlockingQueue<Runnable>()));
74 }
75
76 /**
77 * Creates an Executor that uses a single worker thread operating
78 * off an unbounded queue, and uses the provided ThreadFactory to
79 * create a new thread when needed. Unlike the otherwise
80 * equivalent <tt>newFixedThreadPool(1)</tt> the returned executor
81 * is guaranteed not to be reconfigurable to use additional
82 * threads.
83 *
84 * @param threadFactory the factory to use when creating new
85 * threads
86 *
87 * @return the newly-created single-threaded Executor
88 */
89 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
90 return unconfigurableExecutorService
91 (new ThreadPoolExecutor(1, 1,
92 0L, TimeUnit.MILLISECONDS,
93 new LinkedBlockingQueue<Runnable>(),
94 threadFactory));
95 }
96
97 /**
98 * Creates a thread pool that creates new threads as needed, but
99 * will reuse previously constructed threads when they are
100 * available. These pools will typically improve the performance
101 * of programs that execute many short-lived asynchronous tasks.
102 * Calls to <tt>execute</tt> will reuse previously constructed
103 * threads if available. If no existing thread is available, a new
104 * thread will be created and added to the pool. Threads that have
105 * not been used for sixty seconds are terminated and removed from
106 * the cache. Thus, a pool that remains idle for long enough will
107 * not consume any resources. Note that pools with similar
108 * properties but different details (for example, timeout parameters)
109 * may be created using {@link ThreadPoolExecutor} constructors.
110 *
111 * @return the newly created thread pool
112 */
113 public static ExecutorService newCachedThreadPool() {
114 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
115 60, TimeUnit.SECONDS,
116 new SynchronousQueue<Runnable>());
117 }
118
119 /**
120 * Creates a thread pool that creates new threads as needed, but
121 * will reuse previously constructed threads when they are
122 * available, and uses the provided
123 * ThreadFactory to create new threads when needed.
124 * @param threadFactory the factory to use when creating new threads
125 * @return the newly created thread pool
126 */
127 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
128 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
129 60, TimeUnit.SECONDS,
130 new SynchronousQueue<Runnable>(),
131 threadFactory);
132 }
133
134 /**
135 * Creates a thread pool that can schedule commands to run after a
136 * given delay, or to execute periodically.
137 * @return a newly created scheduled thread pool with termination management
138 */
139 public static ScheduledExecutorService newScheduledThreadPool() {
140 return newScheduledThreadPool(1);
141 }
142
143 /**
144 * Creates a thread pool that can schedule commands to run after a
145 * given delay, or to execute periodically.
146 * @param corePoolSize the number of threads to keep in the pool,
147 * even if they are idle.
148 * @return a newly created scheduled thread pool with termination management
149 */
150 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
151 return new ScheduledThreadPoolExecutor(corePoolSize);
152 }
153
154 /**
155 * Creates a thread pool that can schedule commands to run after a
156 * given delay, or to execute periodically.
157 * @param corePoolSize the number of threads to keep in the pool,
158 * even if they are idle.
159 * @param threadFactory the factory to use when the executor
160 * creates a new thread.
161 * @return a newly created scheduled thread pool with termination management
162 */
163 public static ScheduledExecutorService newScheduledThreadPool(
164 int corePoolSize, ThreadFactory threadFactory) {
165 return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
166 }
167
168
169 /**
170 * Creates and returns an object that delegates all defined {@link
171 * ExecutorService} methods to the given executor, but not any
172 * other methods that might otherwise be accessible using
173 * casts. This provides a way to safely "freeze" configuration and
174 * disallow tuning of a given concrete implementation.
175 * @param executor the underlying implementation
176 * @return an <tt>ExecutorService</tt> instance
177 * @throws NullPointerException if executor null
178 */
179 public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
180 if (executor == null)
181 throw new NullPointerException();
182 return new DelegatedExecutorService(executor);
183 }
184
185 /**
186 * Creates and returns an object that delegates all defined {@link
187 * ScheduledExecutorService} methods to the given executor, but
188 * not any other methods that might otherwise be accessible using
189 * casts. This provides a way to safely "freeze" configuration and
190 * disallow tuning of a given concrete implementation.
191 * @param executor the underlying implementation
192 * @return a <tt>ScheduledExecutorService</tt> instance
193 * @throws NullPointerException if executor null
194 */
195 public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
196 if (executor == null)
197 throw new NullPointerException();
198 return new DelegatedScheduledExecutorService(executor);
199 }
200
201 /**
202 * Return a default thread factory used to create new threads.
203 * This factory creates all new threads used by an Executor in the
204 * same {@link ThreadGroup}. If there is a {@link
205 * java.lang.SecurityManager}, it uses the group of {@link
206 * System#getSecurityManager}, else the group of the thread
207 * invoking this <tt>defaultThreadFactory</tt> method. Each new
208 * thread is created as a non-daemon thread with priority
209 * <tt>Thread.NORM_PRIORITY</tt>. New threads have names
210 * accessible via {@link Thread#getName} of
211 * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
212 * number of this factory, and <em>M</em> is the sequence number
213 * of the thread created by this factory.
214 * @return the thread factory
215 */
216 public static ThreadFactory defaultThreadFactory() {
217 return new DefaultThreadFactory();
218 }
219
220 /**
221 * Return a thread factory used to create new threads that
222 * have the same permissions as the current thread.
223 * This factory creates threads with the same settings as {@link
224 * Executors#defaultThreadFactory}, additionally setting the
225 * AccessControlContext and contextClassLoader of new threads to
226 * be the same as the thread invoking this
227 * <tt>privilegedThreadFactory</tt> method. A new
228 * <tt>privilegedThreadFactory</tt> can be created within an
229 * {@link AccessController#doPrivileged} action setting the
230 * current thread's access control context to create threads with
231 * the selected permission settings holding within that action.
232 *
233 * <p> Note that while tasks running within such threads will have
234 * the same access control and class loader settings as the
235 * current thread, they need not have the same {@link
236 * java.lang.ThreadLocal} or {@link
237 * java.lang.InheritableThreadLocal} values. If necessary,
238 * particular values of thread locals can be set or reset before
239 * any task runs in {@link ThreadPoolExecutor} subclasses using
240 * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is
241 * necessary to initialize worker threads to have the same
242 * InheritableThreadLocal settings as some other designated
243 * thread, you can create a custom ThreadFactory in which that
244 * thread waits for and services requests to create others that
245 * will inherit its values.
246 *
247 * @return the thread factory
248 * @throws AccessControlException if the current access control
249 * context does not have permission to both get and set context
250 * class loader.
251 * @see PrivilegedFutureTask
252 */
253 public static ThreadFactory privilegedThreadFactory() {
254 return new PrivilegedThreadFactory();
255 }
256
257
258 /**
259 * Creates and returns a {@link Callable} object that, when
260 * called, runs the given task and returns the given result. This
261 * can be useful when applying methods requiring a
262 * <tt>Callable</tt> to an otherwise resultless action.
263 * @param task the task to run
264 * @param result the result to return
265 */
266 public static <T> Callable<T> callable(Runnable task, T result) {
267 return new RunnableAdapter<T>(task, result);
268 }
269
270 /**
271 * Creates and returns a {@link Callable} object that, when
272 * called, runs the given task and returns <tt>null</tt>
273 * @param task the task to run
274 */
275 public static Callable<Object> callable(Runnable task) {
276 return new RunnableAdapter<Object>(task, null);
277 }
278
279 /**
280 * Creates and returns a {@link Callable} object that, when
281 * called, runs the given privileged action and returns its result
282 * @param action the privileged action to run
283 */
284 public static Callable<Object> callable(PrivilegedAction action) {
285 return new PrivilegedActionAdapter(action);
286 }
287
288 /**
289 * Creates and returns a {@link Callable} object that, when
290 * called, runs the given privileged exception action and returns its result
291 * @param action the privileged exception action to run
292 */
293 public static Callable<Object> callable(PrivilegedExceptionAction action) {
294 return new PrivilegedExceptionActionAdapter(action);
295 }
296
297 /**
298 * A callable that runs given task and returns given result
299 */
300 static class RunnableAdapter<T> implements Callable<T> {
301 private final Runnable task;
302 private final T result;
303 RunnableAdapter(Runnable task, T result) {
304 this.task = task;
305 this.result = result;
306 }
307 public T call() {
308 task.run();
309 return result;
310 }
311 }
312
313 /**
314 * A callable that runs given privileged action and returns its result
315 */
316 static class PrivilegedActionAdapter implements Callable<Object> {
317 PrivilegedActionAdapter(PrivilegedAction action) {
318 this.action = action;
319 }
320 public Object call () {
321 return action.run();
322 }
323 private final PrivilegedAction action;
324 }
325
326 /**
327 * A callable that runs given privileged exception action and returns its result
328 */
329 static class PrivilegedExceptionActionAdapter implements Callable<Object> {
330 PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
331 this.action = action;
332 }
333 public Object call () throws Exception {
334 return action.run();
335 }
336 private final PrivilegedExceptionAction action;
337 }
338
339 static class DefaultThreadFactory implements ThreadFactory {
340 static final AtomicInteger poolNumber = new AtomicInteger(1);
341 final ThreadGroup group;
342 final AtomicInteger threadNumber = new AtomicInteger(1);
343 final String namePrefix;
344
345 DefaultThreadFactory() {
346 SecurityManager s = System.getSecurityManager();
347 group = (s != null)? s.getThreadGroup() :
348 Thread.currentThread().getThreadGroup();
349 namePrefix = "pool-" +
350 poolNumber.getAndIncrement() +
351 "-thread-";
352 }
353
354 public Thread newThread(Runnable r) {
355 Thread t = new Thread(group, r,
356 namePrefix + threadNumber.getAndIncrement(),
357 0);
358 if (t.isDaemon())
359 t.setDaemon(false);
360 if (t.getPriority() != Thread.NORM_PRIORITY)
361 t.setPriority(Thread.NORM_PRIORITY);
362 return t;
363 }
364 }
365
366 static class PrivilegedThreadFactory extends DefaultThreadFactory {
367 private final ClassLoader ccl;
368 private final AccessControlContext acc;
369
370 PrivilegedThreadFactory() {
371 super();
372 this.ccl = Thread.currentThread().getContextClassLoader();
373 this.acc = AccessController.getContext();
374 acc.checkPermission(new RuntimePermission("setContextClassLoader"));
375 }
376
377 public Thread newThread(final Runnable r) {
378 return super.newThread(new Runnable() {
379 public void run() {
380 AccessController.doPrivileged(new PrivilegedAction() {
381 public Object run() {
382 Thread.currentThread().setContextClassLoader(ccl);
383 r.run();
384 return null;
385 }
386 }, acc);
387 }
388 });
389 }
390
391 }
392
393 /**
394 * A wrapper class that exposes only the ExecutorService methods
395 * of an implementation.
396 */
397 static class DelegatedExecutorService extends AbstractExecutorService {
398 private final ExecutorService e;
399 DelegatedExecutorService(ExecutorService executor) { e = executor; }
400 public void execute(Runnable command) { e.execute(command); }
401 public void shutdown() { e.shutdown(); }
402 public List<Runnable> shutdownNow() { return e.shutdownNow(); }
403 public boolean isShutdown() { return e.isShutdown(); }
404 public boolean isTerminated() { return e.isTerminated(); }
405 public boolean awaitTermination(long timeout, TimeUnit unit)
406 throws InterruptedException {
407 return e.awaitTermination(timeout, unit);
408 }
409 public Future<?> submit(Runnable task) {
410 return e.submit(task);
411 }
412 public <T> Future<T> submit(Callable<T> task) {
413 return e.submit(task);
414 }
415
416 public <T> T invoke(Callable<T> task) throws ExecutionException, InterruptedException {
417 return e.invoke(task);
418 }
419 public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
420 throws InterruptedException {
421 return e.invokeAll(tasks);
422 }
423 public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
424 long timeout, TimeUnit unit)
425 throws InterruptedException {
426 return e.invokeAll(tasks, timeout, unit);
427 }
428 public <T> T invokeAny(Collection<Callable<T>> tasks)
429 throws InterruptedException, ExecutionException {
430 return e.invokeAny(tasks);
431 }
432 public <T> T invokeAny(Collection<Callable<T>> tasks,
433 long timeout, TimeUnit unit)
434 throws InterruptedException, ExecutionException, TimeoutException {
435 return e.invokeAny(tasks, timeout, unit);
436 }
437 }
438
439 /**
440 * A wrapper class that exposes only the ExecutorService and
441 * ScheduleExecutor methods of a ScheduledThreadPoolExecutor.
442 */
443 static class DelegatedScheduledExecutorService
444 extends DelegatedExecutorService
445 implements ScheduledExecutorService {
446 private final ScheduledExecutorService e;
447 DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
448 super(executor);
449 e = executor;
450 }
451 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
452 return e.schedule(command, delay, unit);
453 }
454 public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
455 return e.schedule(callable, delay, unit);
456 }
457 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
458 return e.scheduleAtFixedRate(command, initialDelay, period, unit);
459 }
460 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
461 return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
462 }
463 }
464
465
466 /** Cannot instantiate. */
467 private Executors() {}
468 }