ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.5
Committed: Mon Dec 15 00:29:49 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.4: +8 -16 lines
Log Message:
Added CompletionService; Executor any/all methods now require lists

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 import java.security.AccessControlContext;
10 import java.security.AccessController;
11 import java.security.PrivilegedAction;
12 import java.security.PrivilegedExceptionAction;
13 import java.util.*;
14 import java.util.concurrent.locks.*;
15
16 /**
17 * Provides default implementation of {@link ExecutorService}
18 * execution methods. This class implements the <tt>submit</tt> and
19 * <tt>invoke</tt> methods using the default {@link FutureTask} and
20 * {@link PrivilegedFutureTask} classes provided in this package. For
21 * example, the the implementation of <tt>submit(Runnable)</tt>
22 * creates an associated <tt>FutureTask</tt> that is executed and
23 * returned. Subclasses overriding these methods to use different
24 * {@link Future} implementations should do so consistently for each
25 * of these methods.
26 *
27 * @since 1.5
28 * @author Doug Lea
29 */
30 public abstract class AbstractExecutorService implements ExecutorService {
31
32 public Future<?> submit(Runnable task) {
33 FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
34 execute(ftask);
35 return ftask;
36 }
37
38 public <T> Future<T> submit(Callable<T> task) {
39 FutureTask<T> ftask = new FutureTask<T>(task);
40 execute(ftask);
41 return ftask;
42 }
43
44 public void invoke(Runnable task) throws ExecutionException, InterruptedException {
45 FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
46 execute(ftask);
47 ftask.get();
48 }
49
50 public <T> T invoke(Callable<T> task) throws ExecutionException, InterruptedException {
51 FutureTask<T> ftask = new FutureTask<T>(task);
52 execute(ftask);
53 return ftask.get();
54 }
55
56 public Future<Object> submit(PrivilegedAction action) {
57 Callable<Object> task = new PrivilegedActionAdapter(action);
58 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
59 execute(future);
60 return future;
61 }
62
63 public Future<Object> submit(PrivilegedExceptionAction action) {
64 Callable<Object> task = new PrivilegedExceptionActionAdapter(action);
65 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
66 execute(future);
67 return future;
68 }
69
70 private static class PrivilegedActionAdapter implements Callable<Object> {
71 PrivilegedActionAdapter(PrivilegedAction action) {
72 this.action = action;
73 }
74 public Object call () {
75 return action.run();
76 }
77 private final PrivilegedAction action;
78 }
79
80 private static class PrivilegedExceptionActionAdapter implements Callable<Object> {
81 PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
82 this.action = action;
83 }
84 public Object call () throws Exception {
85 return action.run();
86 }
87 private final PrivilegedExceptionAction action;
88 }
89
90 /**
91 * Helper class to wait for tasks in bulk-execute methods
92 */
93 private static class TaskGroupWaiter {
94 private final ReentrantLock lock = new ReentrantLock();
95 private final Condition done = lock.newCondition();
96 private int firstIndex = -1;
97 private int countDown;
98 TaskGroupWaiter(int ntasks) { countDown = ntasks; }
99
100 void signalDone(int index) {
101 lock.lock();
102 try {
103 if (firstIndex < 0)
104 firstIndex = index;
105 if (--countDown == 0)
106 done.signalAll();
107 } finally {
108 lock.unlock();
109 }
110 }
111
112 int await() throws InterruptedException {
113 lock.lock();
114 try {
115 while (countDown > 0)
116 done.await();
117 return firstIndex;
118 } finally {
119 lock.unlock();
120 }
121 }
122
123 int awaitNanos(long nanos) throws InterruptedException {
124 lock.lock();
125 try {
126 while (countDown > 0 && nanos > 0)
127 nanos = done.awaitNanos(nanos);
128 return firstIndex;
129 } finally {
130 lock.unlock();
131 }
132 }
133
134 boolean isDone() {
135 lock.lock();
136 try {
137 return countDown <= 0;
138 } finally {
139 lock.unlock();
140 }
141 }
142 }
143
144 /**
145 * FutureTask extension to provide signal when task completes
146 */
147 private static class SignallingFuture<T> extends FutureTask<T> {
148 private final TaskGroupWaiter waiter;
149 private final int index;
150 SignallingFuture(Callable<T> c, TaskGroupWaiter w, int i) {
151 super(c); waiter = w; index = i;
152 }
153 SignallingFuture(Runnable t, T r, TaskGroupWaiter w, int i) {
154 super(t, r); waiter = w; index = i;
155 }
156 protected void done() {
157 waiter.signalDone(index);
158 }
159 }
160
161 // any/all methods, each a little bit different than the other
162
163 public List<Future<?>> runAny(List<Runnable> tasks)
164 throws InterruptedException {
165 if (tasks == null)
166 throw new NullPointerException();
167 int n = tasks.size();
168 List<Future<?>> futures = new ArrayList<Future<?>>(n);
169 if (n == 0)
170 return futures;
171 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
172 try {
173 int i = 0;
174 for (Runnable t : tasks) {
175 SignallingFuture<Boolean> f =
176 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
177 futures.add(f);
178 if (!waiter.isDone())
179 execute(f);
180 }
181 int first = waiter.await();
182 return futures;
183 } finally {
184 for (Future<?> f : futures)
185 f.cancel(true);
186 }
187 }
188
189 public List<Future<?>> runAny(List<Runnable> tasks,
190 long timeout, TimeUnit unit)
191 throws InterruptedException {
192 if (tasks == null || unit == null)
193 throw new NullPointerException();
194 long nanos = unit.toNanos(timeout);
195 int n = tasks.size();
196 List<Future<?>> futures = new ArrayList<Future<?>>(n);
197 if (n == 0)
198 return futures;
199 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
200 try {
201 int i = 0;
202 for (Runnable t : tasks) {
203 SignallingFuture<Boolean> f =
204 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
205 futures.add(f);
206 if (!waiter.isDone())
207 execute(f);
208 }
209 int first = waiter.awaitNanos(nanos);
210 return futures;
211 } finally {
212 for (Future<?> f : futures)
213 f.cancel(true);
214 }
215 }
216
217
218
219 public List<Future<?>> runAll(List<Runnable> tasks)
220 throws InterruptedException {
221 if (tasks == null)
222 throw new NullPointerException();
223 int n = tasks.size();
224 List<Future<?>> futures = new ArrayList<Future<?>>(n);
225 if (n == 0)
226 return futures;
227 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
228 int i = 0;
229 try {
230 for (Runnable t : tasks) {
231 SignallingFuture<Boolean> f =
232 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
233 futures.add(f);
234 execute(f);
235 }
236 waiter.await();
237 return futures;
238 } finally {
239 if (!waiter.isDone())
240 for (Future<?> f : futures)
241 f.cancel(true);
242 }
243 }
244
245 public List<Future<?>> runAll(List<Runnable> tasks,
246 long timeout, TimeUnit unit)
247 throws InterruptedException {
248 if (tasks == null || unit == null)
249 throw new NullPointerException();
250 long nanos = unit.toNanos(timeout);
251 int n = tasks.size();
252 List<Future<?>> futures = new ArrayList<Future<?>>(n);
253 if (n == 0)
254 return futures;
255 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
256 try {
257 int i = 0;
258 for (Runnable t : tasks) {
259 SignallingFuture<Boolean> f =
260 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
261 futures.add(f);
262 execute(f);
263 }
264 waiter.awaitNanos(nanos);
265 return futures;
266 } finally {
267 if (!waiter.isDone())
268 for (Future<?> f : futures)
269 f.cancel(true);
270 }
271 }
272
273 public <T> List<Future<T>> callAny(List<Callable<T>> tasks)
274 throws InterruptedException {
275 if (tasks == null)
276 throw new NullPointerException();
277 int n = tasks.size();
278 List<Future<T>> futures = new ArrayList<Future<T>>(n);
279 if (n == 0)
280 return futures;
281 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
282 int i = 0;
283 try {
284 for (Callable<T> t : tasks) {
285 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
286 futures.add(f);
287 if (!waiter.isDone())
288 execute(f);
289 }
290 int first = waiter.await();
291 return futures;
292 } finally {
293 for (Future<T> f : futures)
294 f.cancel(true);
295 }
296 }
297
298 public <T> List<Future<T>> callAny(List<Callable<T>> tasks,
299 long timeout, TimeUnit unit)
300 throws InterruptedException {
301 if (tasks == null || unit == null)
302 throw new NullPointerException();
303 long nanos = unit.toNanos(timeout);
304 int n = tasks.size();
305 List<Future<T>> futures= new ArrayList<Future<T>>(n);
306 if (n == 0)
307 return futures;
308 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
309 try {
310 int i = 0;
311 for (Callable<T> t : tasks) {
312 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
313 futures.add(f);
314 if (!waiter.isDone())
315 execute(f);
316 }
317 int first = waiter.awaitNanos(nanos);
318 return futures;
319 } finally {
320 for (Future<T> f : futures)
321 f.cancel(true);
322 }
323 }
324
325
326 public <T> List<Future<T>> callAll(List<Callable<T>> tasks)
327 throws InterruptedException {
328 if (tasks == null)
329 throw new NullPointerException();
330 int n = tasks.size();
331 List<Future<T>> futures = new ArrayList<Future<T>>(n);
332 if (n == 0)
333 return futures;
334 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
335 try {
336 int i = 0;
337 for (Callable<T> t : tasks) {
338 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
339 futures.add(f);
340 execute(f);
341 }
342 waiter.await();
343 return futures;
344 } finally {
345 if (!waiter.isDone())
346 for (Future<T> f : futures)
347 f.cancel(true);
348 }
349 }
350
351 public <T> List<Future<T>> callAll(List<Callable<T>> tasks,
352 long timeout, TimeUnit unit)
353 throws InterruptedException {
354 if (tasks == null || unit == null)
355 throw new NullPointerException();
356 long nanos = unit.toNanos(timeout);
357 int n = tasks.size();
358 List<Future<T>> futures = new ArrayList<Future<T>>(n);
359 if (n == 0)
360 return futures;
361 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
362 try {
363 int i = 0;
364 for (Callable<T> t : tasks) {
365 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
366 futures.add(f);
367 execute(f);
368 }
369 waiter.awaitNanos(nanos);
370 return futures;
371 } finally {
372 if (!waiter.isDone())
373 for (Future<T> f : futures)
374 f.cancel(true);
375 }
376 }
377
378 }