ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/AbstractExecutorService.java (file contents):
Revision 1.5 by dl, Mon Dec 15 00:29:49 2003 UTC vs.
Revision 1.6 by dl, Mon Dec 15 15:41:49 2003 UTC

# Line 29 | Line 29 | import java.util.concurrent.locks.*;
29   */
30   public abstract class AbstractExecutorService implements ExecutorService {
31  
32 <    public Future<?> submit(Runnable task) {
33 <        FutureTask<?> ftask = new FutureTask<Boolean>(task, Boolean.TRUE);
32 >    public <T> Future<T> submit(Runnable task, T result) {
33 >        FutureTask<T> ftask = new FutureTask<T>(task, result);
34          execute(ftask);
35          return ftask;
36      }
# Line 88 | Line 88 | public abstract class AbstractExecutorSe
88      }
89  
90      /**
91     * Helper class to wait for tasks in bulk-execute methods
92     */
93    private static class TaskGroupWaiter {
94        private final ReentrantLock lock = new ReentrantLock();
95        private final Condition done = lock.newCondition();
96        private int firstIndex = -1;
97        private int countDown;
98        TaskGroupWaiter(int ntasks) { countDown = ntasks; }
99
100        void signalDone(int index) {
101            lock.lock();
102            try {
103                if (firstIndex < 0)
104                    firstIndex = index;
105                if (--countDown == 0)
106                    done.signalAll();
107            } finally {
108                lock.unlock();
109            }
110        }
111
112        int await() throws InterruptedException {
113            lock.lock();
114            try {
115                while (countDown > 0)
116                    done.await();
117                return firstIndex;
118            } finally {
119                lock.unlock();
120            }
121        }
122
123        int awaitNanos(long nanos) throws InterruptedException {
124            lock.lock();
125            try {
126                while (countDown > 0 && nanos > 0)
127                    nanos = done.awaitNanos(nanos);
128                return firstIndex;
129            } finally {
130                lock.unlock();
131            }
132        }
133
134        boolean isDone() {
135            lock.lock();
136            try {
137                return countDown <= 0;
138            } finally {
139                lock.unlock();
140            }
141        }
142    }
143
144    /**
91       * FutureTask extension to provide signal when task completes
92       */
93      private static class SignallingFuture<T> extends FutureTask<T> {
94 <        private final TaskGroupWaiter waiter;
95 <        private final int index;
96 <        SignallingFuture(Callable<T> c, TaskGroupWaiter w, int i) {
151 <            super(c); waiter = w; index = i;
94 >        private final CountDownLatch signal;
95 >        SignallingFuture(Callable<T> c, CountDownLatch l) {
96 >            super(c); signal = l;
97          }
98 <        SignallingFuture(Runnable t, T r, TaskGroupWaiter w, int i) {
99 <            super(t, r); waiter = w; index = i;
98 >        SignallingFuture(Runnable t, T r, CountDownLatch l) {
99 >            super(t, r); signal = l;
100          }
101          protected void done() {
102 <            waiter.signalDone(index);
102 >            signal.countDown();
103          }
104      }
105  
106      // any/all methods, each a little bit different than the other
107  
108 <    public List<Future<?>> runAny(List<Runnable> tasks)
108 >    public <T> List<Future<T>> invokeAny(List<Runnable> tasks, T result)
109          throws InterruptedException {
110          if (tasks == null)
111              throw new NullPointerException();
112          int n = tasks.size();
113 <        List<Future<?>> futures = new ArrayList<Future<?>>(n);
113 >        List<Future<T>> futures = new ArrayList<Future<T>>(n);
114          if (n == 0)
115              return futures;
116 <        TaskGroupWaiter waiter = new TaskGroupWaiter(1);
116 >        CountDownLatch waiter = new CountDownLatch(1);;
117          try {
173            int i = 0;
118              for (Runnable t : tasks) {
119 <                SignallingFuture<Boolean> f =
120 <                    new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
119 >                SignallingFuture<T> f =
120 >                    new SignallingFuture<T>(t, result, waiter);
121                  futures.add(f);
122 <                if (!waiter.isDone())
122 >                if (waiter.getCount() > 0)
123                      execute(f);
124              }
125 <            int first = waiter.await();
125 >            waiter.await();
126              return futures;
127          } finally {
128 <            for (Future<?> f : futures)
128 >            for (Future<T> f : futures)
129                  f.cancel(true);
130          }
131      }
132  
133 <    public List<Future<?>> runAny(List<Runnable> tasks,
134 <                                  long timeout, TimeUnit unit)
133 >    public <T> List<Future<T>> invokeAny(List<Runnable> tasks, T result,
134 >                                         long timeout, TimeUnit unit)
135          throws InterruptedException {
136          if (tasks == null || unit == null)
137              throw new NullPointerException();
194        long nanos = unit.toNanos(timeout);
138          int n = tasks.size();
139 <        List<Future<?>> futures = new ArrayList<Future<?>>(n);
139 >        List<Future<T>> futures = new ArrayList<Future<T>>(n);
140          if (n == 0)
141              return futures;
142 <        TaskGroupWaiter waiter = new TaskGroupWaiter(1);
142 >        CountDownLatch waiter = new CountDownLatch(1);;
143          try {
201            int i = 0;
144              for (Runnable t : tasks) {
145 <                SignallingFuture<Boolean> f =
146 <                    new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
145 >                SignallingFuture<T> f =
146 >                    new SignallingFuture<T>(t, result, waiter);
147                  futures.add(f);
148 <                if (!waiter.isDone())
148 >                if (waiter.getCount() > 0)
149                      execute(f);
150              }
151 <            int first = waiter.awaitNanos(nanos);
151 >            waiter.await(timeout, unit);
152              return futures;
153          } finally {
154 <            for (Future<?> f : futures)
154 >            for (Future<T> f : futures)
155                  f.cancel(true);
156          }
157      }
158  
159 <
218 <
219 <    public List<Future<?>> runAll(List<Runnable> tasks)
159 >    public <T> List<Future<T>> invokeAny(List<Callable<T>> tasks)
160          throws InterruptedException {
161          if (tasks == null)
162              throw new NullPointerException();
163          int n = tasks.size();
164 <        List<Future<?>> futures = new ArrayList<Future<?>>(n);
164 >        List<Future<T>> futures = new ArrayList<Future<T>>(n);
165          if (n == 0)
166              return futures;
167 <        TaskGroupWaiter waiter = new TaskGroupWaiter(n);
228 <        int i = 0;
167 >        CountDownLatch waiter = new CountDownLatch(1);;
168          try {
169 <            for (Runnable t : tasks) {
170 <                SignallingFuture<Boolean> f =
232 <                    new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
169 >            for (Callable<T> t : tasks) {
170 >                SignallingFuture<T> f = new SignallingFuture<T>(t, waiter);
171                  futures.add(f);
172 <                execute(f);
172 >                if (waiter.getCount() > 0)
173 >                    execute(f);
174              }
175              waiter.await();
176              return futures;
177          } finally {
178 <            if (!waiter.isDone())
179 <                for (Future<?> f : futures)
241 <                    f.cancel(true);
178 >            for (Future<T> f : futures)
179 >                f.cancel(true);
180          }
181      }
182  
183 <    public List<Future<?>> runAll(List<Runnable> tasks,
184 <                                  long timeout, TimeUnit unit)
183 >    public <T> List<Future<T>> invokeAny(List<Callable<T>> tasks,
184 >                                       long timeout, TimeUnit unit)
185          throws InterruptedException {
186          if (tasks == null || unit == null)
187              throw new NullPointerException();
250        long nanos = unit.toNanos(timeout);
188          int n = tasks.size();
189 <        List<Future<?>> futures = new ArrayList<Future<?>>(n);
189 >        List<Future<T>> futures= new ArrayList<Future<T>>(n);
190          if (n == 0)
191              return futures;
192 <        TaskGroupWaiter waiter = new TaskGroupWaiter(n);
192 >        CountDownLatch waiter = new CountDownLatch(1);;
193          try {
194 <            int i = 0;
195 <            for (Runnable t : tasks) {
259 <                SignallingFuture<Boolean> f =
260 <                    new SignallingFuture<Boolean>(t, Boolean.TRUE, waiter, i++);
194 >            for (Callable<T> t : tasks) {
195 >                SignallingFuture<T> f = new SignallingFuture<T>(t, waiter);
196                  futures.add(f);
197 <                execute(f);
197 >                if (waiter.getCount() > 0)
198 >                    execute(f);
199              }
200 <            waiter.awaitNanos(nanos);
200 >            waiter.await(timeout, unit);
201              return futures;
202          } finally {
203 <            if (!waiter.isDone())
204 <                for (Future<?> f : futures)
269 <                    f.cancel(true);
203 >            for (Future<T> f : futures)
204 >                f.cancel(true);
205          }
206      }
207  
208 <    public <T> List<Future<T>> callAny(List<Callable<T>> tasks)
208 >
209 >    public <T> List<Future<T>> invokeAll(List<Runnable> tasks, T result)
210          throws InterruptedException {
211          if (tasks == null)
212              throw new NullPointerException();
213 <        int n = tasks.size();
214 <        List<Future<T>> futures = new ArrayList<Future<T>>(n);
279 <        if (n == 0)
280 <            return futures;
281 <        TaskGroupWaiter waiter = new TaskGroupWaiter(1);
282 <        int i = 0;
213 >        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
214 >        boolean done = false;
215          try {
216 <            for (Callable<T> t : tasks) {
217 <                SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
216 >            for (Runnable t : tasks) {
217 >                FutureTask<T> f = new FutureTask<T>(t, result);
218                  futures.add(f);
219 <                if (!waiter.isDone())
288 <                    execute(f);
219 >                execute(f);
220              }
221 <            int first = waiter.await();
221 >            for (Future<T> f : futures) {
222 >                if (!f.isDone()) {
223 >                    try {
224 >                        f.get();
225 >                    } catch(CancellationException ignore) {
226 >                    } catch(ExecutionException ignore) {
227 >                    }
228 >                }
229 >            }
230 >            done = true;
231              return futures;
232          } finally {
233 <            for (Future<T> f : futures)
234 <                f.cancel(true);
233 >            if (!done)
234 >                for (Future<T> f : futures)
235 >                    f.cancel(true);
236          }
237      }
238  
239 <    public <T> List<Future<T>> callAny(List<Callable<T>> tasks,
240 <                                       long timeout, TimeUnit unit)
239 >    public <T> List<Future<T>> invokeAll(List<Runnable> tasks, T result,
240 >                                         long timeout, TimeUnit unit)
241          throws InterruptedException {
242          if (tasks == null || unit == null)
243              throw new NullPointerException();
244          long nanos = unit.toNanos(timeout);
245 <        int n = tasks.size();
246 <        List<Future<T>> futures= new ArrayList<Future<T>>(n);
306 <        if (n == 0)
307 <            return futures;
308 <        TaskGroupWaiter waiter = new TaskGroupWaiter(1);
245 >        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
246 >        boolean done = false;
247          try {
248 <            int i = 0;
249 <            for (Callable<T> t : tasks) {
312 <                SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
248 >            for (Runnable t : tasks) {
249 >                FutureTask<T> f = new FutureTask<T>(t, result);
250                  futures.add(f);
251 <                if (!waiter.isDone())
252 <                    execute(f);
251 >                execute(f);
252 >            }
253 >            long lastTime = System.nanoTime();
254 >            for (Future<T> f : futures) {
255 >                if (!f.isDone()) {
256 >                    if (nanos < 0)
257 >                        return futures;
258 >                    try {
259 >                        f.get(nanos, TimeUnit.NANOSECONDS);
260 >                        long now = System.nanoTime();
261 >                        nanos -= now - lastTime;
262 >                        lastTime = now;
263 >                    } catch(CancellationException ignore) {
264 >                    } catch(ExecutionException ignore) {
265 >                    } catch(TimeoutException toe) {
266 >                        return futures;
267 >                    }
268 >                }
269              }
270 <            int first = waiter.awaitNanos(nanos);
270 >            done = true;
271              return futures;
272          } finally {
273 <            for (Future<T> f : futures)
274 <                f.cancel(true);
273 >            if (!done)
274 >                for (Future<T> f : futures)
275 >                    f.cancel(true);
276          }
277      }
278  
279 <
326 <    public <T> List<Future<T>> callAll(List<Callable<T>> tasks)
279 >    public <T> List<Future<T>> invokeAll(List<Callable<T>> tasks)
280          throws InterruptedException {
281          if (tasks == null)
282              throw new NullPointerException();
283 <        int n = tasks.size();
284 <        List<Future<T>> futures = new ArrayList<Future<T>>(n);
332 <        if (n == 0)
333 <            return futures;
334 <        TaskGroupWaiter waiter = new TaskGroupWaiter(n);
283 >        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
284 >        boolean done = false;
285          try {
336            int i = 0;
286              for (Callable<T> t : tasks) {
287 <                SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
287 >                FutureTask<T> f = new FutureTask<T>(t);
288                  futures.add(f);
289                  execute(f);
290              }
291 <            waiter.await();
291 >            for (Future<T> f : futures) {
292 >                if (!f.isDone()) {
293 >                    try {
294 >                        f.get();
295 >                    } catch(CancellationException ignore) {
296 >                    } catch(ExecutionException ignore) {
297 >                    }
298 >                }
299 >            }
300 >            done = true;
301              return futures;
302          } finally {
303 <            if (!waiter.isDone())
303 >            if (!done)
304                  for (Future<T> f : futures)
305                      f.cancel(true);
306          }
307      }
308  
309 <    public <T> List<Future<T>> callAll(List<Callable<T>> tasks,
310 <                                       long timeout, TimeUnit unit)
309 >    public <T> List<Future<T>> invokeAll(List<Callable<T>> tasks,
310 >                                         long timeout, TimeUnit unit)
311          throws InterruptedException {
312          if (tasks == null || unit == null)
313              throw new NullPointerException();
314          long nanos = unit.toNanos(timeout);
315 <        int n = tasks.size();
316 <        List<Future<T>> futures = new ArrayList<Future<T>>(n);
359 <        if (n == 0)
360 <            return futures;
361 <        TaskGroupWaiter waiter = new TaskGroupWaiter(n);
315 >        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
316 >        boolean done = false;
317          try {
363            int i = 0;
318              for (Callable<T> t : tasks) {
319 <                SignallingFuture<T> f = new SignallingFuture<T>(t, waiter, i++);
319 >                FutureTask<T> f = new FutureTask<T>(t);
320                  futures.add(f);
321                  execute(f);
322              }
323 <            waiter.awaitNanos(nanos);
323 >            long lastTime = System.nanoTime();
324 >            for (Future<T> f : futures) {
325 >                if (!f.isDone()) {
326 >                    if (nanos < 0)
327 >                        return futures;
328 >                    try {
329 >                        f.get(nanos, TimeUnit.NANOSECONDS);
330 >                        long now = System.nanoTime();
331 >                        nanos -= now - lastTime;
332 >                        lastTime = now;
333 >                    } catch(CancellationException ignore) {
334 >                    } catch(ExecutionException ignore) {
335 >                    } catch(TimeoutException toe) {
336 >                        return futures;
337 >                    }
338 >                }
339 >            }
340 >            done = true;
341              return futures;
342          } finally {
343 <            if (!waiter.isDone())
343 >            if (!done)
344                  for (Future<T> f : futures)
345                      f.cancel(true);
346          }

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines