ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.4
Committed: Sun Dec 14 22:36:36 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.3: +26 -40 lines
Log Message:
Minor improvements to any/all methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 import java.security.AccessControlContext;
10 import java.security.AccessController;
11 import java.security.PrivilegedAction;
12 import java.security.PrivilegedExceptionAction;
13 import java.util.*;
14 import java.util.concurrent.locks.*;
15
16 /**
17 * Provides default implementation of {@link ExecutorService}
18 * execution methods. This class implements the <tt>submit</tt> and
19 * <tt>invoke</tt> methods using the default {@link FutureTask} and
20 * {@link PrivilegedFutureTask} classes provided in this package. For
21 * example, the the implementation of <tt>submit(Runnable)</tt>
22 * creates an associated <tt>FutureTask</tt> that is executed and
23 * returned. Subclasses overriding these methods to use different
24 * {@link Future} implementations should do so consistently for each
25 * of these methods.
26 *
27 * @since 1.5
28 * @author Doug Lea
29 */
30 public abstract class AbstractExecutorService implements ExecutorService {
31
32 public Future<?> submit(Runnable task) {
33 FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
34 execute(ftask);
35 return ftask;
36 }
37
38 public <T> Future<T> submit(Callable<T> task) {
39 FutureTask<T> ftask = new FutureTask<T>(task);
40 execute(ftask);
41 return ftask;
42 }
43
44 public void invoke(Runnable task) throws ExecutionException, InterruptedException {
45 FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
46 execute(ftask);
47 ftask.get();
48 }
49
50 public <T> T invoke(Callable<T> task) throws ExecutionException, InterruptedException {
51 FutureTask<T> ftask = new FutureTask<T>(task);
52 execute(ftask);
53 return ftask.get();
54 }
55
56 public Future<Object> submit(PrivilegedAction action) {
57 Callable<Object> task = new PrivilegedActionAdapter(action);
58 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
59 execute(future);
60 return future;
61 }
62
63 public Future<Object> submit(PrivilegedExceptionAction action) {
64 Callable<Object> task = new PrivilegedExceptionActionAdapter(action);
65 FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
66 execute(future);
67 return future;
68 }
69
70 private static class PrivilegedActionAdapter implements Callable<Object> {
71 PrivilegedActionAdapter(PrivilegedAction action) {
72 this.action = action;
73 }
74 public Object call () {
75 return action.run();
76 }
77 private final PrivilegedAction action;
78 }
79
80 private static class PrivilegedExceptionActionAdapter implements Callable<Object> {
81 PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
82 this.action = action;
83 }
84 public Object call () throws Exception {
85 return action.run();
86 }
87 private final PrivilegedExceptionAction action;
88 }
89
90 /**
91 * Helper class to wait for tasks in bulk-execute methods
92 */
93 private static class TaskGroupWaiter {
94 private final ReentrantLock lock = new ReentrantLock();
95 private final Condition done = lock.newCondition();
96 private int firstIndex = -1;
97 private int countDown;
98 TaskGroupWaiter(int ntasks) { countDown = ntasks; }
99
100 void signalDone(int index) {
101 lock.lock();
102 try {
103 if (firstIndex < 0)
104 firstIndex = index;
105 if (--countDown == 0)
106 done.signalAll();
107 } finally {
108 lock.unlock();
109 }
110 }
111
112 int await() throws InterruptedException {
113 lock.lock();
114 try {
115 while (countDown > 0)
116 done.await();
117 return firstIndex;
118 } finally {
119 lock.unlock();
120 }
121 }
122
123 int awaitNanos(long nanos) throws InterruptedException {
124 lock.lock();
125 try {
126 while (countDown > 0 && nanos > 0)
127 nanos = done.awaitNanos(nanos);
128 return firstIndex;
129 } finally {
130 lock.unlock();
131 }
132 }
133
134 boolean isDone() {
135 lock.lock();
136 try {
137 return countDown <= 0;
138 } finally {
139 lock.unlock();
140 }
141 }
142 }
143
144 /**
145 * FutureTask extension to provide signal when task completes
146 */
147 private static class SignallingFuture<T> extends FutureTask<T> {
148 private final TaskGroupWaiter waiter;
149 private final int index;
150 SignallingFuture(Callable<T> c, TaskGroupWaiter w, int i) {
151 super(c); waiter = w; index = i;
152 }
153 SignallingFuture(Runnable t, T r, TaskGroupWaiter w, int i) {
154 super(t, r); waiter = w; index = i;
155 }
156 protected void done() {
157 waiter.signalDone(index);
158 }
159 }
160
161 // any/all methods, each a little bit different than the other
162
163 public List<Future<?>> runAny(Collection<Runnable> tasks)
164 throws InterruptedException {
165 if (tasks == null)
166 throw new NullPointerException();
167 int n = tasks.size();
168 List<Future<?>> futures = new ArrayList<Future<?>>(n);
169 if (n == 0)
170 return futures;
171 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
172 try {
173 int i = 0;
174 for (Runnable t : tasks) {
175 SignallingFuture<Boolean> f =
176 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
177 futures.add(f);
178 if (!waiter.isDone())
179 execute(f);
180 }
181 int first = waiter.await();
182 if (first > 0)
183 Collections.swap(futures, first, 0);
184 return futures;
185 } finally {
186 for (Future<?> f : futures)
187 f.cancel(true);
188 }
189 }
190
191 public List<Future<?>> runAny(Collection<Runnable> tasks,
192 long timeout, TimeUnit unit)
193 throws InterruptedException {
194 if (tasks == null || unit == null)
195 throw new NullPointerException();
196 long nanos = unit.toNanos(timeout);
197 int n = tasks.size();
198 List<Future<?>> futures = new ArrayList<Future<?>>(n);
199 if (n == 0)
200 return futures;
201 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
202 try {
203 int i = 0;
204 for (Runnable t : tasks) {
205 SignallingFuture<Boolean> f =
206 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
207 futures.add(f);
208 if (!waiter.isDone())
209 execute(f);
210 }
211 int first = waiter.awaitNanos(nanos);
212 if (first > 0)
213 Collections.swap(futures, first, 0);
214 return futures;
215 } finally {
216 for (Future<?> f : futures)
217 f.cancel(true);
218 }
219 }
220
221
222
223 public List<Future<?>> runAll(Collection<Runnable> tasks)
224 throws InterruptedException {
225 if (tasks == null)
226 throw new NullPointerException();
227 int n = tasks.size();
228 List<Future<?>> futures = new ArrayList<Future<?>>(n);
229 if (n == 0)
230 return futures;
231 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
232 int i = 0;
233 try {
234 for (Runnable t : tasks) {
235 SignallingFuture<Boolean> f =
236 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
237 futures.add(f);
238 execute(f);
239 }
240 waiter.await();
241 return futures;
242 } finally {
243 if (!waiter.isDone())
244 for (Future<?> f : futures)
245 f.cancel(true);
246 }
247 }
248
249 public List<Future<?>> runAll(Collection<Runnable> tasks,
250 long timeout, TimeUnit unit)
251 throws InterruptedException {
252 if (tasks == null || unit == null)
253 throw new NullPointerException();
254 long nanos = unit.toNanos(timeout);
255 int n = tasks.size();
256 List<Future<?>> futures = new ArrayList<Future<?>>(n);
257 if (n == 0)
258 return futures;
259 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
260 try {
261 int i = 0;
262 for (Runnable t : tasks) {
263 SignallingFuture<Boolean> f =
264 new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
265 futures.add(f);
266 execute(f);
267 }
268 waiter.awaitNanos(nanos);
269 return futures;
270 } finally {
271 if (!waiter.isDone())
272 for (Future<?> f : futures)
273 f.cancel(true);
274 }
275 }
276
277 public <T> List<Future<T>> callAny(Collection<Callable<T>> tasks)
278 throws InterruptedException {
279 if (tasks == null)
280 throw new NullPointerException();
281 int n = tasks.size();
282 List<Future<T>> futures = new ArrayList<Future<T>>(n);
283 if (n == 0)
284 return futures;
285 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
286 int i = 0;
287 try {
288 for (Callable<T> t : tasks) {
289 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
290 futures.add(f);
291 if (!waiter.isDone())
292 execute(f);
293 }
294 int first = waiter.await();
295 if (first > 0)
296 Collections.swap(futures, first, 0);
297 return futures;
298 } finally {
299 for (Future<T> f : futures)
300 f.cancel(true);
301 }
302 }
303
304 public <T> List<Future<T>> callAny(Collection<Callable<T>> tasks,
305 long timeout, TimeUnit unit)
306 throws InterruptedException {
307 if (tasks == null || unit == null)
308 throw new NullPointerException();
309 long nanos = unit.toNanos(timeout);
310 int n = tasks.size();
311 List<Future<T>> futures= new ArrayList<Future<T>>(n);
312 if (n == 0)
313 return futures;
314 TaskGroupWaiter waiter = new TaskGroupWaiter(1);
315 try {
316 int i = 0;
317 for (Callable<T> t : tasks) {
318 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
319 futures.add(f);
320 if (!waiter.isDone())
321 execute(f);
322 }
323 int first = waiter.awaitNanos(nanos);
324 if (first > 0)
325 Collections.swap(futures, first, 0);
326 return futures;
327 } finally {
328 for (Future<T> f : futures)
329 f.cancel(true);
330 }
331 }
332
333
334 public <T> List<Future<T>> callAll(Collection<Callable<T>> tasks)
335 throws InterruptedException {
336 if (tasks == null)
337 throw new NullPointerException();
338 int n = tasks.size();
339 List<Future<T>> futures = new ArrayList<Future<T>>(n);
340 if (n == 0)
341 return futures;
342 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
343 try {
344 int i = 0;
345 for (Callable<T> t : tasks) {
346 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
347 futures.add(f);
348 execute(f);
349 }
350 waiter.await();
351 return futures;
352 } finally {
353 if (!waiter.isDone())
354 for (Future<T> f : futures)
355 f.cancel(true);
356 }
357 }
358
359 public <T> List<Future<T>> callAll(Collection<Callable<T>> tasks,
360 long timeout, TimeUnit unit)
361 throws InterruptedException {
362 if (tasks == null || unit == null)
363 throw new NullPointerException();
364 long nanos = unit.toNanos(timeout);
365 int n = tasks.size();
366 List<Future<T>> futures = new ArrayList<Future<T>>(n);
367 if (n == 0)
368 return futures;
369 TaskGroupWaiter waiter = new TaskGroupWaiter(n);
370 try {
371 int i = 0;
372 for (Callable<T> t : tasks) {
373 SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
374 futures.add(f);
375 execute(f);
376 }
377 waiter.awaitNanos(nanos);
378 return futures;
379 } finally {
380 if (!waiter.isDone())
381 for (Future<T> f : futures)
382 f.cancel(true);
383 }
384 }
385
386 }