ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.46
Committed: Thu Jul 18 17:13:42 2013 UTC (10 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.45: +2 -1 lines
Log Message:
doclint warning fixes

File Contents

# User Rev Content
1 tim 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.17 * Expert Group and released to the public domain, as explained at
4 jsr166 1.35 * http://creativecommons.org/publicdomain/zero/1.0/
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.3 import java.util.*;
9 tim 1.1
10     /**
11 jsr166 1.24 * Provides default implementations of {@link ExecutorService}
12 jsr166 1.40 * execution methods. This class implements the {@code submit},
13     * {@code invokeAny} and {@code invokeAll} methods using a
14     * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
15 peierls 1.21 * to the {@link FutureTask} class provided in this package. For example,
16 jsr166 1.40 * the implementation of {@code submit(Runnable)} creates an
17     * associated {@code RunnableFuture} that is executed and
18     * returned. Subclasses may override the {@code newTaskFor} methods
19     * to return {@code RunnableFuture} implementations other than
20     * {@code FutureTask}.
21 tim 1.1 *
22 jsr166 1.39 * <p><b>Extension example</b>. Here is a sketch of a class
23 dl 1.22 * that customizes {@link ThreadPoolExecutor} to use
24 jsr166 1.40 * a {@code CustomTask} class instead of the default {@code FutureTask}:
25 jsr166 1.32 * <pre> {@code
26 dl 1.22 * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
27     *
28 jsr166 1.32 * static class CustomTask<V> implements RunnableFuture<V> {...}
29 dl 1.22 *
30 jsr166 1.32 * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
31     * return new CustomTask<V>(c);
32 jsr166 1.25 * }
33 jsr166 1.32 * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
34     * return new CustomTask<V>(r, v);
35 jsr166 1.25 * }
36     * // ... add constructors, etc.
37 jsr166 1.32 * }}</pre>
38     *
39 tim 1.1 * @since 1.5
40     * @author Doug Lea
41     */
42     public abstract class AbstractExecutorService implements ExecutorService {
43    
44 peierls 1.21 /**
45 jsr166 1.40 * Returns a {@code RunnableFuture} for the given runnable and default
46 peierls 1.21 * value.
47 jsr166 1.24 *
48 peierls 1.21 * @param runnable the runnable task being wrapped
49     * @param value the default value for the returned future
50 jsr166 1.46 * @param <T> the type of the given value
51 jsr166 1.44 * @return a {@code RunnableFuture} which, when run, will run the
52 jsr166 1.40 * underlying runnable and which, as a {@code Future}, will yield
53 dl 1.22 * the given value as its result and provide for cancellation of
54 jsr166 1.43 * the underlying task
55 dl 1.22 * @since 1.6
56 peierls 1.21 */
57     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
58     return new FutureTask<T>(runnable, value);
59     }
60    
61     /**
62 jsr166 1.40 * Returns a {@code RunnableFuture} for the given callable task.
63 jsr166 1.24 *
64 peierls 1.21 * @param callable the callable task being wrapped
65 jsr166 1.46 * @param <T> the type of the callable's result
66 jsr166 1.44 * @return a {@code RunnableFuture} which, when run, will call the
67 jsr166 1.40 * underlying callable and which, as a {@code Future}, will yield
68 dl 1.22 * the callable's result as its result and provide for
69 jsr166 1.43 * cancellation of the underlying task
70 dl 1.22 * @since 1.6
71 peierls 1.21 */
72     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
73     return new FutureTask<T>(callable);
74     }
75    
76 jsr166 1.30 /**
77     * @throws RejectedExecutionException {@inheritDoc}
78     * @throws NullPointerException {@inheritDoc}
79     */
80 dl 1.11 public Future<?> submit(Runnable task) {
81 dl 1.13 if (task == null) throw new NullPointerException();
82 jsr166 1.33 RunnableFuture<Void> ftask = newTaskFor(task, null);
83 tim 1.1 execute(ftask);
84     return ftask;
85     }
86    
87 jsr166 1.30 /**
88     * @throws RejectedExecutionException {@inheritDoc}
89     * @throws NullPointerException {@inheritDoc}
90     */
91 dl 1.13 public <T> Future<T> submit(Runnable task, T result) {
92     if (task == null) throw new NullPointerException();
93 peierls 1.21 RunnableFuture<T> ftask = newTaskFor(task, result);
94 dl 1.13 execute(ftask);
95     return ftask;
96     }
97    
98 jsr166 1.30 /**
99     * @throws RejectedExecutionException {@inheritDoc}
100     * @throws NullPointerException {@inheritDoc}
101     */
102 tim 1.1 public <T> Future<T> submit(Callable<T> task) {
103 dl 1.13 if (task == null) throw new NullPointerException();
104 peierls 1.21 RunnableFuture<T> ftask = newTaskFor(task);
105 tim 1.1 execute(ftask);
106     return ftask;
107     }
108    
109 dl 1.15 /**
110     * the main mechanics of invokeAny.
111     */
112 jsr166 1.26 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
113 jsr166 1.36 boolean timed, long nanos)
114 dl 1.15 throws InterruptedException, ExecutionException, TimeoutException {
115 dl 1.3 if (tasks == null)
116     throw new NullPointerException();
117 dl 1.15 int ntasks = tasks.size();
118     if (ntasks == 0)
119 dl 1.7 throw new IllegalArgumentException();
120 jsr166 1.42 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
121 jsr166 1.20 ExecutorCompletionService<T> ecs =
122 dl 1.7 new ExecutorCompletionService<T>(this);
123 dl 1.15
124     // For efficiency, especially in executors with limited
125     // parallelism, check to see if previously submitted tasks are
126 dl 1.18 // done before submitting more of them. This interleaving
127 dl 1.15 // plus the exception mechanics account for messiness of main
128 dl 1.18 // loop.
129 dl 1.15
130 dl 1.3 try {
131 dl 1.15 // Record exceptions so that if we fail to obtain any
132     // result, we can throw the last exception we got.
133 dl 1.7 ExecutionException ee = null;
134 jsr166 1.37 final long deadline = timed ? System.nanoTime() + nanos : 0L;
135 jsr166 1.26 Iterator<? extends Callable<T>> it = tasks.iterator();
136 dl 1.15
137     // Start one task for sure; the rest incrementally
138     futures.add(ecs.submit(it.next()));
139     --ntasks;
140     int active = 1;
141    
142     for (;;) {
143 jsr166 1.20 Future<T> f = ecs.poll();
144 dl 1.15 if (f == null) {
145     if (ntasks > 0) {
146     --ntasks;
147     futures.add(ecs.submit(it.next()));
148     ++active;
149     }
150 jsr166 1.20 else if (active == 0)
151 dl 1.15 break;
152     else if (timed) {
153     f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
154     if (f == null)
155     throw new TimeoutException();
156 jsr166 1.37 nanos = deadline - System.nanoTime();
157 dl 1.15 }
158 jsr166 1.20 else
159 dl 1.15 f = ecs.take();
160     }
161     if (f != null) {
162     --active;
163     try {
164     return f.get();
165 jsr166 1.19 } catch (ExecutionException eex) {
166 dl 1.15 ee = eex;
167 jsr166 1.19 } catch (RuntimeException rex) {
168 dl 1.15 ee = new ExecutionException(rex);
169     }
170 dl 1.7 }
171 jsr166 1.20 }
172 dl 1.15
173     if (ee == null)
174     ee = new ExecutionException();
175     throw ee;
176    
177 dl 1.3 } finally {
178 jsr166 1.42 for (int i = 0, size = futures.size(); i < size; i++)
179     futures.get(i).cancel(true);
180 dl 1.3 }
181     }
182    
183 jsr166 1.26 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
184 dl 1.15 throws InterruptedException, ExecutionException {
185     try {
186     return doInvokeAny(tasks, false, 0);
187     } catch (TimeoutException cannotHappen) {
188     assert false;
189     return null;
190     }
191     }
192    
193 jsr166 1.26 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
194 jsr166 1.20 long timeout, TimeUnit unit)
195 dl 1.7 throws InterruptedException, ExecutionException, TimeoutException {
196 dl 1.15 return doInvokeAny(tasks, true, unit.toNanos(timeout));
197 dl 1.3 }
198    
199 jsr166 1.26 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
200 dl 1.3 throws InterruptedException {
201     if (tasks == null)
202     throw new NullPointerException();
203 jsr166 1.42 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
204 dl 1.6 boolean done = false;
205 dl 1.3 try {
206     for (Callable<T> t : tasks) {
207 peierls 1.21 RunnableFuture<T> f = newTaskFor(t);
208 dl 1.3 futures.add(f);
209     execute(f);
210     }
211 jsr166 1.42 for (int i = 0, size = futures.size(); i < size; i++) {
212     Future<T> f = futures.get(i);
213 dl 1.6 if (!f.isDone()) {
214 jsr166 1.20 try {
215     f.get();
216 jsr166 1.19 } catch (CancellationException ignore) {
217     } catch (ExecutionException ignore) {
218 dl 1.6 }
219     }
220     }
221     done = true;
222 dl 1.3 return futures;
223     } finally {
224 dl 1.6 if (!done)
225 jsr166 1.42 for (int i = 0, size = futures.size(); i < size; i++)
226     futures.get(i).cancel(true);
227 dl 1.3 }
228     }
229    
230 jsr166 1.26 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
231 jsr166 1.20 long timeout, TimeUnit unit)
232 dl 1.3 throws InterruptedException {
233 jsr166 1.38 if (tasks == null)
234 dl 1.3 throw new NullPointerException();
235     long nanos = unit.toNanos(timeout);
236 jsr166 1.42 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
237 dl 1.6 boolean done = false;
238 dl 1.3 try {
239 jsr166 1.20 for (Callable<T> t : tasks)
240 peierls 1.21 futures.add(newTaskFor(t));
241 dl 1.16
242 jsr166 1.37 final long deadline = System.nanoTime() + nanos;
243 jsr166 1.42 final int size = futures.size();
244 dl 1.16
245     // Interleave time checks and calls to execute in case
246     // executor doesn't have any/much parallelism.
247 jsr166 1.42 for (int i = 0; i < size; i++) {
248     execute((Runnable)futures.get(i));
249 jsr166 1.37 nanos = deadline - System.nanoTime();
250     if (nanos <= 0L)
251 jsr166 1.20 return futures;
252 dl 1.3 }
253 dl 1.16
254 jsr166 1.42 for (int i = 0; i < size; i++) {
255     Future<T> f = futures.get(i);
256 dl 1.6 if (!f.isDone()) {
257 jsr166 1.37 if (nanos <= 0L)
258 jsr166 1.20 return futures;
259     try {
260     f.get(nanos, TimeUnit.NANOSECONDS);
261 jsr166 1.19 } catch (CancellationException ignore) {
262     } catch (ExecutionException ignore) {
263     } catch (TimeoutException toe) {
264 dl 1.6 return futures;
265     }
266 jsr166 1.37 nanos = deadline - System.nanoTime();
267 dl 1.6 }
268     }
269     done = true;
270 dl 1.3 return futures;
271     } finally {
272 dl 1.6 if (!done)
273 jsr166 1.42 for (int i = 0, size = futures.size(); i < size; i++)
274     futures.get(i).cancel(true);
275 dl 1.3 }
276     }
277    
278 tim 1.1 }