ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java
Revision: 1.17
Committed: Sat Dec 27 19:26:25 2003 UTC (20 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.16: +2 -2 lines
Log Message:
Headers reference Creative Commons

File Contents

# User Rev Content
1 tim 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.17 * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/licenses/publicdomain
5 tim 1.1 */
6    
7     package java.util.concurrent;
8    
9 dl 1.3 import java.util.*;
10 tim 1.1
11     /**
12 dl 1.2 * Provides default implementation of {@link ExecutorService}
13 dl 1.12 * execution methods. This class implements the <tt>submit</tt>,
14     * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using the default
15     * {@link FutureTask} class provided in this package. For example,
16     * the the implementation of <tt>submit(Runnable)</tt> creates an
17     * associated <tt>FutureTask</tt> that is executed and
18 dl 1.2 * returned. Subclasses overriding these methods to use different
19     * {@link Future} implementations should do so consistently for each
20     * of these methods.
21 tim 1.1 *
22     * @since 1.5
23     * @author Doug Lea
24     */
25     public abstract class AbstractExecutorService implements ExecutorService {
26    
27 dl 1.11 public Future<?> submit(Runnable task) {
28 dl 1.13 if (task == null) throw new NullPointerException();
29 dl 1.11 FutureTask<Object> ftask = new FutureTask<Object>(task, null);
30 tim 1.1 execute(ftask);
31     return ftask;
32     }
33    
34 dl 1.13 public <T> Future<T> submit(Runnable task, T result) {
35     if (task == null) throw new NullPointerException();
36     FutureTask<T> ftask = new FutureTask<T>(task, result);
37     execute(ftask);
38     return ftask;
39     }
40    
41 tim 1.1 public <T> Future<T> submit(Callable<T> task) {
42 dl 1.13 if (task == null) throw new NullPointerException();
43 tim 1.1 FutureTask<T> ftask = new FutureTask<T>(task);
44     execute(ftask);
45     return ftask;
46     }
47    
48 dl 1.15 /**
49     * the main mechanics of invokeAny.
50     */
51     private <T> T doInvokeAny(Collection<Callable<T>> tasks,
52     boolean timed, long nanos)
53     throws InterruptedException, ExecutionException, TimeoutException {
54 dl 1.3 if (tasks == null)
55     throw new NullPointerException();
56 dl 1.15 int ntasks = tasks.size();
57     if (ntasks == 0)
58 dl 1.7 throw new IllegalArgumentException();
59 dl 1.15 List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
60 dl 1.7 ExecutorCompletionService<T> ecs =
61     new ExecutorCompletionService<T>(this);
62 dl 1.15
63     // For efficiency, especially in executors with limited
64     // parallelism, check to see if previously submitted tasks are
65     // done before submitting more of them. This interleaving,
66     // plus the exception mechanics account for messiness of main
67     // loop
68    
69 dl 1.3 try {
70 dl 1.15 // Record exceptions so that if we fail to obtain any
71     // result, we can throw the last exception we got.
72 dl 1.7 ExecutionException ee = null;
73 dl 1.15 long lastTime = (timed)? System.nanoTime() : 0;
74     Iterator<Callable<T>> it = tasks.iterator();
75    
76     // Start one task for sure; the rest incrementally
77     futures.add(ecs.submit(it.next()));
78     --ntasks;
79     int active = 1;
80    
81     for (;;) {
82     Future<T> f = ecs.poll();
83     if (f == null) {
84     if (ntasks > 0) {
85     --ntasks;
86     futures.add(ecs.submit(it.next()));
87     ++active;
88     }
89     else if (active == 0)
90     break;
91     else if (timed) {
92     f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
93     if (f == null)
94     throw new TimeoutException();
95     long now = System.nanoTime();
96     nanos -= now - lastTime;
97     lastTime = now;
98     }
99     else
100     f = ecs.take();
101     }
102     if (f != null) {
103     --active;
104     try {
105     return f.get();
106     } catch(InterruptedException ie) {
107     throw ie;
108     } catch(ExecutionException eex) {
109     ee = eex;
110     } catch(RuntimeException rex) {
111     ee = new ExecutionException(rex);
112     }
113 dl 1.7 }
114     }
115 dl 1.15
116     if (ee == null)
117     ee = new ExecutionException();
118     throw ee;
119    
120 dl 1.3 } finally {
121 dl 1.6 for (Future<T> f : futures)
122 dl 1.4 f.cancel(true);
123 dl 1.3 }
124     }
125    
126 dl 1.15 public <T> T invokeAny(Collection<Callable<T>> tasks)
127     throws InterruptedException, ExecutionException {
128     try {
129     return doInvokeAny(tasks, false, 0);
130     } catch (TimeoutException cannotHappen) {
131     assert false;
132     return null;
133     }
134     }
135    
136 dl 1.7 public <T> T invokeAny(Collection<Callable<T>> tasks,
137     long timeout, TimeUnit unit)
138     throws InterruptedException, ExecutionException, TimeoutException {
139 dl 1.15 return doInvokeAny(tasks, true, unit.toNanos(timeout));
140 dl 1.3 }
141    
142 dl 1.9 public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
143 dl 1.3 throws InterruptedException {
144     if (tasks == null)
145     throw new NullPointerException();
146 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
147     boolean done = false;
148 dl 1.3 try {
149     for (Callable<T> t : tasks) {
150 dl 1.6 FutureTask<T> f = new FutureTask<T>(t);
151 dl 1.3 futures.add(f);
152     execute(f);
153     }
154 dl 1.6 for (Future<T> f : futures) {
155     if (!f.isDone()) {
156     try {
157     f.get();
158     } catch(CancellationException ignore) {
159     } catch(ExecutionException ignore) {
160     }
161     }
162     }
163     done = true;
164 dl 1.3 return futures;
165     } finally {
166 dl 1.6 if (!done)
167 dl 1.4 for (Future<T> f : futures)
168     f.cancel(true);
169 dl 1.3 }
170     }
171    
172 dl 1.9 public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
173 dl 1.6 long timeout, TimeUnit unit)
174 dl 1.3 throws InterruptedException {
175     if (tasks == null || unit == null)
176     throw new NullPointerException();
177     long nanos = unit.toNanos(timeout);
178 dl 1.6 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
179     boolean done = false;
180 dl 1.3 try {
181 dl 1.16 for (Callable<T> t : tasks)
182     futures.add(new FutureTask<T>(t));
183    
184     long lastTime = System.nanoTime();
185    
186     // Interleave time checks and calls to execute in case
187     // executor doesn't have any/much parallelism.
188     Iterator<Future<T>> it = futures.iterator();
189     while (it.hasNext()) {
190     execute((Runnable)(it.next()));
191     long now = System.nanoTime();
192     nanos -= now - lastTime;
193     lastTime = now;
194     if (nanos <= 0)
195     return futures;
196 dl 1.3 }
197 dl 1.16
198 dl 1.6 for (Future<T> f : futures) {
199     if (!f.isDone()) {
200 dl 1.16 if (nanos <= 0)
201 dl 1.6 return futures;
202     try {
203     f.get(nanos, TimeUnit.NANOSECONDS);
204     } catch(CancellationException ignore) {
205     } catch(ExecutionException ignore) {
206     } catch(TimeoutException toe) {
207     return futures;
208     }
209 dl 1.8 long now = System.nanoTime();
210     nanos -= now - lastTime;
211     lastTime = now;
212 dl 1.6 }
213     }
214     done = true;
215 dl 1.3 return futures;
216     } finally {
217 dl 1.6 if (!done)
218 dl 1.4 for (Future<T> f : futures)
219     f.cancel(true);
220 dl 1.3 }
221     }
222    
223 tim 1.1 }