ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.27
Committed: Tue Feb 7 20:54:24 2006 UTC (18 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.26: +0 -1 lines
Log Message:
6378729: Remove workaround for 6280605

File Contents

# User Rev Content
1 tim 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.17 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.3 import java.util.*;
9 tim 1.1
10     /**
11 jsr166 1.24 * Provides default implementations of {@link ExecutorService}
12 dl 1.12 * execution methods. This class implements the <tt>submit</tt>,
13 peierls 1.21 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
14     * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
15     * to the {@link FutureTask} class provided in this package. For example,
16 dl 1.18 * the implementation of <tt>submit(Runnable)</tt> creates an
17 peierls 1.21 * associated <tt>RunnableFuture</tt> that is executed and
18     * returned. Subclasses may override the <tt>newTaskFor</tt> methods
19 jsr166 1.24 * to return <tt>RunnableFuture</tt> implementations other than
20 peierls 1.21 * <tt>FutureTask</tt>.
21 tim 1.1 *
22 dl 1.22 * <p> <b>Extension example</b>. Here is a sketch of a class
23     * that customizes {@link ThreadPoolExecutor} to use
24     * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
25     * <pre>
26     * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
27     *
28 jsr166 1.25 * static class CustomTask&lt;V&gt; implements RunnableFuture&lt;V&gt; {...}
29 dl 1.22 *
30 jsr166 1.25 * protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Callable&lt;V&gt; c) {
31     * return new CustomTask&lt;V&gt;(c);
32     * }
33     * protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Runnable r, V v) {
34     * return new CustomTask&lt;V&gt;(r, v);
35     * }
36     * // ... add constructors, etc.
37 dl 1.22 * }
38     * </pre>
39 tim 1.1 * @since 1.5
40     * @author Doug Lea
41     */
42     public abstract class AbstractExecutorService implements ExecutorService {
43    
44 peierls 1.21 /**
45     * Returns a <tt>RunnableFuture</tt> for the given runnable and default
46     * value.
47 jsr166 1.24 *
48 peierls 1.21 * @param runnable the runnable task being wrapped
49     * @param value the default value for the returned future
50 dl 1.22 * @return a <tt>RunnableFuture</tt> which when run will run the
51     * underlying runnable and which, as a <tt>Future</tt>, will yield
52     * the given value as its result and provide for cancellation of
53     * the underlying task.
54     * @since 1.6
55 peierls 1.21 */
56     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
57     return new FutureTask<T>(runnable, value);
58     }
59    
60     /**
61     * Returns a <tt>RunnableFuture</tt> for the given callable task.
62 jsr166 1.24 *
63 peierls 1.21 * @param callable the callable task being wrapped
64 dl 1.22 * @return a <tt>RunnableFuture</tt> which when run will call the
65     * underlying callable and which, as a <tt>Future</tt>, will yield
66     * the callable's result as its result and provide for
67     * cancellation of the underlying task.
68     * @since 1.6
69 peierls 1.21 */
70     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
71     return new FutureTask<T>(callable);
72     }
73    
74 dl 1.11 public Future<?> submit(Runnable task) {
75 dl 1.13 if (task == null) throw new NullPointerException();
76 peierls 1.21 RunnableFuture<Object> ftask = newTaskFor(task, null);
77 tim 1.1 execute(ftask);
78     return ftask;
79     }
80    
81 dl 1.13 public <T> Future<T> submit(Runnable task, T result) {
82     if (task == null) throw new NullPointerException();
83 peierls 1.21 RunnableFuture<T> ftask = newTaskFor(task, result);
84 dl 1.13 execute(ftask);
85     return ftask;
86     }
87    
88 tim 1.1 public <T> Future<T> submit(Callable<T> task) {
89 dl 1.13 if (task == null) throw new NullPointerException();
90 peierls 1.21 RunnableFuture<T> ftask = newTaskFor(task);
91 tim 1.1 execute(ftask);
92     return ftask;
93     }
94    
95 dl 1.15 /**
96     * the main mechanics of invokeAny.
97     */
98 jsr166 1.26 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
99 dl 1.15 boolean timed, long nanos)
100     throws InterruptedException, ExecutionException, TimeoutException {
101 dl 1.3 if (tasks == null)
102     throw new NullPointerException();
103 dl 1.15 int ntasks = tasks.size();
104     if (ntasks == 0)
105 dl 1.7 throw new IllegalArgumentException();
106 dl 1.15 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
107 jsr166 1.20 ExecutorCompletionService<T> ecs =
108 dl 1.7 new ExecutorCompletionService<T>(this);
109 dl 1.15
110     // For efficiency, especially in executors with limited
111     // parallelism, check to see if previously submitted tasks are
112 dl 1.18 // done before submitting more of them. This interleaving
113 dl 1.15 // plus the exception mechanics account for messiness of main
114 dl 1.18 // loop.
115 dl 1.15
116 dl 1.3 try {
117 dl 1.15 // Record exceptions so that if we fail to obtain any
118     // result, we can throw the last exception we got.
119 dl 1.7 ExecutionException ee = null;
120 dl 1.15 long lastTime = (timed)? System.nanoTime() : 0;
121 jsr166 1.26 Iterator<? extends Callable<T>> it = tasks.iterator();
122 dl 1.15
123     // Start one task for sure; the rest incrementally
124     futures.add(ecs.submit(it.next()));
125     --ntasks;
126     int active = 1;
127    
128     for (;;) {
129 jsr166 1.20 Future<T> f = ecs.poll();
130 dl 1.15 if (f == null) {
131     if (ntasks > 0) {
132     --ntasks;
133     futures.add(ecs.submit(it.next()));
134     ++active;
135     }
136 jsr166 1.20 else if (active == 0)
137 dl 1.15 break;
138     else if (timed) {
139     f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
140     if (f == null)
141     throw new TimeoutException();
142     long now = System.nanoTime();
143     nanos -= now - lastTime;
144     lastTime = now;
145     }
146 jsr166 1.20 else
147 dl 1.15 f = ecs.take();
148     }
149     if (f != null) {
150     --active;
151     try {
152     return f.get();
153 jsr166 1.19 } catch (InterruptedException ie) {
154 dl 1.15 throw ie;
155 jsr166 1.19 } catch (ExecutionException eex) {
156 dl 1.15 ee = eex;
157 jsr166 1.19 } catch (RuntimeException rex) {
158 dl 1.15 ee = new ExecutionException(rex);
159     }
160 dl 1.7 }
161 jsr166 1.20 }
162 dl 1.15
163     if (ee == null)
164     ee = new ExecutionException();
165     throw ee;
166    
167 dl 1.3 } finally {
168 jsr166 1.20 for (Future<T> f : futures)
169 dl 1.4 f.cancel(true);
170 dl 1.3 }
171     }
172    
173 jsr166 1.26 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
174 dl 1.15 throws InterruptedException, ExecutionException {
175     try {
176     return doInvokeAny(tasks, false, 0);
177     } catch (TimeoutException cannotHappen) {
178     assert false;
179     return null;
180     }
181     }
182    
183 jsr166 1.26 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
184 jsr166 1.20 long timeout, TimeUnit unit)
185 dl 1.7 throws InterruptedException, ExecutionException, TimeoutException {
186 dl 1.15 return doInvokeAny(tasks, true, unit.toNanos(timeout));
187 dl 1.3 }
188    
189 jsr166 1.26 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
190 dl 1.3 throws InterruptedException {
191     if (tasks == null)
192     throw new NullPointerException();
193 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
194     boolean done = false;
195 dl 1.3 try {
196     for (Callable<T> t : tasks) {
197 peierls 1.21 RunnableFuture<T> f = newTaskFor(t);
198 dl 1.3 futures.add(f);
199     execute(f);
200     }
201 dl 1.6 for (Future<T> f : futures) {
202     if (!f.isDone()) {
203 jsr166 1.20 try {
204     f.get();
205 jsr166 1.19 } catch (CancellationException ignore) {
206     } catch (ExecutionException ignore) {
207 dl 1.6 }
208     }
209     }
210     done = true;
211 dl 1.3 return futures;
212     } finally {
213 dl 1.6 if (!done)
214 jsr166 1.20 for (Future<T> f : futures)
215 dl 1.4 f.cancel(true);
216 dl 1.3 }
217     }
218    
219 jsr166 1.26 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
220 jsr166 1.20 long timeout, TimeUnit unit)
221 dl 1.3 throws InterruptedException {
222     if (tasks == null || unit == null)
223     throw new NullPointerException();
224     long nanos = unit.toNanos(timeout);
225 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
226     boolean done = false;
227 dl 1.3 try {
228 jsr166 1.20 for (Callable<T> t : tasks)
229 peierls 1.21 futures.add(newTaskFor(t));
230 dl 1.16
231     long lastTime = System.nanoTime();
232    
233     // Interleave time checks and calls to execute in case
234     // executor doesn't have any/much parallelism.
235     Iterator<Future<T>> it = futures.iterator();
236     while (it.hasNext()) {
237     execute((Runnable)(it.next()));
238     long now = System.nanoTime();
239     nanos -= now - lastTime;
240     lastTime = now;
241     if (nanos <= 0)
242 jsr166 1.20 return futures;
243 dl 1.3 }
244 dl 1.16
245 dl 1.6 for (Future<T> f : futures) {
246     if (!f.isDone()) {
247 jsr166 1.20 if (nanos <= 0)
248     return futures;
249     try {
250     f.get(nanos, TimeUnit.NANOSECONDS);
251 jsr166 1.19 } catch (CancellationException ignore) {
252     } catch (ExecutionException ignore) {
253     } catch (TimeoutException toe) {
254 dl 1.6 return futures;
255     }
256 dl 1.8 long now = System.nanoTime();
257     nanos -= now - lastTime;
258     lastTime = now;
259 dl 1.6 }
260     }
261     done = true;
262 dl 1.3 return futures;
263     } finally {
264 dl 1.6 if (!done)
265 jsr166 1.20 for (Future<T> f : futures)
266 dl 1.4 f.cancel(true);
267 dl 1.3 }
268     }
269    
270 tim 1.1 }