ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.4
Committed: Sun Dec 14 22:36:36 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.3: +26 -40 lines
Log Message:
Minor improvements to any/all methods

File Contents

# User Rev Content
1 tim 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain. Use, modify, and
4     * redistribute this code in any way without acknowledgement.
5     */
6    
7     package java.util.concurrent;
8    
9     import java.security.AccessControlContext;
10     import java.security.AccessController;
11     import java.security.PrivilegedAction;
12     import java.security.PrivilegedExceptionAction;
13 dl 1.3 import java.util.*;
14     import java.util.concurrent.locks.*;
15 tim 1.1
16     /**
17 dl 1.2 * Provides default implementation of {@link ExecutorService}
18     * execution methods. This class implements the <tt>submit</tt> and
19     * <tt>invoke</tt> methods using the default {@link FutureTask} and
20     * {@link PrivilegedFutureTask} classes provided in this package. For
21     * example, the the implementation of <tt>submit(Runnable)</tt>
22     * creates an associated <tt>FutureTask</tt> that is executed and
23     * returned. Subclasses overriding these methods to use different
24     * {@link Future} implementations should do so consistently for each
25     * of these methods.
26 tim 1.1 *
27     * @since 1.5
28     * @author Doug Lea
29     */
30     public abstract class AbstractExecutorService implements ExecutorService {
31    
32     public Future<?> submit(Runnable task) {
33     FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
34     execute(ftask);
35     return ftask;
36     }
37    
38     public <T> Future<T> submit(Callable<T> task) {
39     FutureTask<T> ftask = new FutureTask<T>(task);
40     execute(ftask);
41     return ftask;
42     }
43    
44     public void invoke(Runnable task) throws ExecutionException, InterruptedException {
45     FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
46     execute(ftask);
47     ftask.get();
48     }
49    
50     public <T> T invoke(Callable<T> task) throws ExecutionException, InterruptedException {
51     FutureTask<T> ftask = new FutureTask<T>(task);
52     execute(ftask);
53     return ftask.get();
54     }
55    
56     public Future<Object> submit(PrivilegedAction action) {
57     Callable<Object> task = new PrivilegedActionAdapter(action);
58     FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
59     execute(future);
60     return future;
61     }
62    
63     public Future<Object> submit(PrivilegedExceptionAction action) {
64     Callable<Object> task = new PrivilegedExceptionActionAdapter(action);
65     FutureTask<Object> future = new PrivilegedFutureTask<Object>(task);
66     execute(future);
67     return future;
68     }
69    
70     private static class PrivilegedActionAdapter implements Callable<Object> {
71     PrivilegedActionAdapter(PrivilegedAction action) {
72     this.action = action;
73     }
74     public Object call () {
75     return action.run();
76     }
77     private final PrivilegedAction action;
78     }
79    
80     private static class PrivilegedExceptionActionAdapter implements Callable<Object> {
81     PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
82     this.action = action;
83     }
84     public Object call () throws Exception {
85     return action.run();
86     }
87     private final PrivilegedExceptionAction action;
88     }
89 dl 1.3
90     /**
91     * Helper class to wait for tasks in bulk-execute methods
92     */
93     private static class TaskGroupWaiter {
94     private final ReentrantLock lock = new ReentrantLock();
95 dl 1.4 private final Condition done = lock.newCondition();
96 dl 1.3 private int firstIndex = -1;
97     private int countDown;
98     TaskGroupWaiter(int ntasks) { countDown = ntasks; }
99    
100     void signalDone(int index) {
101     lock.lock();
102     try {
103     if (firstIndex < 0)
104     firstIndex = index;
105     if (--countDown == 0)
106 dl 1.4 done.signalAll();
107     } finally {
108 dl 1.3 lock.unlock();
109     }
110     }
111    
112     int await() throws InterruptedException {
113     lock.lock();
114     try {
115     while (countDown > 0)
116 dl 1.4 done.await();
117 dl 1.3 return firstIndex;
118 dl 1.4 } finally {
119 dl 1.3 lock.unlock();
120     }
121     }
122    
123     int awaitNanos(long nanos) throws InterruptedException {
124     lock.lock();
125     try {
126     while (countDown > 0 && nanos > 0)
127 dl 1.4 nanos = done.awaitNanos(nanos);
128 dl 1.3 return firstIndex;
129 dl 1.4 } finally {
130 dl 1.3 lock.unlock();
131     }
132     }
133    
134     boolean isDone() {
135     lock.lock();
136     try {
137     return countDown <= 0;
138 dl 1.4 } finally {
139 dl 1.3 lock.unlock();
140     }
141     }
142     }
143    
144     /**
145     * FutureTask extension to provide signal when task completes
146     */
147     private static class SignallingFuture<T> extends FutureTask<T> {
148     private final TaskGroupWaiter waiter;
149     private final int index;
150     SignallingFuture(Callable<T> c, TaskGroupWaiter w, int i) {
151     super(c); waiter = w; index = i;
152     }
153     SignallingFuture(Runnable t, T r, TaskGroupWaiter w, int i) {
154     super(t, r); waiter = w; index = i;
155     }
156     protected void done() {
157     waiter.signalDone(index);
158     }
159     }
160    
161 dl 1.4 // any/all methods, each a little bit different than the other
162 dl 1.3
163     public List<Future<?>> runAny(Collection<Runnable> tasks)
164     throws InterruptedException {
165     if (tasks == null)
166     throw new NullPointerException();
167     int n = tasks.size();
168     List<Future<?>> futures = new ArrayList<Future<?>>(n);
169     if (n == 0)
170     return futures;
171     TaskGroupWaiter waiter = new TaskGroupWaiter(1);
172     try {
173     int i = 0;
174     for (Runnable t : tasks) {
175     SignallingFuture<Boolean> f =
176     new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
177     futures.add(f);
178     if (!waiter.isDone())
179     execute(f);
180     }
181     int first = waiter.await();
182     if (first > 0)
183     Collections.swap(futures, first, 0);
184     return futures;
185     } finally {
186 dl 1.4 for (Future<?> f : futures)
187     f.cancel(true);
188 dl 1.3 }
189     }
190    
191     public List<Future<?>> runAny(Collection<Runnable> tasks,
192     long timeout, TimeUnit unit)
193     throws InterruptedException {
194     if (tasks == null || unit == null)
195     throw new NullPointerException();
196     long nanos = unit.toNanos(timeout);
197     int n = tasks.size();
198     List<Future<?>> futures = new ArrayList<Future<?>>(n);
199     if (n == 0)
200     return futures;
201     TaskGroupWaiter waiter = new TaskGroupWaiter(1);
202     try {
203     int i = 0;
204     for (Runnable t : tasks) {
205     SignallingFuture<Boolean> f =
206     new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
207     futures.add(f);
208     if (!waiter.isDone())
209     execute(f);
210     }
211     int first = waiter.awaitNanos(nanos);
212     if (first > 0)
213     Collections.swap(futures, first, 0);
214     return futures;
215     } finally {
216 dl 1.4 for (Future<?> f : futures)
217     f.cancel(true);
218 dl 1.3 }
219     }
220    
221    
222    
223     public List<Future<?>> runAll(Collection<Runnable> tasks)
224     throws InterruptedException {
225     if (tasks == null)
226     throw new NullPointerException();
227     int n = tasks.size();
228     List<Future<?>> futures = new ArrayList<Future<?>>(n);
229     if (n == 0)
230     return futures;
231     TaskGroupWaiter waiter = new TaskGroupWaiter(n);
232     int i = 0;
233     try {
234     for (Runnable t : tasks) {
235     SignallingFuture<Boolean> f =
236     new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
237     futures.add(f);
238     execute(f);
239     }
240     waiter.await();
241     return futures;
242     } finally {
243     if (!waiter.isDone())
244 dl 1.4 for (Future<?> f : futures)
245     f.cancel(true);
246 dl 1.3 }
247     }
248    
249     public List<Future<?>> runAll(Collection<Runnable> tasks,
250 dl 1.4 long timeout, TimeUnit unit)
251 dl 1.3 throws InterruptedException {
252     if (tasks == null || unit == null)
253     throw new NullPointerException();
254     long nanos = unit.toNanos(timeout);
255     int n = tasks.size();
256     List<Future<?>> futures = new ArrayList<Future<?>>(n);
257     if (n == 0)
258     return futures;
259     TaskGroupWaiter waiter = new TaskGroupWaiter(n);
260     try {
261     int i = 0;
262     for (Runnable t : tasks) {
263     SignallingFuture<Boolean> f =
264     new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
265     futures.add(f);
266     execute(f);
267     }
268     waiter.awaitNanos(nanos);
269     return futures;
270     } finally {
271     if (!waiter.isDone())
272 dl 1.4 for (Future<?> f : futures)
273     f.cancel(true);
274 dl 1.3 }
275     }
276    
277     public <T> List<Future<T>> callAny(Collection<Callable<T>> tasks)
278     throws InterruptedException {
279     if (tasks == null)
280     throw new NullPointerException();
281     int n = tasks.size();
282     List<Future<T>> futures = new ArrayList<Future<T>>(n);
283     if (n == 0)
284     return futures;
285     TaskGroupWaiter waiter = new TaskGroupWaiter(1);
286     int i = 0;
287     try {
288     for (Callable<T> t : tasks) {
289     SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
290     futures.add(f);
291     if (!waiter.isDone())
292     execute(f);
293     }
294     int first = waiter.await();
295     if (first > 0)
296     Collections.swap(futures, first, 0);
297     return futures;
298     } finally {
299 dl 1.4 for (Future<T> f : futures)
300     f.cancel(true);
301 dl 1.3 }
302     }
303    
304     public <T> List<Future<T>> callAny(Collection<Callable<T>> tasks,
305     long timeout, TimeUnit unit)
306     throws InterruptedException {
307     if (tasks == null || unit == null)
308     throw new NullPointerException();
309     long nanos = unit.toNanos(timeout);
310     int n = tasks.size();
311     List<Future<T>> futures= new ArrayList<Future<T>>(n);
312     if (n == 0)
313     return futures;
314     TaskGroupWaiter waiter = new TaskGroupWaiter(1);
315     try {
316     int i = 0;
317     for (Callable<T> t : tasks) {
318     SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
319     futures.add(f);
320     if (!waiter.isDone())
321     execute(f);
322     }
323     int first = waiter.awaitNanos(nanos);
324     if (first > 0)
325     Collections.swap(futures, first, 0);
326     return futures;
327     } finally {
328 dl 1.4 for (Future<T> f : futures)
329     f.cancel(true);
330 dl 1.3 }
331     }
332    
333    
334     public <T> List<Future<T>> callAll(Collection<Callable<T>> tasks)
335     throws InterruptedException {
336     if (tasks == null)
337     throw new NullPointerException();
338     int n = tasks.size();
339     List<Future<T>> futures = new ArrayList<Future<T>>(n);
340     if (n == 0)
341     return futures;
342     TaskGroupWaiter waiter = new TaskGroupWaiter(n);
343     try {
344     int i = 0;
345     for (Callable<T> t : tasks) {
346     SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
347     futures.add(f);
348     execute(f);
349     }
350     waiter.await();
351     return futures;
352     } finally {
353     if (!waiter.isDone())
354 dl 1.4 for (Future<T> f : futures)
355     f.cancel(true);
356 dl 1.3 }
357     }
358    
359     public <T> List<Future<T>> callAll(Collection<Callable<T>> tasks,
360     long timeout, TimeUnit unit)
361     throws InterruptedException {
362     if (tasks == null || unit == null)
363     throw new NullPointerException();
364     long nanos = unit.toNanos(timeout);
365     int n = tasks.size();
366     List<Future<T>> futures = new ArrayList<Future<T>>(n);
367     if (n == 0)
368     return futures;
369     TaskGroupWaiter waiter = new TaskGroupWaiter(n);
370     try {
371     int i = 0;
372     for (Callable<T> t : tasks) {
373     SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
374     futures.add(f);
375     execute(f);
376     }
377     waiter.awaitNanos(nanos);
378     return futures;
379     } finally {
380     if (!waiter.isDone())
381 dl 1.4 for (Future<T> f : futures)
382     f.cancel(true);
383 dl 1.3 }
384     }
385    
386 tim 1.1 }