ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
(Generate patch)

Comparing jsr166/src/main/java/util/concurrent/AbstractExecutorService.java (file contents):
Revision 1.6 by dl, Mon Dec 15 15:41:49 2003 UTC vs.
Revision 1.7 by dl, Tue Dec 16 12:45:06 2003 UTC

# Line 87 | Line 87 | public abstract class AbstractExecutorSe
87          private final PrivilegedExceptionAction action;
88      }
89  
90    /**
91     * FutureTask extension to provide signal when task completes
92     */
93    private static class SignallingFuture<T> extends FutureTask<T> {
94        private final CountDownLatch signal;
95        SignallingFuture(Callable<T> c, CountDownLatch l) {
96            super(c); signal = l;
97        }
98        SignallingFuture(Runnable t, T r, CountDownLatch l) {
99            super(t, r); signal = l;
100        }
101        protected void done() {
102            signal.countDown();
103        }
104    }
105
90      // any/all methods, each a little bit different than the other
91  
92 <    public <T> List<Future<T>> invokeAny(List<Runnable> tasks, T result)
93 <        throws InterruptedException {
92 >
93 >    public <T> T invokeAny(Collection<Callable<T>> tasks)
94 >        throws InterruptedException, ExecutionException {
95          if (tasks == null)
96              throw new NullPointerException();
97          int n = tasks.size();
113        List<Future<T>> futures = new ArrayList<Future<T>>(n);
98          if (n == 0)
99 <            return futures;
100 <        CountDownLatch waiter = new CountDownLatch(1);;
99 >            throw new IllegalArgumentException();
100 >        List<Future<T>> futures= new ArrayList<Future<T>>(n);
101 >        ExecutorCompletionService<T> ecs =
102 >            new ExecutorCompletionService<T>(this);
103          try {
104 <            for (Runnable t : tasks) {
105 <                SignallingFuture<T> f =
106 <                    new SignallingFuture<T>(t, result, waiter);
107 <                futures.add(f);
108 <                if (waiter.getCount() > 0)
109 <                    execute(f);
110 <            }
111 <            waiter.await();
112 <            return futures;
104 >            for (Callable<T> t : tasks)
105 >                futures.add(ecs.submit(t));
106 >            ExecutionException ee = null;
107 >            RuntimeException re = null;
108 >            while (n-- > 0) {
109 >                Future<T> f = ecs.take();
110 >                try {
111 >                    return f.get();
112 >                } catch(ExecutionException eex) {
113 >                    ee = eex;
114 >                } catch(RuntimeException rex) {
115 >                    re = rex;
116 >                }
117 >            }    
118 >            if (ee != null)
119 >                throw ee;
120 >            if (re != null)
121 >                throw new ExecutionException(re);
122 >            throw new ExecutionException();
123          } finally {
124              for (Future<T> f : futures)
125                  f.cancel(true);
126          }
127      }
128  
129 <    public <T> List<Future<T>> invokeAny(List<Runnable> tasks, T result,
130 <                                         long timeout, TimeUnit unit)
131 <        throws InterruptedException {
129 >    public <T> T invokeAny(Collection<Callable<T>> tasks,
130 >                           long timeout, TimeUnit unit)
131 >        throws InterruptedException, ExecutionException, TimeoutException {
132          if (tasks == null || unit == null)
133              throw new NullPointerException();
134 +        long nanos = unit.toNanos(timeout);
135          int n = tasks.size();
139        List<Future<T>> futures = new ArrayList<Future<T>>(n);
136          if (n == 0)
137 <            return futures;
138 <        CountDownLatch waiter = new CountDownLatch(1);;
137 >            throw new IllegalArgumentException();
138 >        List<Future<T>> futures= new ArrayList<Future<T>>(n);
139 >        ExecutorCompletionService<T> ecs =
140 >            new ExecutorCompletionService<T>(this);
141          try {
142 <            for (Runnable t : tasks) {
143 <                SignallingFuture<T> f =
144 <                    new SignallingFuture<T>(t, result, waiter);
145 <                futures.add(f);
146 <                if (waiter.getCount() > 0)
147 <                    execute(f);
148 <            }
149 <            waiter.await(timeout, unit);
150 <            return futures;
142 >            for (Callable<T> t : tasks)
143 >                futures.add(ecs.submit(t));
144 >            ExecutionException ee = null;
145 >            RuntimeException re = null;
146 >            long lastTime = System.nanoTime();
147 >            while (n-- > 0) {
148 >                Future<T> f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
149 >                if (f == null) {
150 >                    if (nanos <= 0)
151 >                        throw new TimeoutException();
152 >                    long now = System.nanoTime();
153 >                    nanos -= now - lastTime;
154 >                    lastTime = now;
155 >                }
156 >                try {
157 >                    return f.get();
158 >                } catch(ExecutionException eex) {
159 >                    ee = eex;
160 >                } catch(RuntimeException rex) {
161 >                    re = rex;
162 >                }
163 >            }    
164 >            if (ee != null)
165 >                throw ee;
166 >            if (re != null)
167 >                throw new ExecutionException(re);
168 >            throw new ExecutionException();
169          } finally {
170              for (Future<T> f : futures)
171                  f.cancel(true);
172          }
173      }
174  
175 <    public <T> List<Future<T>> invokeAny(List<Callable<T>> tasks)
176 <        throws InterruptedException {
175 >
176 >    public <T> T invokeAny(Collection<Runnable> tasks, T result)
177 >        throws InterruptedException, ExecutionException {
178          if (tasks == null)
179              throw new NullPointerException();
180          int n = tasks.size();
164        List<Future<T>> futures = new ArrayList<Future<T>>(n);
181          if (n == 0)
182 <            return futures;
183 <        CountDownLatch waiter = new CountDownLatch(1);;
182 >            throw new IllegalArgumentException();
183 >        List<Future<T>> futures= new ArrayList<Future<T>>(n);
184 >        ExecutorCompletionService<T> ecs =
185 >            new ExecutorCompletionService<T>(this);
186          try {
187 <            for (Callable<T> t : tasks) {
188 <                SignallingFuture<T> f = new SignallingFuture<T>(t, waiter);
189 <                futures.add(f);
190 <                if (waiter.getCount() > 0)
191 <                    execute(f);
192 <            }
193 <            waiter.await();
194 <            return futures;
187 >            for (Runnable t : tasks)
188 >                futures.add(ecs.submit(t, result));
189 >            ExecutionException ee = null;
190 >            RuntimeException re = null;
191 >            while (n-- > 0) {
192 >                Future<T> f = ecs.take();
193 >                try {
194 >                    return f.get();
195 >                } catch(ExecutionException eex) {
196 >                    ee = eex;
197 >                } catch(RuntimeException rex) {
198 >                    re = rex;
199 >                }
200 >            }    
201 >            if (ee != null)
202 >                throw ee;
203 >            if (re != null)
204 >                throw new ExecutionException(re);
205 >            throw new ExecutionException();
206          } finally {
207              for (Future<T> f : futures)
208                  f.cancel(true);
209          }
210      }
211  
212 <    public <T> List<Future<T>> invokeAny(List<Callable<T>> tasks,
213 <                                       long timeout, TimeUnit unit)
214 <        throws InterruptedException {
212 >    public <T> T invokeAny(Collection<Runnable> tasks, T result,
213 >                           long timeout, TimeUnit unit)
214 >        throws InterruptedException, ExecutionException, TimeoutException {
215          if (tasks == null || unit == null)
216              throw new NullPointerException();
217 +        long nanos = unit.toNanos(timeout);
218          int n = tasks.size();
189        List<Future<T>> futures= new ArrayList<Future<T>>(n);
219          if (n == 0)
220 <            return futures;
221 <        CountDownLatch waiter = new CountDownLatch(1);;
220 >            throw new IllegalArgumentException();
221 >        List<Future<T>> futures= new ArrayList<Future<T>>(n);
222 >        ExecutorCompletionService<T> ecs =
223 >            new ExecutorCompletionService<T>(this);
224          try {
225 <            for (Callable<T> t : tasks) {
226 <                SignallingFuture<T> f = new SignallingFuture<T>(t, waiter);
227 <                futures.add(f);
228 <                if (waiter.getCount() > 0)
229 <                    execute(f);
230 <            }
231 <            waiter.await(timeout, unit);
232 <            return futures;
225 >            for (Runnable t : tasks)
226 >                futures.add(ecs.submit(t, result));
227 >            ExecutionException ee = null;
228 >            RuntimeException re = null;
229 >            long lastTime = System.nanoTime();
230 >            while (n-- > 0) {
231 >                Future<T> f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
232 >                if (f == null) {
233 >                    if (nanos <= 0)
234 >                        throw new TimeoutException();
235 >                    long now = System.nanoTime();
236 >                    nanos -= now - lastTime;
237 >                    lastTime = now;
238 >                }
239 >                try {
240 >                    return f.get();
241 >                } catch(ExecutionException eex) {
242 >                    ee = eex;
243 >                } catch(RuntimeException rex) {
244 >                    re = rex;
245 >                }
246 >            }    
247 >            if (ee != null)
248 >                throw ee;
249 >            if (re != null)
250 >                throw new ExecutionException(re);
251 >            throw new ExecutionException();
252          } finally {
253              for (Future<T> f : futures)
254                  f.cancel(true);

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines