ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.8
Committed: Tue Dec 16 12:58:20 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.7: +16 -20 lines
Log Message:
Fix timeout control

File Contents

# User Rev Content
1 tim 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8    
9     import java.security.AccessControlContext;
10     import java.security.AccessController;
11     import java.security.PrivilegedAction;
12     import java.security.PrivilegedExceptionAction;
13 dl 1.3 import java.util.*;
14     import java.util.concurrent.locks.*;
15 tim 1.1
16     /**
17 dl 1.2 * Provides default implementation of {@link ExecutorService}
18     * execution methods. This class implements the <tt>submit</tt> and
19     * <tt>invoke</tt> methods using the default {@link FutureTask} and
20     * {@link PrivilegedFutureTask} classes provided in this package. For
21     * example, the the implementation of <tt>submit(Runnable)</tt>
22     * creates an associated <tt>FutureTask</tt> that is executed and
23     * returned. Subclasses overriding these methods to use different
24     * {@link Future} implementations should do so consistently for each
25     * of these methods.
26 tim 1.1 *
27     * @since 1.5
28     * @author Doug Lea
29     */
30     public abstract class AbstractExecutorService implements ExecutorService {
31    
32 dl 1.6 public <T> Future<T> submit(Runnable task, T result) {
33     FutureTask<T> ftask = new FutureTask<T>(task, result);
34 tim 1.1 execute(ftask);
35     return ftask;
36     }
37    
38     public <T> Future<T> submit(Callable<T> task) {
39     FutureTask<T> ftask = new FutureTask<T>(task);
40     execute(ftask);
41     return ftask;
42     }
43    
44     public void invoke(Runnable task) throws ExecutionException, InterruptedException {
45     FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
46     execute(ftask);
47     ftask.get();
48     }
49    
50     public <T> T invoke(Callable<T> task) throws ExecutionException, InterruptedException {
51     FutureTask<T> ftask = new FutureTask<T>(task);
52     execute(ftask);
53     return ftask.get();
54     }
55    
56     public Future<Object> submit(PrivilegedAction action) {
57     Callable<Object> task = new PrivilegedActionAdapter(action);
58     FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
59     execute(future);
60     return future;
61     }
62    
63     public Future<Object> submit(PrivilegedExceptionAction action) {
64     Callable<Object> task = new PrivilegedExceptionActionAdapter(action);
65     FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
66     execute(future);
67     return future;
68     }
69    
70     private static class PrivilegedActionAdapter implements Callable<Object> {
71     PrivilegedActionAdapter(PrivilegedAction action) {
72     this.action = action;
73     }
74     public Object call () {
75     return action.run();
76     }
77     private final PrivilegedAction action;
78     }
79    
80     private static class PrivilegedExceptionActionAdapter implements Callable<Object> {
81     PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
82     this.action = action;
83     }
84     public Object call () throws Exception {
85     return action.run();
86     }
87     private final PrivilegedExceptionAction action;
88     }
89 dl 1.3
90 dl 1.7 // any/all methods, each a little bit different than the other
91 dl 1.3
92    
93 dl 1.7 public <T> T invokeAny(Collection<Callable<T>> tasks)
94     throws InterruptedException, ExecutionException {
95 dl 1.3 if (tasks == null)
96     throw new NullPointerException();
97     int n = tasks.size();
98     if (n == 0)
99 dl 1.7 throw new IllegalArgumentException();
100     List<Future<T>> futures= new ArrayList<Future<T>>(n);
101     ExecutorCompletionService<T> ecs =
102     new ExecutorCompletionService<T>(this);
103 dl 1.3 try {
104 dl 1.7 for (Callable<T> t : tasks)
105     futures.add(ecs.submit(t));
106     ExecutionException ee = null;
107     RuntimeException re = null;
108     while (n-- > 0) {
109     Future<T> f = ecs.take();
110     try {
111     return f.get();
112     } catch(ExecutionException eex) {
113     ee = eex;
114     } catch(RuntimeException rex) {
115     re = rex;
116     }
117     }
118     if (ee != null)
119     throw ee;
120     if (re != null)
121     throw new ExecutionException(re);
122     throw new ExecutionException();
123 dl 1.3 } finally {
124 dl 1.6 for (Future<T> f : futures)
125 dl 1.4 f.cancel(true);
126 dl 1.3 }
127     }
128    
129 dl 1.7 public <T> T invokeAny(Collection<Callable<T>> tasks,
130     long timeout, TimeUnit unit)
131     throws InterruptedException, ExecutionException, TimeoutException {
132 dl 1.3 if (tasks == null || unit == null)
133     throw new NullPointerException();
134 dl 1.7 long nanos = unit.toNanos(timeout);
135 dl 1.3 int n = tasks.size();
136     if (n == 0)
137 dl 1.7 throw new IllegalArgumentException();
138     List<Future<T>> futures= new ArrayList<Future<T>>(n);
139     ExecutorCompletionService<T> ecs =
140     new ExecutorCompletionService<T>(this);
141 dl 1.3 try {
142 dl 1.7 for (Callable<T> t : tasks)
143     futures.add(ecs.submit(t));
144     ExecutionException ee = null;
145     RuntimeException re = null;
146     long lastTime = System.nanoTime();
147     while (n-- > 0) {
148     Future<T> f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
149 dl 1.8 if (f == null)
150     throw new TimeoutException();
151 dl 1.7 try {
152     return f.get();
153     } catch(ExecutionException eex) {
154     ee = eex;
155     } catch(RuntimeException rex) {
156     re = rex;
157     }
158 dl 1.8 long now = System.nanoTime();
159     nanos -= now - lastTime;
160     lastTime = now;
161 dl 1.7 }
162     if (ee != null)
163     throw ee;
164     if (re != null)
165     throw new ExecutionException(re);
166     throw new ExecutionException();
167 dl 1.3 } finally {
168 dl 1.6 for (Future<T> f : futures)
169 dl 1.4 f.cancel(true);
170 dl 1.3 }
171     }
172    
173 dl 1.7
174     public <T> T invokeAny(Collection<Runnable> tasks, T result)
175     throws InterruptedException, ExecutionException {
176 dl 1.3 if (tasks == null)
177     throw new NullPointerException();
178     int n = tasks.size();
179     if (n == 0)
180 dl 1.7 throw new IllegalArgumentException();
181     List<Future<T>> futures= new ArrayList<Future<T>>(n);
182     ExecutorCompletionService<T> ecs =
183     new ExecutorCompletionService<T>(this);
184 dl 1.3 try {
185 dl 1.7 for (Runnable t : tasks)
186     futures.add(ecs.submit(t, result));
187     ExecutionException ee = null;
188     RuntimeException re = null;
189     while (n-- > 0) {
190     Future<T> f = ecs.take();
191     try {
192     return f.get();
193     } catch(ExecutionException eex) {
194     ee = eex;
195     } catch(RuntimeException rex) {
196     re = rex;
197     }
198     }
199     if (ee != null)
200     throw ee;
201     if (re != null)
202     throw new ExecutionException(re);
203     throw new ExecutionException();
204 dl 1.3 } finally {
205 dl 1.6 for (Future<T> f : futures)
206     f.cancel(true);
207 dl 1.3 }
208     }
209    
210 dl 1.7 public <T> T invokeAny(Collection<Runnable> tasks, T result,
211     long timeout, TimeUnit unit)
212     throws InterruptedException, ExecutionException, TimeoutException {
213 dl 1.3 if (tasks == null || unit == null)
214     throw new NullPointerException();
215 dl 1.7 long nanos = unit.toNanos(timeout);
216 dl 1.3 int n = tasks.size();
217 dl 1.7 if (n == 0)
218     throw new IllegalArgumentException();
219 dl 1.6 List<Future<T>> futures= new ArrayList<Future<T>>(n);
220 dl 1.7 ExecutorCompletionService<T> ecs =
221     new ExecutorCompletionService<T>(this);
222 dl 1.3 try {
223 dl 1.7 for (Runnable t : tasks)
224     futures.add(ecs.submit(t, result));
225     ExecutionException ee = null;
226     RuntimeException re = null;
227     long lastTime = System.nanoTime();
228     while (n-- > 0) {
229     Future<T> f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
230 dl 1.8 if (f == null)
231     throw new TimeoutException();
232 dl 1.7 try {
233     return f.get();
234     } catch(ExecutionException eex) {
235     ee = eex;
236     } catch(RuntimeException rex) {
237     re = rex;
238     }
239 dl 1.8 long now = System.nanoTime();
240     nanos -= now - lastTime;
241     lastTime = now;
242 dl 1.7 }
243     if (ee != null)
244     throw ee;
245     if (re != null)
246     throw new ExecutionException(re);
247     throw new ExecutionException();
248 dl 1.3 } finally {
249 dl 1.6 for (Future<T> f : futures)
250     f.cancel(true);
251 dl 1.3 }
252     }
253    
254 dl 1.6
255     public <T> List<Future<T>> invokeAll(List<Runnable> tasks, T result)
256 dl 1.3 throws InterruptedException {
257     if (tasks == null)
258     throw new NullPointerException();
259 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
260     boolean done = false;
261 dl 1.3 try {
262 dl 1.6 for (Runnable t : tasks) {
263     FutureTask<T> f = new FutureTask<T>(t, result);
264 dl 1.3 futures.add(f);
265 dl 1.6 execute(f);
266     }
267     for (Future<T> f : futures) {
268     if (!f.isDone()) {
269     try {
270     f.get();
271     } catch(CancellationException ignore) {
272     } catch(ExecutionException ignore) {
273     }
274     }
275 dl 1.3 }
276 dl 1.6 done = true;
277 dl 1.3 return futures;
278     } finally {
279 dl 1.6 if (!done)
280     for (Future<T> f : futures)
281     f.cancel(true);
282 dl 1.3 }
283     }
284    
285 dl 1.6 public <T> List<Future<T>> invokeAll(List<Runnable> tasks, T result,
286     long timeout, TimeUnit unit)
287 dl 1.3 throws InterruptedException {
288     if (tasks == null || unit == null)
289     throw new NullPointerException();
290     long nanos = unit.toNanos(timeout);
291 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
292     boolean done = false;
293 dl 1.3 try {
294 dl 1.6 for (Runnable t : tasks) {
295     FutureTask<T> f = new FutureTask<T>(t, result);
296 dl 1.3 futures.add(f);
297 dl 1.6 execute(f);
298     }
299     long lastTime = System.nanoTime();
300     for (Future<T> f : futures) {
301     if (!f.isDone()) {
302     if (nanos < 0)
303     return futures;
304     try {
305     f.get(nanos, TimeUnit.NANOSECONDS);
306     } catch(CancellationException ignore) {
307     } catch(ExecutionException ignore) {
308     } catch(TimeoutException toe) {
309     return futures;
310     }
311 dl 1.8 long now = System.nanoTime();
312     nanos -= now - lastTime;
313     lastTime = now;
314 dl 1.6 }
315 dl 1.3 }
316 dl 1.6 done = true;
317 dl 1.3 return futures;
318     } finally {
319 dl 1.6 if (!done)
320     for (Future<T> f : futures)
321     f.cancel(true);
322 dl 1.3 }
323     }
324    
325 dl 1.6 public <T> List<Future<T>> invokeAll(List<Callable<T>> tasks)
326 dl 1.3 throws InterruptedException {
327     if (tasks == null)
328     throw new NullPointerException();
329 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
330     boolean done = false;
331 dl 1.3 try {
332     for (Callable<T> t : tasks) {
333 dl 1.6 FutureTask<T> f = new FutureTask<T>(t);
334 dl 1.3 futures.add(f);
335     execute(f);
336     }
337 dl 1.6 for (Future<T> f : futures) {
338     if (!f.isDone()) {
339     try {
340     f.get();
341     } catch(CancellationException ignore) {
342     } catch(ExecutionException ignore) {
343     }
344     }
345     }
346     done = true;
347 dl 1.3 return futures;
348     } finally {
349 dl 1.6 if (!done)
350 dl 1.4 for (Future<T> f : futures)
351     f.cancel(true);
352 dl 1.3 }
353     }
354    
355 dl 1.6 public <T> List<Future<T>> invokeAll(List<Callable<T>> tasks,
356     long timeout, TimeUnit unit)
357 dl 1.3 throws InterruptedException {
358     if (tasks == null || unit == null)
359     throw new NullPointerException();
360     long nanos = unit.toNanos(timeout);
361 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
362     boolean done = false;
363 dl 1.3 try {
364     for (Callable<T> t : tasks) {
365 dl 1.6 FutureTask<T> f = new FutureTask<T>(t);
366 dl 1.3 futures.add(f);
367     execute(f);
368     }
369 dl 1.6 long lastTime = System.nanoTime();
370     for (Future<T> f : futures) {
371     if (!f.isDone()) {
372     if (nanos < 0)
373     return futures;
374     try {
375     f.get(nanos, TimeUnit.NANOSECONDS);
376     } catch(CancellationException ignore) {
377     } catch(ExecutionException ignore) {
378     } catch(TimeoutException toe) {
379     return futures;
380     }
381 dl 1.8 long now = System.nanoTime();
382     nanos -= now - lastTime;
383     lastTime = now;
384 dl 1.6 }
385     }
386     done = true;
387 dl 1.3 return futures;
388     } finally {
389 dl 1.6 if (!done)
390 dl 1.4 for (Future<T> f : futures)
391     f.cancel(true);
392 dl 1.3 }
393     }
394    
395 tim 1.1 }