ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/AbstractExecutorService.java
Revision: 1.8
Committed: Sun Jan 18 20:17:32 2015 UTC (9 years, 3 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.7: +1 -0 lines
Log Message:
exactly one blank line before and after package statements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.*;
10
11 /**
12 * Provides default implementations of {@link ExecutorService}
13 * execution methods. This class implements the {@code submit},
14 * {@code invokeAny} and {@code invokeAll} methods using a
15 * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
16 * to the {@link FutureTask} class provided in this package. For example,
17 * the implementation of {@code submit(Runnable)} creates an
18 * associated {@code RunnableFuture} that is executed and
19 * returned. Subclasses may override the {@code newTaskFor} methods
20 * to return {@code RunnableFuture} implementations other than
21 * {@code FutureTask}.
22 *
23 * <p><b>Extension example</b>. Here is a sketch of a class
24 * that customizes {@link ThreadPoolExecutor} to use
25 * a {@code CustomTask} class instead of the default {@code FutureTask}:
26 * <pre> {@code
27 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
28 *
29 * static class CustomTask<V> implements RunnableFuture<V> {...}
30 *
31 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
32 * return new CustomTask<V>(c);
33 * }
34 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
35 * return new CustomTask<V>(r, v);
36 * }
37 * // ... add constructors, etc.
38 * }}</pre>
39 *
40 * @since 1.5
41 * @author Doug Lea
42 */
43 public abstract class AbstractExecutorService implements ExecutorService {
44
45 /**
46 * Returns a {@code RunnableFuture} for the given runnable and default
47 * value.
48 *
49 * @param runnable the runnable task being wrapped
50 * @param value the default value for the returned future
51 * @param <T> the type of the given value
52 * @return a {@code RunnableFuture} which, when run, will run the
53 * underlying runnable and which, as a {@code Future}, will yield
54 * the given value as its result and provide for cancellation of
55 * the underlying task
56 * @since 1.6
57 */
58 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
59 return new FutureTask<T>(runnable, value);
60 }
61
62 /**
63 * Returns a {@code RunnableFuture} for the given callable task.
64 *
65 * @param callable the callable task being wrapped
66 * @param <T> the type of the callable's result
67 * @return a {@code RunnableFuture} which, when run, will call the
68 * underlying callable and which, as a {@code Future}, will yield
69 * the callable's result as its result and provide for
70 * cancellation of the underlying task
71 * @since 1.6
72 */
73 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
74 return new FutureTask<T>(callable);
75 }
76
77 /**
78 * @throws RejectedExecutionException {@inheritDoc}
79 * @throws NullPointerException {@inheritDoc}
80 */
81 public Future<?> submit(Runnable task) {
82 if (task == null) throw new NullPointerException();
83 RunnableFuture<Void> ftask = newTaskFor(task, null);
84 execute(ftask);
85 return ftask;
86 }
87
88 /**
89 * @throws RejectedExecutionException {@inheritDoc}
90 * @throws NullPointerException {@inheritDoc}
91 */
92 public <T> Future<T> submit(Runnable task, T result) {
93 if (task == null) throw new NullPointerException();
94 RunnableFuture<T> ftask = newTaskFor(task, result);
95 execute(ftask);
96 return ftask;
97 }
98
99 /**
100 * @throws RejectedExecutionException {@inheritDoc}
101 * @throws NullPointerException {@inheritDoc}
102 */
103 public <T> Future<T> submit(Callable<T> task) {
104 if (task == null) throw new NullPointerException();
105 RunnableFuture<T> ftask = newTaskFor(task);
106 execute(ftask);
107 return ftask;
108 }
109
110 /**
111 * the main mechanics of invokeAny.
112 */
113 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
114 boolean timed, long nanos)
115 throws InterruptedException, ExecutionException, TimeoutException {
116 if (tasks == null)
117 throw new NullPointerException();
118 int ntasks = tasks.size();
119 if (ntasks == 0)
120 throw new IllegalArgumentException();
121 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
122 ExecutorCompletionService<T> ecs =
123 new ExecutorCompletionService<T>(this);
124
125 // For efficiency, especially in executors with limited
126 // parallelism, check to see if previously submitted tasks are
127 // done before submitting more of them. This interleaving
128 // plus the exception mechanics account for messiness of main
129 // loop.
130
131 try {
132 // Record exceptions so that if we fail to obtain any
133 // result, we can throw the last exception we got.
134 ExecutionException ee = null;
135 final long deadline = timed ? System.nanoTime() + nanos : 0L;
136 Iterator<? extends Callable<T>> it = tasks.iterator();
137
138 // Start one task for sure; the rest incrementally
139 futures.add(ecs.submit(it.next()));
140 --ntasks;
141 int active = 1;
142
143 for (;;) {
144 Future<T> f = ecs.poll();
145 if (f == null) {
146 if (ntasks > 0) {
147 --ntasks;
148 futures.add(ecs.submit(it.next()));
149 ++active;
150 }
151 else if (active == 0)
152 break;
153 else if (timed) {
154 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
155 if (f == null)
156 throw new TimeoutException();
157 nanos = deadline - System.nanoTime();
158 }
159 else
160 f = ecs.take();
161 }
162 if (f != null) {
163 --active;
164 try {
165 return f.get();
166 } catch (ExecutionException eex) {
167 ee = eex;
168 } catch (RuntimeException rex) {
169 ee = new ExecutionException(rex);
170 }
171 }
172 }
173
174 if (ee == null)
175 ee = new ExecutionException();
176 throw ee;
177
178 } finally {
179 for (int i = 0, size = futures.size(); i < size; i++)
180 futures.get(i).cancel(true);
181 }
182 }
183
184 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
185 throws InterruptedException, ExecutionException {
186 try {
187 return doInvokeAny(tasks, false, 0);
188 } catch (TimeoutException cannotHappen) {
189 assert false;
190 return null;
191 }
192 }
193
194 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
195 long timeout, TimeUnit unit)
196 throws InterruptedException, ExecutionException, TimeoutException {
197 return doInvokeAny(tasks, true, unit.toNanos(timeout));
198 }
199
200 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
201 throws InterruptedException {
202 if (tasks == null)
203 throw new NullPointerException();
204 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
205 boolean done = false;
206 try {
207 for (Callable<T> t : tasks) {
208 RunnableFuture<T> f = newTaskFor(t);
209 futures.add(f);
210 execute(f);
211 }
212 for (int i = 0, size = futures.size(); i < size; i++) {
213 Future<T> f = futures.get(i);
214 if (!f.isDone()) {
215 try {
216 f.get();
217 } catch (CancellationException ignore) {
218 } catch (ExecutionException ignore) {
219 }
220 }
221 }
222 done = true;
223 return futures;
224 } finally {
225 if (!done)
226 for (int i = 0, size = futures.size(); i < size; i++)
227 futures.get(i).cancel(true);
228 }
229 }
230
231 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
232 long timeout, TimeUnit unit)
233 throws InterruptedException {
234 if (tasks == null)
235 throw new NullPointerException();
236 long nanos = unit.toNanos(timeout);
237 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
238 boolean done = false;
239 try {
240 for (Callable<T> t : tasks)
241 futures.add(newTaskFor(t));
242
243 final long deadline = System.nanoTime() + nanos;
244 final int size = futures.size();
245
246 // Interleave time checks and calls to execute in case
247 // executor doesn't have any/much parallelism.
248 for (int i = 0; i < size; i++) {
249 execute((Runnable)futures.get(i));
250 nanos = deadline - System.nanoTime();
251 if (nanos <= 0L)
252 return futures;
253 }
254
255 for (int i = 0; i < size; i++) {
256 Future<T> f = futures.get(i);
257 if (!f.isDone()) {
258 if (nanos <= 0L)
259 return futures;
260 try {
261 f.get(nanos, TimeUnit.NANOSECONDS);
262 } catch (CancellationException ignore) {
263 } catch (ExecutionException ignore) {
264 } catch (TimeoutException toe) {
265 return futures;
266 }
267 nanos = deadline - System.nanoTime();
268 }
269 }
270 done = true;
271 return futures;
272 } finally {
273 if (!done)
274 for (int i = 0, size = futures.size(); i < size; i++)
275 futures.get(i).cancel(true);
276 }
277 }
278
279 }