ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.3
Committed: Sun Dec 14 15:50:13 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.2: +313 -1 lines
Log Message:
Added any/all methods

File Contents

# User Rev Content
1 tim 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8    
9     import java.security.AccessControlContext;
10     import java.security.AccessController;
11     import java.security.PrivilegedAction;
12     import java.security.PrivilegedExceptionAction;
13 dl 1.3 import java.util.*;
14     import java.util.concurrent.locks.*;
15 tim 1.1
16     /**
17 dl 1.2 * Provides default implementation of {@link ExecutorService}
18     * execution methods. This class implements the <tt>submit</tt> and
19     * <tt>invoke</tt> methods using the default {@link FutureTask} and
20     * {@link PrivilegedFutureTask} classes provided in this package. For
21     * example, the the implementation of <tt>submit(Runnable)</tt>
22     * creates an associated <tt>FutureTask</tt> that is executed and
23     * returned. Subclasses overriding these methods to use different
24     * {@link Future} implementations should do so consistently for each
25     * of these methods.
26 tim 1.1 *
27     * @since 1.5
28     * @author Doug Lea
29     */
30     public abstract class AbstractExecutorService implements ExecutorService {
31    
32     public Future<?> submit(Runnable task) {
33     FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
34     execute(ftask);
35     return ftask;
36     }
37    
38     public <T> Future<T> submit(Callable<T> task) {
39     FutureTask<T> ftask = new FutureTask<T>(task);
40     execute(ftask);
41     return ftask;
42     }
43    
44     public void invoke(Runnable task) throws ExecutionException, InterruptedException {
45     FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
46     execute(ftask);
47     ftask.get();
48     }
49    
50     public <T> T invoke(Callable<T> task) throws ExecutionException, InterruptedException {
51     FutureTask<T> ftask = new FutureTask<T>(task);
52     execute(ftask);
53     return ftask.get();
54     }
55    
56     public Future<Object> submit(PrivilegedAction action) {
57     Callable<Object> task = new PrivilegedActionAdapter(action);
58     FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
59     execute(future);
60     return future;
61     }
62    
63     public Future<Object> submit(PrivilegedExceptionAction action) {
64     Callable<Object> task = new PrivilegedExceptionActionAdapter(action);
65     FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
66     execute(future);
67     return future;
68     }
69    
70     private static class PrivilegedActionAdapter implements Callable<Object> {
71     PrivilegedActionAdapter(PrivilegedAction action) {
72     this.action = action;
73     }
74     public Object call () {
75     return action.run();
76     }
77     private final PrivilegedAction action;
78     }
79    
80     private static class PrivilegedExceptionActionAdapter implements Callable<Object> {
81     PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
82     this.action = action;
83     }
84     public Object call () throws Exception {
85     return action.run();
86     }
87     private final PrivilegedExceptionAction action;
88     }
89 dl 1.3
90     /**
91     * Helper class to wait for tasks in bulk-execute methods
92     */
93     private static class TaskGroupWaiter {
94     private final ReentrantLock lock = new ReentrantLock();
95     private final Condition monitor = lock.newCondition();
96     private int firstIndex = -1;
97     private int countDown;
98     TaskGroupWaiter(int ntasks) { countDown = ntasks; }
99    
100     void signalDone(int index) {
101     lock.lock();
102     try {
103     if (firstIndex < 0)
104     firstIndex = index;
105     if (--countDown == 0)
106     monitor.signalAll();
107     }
108     finally {
109     lock.unlock();
110     }
111     }
112    
113     int await() throws InterruptedException {
114     lock.lock();
115     try {
116     while (countDown > 0)
117     monitor.await();
118     return firstIndex;
119     }
120     finally {
121     lock.unlock();
122     }
123     }
124    
125     int awaitNanos(long nanos) throws InterruptedException {
126     lock.lock();
127     try {
128     while (countDown > 0 && nanos > 0)
129     nanos = monitor.awaitNanos(nanos);
130     return firstIndex;
131     }
132     finally {
133     lock.unlock();
134     }
135     }
136    
137     boolean isDone() {
138     lock.lock();
139     try {
140     return countDown <= 0;
141     }
142     finally {
143     lock.unlock();
144     }
145     }
146     }
147    
148     /**
149     * FutureTask extension to provide signal when task completes
150     */
151     private static class SignallingFuture<T> extends FutureTask<T> {
152     private final TaskGroupWaiter waiter;
153     private final int index;
154     SignallingFuture(Callable<T> c, TaskGroupWaiter w, int i) {
155     super(c); waiter = w; index = i;
156     }
157     SignallingFuture(Runnable t, T r, TaskGroupWaiter w, int i) {
158     super(t, r); waiter = w; index = i;
159     }
160     protected void done() {
161     waiter.signalDone(index);
162     }
163     }
164    
165    
166     /**
167     * Helper method to cancel unfinished tasks before return of
168     * bulk execute methods
169     */
170     private static void cancelUnfinishedTasks(List<Future<?>> futures) {
171     for (Future<?> f : futures)
172     f.cancel(true);
173     }
174    
175     /**
176     * Same as cancelUnfinishedTasks; Workaround for compiler oddity
177     */
178     private static <T> void cancelUnfinishedTasks2(List<Future<T>> futures) {
179     for (Future<T> f : futures)
180     f.cancel(true);
181     }
182    
183     // any/all methods, each a little bit different than each other
184    
185     public List<Future<?>> runAny(Collection<Runnable> tasks)
186     throws InterruptedException {
187     if (tasks == null)
188     throw new NullPointerException();
189     int n = tasks.size();
190     List<Future<?>> futures = new ArrayList<Future<?>>(n);
191     if (n == 0)
192     return futures;
193     TaskGroupWaiter waiter = new TaskGroupWaiter(1);
194     try {
195     int i = 0;
196     for (Runnable t : tasks) {
197     SignallingFuture<Boolean> f =
198     new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
199     futures.add(f);
200     if (!waiter.isDone())
201     execute(f);
202     }
203     int first = waiter.await();
204     if (first > 0)
205     Collections.swap(futures, first, 0);
206     return futures;
207     } finally {
208     cancelUnfinishedTasks(futures);
209     }
210     }
211    
212     public List<Future<?>> runAny(Collection<Runnable> tasks,
213     long timeout, TimeUnit unit)
214     throws InterruptedException {
215     if (tasks == null || unit == null)
216     throw new NullPointerException();
217     long nanos = unit.toNanos(timeout);
218     int n = tasks.size();
219     List<Future<?>> futures = new ArrayList<Future<?>>(n);
220     if (n == 0)
221     return futures;
222     TaskGroupWaiter waiter = new TaskGroupWaiter(1);
223     try {
224     int i = 0;
225     for (Runnable t : tasks) {
226     SignallingFuture<Boolean> f =
227     new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
228     futures.add(f);
229     if (!waiter.isDone())
230     execute(f);
231     }
232     int first = waiter.awaitNanos(nanos);
233     if (first > 0)
234     Collections.swap(futures, first, 0);
235     return futures;
236     } finally {
237     cancelUnfinishedTasks(futures);
238     }
239     }
240    
241    
242    
243     public List<Future<?>> runAll(Collection<Runnable> tasks)
244     throws InterruptedException {
245     if (tasks == null)
246     throw new NullPointerException();
247     int n = tasks.size();
248     List<Future<?>> futures = new ArrayList<Future<?>>(n);
249     if (n == 0)
250     return futures;
251     TaskGroupWaiter waiter = new TaskGroupWaiter(n);
252     int i = 0;
253     try {
254     for (Runnable t : tasks) {
255     SignallingFuture<Boolean> f =
256     new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
257     futures.add(f);
258     execute(f);
259     }
260     waiter.await();
261     return futures;
262     } finally {
263     if (!waiter.isDone())
264     cancelUnfinishedTasks(futures);
265     }
266     }
267    
268     public List<Future<?>> runAll(Collection<Runnable> tasks,
269     long timeout, TimeUnit unit)
270     throws InterruptedException {
271     if (tasks == null || unit == null)
272     throw new NullPointerException();
273     long nanos = unit.toNanos(timeout);
274     int n = tasks.size();
275     List<Future<?>> futures = new ArrayList<Future<?>>(n);
276     if (n == 0)
277     return futures;
278     TaskGroupWaiter waiter = new TaskGroupWaiter(n);
279     try {
280     int i = 0;
281     for (Runnable t : tasks) {
282     SignallingFuture<Boolean> f =
283     new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
284     futures.add(f);
285     execute(f);
286     }
287     waiter.awaitNanos(nanos);
288     return futures;
289     } finally {
290     if (!waiter.isDone())
291     cancelUnfinishedTasks(futures);
292     }
293     }
294    
295     public <T> List<Future<T>> callAny(Collection<Callable<T>> tasks)
296     throws InterruptedException {
297     if (tasks == null)
298     throw new NullPointerException();
299     int n = tasks.size();
300     List<Future<T>> futures = new ArrayList<Future<T>>(n);
301     if (n == 0)
302     return futures;
303     TaskGroupWaiter waiter = new TaskGroupWaiter(1);
304     int i = 0;
305     try {
306     for (Callable<T> t : tasks) {
307     SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
308     futures.add(f);
309     if (!waiter.isDone())
310     execute(f);
311     }
312     int first = waiter.await();
313     if (first > 0)
314     Collections.swap(futures, first, 0);
315     return futures;
316     } finally {
317     cancelUnfinishedTasks2(futures);
318     }
319     }
320    
321     public <T> List<Future<T>> callAny(Collection<Callable<T>> tasks,
322     long timeout, TimeUnit unit)
323     throws InterruptedException {
324     if (tasks == null || unit == null)
325     throw new NullPointerException();
326     long nanos = unit.toNanos(timeout);
327     int n = tasks.size();
328     List<Future<T>> futures= new ArrayList<Future<T>>(n);
329     if (n == 0)
330     return futures;
331     TaskGroupWaiter waiter = new TaskGroupWaiter(1);
332     try {
333     int i = 0;
334     for (Callable<T> t : tasks) {
335     SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
336     futures.add(f);
337     if (!waiter.isDone())
338     execute(f);
339     }
340     int first = waiter.awaitNanos(nanos);
341     if (first > 0)
342     Collections.swap(futures, first, 0);
343     return futures;
344     } finally {
345     cancelUnfinishedTasks2(futures);
346     }
347     }
348    
349    
350     public <T> List<Future<T>> callAll(Collection<Callable<T>> tasks)
351     throws InterruptedException {
352     if (tasks == null)
353     throw new NullPointerException();
354     int n = tasks.size();
355     List<Future<T>> futures = new ArrayList<Future<T>>(n);
356     if (n == 0)
357     return futures;
358     TaskGroupWaiter waiter = new TaskGroupWaiter(n);
359     try {
360     int i = 0;
361     for (Callable<T> t : tasks) {
362     SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
363     futures.add(f);
364     execute(f);
365     }
366     waiter.await();
367     return futures;
368     } finally {
369     if (!waiter.isDone())
370     cancelUnfinishedTasks2(futures);
371     }
372     }
373    
374     public <T> List<Future<T>> callAll(Collection<Callable<T>> tasks,
375     long timeout, TimeUnit unit)
376     throws InterruptedException {
377     if (tasks == null || unit == null)
378     throw new NullPointerException();
379     long nanos = unit.toNanos(timeout);
380     int n = tasks.size();
381     List<Future<T>> futures = new ArrayList<Future<T>>(n);
382     if (n == 0)
383     return futures;
384     TaskGroupWaiter waiter = new TaskGroupWaiter(n);
385     try {
386     int i = 0;
387     for (Callable<T> t : tasks) {
388     SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
389     futures.add(f);
390     execute(f);
391     }
392     waiter.awaitNanos(nanos);
393     return futures;
394     } finally {
395     if (!waiter.isDone())
396     cancelUnfinishedTasks2(futures);
397     }
398     }
399    
400 tim 1.1 }