ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.17
Committed: Sat Dec 27 19:26:25 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.16: +2 -2 lines
Log Message:
Headers reference Creative Commons

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/licenses/publicdomain
5 */
6
7 package java.util.concurrent;
8
9 import java.util.*;
10
11 /**
12 * Provides default implementation of {@link ExecutorService}
13 * execution methods. This class implements the <tt>submit</tt>,
14 * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using the default
15 * {@link FutureTask} class provided in this package. For example,
16 * the the implementation of <tt>submit(Runnable)</tt> creates an
17 * associated <tt>FutureTask</tt> that is executed and
18 * returned. Subclasses overriding these methods to use different
19 * {@link Future} implementations should do so consistently for each
20 * of these methods.
21 *
22 * @since 1.5
23 * @author Doug Lea
24 */
25 public abstract class AbstractExecutorService implements ExecutorService {
26
27 public Future<?> submit(Runnable task) {
28 if (task == null) throw new NullPointerException();
29 FutureTask<Object> ftask = new FutureTask<Object>(task, null);
30 execute(ftask);
31 return ftask;
32 }
33
34 public <T> Future<T> submit(Runnable task, T result) {
35 if (task == null) throw new NullPointerException();
36 FutureTask<T> ftask = new FutureTask<T>(task, result);
37 execute(ftask);
38 return ftask;
39 }
40
41 public <T> Future<T> submit(Callable<T> task) {
42 if (task == null) throw new NullPointerException();
43 FutureTask<T> ftask = new FutureTask<T>(task);
44 execute(ftask);
45 return ftask;
46 }
47
48 /**
49 * the main mechanics of invokeAny.
50 */
51 private <T> T doInvokeAny(Collection<Callable<T>> tasks,
52 boolean timed, long nanos)
53 throws InterruptedException, ExecutionException, TimeoutException {
54 if (tasks == null)
55 throw new NullPointerException();
56 int ntasks = tasks.size();
57 if (ntasks == 0)
58 throw new IllegalArgumentException();
59 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
60 ExecutorCompletionService<T> ecs =
61 new ExecutorCompletionService<T>(this);
62
63 // For efficiency, especially in executors with limited
64 // parallelism, check to see if previously submitted tasks are
65 // done before submitting more of them. This interleaving,
66 // plus the exception mechanics account for messiness of main
67 // loop
68
69 try {
70 // Record exceptions so that if we fail to obtain any
71 // result, we can throw the last exception we got.
72 ExecutionException ee = null;
73 long lastTime = (timed)? System.nanoTime() : 0;
74 Iterator<Callable<T>> it = tasks.iterator();
75
76 // Start one task for sure; the rest incrementally
77 futures.add(ecs.submit(it.next()));
78 --ntasks;
79 int active = 1;
80
81 for (;;) {
82 Future<T> f = ecs.poll();
83 if (f == null) {
84 if (ntasks > 0) {
85 --ntasks;
86 futures.add(ecs.submit(it.next()));
87 ++active;
88 }
89 else if (active == 0)
90 break;
91 else if (timed) {
92 f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
93 if (f == null)
94 throw new TimeoutException();
95 long now = System.nanoTime();
96 nanos -= now - lastTime;
97 lastTime = now;
98 }
99 else
100 f = ecs.take();
101 }
102 if (f != null) {
103 --active;
104 try {
105 return f.get();
106 } catch(InterruptedException ie) {
107 throw ie;
108 } catch(ExecutionException eex) {
109 ee = eex;
110 } catch(RuntimeException rex) {
111 ee = new ExecutionException(rex);
112 }
113 }
114 }
115
116 if (ee == null)
117 ee = new ExecutionException();
118 throw ee;
119
120 } finally {
121 for (Future<T> f : futures)
122 f.cancel(true);
123 }
124 }
125
126 public <T> T invokeAny(Collection<Callable<T>> tasks)
127 throws InterruptedException, ExecutionException {
128 try {
129 return doInvokeAny(tasks, false, 0);
130 } catch (TimeoutException cannotHappen) {
131 assert false;
132 return null;
133 }
134 }
135
136 public <T> T invokeAny(Collection<Callable<T>> tasks,
137 long timeout, TimeUnit unit)
138 throws InterruptedException, ExecutionException, TimeoutException {
139 return doInvokeAny(tasks, true, unit.toNanos(timeout));
140 }
141
142 public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
143 throws InterruptedException {
144 if (tasks == null)
145 throw new NullPointerException();
146 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
147 boolean done = false;
148 try {
149 for (Callable<T> t : tasks) {
150 FutureTask<T> f = new FutureTask<T>(t);
151 futures.add(f);
152 execute(f);
153 }
154 for (Future<T> f : futures) {
155 if (!f.isDone()) {
156 try {
157 f.get();
158 } catch(CancellationException ignore) {
159 } catch(ExecutionException ignore) {
160 }
161 }
162 }
163 done = true;
164 return futures;
165 } finally {
166 if (!done)
167 for (Future<T> f : futures)
168 f.cancel(true);
169 }
170 }
171
172 public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
173 long timeout, TimeUnit unit)
174 throws InterruptedException {
175 if (tasks == null || unit == null)
176 throw new NullPointerException();
177 long nanos = unit.toNanos(timeout);
178 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
179 boolean done = false;
180 try {
181 for (Callable<T> t : tasks)
182 futures.add(new FutureTask<T>(t));
183
184 long lastTime = System.nanoTime();
185
186 // Interleave time checks and calls to execute in case
187 // executor doesn't have any/much parallelism.
188 Iterator<Future<T>> it = futures.iterator();
189 while (it.hasNext()) {
190 execute((Runnable)(it.next()));
191 long now = System.nanoTime();
192 nanos -= now - lastTime;
193 lastTime = now;
194 if (nanos <= 0)
195 return futures;
196 }
197
198 for (Future<T> f : futures) {
199 if (!f.isDone()) {
200 if (nanos <= 0)
201 return futures;
202 try {
203 f.get(nanos, TimeUnit.NANOSECONDS);
204 } catch(CancellationException ignore) {
205 } catch(ExecutionException ignore) {
206 } catch(TimeoutException toe) {
207 return futures;
208 }
209 long now = System.nanoTime();
210 nanos -= now - lastTime;
211 lastTime = now;
212 }
213 }
214 done = true;
215 return futures;
216 } finally {
217 if (!done)
218 for (Future<T> f : futures)
219 f.cancel(true);
220 }
221 }
222
223 }