ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.7
Committed: Tue Dec 16 12:45:06 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.6: +122 -72 lines
Log Message:
invokeAny returns value, not list

File Contents

# User Rev Content
1 tim 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8    
9     import java.security.AccessControlContext;
10     import java.security.AccessController;
11     import java.security.PrivilegedAction;
12     import java.security.PrivilegedExceptionAction;
13 dl 1.3 import java.util.*;
14     import java.util.concurrent.locks.*;
15 tim 1.1
16     /**
17 dl 1.2 * Provides default implementation of {@link ExecutorService}
18     * execution methods. This class implements the <tt>submit</tt> and
19     * <tt>invoke</tt> methods using the default {@link FutureTask} and
20     * {@link PrivilegedFutureTask} classes provided in this package. For
21     * example, the the implementation of <tt>submit(Runnable)</tt>
22     * creates an associated <tt>FutureTask</tt> that is executed and
23     * returned. Subclasses overriding these methods to use different
24     * {@link Future} implementations should do so consistently for each
25     * of these methods.
26 tim 1.1 *
27     * @since 1.5
28     * @author Doug Lea
29     */
30     public abstract class AbstractExecutorService implements ExecutorService {
31    
32 dl 1.6 public <T> Future<T> submit(Runnable task, T result) {
33     FutureTask<T> ftask = new FutureTask<T>(task, result);
34 tim 1.1 execute(ftask);
35     return ftask;
36     }
37    
38     public <T> Future<T> submit(Callable<T> task) {
39     FutureTask<T> ftask = new FutureTask<T>(task);
40     execute(ftask);
41     return ftask;
42     }
43    
44     public void invoke(Runnable task) throws ExecutionException, InterruptedException {
45     FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
46     execute(ftask);
47     ftask.get();
48     }
49    
50     public <T> T invoke(Callable<T> task) throws ExecutionException, InterruptedException {
51     FutureTask<T> ftask = new FutureTask<T>(task);
52     execute(ftask);
53     return ftask.get();
54     }
55    
56     public Future<Object> submit(PrivilegedAction action) {
57     Callable<Object> task = new PrivilegedActionAdapter(action);
58     FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
59     execute(future);
60     return future;
61     }
62    
63     public Future<Object> submit(PrivilegedExceptionAction action) {
64     Callable<Object> task = new PrivilegedExceptionActionAdapter(action);
65     FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
66     execute(future);
67     return future;
68     }
69    
70     private static class PrivilegedActionAdapter implements Callable<Object> {
71     PrivilegedActionAdapter(PrivilegedAction action) {
72     this.action = action;
73     }
74     public Object call () {
75     return action.run();
76     }
77     private final PrivilegedAction action;
78     }
79    
80     private static class PrivilegedExceptionActionAdapter implements Callable<Object> {
81     PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
82     this.action = action;
83     }
84     public Object call () throws Exception {
85     return action.run();
86     }
87     private final PrivilegedExceptionAction action;
88     }
89 dl 1.3
90 dl 1.7 // any/all methods, each a little bit different than the other
91 dl 1.3
92    
93 dl 1.7 public <T> T invokeAny(Collection<Callable<T>> tasks)
94     throws InterruptedException, ExecutionException {
95 dl 1.3 if (tasks == null)
96     throw new NullPointerException();
97     int n = tasks.size();
98     if (n == 0)
99 dl 1.7 throw new IllegalArgumentException();
100     List<Future<T>> futures= new ArrayList<Future<T>>(n);
101     ExecutorCompletionService<T> ecs =
102     new ExecutorCompletionService<T>(this);
103 dl 1.3 try {
104 dl 1.7 for (Callable<T> t : tasks)
105     futures.add(ecs.submit(t));
106     ExecutionException ee = null;
107     RuntimeException re = null;
108     while (n-- > 0) {
109     Future<T> f = ecs.take();
110     try {
111     return f.get();
112     } catch(ExecutionException eex) {
113     ee = eex;
114     } catch(RuntimeException rex) {
115     re = rex;
116     }
117     }
118     if (ee != null)
119     throw ee;
120     if (re != null)
121     throw new ExecutionException(re);
122     throw new ExecutionException();
123 dl 1.3 } finally {
124 dl 1.6 for (Future<T> f : futures)
125 dl 1.4 f.cancel(true);
126 dl 1.3 }
127     }
128    
129 dl 1.7 public <T> T invokeAny(Collection<Callable<T>> tasks,
130     long timeout, TimeUnit unit)
131     throws InterruptedException, ExecutionException, TimeoutException {
132 dl 1.3 if (tasks == null || unit == null)
133     throw new NullPointerException();
134 dl 1.7 long nanos = unit.toNanos(timeout);
135 dl 1.3 int n = tasks.size();
136     if (n == 0)
137 dl 1.7 throw new IllegalArgumentException();
138     List<Future<T>> futures= new ArrayList<Future<T>>(n);
139     ExecutorCompletionService<T> ecs =
140     new ExecutorCompletionService<T>(this);
141 dl 1.3 try {
142 dl 1.7 for (Callable<T> t : tasks)
143     futures.add(ecs.submit(t));
144     ExecutionException ee = null;
145     RuntimeException re = null;
146     long lastTime = System.nanoTime();
147     while (n-- > 0) {
148     Future<T> f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
149     if (f == null) {
150     if (nanos <= 0)
151     throw new TimeoutException();
152     long now = System.nanoTime();
153     nanos -= now - lastTime;
154     lastTime = now;
155     }
156     try {
157     return f.get();
158     } catch(ExecutionException eex) {
159     ee = eex;
160     } catch(RuntimeException rex) {
161     re = rex;
162     }
163     }
164     if (ee != null)
165     throw ee;
166     if (re != null)
167     throw new ExecutionException(re);
168     throw new ExecutionException();
169 dl 1.3 } finally {
170 dl 1.6 for (Future<T> f : futures)
171 dl 1.4 f.cancel(true);
172 dl 1.3 }
173     }
174    
175 dl 1.7
176     public <T> T invokeAny(Collection<Runnable> tasks, T result)
177     throws InterruptedException, ExecutionException {
178 dl 1.3 if (tasks == null)
179     throw new NullPointerException();
180     int n = tasks.size();
181     if (n == 0)
182 dl 1.7 throw new IllegalArgumentException();
183     List<Future<T>> futures= new ArrayList<Future<T>>(n);
184     ExecutorCompletionService<T> ecs =
185     new ExecutorCompletionService<T>(this);
186 dl 1.3 try {
187 dl 1.7 for (Runnable t : tasks)
188     futures.add(ecs.submit(t, result));
189     ExecutionException ee = null;
190     RuntimeException re = null;
191     while (n-- > 0) {
192     Future<T> f = ecs.take();
193     try {
194     return f.get();
195     } catch(ExecutionException eex) {
196     ee = eex;
197     } catch(RuntimeException rex) {
198     re = rex;
199     }
200     }
201     if (ee != null)
202     throw ee;
203     if (re != null)
204     throw new ExecutionException(re);
205     throw new ExecutionException();
206 dl 1.3 } finally {
207 dl 1.6 for (Future<T> f : futures)
208     f.cancel(true);
209 dl 1.3 }
210     }
211    
212 dl 1.7 public <T> T invokeAny(Collection<Runnable> tasks, T result,
213     long timeout, TimeUnit unit)
214     throws InterruptedException, ExecutionException, TimeoutException {
215 dl 1.3 if (tasks == null || unit == null)
216     throw new NullPointerException();
217 dl 1.7 long nanos = unit.toNanos(timeout);
218 dl 1.3 int n = tasks.size();
219 dl 1.7 if (n == 0)
220     throw new IllegalArgumentException();
221 dl 1.6 List<Future<T>> futures= new ArrayList<Future<T>>(n);
222 dl 1.7 ExecutorCompletionService<T> ecs =
223     new ExecutorCompletionService<T>(this);
224 dl 1.3 try {
225 dl 1.7 for (Runnable t : tasks)
226     futures.add(ecs.submit(t, result));
227     ExecutionException ee = null;
228     RuntimeException re = null;
229     long lastTime = System.nanoTime();
230     while (n-- > 0) {
231     Future<T> f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
232     if (f == null) {
233     if (nanos <= 0)
234     throw new TimeoutException();
235     long now = System.nanoTime();
236     nanos -= now - lastTime;
237     lastTime = now;
238     }
239     try {
240     return f.get();
241     } catch(ExecutionException eex) {
242     ee = eex;
243     } catch(RuntimeException rex) {
244     re = rex;
245     }
246     }
247     if (ee != null)
248     throw ee;
249     if (re != null)
250     throw new ExecutionException(re);
251     throw new ExecutionException();
252 dl 1.3 } finally {
253 dl 1.6 for (Future<T> f : futures)
254     f.cancel(true);
255 dl 1.3 }
256     }
257    
258 dl 1.6
259     public <T> List<Future<T>> invokeAll(List<Runnable> tasks, T result)
260 dl 1.3 throws InterruptedException {
261     if (tasks == null)
262     throw new NullPointerException();
263 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
264     boolean done = false;
265 dl 1.3 try {
266 dl 1.6 for (Runnable t : tasks) {
267     FutureTask<T> f = new FutureTask<T>(t, result);
268 dl 1.3 futures.add(f);
269 dl 1.6 execute(f);
270     }
271     for (Future<T> f : futures) {
272     if (!f.isDone()) {
273     try {
274     f.get();
275     } catch(CancellationException ignore) {
276     } catch(ExecutionException ignore) {
277     }
278     }
279 dl 1.3 }
280 dl 1.6 done = true;
281 dl 1.3 return futures;
282     } finally {
283 dl 1.6 if (!done)
284     for (Future<T> f : futures)
285     f.cancel(true);
286 dl 1.3 }
287     }
288    
289 dl 1.6 public <T> List<Future<T>> invokeAll(List<Runnable> tasks, T result,
290     long timeout, TimeUnit unit)
291 dl 1.3 throws InterruptedException {
292     if (tasks == null || unit == null)
293     throw new NullPointerException();
294     long nanos = unit.toNanos(timeout);
295 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
296     boolean done = false;
297 dl 1.3 try {
298 dl 1.6 for (Runnable t : tasks) {
299     FutureTask<T> f = new FutureTask<T>(t, result);
300 dl 1.3 futures.add(f);
301 dl 1.6 execute(f);
302     }
303     long lastTime = System.nanoTime();
304     for (Future<T> f : futures) {
305     if (!f.isDone()) {
306     if (nanos < 0)
307     return futures;
308     try {
309     f.get(nanos, TimeUnit.NANOSECONDS);
310     long now = System.nanoTime();
311     nanos -= now - lastTime;
312     lastTime = now;
313     } catch(CancellationException ignore) {
314     } catch(ExecutionException ignore) {
315     } catch(TimeoutException toe) {
316     return futures;
317     }
318     }
319 dl 1.3 }
320 dl 1.6 done = true;
321 dl 1.3 return futures;
322     } finally {
323 dl 1.6 if (!done)
324     for (Future<T> f : futures)
325     f.cancel(true);
326 dl 1.3 }
327     }
328    
329 dl 1.6 public <T> List<Future<T>> invokeAll(List<Callable<T>> tasks)
330 dl 1.3 throws InterruptedException {
331     if (tasks == null)
332     throw new NullPointerException();
333 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
334     boolean done = false;
335 dl 1.3 try {
336     for (Callable<T> t : tasks) {
337 dl 1.6 FutureTask<T> f = new FutureTask<T>(t);
338 dl 1.3 futures.add(f);
339     execute(f);
340     }
341 dl 1.6 for (Future<T> f : futures) {
342     if (!f.isDone()) {
343     try {
344     f.get();
345     } catch(CancellationException ignore) {
346     } catch(ExecutionException ignore) {
347     }
348     }
349     }
350     done = true;
351 dl 1.3 return futures;
352     } finally {
353 dl 1.6 if (!done)
354 dl 1.4 for (Future<T> f : futures)
355     f.cancel(true);
356 dl 1.3 }
357     }
358    
359 dl 1.6 public <T> List<Future<T>> invokeAll(List<Callable<T>> tasks,
360     long timeout, TimeUnit unit)
361 dl 1.3 throws InterruptedException {
362     if (tasks == null || unit == null)
363     throw new NullPointerException();
364     long nanos = unit.toNanos(timeout);
365 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
366     boolean done = false;
367 dl 1.3 try {
368     for (Callable<T> t : tasks) {
369 dl 1.6 FutureTask<T> f = new FutureTask<T>(t);
370 dl 1.3 futures.add(f);
371     execute(f);
372     }
373 dl 1.6 long lastTime = System.nanoTime();
374     for (Future<T> f : futures) {
375     if (!f.isDone()) {
376     if (nanos < 0)
377     return futures;
378     try {
379     f.get(nanos, TimeUnit.NANOSECONDS);
380     long now = System.nanoTime();
381     nanos -= now - lastTime;
382     lastTime = now;
383     } catch(CancellationException ignore) {
384     } catch(ExecutionException ignore) {
385     } catch(TimeoutException toe) {
386     return futures;
387     }
388     }
389     }
390     done = true;
391 dl 1.3 return futures;
392     } finally {
393 dl 1.6 if (!done)
394 dl 1.4 for (Future<T> f : futures)
395     f.cancel(true);
396 dl 1.3 }
397     }
398    
399 tim 1.1 }