ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.3
Committed: Sun Dec 14 15:50:13 2003 UTC (20 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.2: +313 -1 lines
Log Message:
Added any/all methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 import java.security.AccessControlContext;
10 import java.security.AccessController;
11 import java.security.PrivilegedAction;
12 import java.security.PrivilegedExceptionAction;
13 import java.util.*;
14 import java.util.concurrent.locks.*;
15
16 /**
17 * Provides default implementation of {@link ExecutorService}
18 * execution methods. This class implements the <tt>submit</tt> and
19 * <tt>invoke</tt> methods using the default {@link FutureTask} and
20 * {@link PrivilegedFutureTask} classes provided in this package. For
21 * example, the the implementation of <tt>submit(Runnable)</tt>
22 * creates an associated <tt>FutureTask</tt> that is executed and
23 * returned. Subclasses overriding these methods to use different
24 * {@link Future} implementations should do so consistently for each
25 * of these methods.
26 *
27 * @since 1.5
28 * @author Doug Lea
29 */
30 public abstract class AbstractExecutorService implements ExecutorService {
31
32 public Future<?> submit(Runnable task) {
33 FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
34 execute(ftask);
35 return ftask;
36 }
37
38 public <T> Future<T> submit(Callable<T> task) {
39 FutureTask<T> ftask = new FutureTask<T>(task);
40 execute(ftask);
41 return ftask;
42 }
43
44 public void invoke(Runnable task) throws ExecutionException, InterruptedException {
45 FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
46 execute(ftask);
47 ftask.get();
48 }
49
50 public <T> T invoke(Callable<T> task) throws ExecutionException, InterruptedException {
51 FutureTask<T> ftask = new FutureTask<T>(task);
52 execute(ftask);
53 return ftask.get();
54 }
55
56 public Future<Object> submit(PrivilegedAction action) {
57 Callable<Object> task = new PrivilegedActionAdapter(action);
58 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
59 execute(future);
60 return future;
61 }
62
63 public Future<Object> submit(PrivilegedExceptionAction action) {
64 Callable<Object> task = new PrivilegedExceptionActionAdapter(action);
65 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
66 execute(future);
67 return future;
68 }
69
70 private static class PrivilegedActionAdapter implements Callable<Object> {
71 PrivilegedActionAdapter(PrivilegedAction action) {
72 this.action = action;
73 }
74 public Object call () {
75 return action.run();
76 }
77 private final PrivilegedAction action;
78 }
79
80 private static class PrivilegedExceptionActionAdapter implements Callable<Object> {
81 PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
82 this.action = action;
83 }
84 public Object call () throws Exception {
85 return action.run();
86 }
87 private final PrivilegedExceptionAction action;
88 }
89
90 /**
91 * Helper class to wait for tasks in bulk-execute methods
92 */
93 private static class TaskGroupWaiter {
94 private final ReentrantLock lock = new ReentrantLock();
95 private final Condition monitor = lock.newCondition();
96 private int firstIndex = -1;
97 private int countDown;
98 TaskGroupWaiter(int ntasks) { countDown = ntasks; }
99
100 void signalDone(int index) {
101 lock.lock();
102 try {
103 if (firstIndex < 0)
104 firstIndex = index;
105 if (--countDown == 0)
106 monitor.signalAll();
107 }
108 finally {
109 lock.unlock();
110 }
111 }
112
113 int await() throws InterruptedException {
114 lock.lock();
115 try {
116 while (countDown > 0)
117 monitor.await();
118 return firstIndex;
119 }
120 finally {
121 lock.unlock();
122 }
123 }
124
125 int awaitNanos(long nanos) throws InterruptedException {
126 lock.lock();
127 try {
128 while (countDown > 0 && nanos > 0)
129 nanos = monitor.awaitNanos(nanos);
130 return firstIndex;
131 }
132 finally {
133 lock.unlock();
134 }
135 }
136
137 boolean isDone() {
138 lock.lock();
139 try {
140 return countDown <= 0;
141 }
142 finally {
143 lock.unlock();
144 }
145 }
146 }
147
148 /**
149 * FutureTask extension to provide signal when task completes
150 */
151 private static class SignallingFuture<T> extends FutureTask<T> {
152 private final TaskGroupWaiter waiter;
153 private final int index;
154 SignallingFuture(Callable<T> c, TaskGroupWaiter w, int i) {
155 super(c); waiter = w; index = i;
156 }
157 SignallingFuture(Runnable t, T r, TaskGroupWaiter w, int i) {
158 super(t, r); waiter = w; index = i;
159 }
160 protected void done() {
161 waiter.signalDone(index);
162 }
163 }
164
165
166 /**
167 * Helper method to cancel unfinished tasks before return of
168 * bulk execute methods
169 */
170 private static void cancelUnfinishedTasks(List<Future<?>> futures) {
171 for (Future<?> f : futures)
172 f.cancel(true);
173 }
174
175 /**
176 * Same as cancelUnfinishedTasks; Workaround for compiler oddity
177 */
178 private static <T> void cancelUnfinishedTasks2(List<Future<T>> futures) {
179 for (Future<T> f : futures)
180 f.cancel(true);
181 }
182
183 // any/all methods, each a little bit different than each other
184
185 public List<Future<?>> runAny(Collection<Runnable> tasks)
186 throws InterruptedException {
187 if (tasks == null)
188 throw new NullPointerException();
189 int n = tasks.size();
190 List<Future<?>> futures = new ArrayList<Future<?>>(n);
191 if (n == 0)
192 return futures;
193 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
194 try {
195 int i = 0;
196 for (Runnable t : tasks) {
197 SignallingFuture<Boolean> f =
198 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
199 futures.add(f);
200 if (!waiter.isDone())
201 execute(f);
202 }
203 int first = waiter.await();
204 if (first > 0)
205 Collections.swap(futures, first, 0);
206 return futures;
207 } finally {
208 cancelUnfinishedTasks(futures);
209 }
210 }
211
212 public List<Future<?>> runAny(Collection<Runnable> tasks,
213 long timeout, TimeUnit unit)
214 throws InterruptedException {
215 if (tasks == null || unit == null)
216 throw new NullPointerException();
217 long nanos = unit.toNanos(timeout);
218 int n = tasks.size();
219 List<Future<?>> futures = new ArrayList<Future<?>>(n);
220 if (n == 0)
221 return futures;
222 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
223 try {
224 int i = 0;
225 for (Runnable t : tasks) {
226 SignallingFuture<Boolean> f =
227 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
228 futures.add(f);
229 if (!waiter.isDone())
230 execute(f);
231 }
232 int first = waiter.awaitNanos(nanos);
233 if (first > 0)
234 Collections.swap(futures, first, 0);
235 return futures;
236 } finally {
237 cancelUnfinishedTasks(futures);
238 }
239 }
240
241
242
243 public List<Future<?>> runAll(Collection<Runnable> tasks)
244 throws InterruptedException {
245 if (tasks == null)
246 throw new NullPointerException();
247 int n = tasks.size();
248 List<Future<?>> futures = new ArrayList<Future<?>>(n);
249 if (n == 0)
250 return futures;
251 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
252 int i = 0;
253 try {
254 for (Runnable t : tasks) {
255 SignallingFuture<Boolean> f =
256 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
257 futures.add(f);
258 execute(f);
259 }
260 waiter.await();
261 return futures;
262 } finally {
263 if (!waiter.isDone())
264 cancelUnfinishedTasks(futures);
265 }
266 }
267
268 public List<Future<?>> runAll(Collection<Runnable> tasks,
269 long timeout, TimeUnit unit)
270 throws InterruptedException {
271 if (tasks == null || unit == null)
272 throw new NullPointerException();
273 long nanos = unit.toNanos(timeout);
274 int n = tasks.size();
275 List<Future<?>> futures = new ArrayList<Future<?>>(n);
276 if (n == 0)
277 return futures;
278 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
279 try {
280 int i = 0;
281 for (Runnable t : tasks) {
282 SignallingFuture<Boolean> f =
283 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
284 futures.add(f);
285 execute(f);
286 }
287 waiter.awaitNanos(nanos);
288 return futures;
289 } finally {
290 if (!waiter.isDone())
291 cancelUnfinishedTasks(futures);
292 }
293 }
294
295 public <T> List<Future<T>> callAny(Collection<Callable<T>> tasks)
296 throws InterruptedException {
297 if (tasks == null)
298 throw new NullPointerException();
299 int n = tasks.size();
300 List<Future<T>> futures = new ArrayList<Future<T>>(n);
301 if (n == 0)
302 return futures;
303 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
304 int i = 0;
305 try {
306 for (Callable<T> t : tasks) {
307 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
308 futures.add(f);
309 if (!waiter.isDone())
310 execute(f);
311 }
312 int first = waiter.await();
313 if (first > 0)
314 Collections.swap(futures, first, 0);
315 return futures;
316 } finally {
317 cancelUnfinishedTasks2(futures);
318 }
319 }
320
321 public <T> List<Future<T>> callAny(Collection<Callable<T>> tasks,
322 long timeout, TimeUnit unit)
323 throws InterruptedException {
324 if (tasks == null || unit == null)
325 throw new NullPointerException();
326 long nanos = unit.toNanos(timeout);
327 int n = tasks.size();
328 List<Future<T>> futures= new ArrayList<Future<T>>(n);
329 if (n == 0)
330 return futures;
331 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
332 try {
333 int i = 0;
334 for (Callable<T> t : tasks) {
335 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
336 futures.add(f);
337 if (!waiter.isDone())
338 execute(f);
339 }
340 int first = waiter.awaitNanos(nanos);
341 if (first > 0)
342 Collections.swap(futures, first, 0);
343 return futures;
344 } finally {
345 cancelUnfinishedTasks2(futures);
346 }
347 }
348
349
350 public <T> List<Future<T>> callAll(Collection<Callable<T>> tasks)
351 throws InterruptedException {
352 if (tasks == null)
353 throw new NullPointerException();
354 int n = tasks.size();
355 List<Future<T>> futures = new ArrayList<Future<T>>(n);
356 if (n == 0)
357 return futures;
358 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
359 try {
360 int i = 0;
361 for (Callable<T> t : tasks) {
362 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
363 futures.add(f);
364 execute(f);
365 }
366 waiter.await();
367 return futures;
368 } finally {
369 if (!waiter.isDone())
370 cancelUnfinishedTasks2(futures);
371 }
372 }
373
374 public <T> List<Future<T>> callAll(Collection<Callable<T>> tasks,
375 long timeout, TimeUnit unit)
376 throws InterruptedException {
377 if (tasks == null || unit == null)
378 throw new NullPointerException();
379 long nanos = unit.toNanos(timeout);
380 int n = tasks.size();
381 List<Future<T>> futures = new ArrayList<Future<T>>(n);
382 if (n == 0)
383 return futures;
384 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
385 try {
386 int i = 0;
387 for (Callable<T> t : tasks) {
388 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
389 futures.add(f);
390 execute(f);
391 }
392 waiter.awaitNanos(nanos);
393 return futures;
394 } finally {
395 if (!waiter.isDone())
396 cancelUnfinishedTasks2(futures);
397 }
398 }
399
400 }