ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.49
Committed: Wed Dec 3 23:53:45 2014 UTC (9 years, 6 months ago) by jsr166
Branch: MAIN
Changes since 1.48: +37 -37 lines
Log Message:
improve implementation of invokeAll

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import static java.util.concurrent.TimeUnit.NANOSECONDS;
10 import java.util.ArrayList;
11 import java.util.Collection;
12 import java.util.Iterator;
13 import java.util.List;
14
15 /**
16 * Provides default implementations of {@link ExecutorService}
17 * execution methods. This class implements the {@code submit},
18 * {@code invokeAny} and {@code invokeAll} methods using a
19 * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
20 * to the {@link FutureTask} class provided in this package. For example,
21 * the implementation of {@code submit(Runnable)} creates an
22 * associated {@code RunnableFuture} that is executed and
23 * returned. Subclasses may override the {@code newTaskFor} methods
24 * to return {@code RunnableFuture} implementations other than
25 * {@code FutureTask}.
26 *
27 * <p><b>Extension example</b>. Here is a sketch of a class
28 * that customizes {@link ThreadPoolExecutor} to use
29 * a {@code CustomTask} class instead of the default {@code FutureTask}:
30 * <pre> {@code
31 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
32 *
33 * static class CustomTask<V> implements RunnableFuture<V> {...}
34 *
35 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
36 * return new CustomTask<V>(c);
37 * }
38 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
39 * return new CustomTask<V>(r, v);
40 * }
41 * // ... add constructors, etc.
42 * }}</pre>
43 *
44 * @since 1.5
45 * @author Doug Lea
46 */
47 public abstract class AbstractExecutorService implements ExecutorService {
48
49 /**
50 * Returns a {@code RunnableFuture} for the given runnable and default
51 * value.
52 *
53 * @param runnable the runnable task being wrapped
54 * @param value the default value for the returned future
55 * @param <T> the type of the given value
56 * @return a {@code RunnableFuture} which, when run, will run the
57 * underlying runnable and which, as a {@code Future}, will yield
58 * the given value as its result and provide for cancellation of
59 * the underlying task
60 * @since 1.6
61 */
62 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
63 return new FutureTask<T>(runnable, value);
64 }
65
66 /**
67 * Returns a {@code RunnableFuture} for the given callable task.
68 *
69 * @param callable the callable task being wrapped
70 * @param <T> the type of the callable's result
71 * @return a {@code RunnableFuture} which, when run, will call the
72 * underlying callable and which, as a {@code Future}, will yield
73 * the callable's result as its result and provide for
74 * cancellation of the underlying task
75 * @since 1.6
76 */
77 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
78 return new FutureTask<T>(callable);
79 }
80
81 /**
82 * @throws RejectedExecutionException {@inheritDoc}
83 * @throws NullPointerException {@inheritDoc}
84 */
85 public Future<?> submit(Runnable task) {
86 if (task == null) throw new NullPointerException();
87 RunnableFuture<Void> ftask = newTaskFor(task, null);
88 execute(ftask);
89 return ftask;
90 }
91
92 /**
93 * @throws RejectedExecutionException {@inheritDoc}
94 * @throws NullPointerException {@inheritDoc}
95 */
96 public <T> Future<T> submit(Runnable task, T result) {
97 if (task == null) throw new NullPointerException();
98 RunnableFuture<T> ftask = newTaskFor(task, result);
99 execute(ftask);
100 return ftask;
101 }
102
103 /**
104 * @throws RejectedExecutionException {@inheritDoc}
105 * @throws NullPointerException {@inheritDoc}
106 */
107 public <T> Future<T> submit(Callable<T> task) {
108 if (task == null) throw new NullPointerException();
109 RunnableFuture<T> ftask = newTaskFor(task);
110 execute(ftask);
111 return ftask;
112 }
113
114 /**
115 * the main mechanics of invokeAny.
116 */
117 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
118 boolean timed, long nanos)
119 throws InterruptedException, ExecutionException, TimeoutException {
120 if (tasks == null)
121 throw new NullPointerException();
122 int ntasks = tasks.size();
123 if (ntasks == 0)
124 throw new IllegalArgumentException();
125 ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
126 ExecutorCompletionService<T> ecs =
127 new ExecutorCompletionService<T>(this);
128
129 // For efficiency, especially in executors with limited
130 // parallelism, check to see if previously submitted tasks are
131 // done before submitting more of them. This interleaving
132 // plus the exception mechanics account for messiness of main
133 // loop.
134
135 try {
136 // Record exceptions so that if we fail to obtain any
137 // result, we can throw the last exception we got.
138 ExecutionException ee = null;
139 final long deadline = timed ? System.nanoTime() + nanos : 0L;
140 Iterator<? extends Callable<T>> it = tasks.iterator();
141
142 // Start one task for sure; the rest incrementally
143 futures.add(ecs.submit(it.next()));
144 --ntasks;
145 int active = 1;
146
147 for (;;) {
148 Future<T> f = ecs.poll();
149 if (f == null) {
150 if (ntasks > 0) {
151 --ntasks;
152 futures.add(ecs.submit(it.next()));
153 ++active;
154 }
155 else if (active == 0)
156 break;
157 else if (timed) {
158 f = ecs.poll(nanos, NANOSECONDS);
159 if (f == null)
160 throw new TimeoutException();
161 nanos = deadline - System.nanoTime();
162 }
163 else
164 f = ecs.take();
165 }
166 if (f != null) {
167 --active;
168 try {
169 return f.get();
170 } catch (ExecutionException eex) {
171 ee = eex;
172 } catch (RuntimeException rex) {
173 ee = new ExecutionException(rex);
174 }
175 }
176 }
177
178 if (ee == null)
179 ee = new ExecutionException();
180 throw ee;
181
182 } finally {
183 cancelAll(futures);
184 }
185 }
186
187 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
188 throws InterruptedException, ExecutionException {
189 try {
190 return doInvokeAny(tasks, false, 0);
191 } catch (TimeoutException cannotHappen) {
192 assert false;
193 return null;
194 }
195 }
196
197 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
198 long timeout, TimeUnit unit)
199 throws InterruptedException, ExecutionException, TimeoutException {
200 return doInvokeAny(tasks, true, unit.toNanos(timeout));
201 }
202
203 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
204 throws InterruptedException {
205 if (tasks == null)
206 throw new NullPointerException();
207 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
208 try {
209 for (Callable<T> t : tasks) {
210 RunnableFuture<T> f = newTaskFor(t);
211 futures.add(f);
212 execute(f);
213 }
214 for (int i = 0, size = futures.size(); i < size; i++) {
215 Future<T> f = futures.get(i);
216 if (!f.isDone()) {
217 try { f.get(); }
218 catch (CancellationException ignore) {}
219 catch (ExecutionException ignore) {}
220 }
221 }
222 return futures;
223 } catch (Throwable t) {
224 cancelAll(futures);
225 throw t;
226 }
227 }
228
229 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
230 long timeout, TimeUnit unit)
231 throws InterruptedException {
232 if (tasks == null)
233 throw new NullPointerException();
234 final long nanos = unit.toNanos(timeout);
235 final long deadline = System.nanoTime() + nanos;
236 ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
237 int j = 0;
238 timedOut: try {
239 for (Callable<T> t : tasks)
240 futures.add(newTaskFor(t));
241
242 final int size = futures.size();
243
244 // Interleave time checks and calls to execute in case
245 // executor doesn't have any/much parallelism.
246 for (int i = 0; i < size; i++) {
247 if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
248 break timedOut;
249 execute((Runnable)futures.get(i));
250 }
251
252 for (; j < size; j++) {
253 Future<T> f = futures.get(j);
254 if (!f.isDone()) {
255 try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
256 catch (CancellationException ignore) {}
257 catch (ExecutionException ignore) {}
258 catch (TimeoutException timedOut) {
259 break timedOut;
260 }
261 }
262 }
263 return futures;
264 } catch (Throwable t) {
265 cancelAll(futures);
266 throw t;
267 }
268 // Timed out before all the tasks could be completed; cancel remaining
269 cancelAll(futures, j);
270 return futures;
271 }
272
273 private static <T> void cancelAll(ArrayList<Future<T>> futures) {
274 cancelAll(futures, 0);
275 }
276
277 /** Cancels all futures with index at least j. */
278 private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
279 for (int size = futures.size(); j < size; j++)
280 futures.get(j).cancel(true);
281 }
282 }