ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.11
Committed: Fri Dec 19 20:38:31 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.10: +2 -194 lines
Log Message:
Define and use Executors.callable instead of submit/invoke variants

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain. Use, modify, and
4 * redistribute this code in any way without acknowledgement.
5 */
6
7 package java.util.concurrent;
8
9 import java.security.AccessControlContext;
10 import java.security.AccessController;
11 import java.security.PrivilegedAction;
12 import java.security.PrivilegedExceptionAction;
13 import java.util.*;
14 import java.util.concurrent.locks.*;
15
16 /**
17 * Provides default implementation of {@link ExecutorService}
18 * execution methods. This class implements the <tt>submit</tt> and
19 * <tt>invoke</tt> methods using the default {@link FutureTask} and
20 * {@link PrivilegedFutureTask} classes provided in this package. For
21 * example, the the implementation of <tt>submit(Runnable)</tt>
22 * creates an associated <tt>FutureTask</tt> that is executed and
23 * returned. Subclasses overriding these methods to use different
24 * {@link Future} implementations should do so consistently for each
25 * of these methods.
26 *
27 * @since 1.5
28 * @author Doug Lea
29 */
30 public abstract class AbstractExecutorService implements ExecutorService {
31
32 public Future<?> submit(Runnable task) {
33 FutureTask<Object> ftask = new FutureTask<Object>(task, null);
34 execute(ftask);
35 return ftask;
36 }
37
38 public <T> Future<T> submit(Callable<T> task) {
39 FutureTask<T> ftask = new FutureTask<T>(task);
40 execute(ftask);
41 return ftask;
42 }
43
44 public <T> T invoke(Callable<T> task) throws ExecutionException, InterruptedException {
45 FutureTask<T> ftask = new FutureTask<T>(task);
46 execute(ftask);
47 return ftask.get();
48 }
49
50 // any/all methods, each a little bit different than the other
51
52
53 public <T> T invokeAny(Collection<Callable<T>> tasks)
54 throws InterruptedException, ExecutionException {
55 if (tasks == null)
56 throw new NullPointerException();
57 int n = tasks.size();
58 if (n == 0)
59 throw new IllegalArgumentException();
60 List<Future<T>> futures= new ArrayList<Future<T>>(n);
61 ExecutorCompletionService<T> ecs =
62 new ExecutorCompletionService<T>(this);
63 try {
64 for (Callable<T> t : tasks)
65 futures.add(ecs.submit(t));
66 ExecutionException ee = null;
67 RuntimeException re = null;
68 while (n-- > 0) {
69 Future<T> f = ecs.take();
70 try {
71 return f.get();
72 } catch(ExecutionException eex) {
73 ee = eex;
74 } catch(RuntimeException rex) {
75 re = rex;
76 }
77 }
78 if (ee != null)
79 throw ee;
80 if (re != null)
81 throw new ExecutionException(re);
82 throw new ExecutionException();
83 } finally {
84 for (Future<T> f : futures)
85 f.cancel(true);
86 }
87 }
88
89 public <T> T invokeAny(Collection<Callable<T>> tasks,
90 long timeout, TimeUnit unit)
91 throws InterruptedException, ExecutionException, TimeoutException {
92 if (tasks == null || unit == null)
93 throw new NullPointerException();
94 long nanos = unit.toNanos(timeout);
95 int n = tasks.size();
96 if (n == 0)
97 throw new IllegalArgumentException();
98 List<Future<T>> futures= new ArrayList<Future<T>>(n);
99 ExecutorCompletionService<T> ecs =
100 new ExecutorCompletionService<T>(this);
101 try {
102 for (Callable<T> t : tasks)
103 futures.add(ecs.submit(t));
104 ExecutionException ee = null;
105 RuntimeException re = null;
106 long lastTime = System.nanoTime();
107 while (n-- > 0) {
108 Future<T> f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
109 if (f == null)
110 throw new TimeoutException();
111 try {
112 return f.get();
113 } catch(ExecutionException eex) {
114 ee = eex;
115 } catch(RuntimeException rex) {
116 re = rex;
117 }
118 long now = System.nanoTime();
119 nanos -= now - lastTime;
120 lastTime = now;
121 }
122 if (ee != null)
123 throw ee;
124 if (re != null)
125 throw new ExecutionException(re);
126 throw new ExecutionException();
127 } finally {
128 for (Future<T> f : futures)
129 f.cancel(true);
130 }
131 }
132
133 public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
134 throws InterruptedException {
135 if (tasks == null)
136 throw new NullPointerException();
137 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
138 boolean done = false;
139 try {
140 for (Callable<T> t : tasks) {
141 FutureTask<T> f = new FutureTask<T>(t);
142 futures.add(f);
143 execute(f);
144 }
145 for (Future<T> f : futures) {
146 if (!f.isDone()) {
147 try {
148 f.get();
149 } catch(CancellationException ignore) {
150 } catch(ExecutionException ignore) {
151 }
152 }
153 }
154 done = true;
155 return futures;
156 } finally {
157 if (!done)
158 for (Future<T> f : futures)
159 f.cancel(true);
160 }
161 }
162
163 public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
164 long timeout, TimeUnit unit)
165 throws InterruptedException {
166 if (tasks == null || unit == null)
167 throw new NullPointerException();
168 long nanos = unit.toNanos(timeout);
169 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
170 boolean done = false;
171 try {
172 for (Callable<T> t : tasks) {
173 FutureTask<T> f = new FutureTask<T>(t);
174 futures.add(f);
175 execute(f);
176 }
177 long lastTime = System.nanoTime();
178 for (Future<T> f : futures) {
179 if (!f.isDone()) {
180 if (nanos < 0)
181 return futures;
182 try {
183 f.get(nanos, TimeUnit.NANOSECONDS);
184 } catch(CancellationException ignore) {
185 } catch(ExecutionException ignore) {
186 } catch(TimeoutException toe) {
187 return futures;
188 }
189 long now = System.nanoTime();
190 nanos -= now - lastTime;
191 lastTime = now;
192 }
193 }
194 done = true;
195 return futures;
196 } finally {
197 if (!done)
198 for (Future<T> f : futures)
199 f.cancel(true);
200 }
201 }
202
203 }