ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.24
Committed: Sun Jun 12 00:35:38 2005 UTC (19 years ago) by jsr166
Branch: MAIN
Changes since 1.23: +5 -3 lines
Log Message:
doc fixes

File Contents

# User Rev Content
1 tim 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.17 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 tim 1.1 */
6    
7     package java.util.concurrent;
8    
9 dl 1.3 import java.util.*;
10 tim 1.1
11     /**
12 jsr166 1.24 * Provides default implementations of {@link ExecutorService}
13 dl 1.12 * execution methods. This class implements the <tt>submit</tt>,
14 peierls 1.21 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
15     * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
16     * to the {@link FutureTask} class provided in this package. For example,
17 dl 1.18 * the implementation of <tt>submit(Runnable)</tt> creates an
18 peierls 1.21 * associated <tt>RunnableFuture</tt> that is executed and
19     * returned. Subclasses may override the <tt>newTaskFor</tt> methods
20 jsr166 1.24 * to return <tt>RunnableFuture</tt> implementations other than
21 peierls 1.21 * <tt>FutureTask</tt>.
22 tim 1.1 *
23 dl 1.22 * <p> <b>Extension example</b>. Here is a sketch of a class
24     * that customizes {@link ThreadPoolExecutor} to use
25     * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
26     * <pre>
27     * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
28     *
29 dl 1.23 * static class CustomTask&lt;V&gt; implements RunnableFuture&lt;V&gt; {...}
30 dl 1.22 *
31 dl 1.23 * protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Callable&lt;V&gt; c) {
32 dl 1.22 * return new CustomTask&lt;V&gt;(c);
33     * }
34 dl 1.23 * protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Runnable r, V v) {
35 dl 1.22 * return new CustomTask&lt;V&gt;(r, v);
36 jsr166 1.24 * }
37 dl 1.22 * // ... add constructors, etc.
38     * }
39     * </pre>
40 tim 1.1 * @since 1.5
41     * @author Doug Lea
42     */
43     public abstract class AbstractExecutorService implements ExecutorService {
44    
45 peierls 1.21 /**
46     * Returns a <tt>RunnableFuture</tt> for the given runnable and default
47     * value.
48 jsr166 1.24 *
49 peierls 1.21 * @param runnable the runnable task being wrapped
50     * @param value the default value for the returned future
51 dl 1.22 * @return a <tt>RunnableFuture</tt> which when run will run the
52     * underlying runnable and which, as a <tt>Future</tt>, will yield
53     * the given value as its result and provide for cancellation of
54     * the underlying task.
55     * @since 1.6
56 peierls 1.21 */
57     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
58     return new FutureTask<T>(runnable, value);
59     }
60    
61     /**
62     * Returns a <tt>RunnableFuture</tt> for the given callable task.
63 jsr166 1.24 *
64 peierls 1.21 * @param callable the callable task being wrapped
65 dl 1.22 * @return a <tt>RunnableFuture</tt> which when run will call the
66     * underlying callable and which, as a <tt>Future</tt>, will yield
67     * the callable's result as its result and provide for
68     * cancellation of the underlying task.
69     * @since 1.6
70 peierls 1.21 */
71     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
72     return new FutureTask<T>(callable);
73     }
74    
75 dl 1.11 public Future<?> submit(Runnable task) {
76 dl 1.13 if (task == null) throw new NullPointerException();
77 peierls 1.21 RunnableFuture<Object> ftask = newTaskFor(task, null);
78 tim 1.1 execute(ftask);
79     return ftask;
80     }
81    
82 dl 1.13 public <T> Future<T> submit(Runnable task, T result) {
83     if (task == null) throw new NullPointerException();
84 peierls 1.21 RunnableFuture<T> ftask = newTaskFor(task, result);
85 dl 1.13 execute(ftask);
86     return ftask;
87     }
88    
89 tim 1.1 public <T> Future<T> submit(Callable<T> task) {
90 dl 1.13 if (task == null) throw new NullPointerException();
91 peierls 1.21 RunnableFuture<T> ftask = newTaskFor(task);
92 tim 1.1 execute(ftask);
93     return ftask;
94     }
95    
96 dl 1.15 /**
97     * the main mechanics of invokeAny.
98     */
99     private <T> T doInvokeAny(Collection<Callable<T>> tasks,
100     boolean timed, long nanos)
101     throws InterruptedException, ExecutionException, TimeoutException {
102 dl 1.3 if (tasks == null)
103     throw new NullPointerException();
104 dl 1.15 int ntasks = tasks.size();
105     if (ntasks == 0)
106 dl 1.7 throw new IllegalArgumentException();
107 dl 1.15 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
108 jsr166 1.20 ExecutorCompletionService<T> ecs =
109 dl 1.7 new ExecutorCompletionService<T>(this);
110 dl 1.15
111     // For efficiency, especially in executors with limited
112     // parallelism, check to see if previously submitted tasks are
113 dl 1.18 // done before submitting more of them. This interleaving
114 dl 1.15 // plus the exception mechanics account for messiness of main
115 dl 1.18 // loop.
116 dl 1.15
117 dl 1.3 try {
118 dl 1.15 // Record exceptions so that if we fail to obtain any
119     // result, we can throw the last exception we got.
120 dl 1.7 ExecutionException ee = null;
121 dl 1.15 long lastTime = (timed)? System.nanoTime() : 0;
122     Iterator<Callable<T>> it = tasks.iterator();
123    
124     // Start one task for sure; the rest incrementally
125     futures.add(ecs.submit(it.next()));
126     --ntasks;
127     int active = 1;
128    
129     for (;;) {
130 jsr166 1.20 Future<T> f = ecs.poll();
131 dl 1.15 if (f == null) {
132     if (ntasks > 0) {
133     --ntasks;
134     futures.add(ecs.submit(it.next()));
135     ++active;
136     }
137 jsr166 1.20 else if (active == 0)
138 dl 1.15 break;
139     else if (timed) {
140     f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
141     if (f == null)
142     throw new TimeoutException();
143     long now = System.nanoTime();
144     nanos -= now - lastTime;
145     lastTime = now;
146     }
147 jsr166 1.20 else
148 dl 1.15 f = ecs.take();
149     }
150     if (f != null) {
151     --active;
152     try {
153     return f.get();
154 jsr166 1.19 } catch (InterruptedException ie) {
155 dl 1.15 throw ie;
156 jsr166 1.19 } catch (ExecutionException eex) {
157 dl 1.15 ee = eex;
158 jsr166 1.19 } catch (RuntimeException rex) {
159 dl 1.15 ee = new ExecutionException(rex);
160     }
161 dl 1.7 }
162 jsr166 1.20 }
163 dl 1.15
164     if (ee == null)
165     ee = new ExecutionException();
166     throw ee;
167    
168 dl 1.3 } finally {
169 jsr166 1.20 for (Future<T> f : futures)
170 dl 1.4 f.cancel(true);
171 dl 1.3 }
172     }
173    
174 dl 1.15 public <T> T invokeAny(Collection<Callable<T>> tasks)
175     throws InterruptedException, ExecutionException {
176     try {
177     return doInvokeAny(tasks, false, 0);
178     } catch (TimeoutException cannotHappen) {
179     assert false;
180     return null;
181     }
182     }
183    
184 jsr166 1.20 public <T> T invokeAny(Collection<Callable<T>> tasks,
185     long timeout, TimeUnit unit)
186 dl 1.7 throws InterruptedException, ExecutionException, TimeoutException {
187 dl 1.15 return doInvokeAny(tasks, true, unit.toNanos(timeout));
188 dl 1.3 }
189    
190 dl 1.9 public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
191 dl 1.3 throws InterruptedException {
192     if (tasks == null)
193     throw new NullPointerException();
194 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
195     boolean done = false;
196 dl 1.3 try {
197     for (Callable<T> t : tasks) {
198 peierls 1.21 RunnableFuture<T> f = newTaskFor(t);
199 dl 1.3 futures.add(f);
200     execute(f);
201     }
202 dl 1.6 for (Future<T> f : futures) {
203     if (!f.isDone()) {
204 jsr166 1.20 try {
205     f.get();
206 jsr166 1.19 } catch (CancellationException ignore) {
207     } catch (ExecutionException ignore) {
208 dl 1.6 }
209     }
210     }
211     done = true;
212 dl 1.3 return futures;
213     } finally {
214 dl 1.6 if (!done)
215 jsr166 1.20 for (Future<T> f : futures)
216 dl 1.4 f.cancel(true);
217 dl 1.3 }
218     }
219    
220 jsr166 1.20 public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
221     long timeout, TimeUnit unit)
222 dl 1.3 throws InterruptedException {
223     if (tasks == null || unit == null)
224     throw new NullPointerException();
225     long nanos = unit.toNanos(timeout);
226 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
227     boolean done = false;
228 dl 1.3 try {
229 jsr166 1.20 for (Callable<T> t : tasks)
230 peierls 1.21 futures.add(newTaskFor(t));
231 dl 1.16
232     long lastTime = System.nanoTime();
233    
234     // Interleave time checks and calls to execute in case
235     // executor doesn't have any/much parallelism.
236     Iterator<Future<T>> it = futures.iterator();
237     while (it.hasNext()) {
238     execute((Runnable)(it.next()));
239     long now = System.nanoTime();
240     nanos -= now - lastTime;
241     lastTime = now;
242     if (nanos <= 0)
243 jsr166 1.20 return futures;
244 dl 1.3 }
245 dl 1.16
246 dl 1.6 for (Future<T> f : futures) {
247     if (!f.isDone()) {
248 jsr166 1.20 if (nanos <= 0)
249     return futures;
250     try {
251     f.get(nanos, TimeUnit.NANOSECONDS);
252 jsr166 1.19 } catch (CancellationException ignore) {
253     } catch (ExecutionException ignore) {
254     } catch (TimeoutException toe) {
255 dl 1.6 return futures;
256     }
257 dl 1.8 long now = System.nanoTime();
258     nanos -= now - lastTime;
259     lastTime = now;
260 dl 1.6 }
261     }
262     done = true;
263 dl 1.3 return futures;
264     } finally {
265 dl 1.6 if (!done)
266 jsr166 1.20 for (Future<T> f : futures)
267 dl 1.4 f.cancel(true);
268 dl 1.3 }
269     }
270    
271 tim 1.1 }