ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/Executors.java
Revision: 1.25
Committed: Thu Dec 4 20:54:29 2003 UTC (20 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.24: +7 -5 lines
Log Message:
Revised tests for revised Future classes

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.2 import java.util.*;
9 dl 1.22 import java.util.concurrent.atomic.AtomicInteger;
10 tim 1.20 import java.security.AccessControlContext;
11     import java.security.AccessController;
12     import java.security.PrivilegedAction;
13     import java.security.PrivilegedExceptionAction;
14 tim 1.1
15     /**
16 dl 1.18 * Factory and utility methods for {@link Executor}, {@link
17 dl 1.25 * ExecutorService}, {@link ThreadFactory}, and {@link Future}
18     * classes defined in this package.
19 tim 1.1 *
20     * @since 1.5
21 dl 1.12 * @author Doug Lea
22 tim 1.1 */
23     public class Executors {
24    
25     /**
26 dl 1.2 * A wrapper class that exposes only the ExecutorService methods
27     * of an implementation.
28 tim 1.6 */
29 jozart 1.11 private static class DelegatedExecutorService implements ExecutorService {
30 dl 1.2 private final ExecutorService e;
31     DelegatedExecutorService(ExecutorService executor) { e = executor; }
32     public void execute(Runnable command) { e.execute(command); }
33     public void shutdown() { e.shutdown(); }
34     public List shutdownNow() { return e.shutdownNow(); }
35     public boolean isShutdown() { return e.isShutdown(); }
36     public boolean isTerminated() { return e.isTerminated(); }
37     public boolean awaitTermination(long timeout, TimeUnit unit)
38     throws InterruptedException {
39     return e.awaitTermination(timeout, unit);
40     }
41     }
42    
43     /**
44 tim 1.1 * Creates a thread pool that reuses a fixed set of threads
45 dl 1.16 * operating off a shared unbounded queue. If any thread
46     * terminates due to a failure during execution prior to shutdown,
47     * a new one will take its place if needed to execute subsequent
48     * tasks.
49 tim 1.1 *
50     * @param nThreads the number of threads in the pool
51     * @return the newly created thread pool
52     */
53 dl 1.2 public static ExecutorService newFixedThreadPool(int nThreads) {
54     return new DelegatedExecutorService
55     (new ThreadPoolExecutor(nThreads, nThreads,
56     0L, TimeUnit.MILLISECONDS,
57 dl 1.3 new LinkedBlockingQueue<Runnable>()));
58 dl 1.2 }
59    
60     /**
61     * Creates a thread pool that reuses a fixed set of threads
62     * operating off a shared unbounded queue, using the provided
63     * ThreadFactory to create new threads when needed.
64     *
65     * @param nThreads the number of threads in the pool
66 dl 1.12 * @param threadFactory the factory to use when creating new threads
67 dl 1.2 * @return the newly created thread pool
68     */
69     public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
70     return new DelegatedExecutorService
71     (new ThreadPoolExecutor(nThreads, nThreads,
72     0L, TimeUnit.MILLISECONDS,
73 dl 1.3 new LinkedBlockingQueue<Runnable>(),
74 dl 1.14 threadFactory));
75 dl 1.2 }
76    
77     /**
78     * Creates an Executor that uses a single worker thread operating
79     * off an unbounded queue. (Note however that if this single
80     * thread terminates due to a failure during execution prior to
81     * shutdown, a new one will take its place if needed to execute
82     * subsequent tasks.) Tasks are guaranteed to execute
83     * sequentially, and no more than one task will be active at any
84 dl 1.16 * given time. This method is equivalent in effect to
85     *<tt>new FixedThreadPool(1)</tt>.
86 dl 1.2 *
87     * @return the newly-created single-threaded Executor
88     */
89     public static ExecutorService newSingleThreadExecutor() {
90     return new DelegatedExecutorService
91 tim 1.6 (new ThreadPoolExecutor(1, 1,
92 dl 1.2 0L, TimeUnit.MILLISECONDS,
93 dl 1.3 new LinkedBlockingQueue<Runnable>()));
94 dl 1.2 }
95    
96     /**
97     * Creates an Executor that uses a single worker thread operating
98     * off an unbounded queue, and uses the provided ThreadFactory to
99     * create new threads when needed.
100 dl 1.12 * @param threadFactory the factory to use when creating new
101 dl 1.2 * threads
102     *
103     * @return the newly-created single-threaded Executor
104     */
105     public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
106     return new DelegatedExecutorService
107 tim 1.6 (new ThreadPoolExecutor(1, 1,
108 dl 1.2 0L, TimeUnit.MILLISECONDS,
109 dl 1.3 new LinkedBlockingQueue<Runnable>(),
110 dl 1.14 threadFactory));
111 tim 1.1 }
112    
113     /**
114     * Creates a thread pool that creates new threads as needed, but
115     * will reuse previously constructed threads when they are
116     * available. These pools will typically improve the performance
117     * of programs that execute many short-lived asynchronous tasks.
118     * Calls to <tt>execute</tt> will reuse previously constructed
119     * threads if available. If no existing thread is available, a new
120     * thread will be created and added to the pool. Threads that have
121     * not been used for sixty seconds are terminated and removed from
122     * the cache. Thus, a pool that remains idle for long enough will
123 dl 1.16 * not consume any resources. Note that pools with similar
124     * properties but different details (for example, timeout parameters)
125     * may be created using {@link ThreadPoolExecutor} constructors.
126 tim 1.1 *
127     * @return the newly created thread pool
128     */
129 dl 1.2 public static ExecutorService newCachedThreadPool() {
130     return new DelegatedExecutorService
131     (new ThreadPoolExecutor(0, Integer.MAX_VALUE,
132     60, TimeUnit.SECONDS,
133 dl 1.3 new SynchronousQueue<Runnable>()));
134 tim 1.1 }
135    
136     /**
137 dl 1.2 * Creates a thread pool that creates new threads as needed, but
138     * will reuse previously constructed threads when they are
139 tim 1.6 * available, and uses the provided
140 dl 1.2 * ThreadFactory to create new threads when needed.
141 dl 1.12 * @param threadFactory the factory to use when creating new threads
142 tim 1.1 * @return the newly created thread pool
143     */
144 dl 1.2 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
145     return new DelegatedExecutorService
146     (new ThreadPoolExecutor(0, Integer.MAX_VALUE,
147     60, TimeUnit.SECONDS,
148 dl 1.3 new SynchronousQueue<Runnable>(),
149 dl 1.14 threadFactory));
150 tim 1.1 }
151    
152     /**
153 dl 1.25 * Executes a Runnable task and returns a Future representing that
154 tim 1.15 * task.
155     *
156     * @param executor the Executor to which the task will be submitted
157     * @param task the task to submit
158 dl 1.25 * @return a Future representing pending completion of the task,
159     * and whose <tt>get()</tt> method will return <tt>Boolean.TRUE</tt>
160     * upon completion
161 tim 1.15 * @throws RejectedExecutionException if task cannot be scheduled
162     * for execution
163     */
164 dl 1.25 public static Future<Boolean> execute(Executor executor, Runnable task) {
165 tim 1.15 FutureTask<Boolean> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
166     executor.execute(ftask);
167     return ftask;
168     }
169    
170     /**
171 tim 1.1 * Executes a Runnable task and returns a Future representing that
172     * task.
173     *
174     * @param executor the Executor to which the task will be submitted
175     * @param task the task to submit
176     * @param value the value which will become the return value of
177     * the task upon task completion
178 dl 1.5 * @return a Future representing pending completion of the task
179 jozart 1.9 * @throws RejectedExecutionException if task cannot be scheduled
180 tim 1.1 * for execution
181     */
182 dl 1.5 public static <T> Future<T> execute(Executor executor, Runnable task, T value) {
183 dl 1.10 FutureTask<T> ftask = new FutureTask<T>(task, value);
184 tim 1.1 executor.execute(ftask);
185     return ftask;
186     }
187    
188     /**
189     * Executes a value-returning task and returns a Future
190     * representing the pending results of the task.
191     *
192     * @param executor the Executor to which the task will be submitted
193     * @param task the task to submit
194     * @return a Future representing pending completion of the task
195 jozart 1.9 * @throws RejectedExecutionException if task cannot be scheduled
196     * for execution
197 tim 1.1 */
198 tim 1.15 public static <T> Future<T> execute(Executor executor, Callable<T> task) {
199 dl 1.10 FutureTask<T> ftask = new FutureTask<T>(task);
200 tim 1.1 executor.execute(ftask);
201     return ftask;
202     }
203    
204     /**
205     * Executes a Runnable task and blocks until it completes normally
206     * or throws an exception.
207     *
208     * @param executor the Executor to which the task will be submitted
209     * @param task the task to submit
210 jozart 1.9 * @throws RejectedExecutionException if task cannot be scheduled
211     * for execution
212 dl 1.19 * @throws ExecutionException if the task encountered an exception
213     * while executing
214 tim 1.1 */
215     public static void invoke(Executor executor, Runnable task)
216     throws ExecutionException, InterruptedException {
217 tim 1.13 FutureTask<Boolean> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
218 tim 1.1 executor.execute(ftask);
219     ftask.get();
220     }
221    
222     /**
223     * Executes a value-returning task and blocks until it returns a
224     * value or throws an exception.
225     *
226     * @param executor the Executor to which the task will be submitted
227     * @param task the task to submit
228     * @return a Future representing pending completion of the task
229 jozart 1.9 * @throws RejectedExecutionException if task cannot be scheduled
230     * for execution
231 dl 1.12 * @throws InterruptedException if interrupted while waiting for
232     * completion
233 dl 1.19 * @throws ExecutionException if the task encountered an exception
234     * while executing
235 tim 1.1 */
236     public static <T> T invoke(Executor executor, Callable<T> task)
237     throws ExecutionException, InterruptedException {
238     FutureTask<T> ftask = new FutureTask<T>(task);
239     executor.execute(ftask);
240     return ftask.get();
241 tim 1.6 }
242    
243 tim 1.20
244     /**
245     * Executes a privileged action under the current access control
246     * context and returns a Future representing the pending result
247     * object of that action.
248     *
249     * @param executor the Executor to which the task will be submitted
250     * @param action the action to submit
251     * @return a Future representing pending completion of the action
252     * @throws RejectedExecutionException if action cannot be scheduled
253     * for execution
254     */
255     public static Future<Object> execute(Executor executor, PrivilegedAction action) {
256     Callable<Object> task = new PrivilegedActionAdapter(action);
257 tim 1.21 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
258 tim 1.20 executor.execute(future);
259     return future;
260     }
261    
262     /**
263     * Executes a privileged exception action under the current access control
264     * context and returns a Future representing the pending result
265     * object of that action.
266     *
267     * @param executor the Executor to which the task will be submitted
268     * @param action the action to submit
269     * @return a Future representing pending completion of the action
270     * @throws RejectedExecutionException if action cannot be scheduled
271     * for execution
272     */
273     public static Future<Object> execute(Executor executor, PrivilegedExceptionAction action) {
274     Callable<Object> task = new PrivilegedExceptionActionAdapter(action);
275 tim 1.21 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
276 tim 1.20 executor.execute(future);
277     return future;
278     }
279    
280     private static class PrivilegedActionAdapter implements Callable<Object> {
281     PrivilegedActionAdapter(PrivilegedAction action) {
282     this.action = action;
283     }
284     public Object call () {
285     return action.run();
286     }
287     private final PrivilegedAction action;
288     }
289    
290     private static class PrivilegedExceptionActionAdapter implements Callable<Object> {
291     PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
292     this.action = action;
293     }
294     public Object call () throws Exception {
295     return action.run();
296     }
297     private final PrivilegedExceptionAction action;
298     }
299    
300 dl 1.22 /**
301     * Return a default thread factory used to create new threads.
302     * This factory creates all new threads used by an Executor in the
303     * same {@link ThreadGroup}. If there is a {@link
304     * java.lang.SecurityManager}, it uses the group of {@link
305     * System#getSecurityManager}, else the group of the thread
306     * invoking this <tt>defaultThreadFactory</tt> method. Each new
307     * thread is created as a non-daemon thread with priority
308     * <tt>Thread.NORM_PRIORITY</tt>. New threads have names
309     * accessible via {@link Thread#getName} of
310     * <em>pool-N-thread-M</em>, where <em>N</em> is the sequence
311     * number of this factory, and <em>M</em> is the sequence number
312     * of the thread created by this factory.
313     * @return the thread factory
314     */
315     public static ThreadFactory defaultThreadFactory() {
316     return new DefaultThreadFactory();
317     }
318    
319     /**
320 dl 1.24 * Return a thread factory used to create new threads that
321     * have the same permissions as the current thread.
322 dl 1.22 * This factory creates threads with the same settings as {@link
323     * Executors#defaultThreadFactory}, additionally setting the
324     * AccessControlContext and contextClassLoader of new threads to
325     * be the same as the thread invoking this
326     * <tt>privilegedThreadFactory</tt> method. A new
327     * <tt>privilegedThreadFactory</tt> can be created within an
328 dl 1.23 * {@link AccessController#doPrivileged} action setting the
329 dl 1.24 * current thread's access control context to create threads with
330 dl 1.23 * the selected permission settings holding within that action.
331 dl 1.22 *
332     * <p> Note that while tasks running within such threads will have
333     * the same access control and class loader settings as the
334     * current thread, they need not have the same {@link
335     * java.lang.ThreadLocal} or {@link
336     * java.lang.InheritableThreadLocal} values. If necessary,
337     * particular values of thread locals can be set or reset before
338     * any task runs in {@link ThreadPoolExecutor} subclasses using
339     * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is
340     * necessary to initialize worker threads to have the same
341     * InheritableThreadLocal settings as some other designated
342     * thread, you can create a custom ThreadFactory in which that
343     * thread waits for and services requests to create others that
344     * will inherit its values.
345     *
346     * @return the thread factory
347     * @throws AccessControlException if the current access control
348     * context does not have permission to both get and set context
349     * class loader.
350     * @see PrivilegedFutureTask
351     */
352     public static ThreadFactory privilegedThreadFactory() {
353     return new PrivilegedThreadFactory();
354     }
355    
356     static class DefaultThreadFactory implements ThreadFactory {
357     static final AtomicInteger poolNumber = new AtomicInteger(1);
358     final ThreadGroup group;
359     final AtomicInteger threadNumber = new AtomicInteger(1);
360     final String namePrefix;
361    
362     DefaultThreadFactory() {
363     SecurityManager s = System.getSecurityManager();
364     group = (s != null)? s.getThreadGroup() :
365     Thread.currentThread().getThreadGroup();
366     namePrefix = "pool-" +
367     poolNumber.getAndIncrement() +
368     "-thread-";
369     }
370    
371     public Thread newThread(Runnable r) {
372     Thread t = new Thread(group, r,
373     namePrefix + threadNumber.getAndIncrement(),
374     0);
375     if (t.isDaemon())
376     t.setDaemon(false);
377     if (t.getPriority() != Thread.NORM_PRIORITY)
378     t.setPriority(Thread.NORM_PRIORITY);
379     return t;
380     }
381     }
382    
383     static class PrivilegedThreadFactory extends DefaultThreadFactory {
384     private final ClassLoader ccl;
385     private final AccessControlContext acc;
386    
387     PrivilegedThreadFactory() {
388     super();
389     this.ccl = Thread.currentThread().getContextClassLoader();
390     this.acc = AccessController.getContext();
391     acc.checkPermission(new RuntimePermission("setContextClassLoader"));
392     }
393    
394     public Thread newThread(final Runnable r) {
395     return super.newThread(new Runnable() {
396     public void run() {
397     AccessController.doPrivileged(new PrivilegedAction() {
398     public Object run() {
399     Thread.currentThread().setContextClassLoader(ccl);
400     r.run();
401     return null;
402     }
403     }, acc);
404     }
405     });
406     }
407    
408     }
409    
410 tim 1.20
411 tim 1.15 /** Cannot instantiate. */
412     private Executors() {}
413 tim 1.1 }