ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/AbstractExecutorService.java
Revision: 1.8
Committed: Sun Jan 18 20:17:32 2015 UTC (9 years, 4 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.7: +1 -0 lines
Log Message:
exactly one blank line before and after package statements

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8 jsr166 1.8
9 dl 1.1 import java.util.*;
10    
11     /**
12     * Provides default implementations of {@link ExecutorService}
13 jsr166 1.2 * execution methods. This class implements the {@code submit},
14     * {@code invokeAny} and {@code invokeAll} methods using a
15     * {@link RunnableFuture} returned by {@code newTaskFor}, which defaults
16 dl 1.1 * to the {@link FutureTask} class provided in this package. For example,
17 jsr166 1.2 * the implementation of {@code submit(Runnable)} creates an
18     * associated {@code RunnableFuture} that is executed and
19     * returned. Subclasses may override the {@code newTaskFor} methods
20     * to return {@code RunnableFuture} implementations other than
21     * {@code FutureTask}.
22 dl 1.1 *
23     * <p><b>Extension example</b>. Here is a sketch of a class
24     * that customizes {@link ThreadPoolExecutor} to use
25 jsr166 1.2 * a {@code CustomTask} class instead of the default {@code FutureTask}:
26 dl 1.1 * <pre> {@code
27     * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
28     *
29     * static class CustomTask<V> implements RunnableFuture<V> {...}
30     *
31     * protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
32     * return new CustomTask<V>(c);
33     * }
34     * protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
35     * return new CustomTask<V>(r, v);
36     * }
37     * // ... add constructors, etc.
38     * }}</pre>
39     *
40     * @since 1.5
41     * @author Doug Lea
42     */
43     public abstract class AbstractExecutorService implements ExecutorService {
44    
45     /**
46 jsr166 1.2 * Returns a {@code RunnableFuture} for the given runnable and default
47 dl 1.1 * value.
48     *
49     * @param runnable the runnable task being wrapped
50     * @param value the default value for the returned future
51 jsr166 1.7 * @param <T> the type of the given value
52 jsr166 1.6 * @return a {@code RunnableFuture} which, when run, will run the
53 jsr166 1.2 * underlying runnable and which, as a {@code Future}, will yield
54 dl 1.1 * the given value as its result and provide for cancellation of
55 jsr166 1.5 * the underlying task
56 dl 1.1 * @since 1.6
57     */
58     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
59     return new FutureTask<T>(runnable, value);
60     }
61    
62     /**
63 jsr166 1.2 * Returns a {@code RunnableFuture} for the given callable task.
64 dl 1.1 *
65     * @param callable the callable task being wrapped
66 jsr166 1.7 * @param <T> the type of the callable's result
67 jsr166 1.6 * @return a {@code RunnableFuture} which, when run, will call the
68 jsr166 1.2 * underlying callable and which, as a {@code Future}, will yield
69 dl 1.1 * the callable's result as its result and provide for
70 jsr166 1.5 * cancellation of the underlying task
71 dl 1.1 * @since 1.6
72     */
73     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
74     return new FutureTask<T>(callable);
75     }
76    
77     /**
78     * @throws RejectedExecutionException {@inheritDoc}
79     * @throws NullPointerException {@inheritDoc}
80     */
81     public Future<?> submit(Runnable task) {
82     if (task == null) throw new NullPointerException();
83     RunnableFuture<Void> ftask = newTaskFor(task, null);
84     execute(ftask);
85     return ftask;
86     }
87    
88     /**
89     * @throws RejectedExecutionException {@inheritDoc}
90     * @throws NullPointerException {@inheritDoc}
91     */
92     public <T> Future<T> submit(Runnable task, T result) {
93     if (task == null) throw new NullPointerException();
94     RunnableFuture<T> ftask = newTaskFor(task, result);
95     execute(ftask);
96     return ftask;
97     }
98    
99     /**
100     * @throws RejectedExecutionException {@inheritDoc}
101     * @throws NullPointerException {@inheritDoc}
102     */
103     public <T> Future<T> submit(Callable<T> task) {
104     if (task == null) throw new NullPointerException();
105     RunnableFuture<T> ftask = newTaskFor(task);
106     execute(ftask);
107     return ftask;
108     }
109    
110     /**
111     * the main mechanics of invokeAny.
112     */
113     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
114     boolean timed, long nanos)
115     throws InterruptedException, ExecutionException, TimeoutException {
116     if (tasks == null)
117     throw new NullPointerException();
118     int ntasks = tasks.size();
119     if (ntasks == 0)
120     throw new IllegalArgumentException();
121 jsr166 1.4 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
122 dl 1.1 ExecutorCompletionService<T> ecs =
123     new ExecutorCompletionService<T>(this);
124    
125     // For efficiency, especially in executors with limited
126     // parallelism, check to see if previously submitted tasks are
127     // done before submitting more of them. This interleaving
128     // plus the exception mechanics account for messiness of main
129     // loop.
130    
131     try {
132     // Record exceptions so that if we fail to obtain any
133     // result, we can throw the last exception we got.
134     ExecutionException ee = null;
135     final long deadline = timed ? System.nanoTime() + nanos : 0L;
136     Iterator<? extends Callable<T>> it = tasks.iterator();
137    
138     // Start one task for sure; the rest incrementally
139     futures.add(ecs.submit(it.next()));
140     --ntasks;
141     int active = 1;
142    
143     for (;;) {
144     Future<T> f = ecs.poll();
145     if (f == null) {
146     if (ntasks > 0) {
147     --ntasks;
148     futures.add(ecs.submit(it.next()));
149     ++active;
150     }
151     else if (active == 0)
152     break;
153     else if (timed) {
154     f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
155     if (f == null)
156     throw new TimeoutException();
157     nanos = deadline - System.nanoTime();
158     }
159     else
160     f = ecs.take();
161     }
162     if (f != null) {
163     --active;
164     try {
165     return f.get();
166     } catch (ExecutionException eex) {
167     ee = eex;
168     } catch (RuntimeException rex) {
169     ee = new ExecutionException(rex);
170     }
171     }
172     }
173    
174     if (ee == null)
175     ee = new ExecutionException();
176     throw ee;
177    
178     } finally {
179 jsr166 1.4 for (int i = 0, size = futures.size(); i < size; i++)
180     futures.get(i).cancel(true);
181 dl 1.1 }
182     }
183    
184     public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
185     throws InterruptedException, ExecutionException {
186     try {
187     return doInvokeAny(tasks, false, 0);
188     } catch (TimeoutException cannotHappen) {
189     assert false;
190     return null;
191     }
192     }
193    
194     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
195     long timeout, TimeUnit unit)
196     throws InterruptedException, ExecutionException, TimeoutException {
197     return doInvokeAny(tasks, true, unit.toNanos(timeout));
198     }
199    
200     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
201     throws InterruptedException {
202     if (tasks == null)
203     throw new NullPointerException();
204 jsr166 1.4 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
205 dl 1.1 boolean done = false;
206     try {
207     for (Callable<T> t : tasks) {
208     RunnableFuture<T> f = newTaskFor(t);
209     futures.add(f);
210     execute(f);
211     }
212 jsr166 1.4 for (int i = 0, size = futures.size(); i < size; i++) {
213     Future<T> f = futures.get(i);
214 dl 1.1 if (!f.isDone()) {
215     try {
216     f.get();
217     } catch (CancellationException ignore) {
218     } catch (ExecutionException ignore) {
219     }
220     }
221     }
222     done = true;
223     return futures;
224     } finally {
225     if (!done)
226 jsr166 1.4 for (int i = 0, size = futures.size(); i < size; i++)
227     futures.get(i).cancel(true);
228 dl 1.1 }
229     }
230    
231     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
232     long timeout, TimeUnit unit)
233     throws InterruptedException {
234     if (tasks == null)
235     throw new NullPointerException();
236     long nanos = unit.toNanos(timeout);
237 jsr166 1.4 ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
238 dl 1.1 boolean done = false;
239     try {
240     for (Callable<T> t : tasks)
241     futures.add(newTaskFor(t));
242    
243     final long deadline = System.nanoTime() + nanos;
244 jsr166 1.4 final int size = futures.size();
245 dl 1.1
246     // Interleave time checks and calls to execute in case
247     // executor doesn't have any/much parallelism.
248 jsr166 1.4 for (int i = 0; i < size; i++) {
249     execute((Runnable)futures.get(i));
250 dl 1.1 nanos = deadline - System.nanoTime();
251     if (nanos <= 0L)
252     return futures;
253     }
254    
255 jsr166 1.4 for (int i = 0; i < size; i++) {
256     Future<T> f = futures.get(i);
257 dl 1.1 if (!f.isDone()) {
258     if (nanos <= 0L)
259     return futures;
260     try {
261     f.get(nanos, TimeUnit.NANOSECONDS);
262     } catch (CancellationException ignore) {
263     } catch (ExecutionException ignore) {
264     } catch (TimeoutException toe) {
265     return futures;
266     }
267     nanos = deadline - System.nanoTime();
268     }
269     }
270     done = true;
271     return futures;
272     } finally {
273     if (!done)
274 jsr166 1.4 for (int i = 0, size = futures.size(); i < size; i++)
275     futures.get(i).cancel(true);
276 dl 1.1 }
277     }
278    
279     }