ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.28
Committed: Sun Jun 25 17:42:22 2006 UTC (17 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.27: +1 -1 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8 import java.util.*;
9
10 /**
11 * Provides default implementations of {@link ExecutorService}
12 * execution methods. This class implements the <tt>submit</tt>,
13 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
14 * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
15 * to the {@link FutureTask} class provided in this package. For example,
16 * the implementation of <tt>submit(Runnable)</tt> creates an
17 * associated <tt>RunnableFuture</tt> that is executed and
18 * returned. Subclasses may override the <tt>newTaskFor</tt> methods
19 * to return <tt>RunnableFuture</tt> implementations other than
20 * <tt>FutureTask</tt>.
21 *
22 * <p> <b>Extension example</b>. Here is a sketch of a class
23 * that customizes {@link ThreadPoolExecutor} to use
24 * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
25 * <pre>
26 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
27 *
28 * static class CustomTask&lt;V&gt; implements RunnableFuture&lt;V&gt; {...}
29 *
30 * protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Callable&lt;V&gt; c) {
31 * return new CustomTask&lt;V&gt;(c);
32 * }
33 * protected &lt;V&gt; RunnableFuture&lt;V&gt; newTaskFor(Runnable r, V v) {
34 * return new CustomTask&lt;V&gt;(r, v);
35 * }
36 * // ... add constructors, etc.
37 * }
38 * </pre>
39 * @since 1.5
40 * @author Doug Lea
41 */
42 public abstract class AbstractExecutorService implements ExecutorService {
43
44 /**
45 * Returns a <tt>RunnableFuture</tt> for the given runnable and default
46 * value.
47 *
48 * @param runnable the runnable task being wrapped
49 * @param value the default value for the returned future
50 * @return a <tt>RunnableFuture</tt> which when run will run the
51 * underlying runnable and which, as a <tt>Future</tt>, will yield
52 * the given value as its result and provide for cancellation of
53 * the underlying task.
54 * @since 1.6
55 */
56 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
57 return new FutureTask<T>(runnable, value);
58 }
59
60 /**
61 * Returns a <tt>RunnableFuture</tt> for the given callable task.
62 *
63 * @param callable the callable task being wrapped
64 * @return a <tt>RunnableFuture</tt> which when run will call the
65 * underlying callable and which, as a <tt>Future</tt>, will yield
66 * the callable's result as its result and provide for
67 * cancellation of the underlying task.
68 * @since 1.6
69 */
70 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
71 return new FutureTask<T>(callable);
72 }
73
74 public Future<?> submit(Runnable task) {
75 if (task == null) throw new NullPointerException();
76 RunnableFuture<Object> ftask = newTaskFor(task, null);
77 execute(ftask);
78 return ftask;
79 }
80
81 public <T> Future<T> submit(Runnable task, T result) {
82 if (task == null) throw new NullPointerException();
83 RunnableFuture<T> ftask = newTaskFor(task, result);
84 execute(ftask);
85 return ftask;
86 }
87
88 public <T> Future<T> submit(Callable<T> task) {
89 if (task == null) throw new NullPointerException();
90 RunnableFuture<T> ftask = newTaskFor(task);
91 execute(ftask);
92 return ftask;
93 }
94
95 /**
96 * the main mechanics of invokeAny.
97 */
98 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
99 boolean timed, long nanos)
100 throws InterruptedException, ExecutionException, TimeoutException {
101 if (tasks == null)
102 throw new NullPointerException();
103 int ntasks = tasks.size();
104 if (ntasks == 0)
105 throw new IllegalArgumentException();
106 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
107 ExecutorCompletionService<T> ecs =
108 new ExecutorCompletionService<T>(this);
109
110 // For efficiency, especially in executors with limited
111 // parallelism, check to see if previously submitted tasks are
112 // done before submitting more of them. This interleaving
113 // plus the exception mechanics account for messiness of main
114 // loop.
115
116 try {
117 // Record exceptions so that if we fail to obtain any
118 // result, we can throw the last exception we got.
119 ExecutionException ee = null;
120 long lastTime = (timed)? System.nanoTime() : 0;
121 Iterator<? extends Callable<T>> it = tasks.iterator();
122
123 // Start one task for sure; the rest incrementally
124 futures.add(ecs.submit(it.next()));
125 --ntasks;
126 int active = 1;
127
128 for (;;) {
129 Future<T> f = ecs.poll();
130 if (f == null) {
131 if (ntasks > 0) {
132 --ntasks;
133 futures.add(ecs.submit(it.next()));
134 ++active;
135 }
136 else if (active == 0)
137 break;
138 else if (timed) {
139 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
140 if (f == null)
141 throw new TimeoutException();
142 long now = System.nanoTime();
143 nanos -= now - lastTime;
144 lastTime = now;
145 }
146 else
147 f = ecs.take();
148 }
149 if (f != null) {
150 --active;
151 try {
152 return f.get();
153 } catch (InterruptedException ie) {
154 throw ie;
155 } catch (ExecutionException eex) {
156 ee = eex;
157 } catch (RuntimeException rex) {
158 ee = new ExecutionException(rex);
159 }
160 }
161 }
162
163 if (ee == null)
164 ee = new ExecutionException();
165 throw ee;
166
167 } finally {
168 for (Future<T> f : futures)
169 f.cancel(true);
170 }
171 }
172
173 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
174 throws InterruptedException, ExecutionException {
175 try {
176 return doInvokeAny(tasks, false, 0);
177 } catch (TimeoutException cannotHappen) {
178 assert false;
179 return null;
180 }
181 }
182
183 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
184 long timeout, TimeUnit unit)
185 throws InterruptedException, ExecutionException, TimeoutException {
186 return doInvokeAny(tasks, true, unit.toNanos(timeout));
187 }
188
189 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
190 throws InterruptedException {
191 if (tasks == null)
192 throw new NullPointerException();
193 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
194 boolean done = false;
195 try {
196 for (Callable<T> t : tasks) {
197 RunnableFuture<T> f = newTaskFor(t);
198 futures.add(f);
199 execute(f);
200 }
201 for (Future<T> f : futures) {
202 if (!f.isDone()) {
203 try {
204 f.get();
205 } catch (CancellationException ignore) {
206 } catch (ExecutionException ignore) {
207 }
208 }
209 }
210 done = true;
211 return futures;
212 } finally {
213 if (!done)
214 for (Future<T> f : futures)
215 f.cancel(true);
216 }
217 }
218
219 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
220 long timeout, TimeUnit unit)
221 throws InterruptedException {
222 if (tasks == null || unit == null)
223 throw new NullPointerException();
224 long nanos = unit.toNanos(timeout);
225 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
226 boolean done = false;
227 try {
228 for (Callable<T> t : tasks)
229 futures.add(newTaskFor(t));
230
231 long lastTime = System.nanoTime();
232
233 // Interleave time checks and calls to execute in case
234 // executor doesn't have any/much parallelism.
235 Iterator<Future<T>> it = futures.iterator();
236 while (it.hasNext()) {
237 execute((Runnable)(it.next()));
238 long now = System.nanoTime();
239 nanos -= now - lastTime;
240 lastTime = now;
241 if (nanos <= 0)
242 return futures;
243 }
244
245 for (Future<T> f : futures) {
246 if (!f.isDone()) {
247 if (nanos <= 0)
248 return futures;
249 try {
250 f.get(nanos, TimeUnit.NANOSECONDS);
251 } catch (CancellationException ignore) {
252 } catch (ExecutionException ignore) {
253 } catch (TimeoutException toe) {
254 return futures;
255 }
256 long now = System.nanoTime();
257 nanos -= now - lastTime;
258 lastTime = now;
259 }
260 }
261 done = true;
262 return futures;
263 } finally {
264 if (!done)
265 for (Future<T> f : futures)
266 f.cancel(true);
267 }
268 }
269
270 }