7 |
|
package java.util.concurrent; |
8 |
|
import java.util.concurrent.locks.*; |
9 |
|
|
10 |
– |
|
10 |
|
/** |
11 |
|
* A cancellable asynchronous computation. This class provides a base |
12 |
|
* implementation of {@link Future}, with methods to start and cancel |
31 |
|
* @param <V> The result type returned by this FutureTask's <tt>get</tt> method |
32 |
|
*/ |
33 |
|
public class FutureTask<V> implements Future<V>, Runnable { |
34 |
< |
/** |
35 |
< |
* Special value for "runState" indicating task is cancelled |
37 |
< |
*/ |
38 |
< |
private static final Object CANCELLED = new Object(); |
39 |
< |
|
40 |
< |
/** |
41 |
< |
* Special value for "runState" indicating task is completed |
42 |
< |
*/ |
43 |
< |
private static final Object DONE = new Object(); |
44 |
< |
|
45 |
< |
/** |
46 |
< |
* Holds the run-state, taking on values: |
47 |
< |
* null = not yet started, |
48 |
< |
* [some thread ref] = running, |
49 |
< |
* DONE = completed normally, |
50 |
< |
* CANCELLED = cancelled (may or may not have ever run). |
51 |
< |
*/ |
52 |
< |
private volatile Object runState; |
53 |
< |
|
54 |
< |
/** The underlying callable */ |
55 |
< |
private final Callable<V> callable; |
56 |
< |
/** The result to return from get() */ |
57 |
< |
private V result; |
58 |
< |
/** The exception to throw from get() */ |
59 |
< |
private Throwable exception; |
60 |
< |
|
61 |
< |
private final ReentrantLock lock = new ReentrantLock(); |
62 |
< |
private final Condition accessible = lock.newCondition(); |
34 |
> |
/** Synchronization control for FutureTask */ |
35 |
> |
private final Sync sync; |
36 |
|
|
37 |
|
/** |
38 |
|
* Constructs a <tt>FutureTask</tt> that will upon running, execute the |
44 |
|
public FutureTask(Callable<V> callable) { |
45 |
|
if (callable == null) |
46 |
|
throw new NullPointerException(); |
47 |
< |
this.callable = callable; |
47 |
> |
sync = new Sync(callable); |
48 |
|
} |
49 |
|
|
50 |
|
/** |
60 |
|
* @throws NullPointerException if runnable is null |
61 |
|
*/ |
62 |
|
public FutureTask(Runnable runnable, V result) { |
63 |
< |
this.callable = Executors.callable(runnable, result); |
63 |
> |
sync = new Sync(Executors.callable(runnable, result)); |
64 |
|
} |
65 |
|
|
66 |
|
public boolean isCancelled() { |
67 |
< |
return runState == CANCELLED; |
67 |
> |
return sync.doIsCancelled(); |
68 |
|
} |
69 |
|
|
70 |
|
public boolean isDone() { |
71 |
< |
Object r = runState; |
99 |
< |
return r == DONE || r == CANCELLED; |
71 |
> |
return sync.doIsDone(); |
72 |
|
} |
73 |
|
|
74 |
|
public boolean cancel(boolean mayInterruptIfRunning) { |
75 |
< |
final ReentrantLock lock = this.lock; |
104 |
< |
Thread interruptThread = null; |
105 |
< |
lock.lock(); |
106 |
< |
try { |
107 |
< |
Object r = runState; |
108 |
< |
if (r == DONE || r == CANCELLED) |
109 |
< |
return false; |
110 |
< |
if (mayInterruptIfRunning && r != null && r instanceof Thread) |
111 |
< |
interruptThread = (Thread)r; |
112 |
< |
accessible.signalAll(); |
113 |
< |
runState = CANCELLED; |
114 |
< |
} |
115 |
< |
finally{ |
116 |
< |
lock.unlock(); |
117 |
< |
} |
118 |
< |
if (interruptThread != null) |
119 |
< |
interruptThread.interrupt(); |
120 |
< |
done(); |
121 |
< |
return true; |
75 |
> |
return sync.doCancel(mayInterruptIfRunning); |
76 |
|
} |
77 |
|
|
78 |
|
/** |
88 |
|
* while waiting |
89 |
|
*/ |
90 |
|
public V get() throws InterruptedException, ExecutionException { |
91 |
< |
final ReentrantLock lock = this.lock; |
138 |
< |
lock.lock(); |
139 |
< |
try { |
140 |
< |
for (;;) { |
141 |
< |
Object r = runState; |
142 |
< |
if (r == CANCELLED) |
143 |
< |
throw new CancellationException(); |
144 |
< |
if (r == DONE) { |
145 |
< |
if (exception != null) |
146 |
< |
throw new ExecutionException(exception); |
147 |
< |
else |
148 |
< |
return result; |
149 |
< |
} |
150 |
< |
accessible.await(); |
151 |
< |
} |
152 |
< |
} finally { |
153 |
< |
lock.unlock(); |
154 |
< |
} |
91 |
> |
return sync.doGet(); |
92 |
|
} |
93 |
|
|
94 |
|
/** |
108 |
|
*/ |
109 |
|
public V get(long timeout, TimeUnit unit) |
110 |
|
throws InterruptedException, ExecutionException, TimeoutException { |
111 |
< |
long nanos = unit.toNanos(timeout); |
175 |
< |
final ReentrantLock lock = this.lock; |
176 |
< |
lock.lock(); |
177 |
< |
try { |
178 |
< |
for (;;) { |
179 |
< |
Object r = runState; |
180 |
< |
if (r == CANCELLED) |
181 |
< |
throw new CancellationException(); |
182 |
< |
if (r == DONE) { |
183 |
< |
if (exception != null) |
184 |
< |
throw new ExecutionException(exception); |
185 |
< |
else |
186 |
< |
return result; |
187 |
< |
} |
188 |
< |
if (nanos <= 0) |
189 |
< |
throw new TimeoutException(); |
190 |
< |
nanos = accessible.awaitNanos(nanos); |
191 |
< |
} |
192 |
< |
} finally { |
193 |
< |
lock.unlock(); |
194 |
< |
} |
111 |
> |
return sync.doGet(unit.toNanos(timeout)); |
112 |
|
} |
113 |
|
|
114 |
|
/** |
124 |
|
|
125 |
|
/** |
126 |
|
* Sets the result of this Future to the given value unless |
127 |
< |
* this future has been cancelled |
127 |
> |
* this future has already been set or has been cancelled |
128 |
|
* @param v the value |
129 |
|
*/ |
130 |
|
protected void set(V v) { |
131 |
< |
final ReentrantLock lock = this.lock; |
215 |
< |
lock.lock(); |
216 |
< |
try { |
217 |
< |
if (runState == CANCELLED) |
218 |
< |
return; |
219 |
< |
result = v; |
220 |
< |
accessible.signalAll(); |
221 |
< |
runState = DONE; |
222 |
< |
} finally { |
223 |
< |
lock.unlock(); |
224 |
< |
} |
225 |
< |
done(); |
131 |
> |
sync.doSet(v); |
132 |
|
} |
133 |
|
|
134 |
|
/** |
135 |
|
* Causes this future to report an <tt>ExecutionException</tt> |
136 |
|
* with the given throwable as its cause, unless this Future has |
137 |
< |
* been cancelled. |
137 |
> |
* already been set or has been cancelled. |
138 |
|
* @param t the cause of failure. |
139 |
|
*/ |
140 |
|
protected void setException(Throwable t) { |
141 |
< |
final ReentrantLock lock = this.lock; |
236 |
< |
lock.lock(); |
237 |
< |
try { |
238 |
< |
if (runState == CANCELLED) |
239 |
< |
return; |
240 |
< |
exception = t; |
241 |
< |
accessible.signalAll(); |
242 |
< |
runState = DONE; |
243 |
< |
} finally { |
244 |
< |
lock.unlock(); |
245 |
< |
} |
246 |
< |
done(); |
141 |
> |
sync.doSetException(t); |
142 |
|
} |
143 |
|
|
144 |
|
/** |
145 |
< |
* Attempts to set the state of this task to Running, succeeding |
146 |
< |
* only if the state is currently NOT Done, Running, or Cancelled. |
147 |
< |
* @return true if successful |
148 |
< |
*/ |
149 |
< |
private boolean setRunning() { |
255 |
< |
final ReentrantLock lock = this.lock; |
256 |
< |
lock.lock(); |
257 |
< |
try { |
258 |
< |
if (runState != null) |
259 |
< |
return false; |
260 |
< |
runState = Thread.currentThread(); |
261 |
< |
return true; |
262 |
< |
} |
263 |
< |
finally { |
264 |
< |
lock.unlock(); |
265 |
< |
} |
145 |
> |
* Sets this Future to the result of computation unless |
146 |
> |
* it has been cancelled. |
147 |
> |
*/ |
148 |
> |
public void run() { |
149 |
> |
sync.doRun(); |
150 |
|
} |
151 |
|
|
152 |
|
/** |
153 |
< |
* Resets the run state of this task to its initial state unless |
154 |
< |
* it has been cancelled. (Note that a cancelled task cannot be |
155 |
< |
* reset.) |
156 |
< |
* @return true if successful |
153 |
> |
* Executes the computation without setting result, and then |
154 |
> |
* resets this Future to initial state; failing to do so if the |
155 |
> |
* computation encounters an exception or is cancelled. This is |
156 |
> |
* designed for use with tasks that intrinsically execute more |
157 |
> |
* than once. |
158 |
> |
* @return true if successfully run and reset |
159 |
|
*/ |
160 |
< |
private boolean reset() { |
161 |
< |
final ReentrantLock lock = this.lock; |
276 |
< |
lock.lock(); |
277 |
< |
try { |
278 |
< |
if (runState == CANCELLED) |
279 |
< |
return false; |
280 |
< |
runState = null; |
281 |
< |
return true; |
282 |
< |
} |
283 |
< |
finally { |
284 |
< |
lock.unlock(); |
285 |
< |
} |
160 |
> |
protected boolean runAndReset() { |
161 |
> |
return sync.doRunAndReset(); |
162 |
|
} |
163 |
|
|
164 |
|
/** |
165 |
< |
* Sets this Future to the result of computation unless |
166 |
< |
* it has been cancelled. |
165 |
> |
* Synchronization control for FutureTask. Note that this must be |
166 |
> |
* a non-static inner class in order to invoke protected |
167 |
> |
* <tt>done</tt> method. For clarity, all inner class support |
168 |
> |
* methods are same as outer, prefixed with "do". |
169 |
> |
* |
170 |
> |
* Uses AQS sync state to represent run status |
171 |
|
*/ |
172 |
< |
public void run() { |
173 |
< |
if (setRunning()) { |
172 |
> |
private final class Sync extends AbstractQueuedSynchronizer { |
173 |
> |
/** State value representing that task is running */ |
174 |
> |
private static final int RUNNING = 1; |
175 |
> |
/** State value representing that task ran */ |
176 |
> |
private static final int RAN = 2; |
177 |
> |
/** State value representing that task was cancelled */ |
178 |
> |
private static final int CANCELLED = 4; |
179 |
> |
|
180 |
> |
/** The underlying callable */ |
181 |
> |
private final Callable<V> callable; |
182 |
> |
/** The result to return from get() */ |
183 |
> |
private V result; |
184 |
> |
/** The exception to throw from get() */ |
185 |
> |
private Throwable exception; |
186 |
> |
|
187 |
> |
/** |
188 |
> |
* The thread running task. When nulled after set/cancel, this |
189 |
> |
* indicates that the results are accessible. Must be |
190 |
> |
* volatile, to serve as write barrier on completion. |
191 |
> |
*/ |
192 |
> |
private volatile Thread runner; |
193 |
> |
|
194 |
> |
Sync(Callable<V> callable) { |
195 |
> |
this.callable = callable; |
196 |
> |
} |
197 |
> |
|
198 |
> |
private boolean ranOrCancelled(int state) { |
199 |
> |
return (state & (RAN | CANCELLED)) != 0; |
200 |
> |
} |
201 |
> |
|
202 |
> |
/** |
203 |
> |
* Implements AQS base acquire to succeed if Done/cancelled |
204 |
> |
*/ |
205 |
> |
protected int tryAcquireSharedState(boolean b, int ignore) { |
206 |
> |
return doIsDone()? 1 : -1; |
207 |
> |
} |
208 |
> |
|
209 |
> |
/** |
210 |
> |
* Implements AQS base release to always signal after setting |
211 |
> |
* final done status by nulling runner thread. |
212 |
> |
*/ |
213 |
> |
protected boolean releaseSharedState(int ignore) { |
214 |
> |
runner = null; |
215 |
> |
return true; |
216 |
> |
} |
217 |
> |
|
218 |
> |
boolean doIsCancelled() { |
219 |
> |
return getState() == CANCELLED; |
220 |
> |
} |
221 |
> |
|
222 |
> |
boolean doIsDone() { |
223 |
> |
return ranOrCancelled(getState()) && runner == null; |
224 |
> |
} |
225 |
> |
|
226 |
> |
V doGet() throws InterruptedException, ExecutionException { |
227 |
> |
acquireSharedInterruptibly(0); |
228 |
> |
if (getState() == CANCELLED) |
229 |
> |
throw new CancellationException(); |
230 |
> |
if (exception != null) |
231 |
> |
throw new ExecutionException(exception); |
232 |
> |
return result; |
233 |
> |
} |
234 |
> |
|
235 |
> |
V doGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { |
236 |
> |
if (!acquireSharedTimed(0, nanosTimeout)) |
237 |
> |
throw new TimeoutException(); |
238 |
> |
if (getState() == CANCELLED) |
239 |
> |
throw new CancellationException(); |
240 |
> |
if (exception != null) |
241 |
> |
throw new ExecutionException(exception); |
242 |
> |
return result; |
243 |
> |
} |
244 |
> |
|
245 |
> |
void doSet(V v) { |
246 |
> |
int s = getState(); |
247 |
> |
if (ranOrCancelled(s) || !compareAndSetState(s, RAN)) |
248 |
> |
return; |
249 |
> |
result = v; |
250 |
> |
releaseShared(0); |
251 |
> |
done(); |
252 |
> |
} |
253 |
> |
|
254 |
> |
void doSetException(Throwable t) { |
255 |
> |
int s = getState(); |
256 |
> |
if (ranOrCancelled(s) || !compareAndSetState(s, RAN)) |
257 |
> |
return; |
258 |
> |
exception = t; |
259 |
> |
result = null; |
260 |
> |
releaseShared(0); |
261 |
> |
done(); |
262 |
> |
} |
263 |
> |
|
264 |
> |
boolean doCancel(boolean mayInterruptIfRunning) { |
265 |
> |
int s = getState(); |
266 |
> |
if (ranOrCancelled(s) || !compareAndSetState(s, CANCELLED)) |
267 |
> |
return false; |
268 |
> |
if (mayInterruptIfRunning) { |
269 |
> |
Thread r = runner; |
270 |
> |
if (r != null) |
271 |
> |
r.interrupt(); |
272 |
> |
} |
273 |
> |
releaseShared(0); |
274 |
> |
done(); |
275 |
> |
return true; |
276 |
> |
} |
277 |
> |
|
278 |
> |
void doRun() { |
279 |
> |
if (!compareAndSetState(0, RUNNING)) |
280 |
> |
return; |
281 |
|
try { |
282 |
< |
set(callable.call()); |
282 |
> |
runner = Thread.currentThread(); |
283 |
> |
doSet(callable.call()); |
284 |
|
} catch(Throwable ex) { |
285 |
< |
setException(ex); |
286 |
< |
} |
285 |
> |
doSetException(ex); |
286 |
> |
} |
287 |
|
} |
300 |
– |
} |
288 |
|
|
289 |
< |
/** |
290 |
< |
* Executes the computation and then resets this Future to initial |
291 |
< |
* state; failing to do so if the computation encounters an |
305 |
< |
* exception or is cancelled. This is designed for use with tasks |
306 |
< |
* that intrinsically execute more than once. |
307 |
< |
* @return true if successfully run and reset |
308 |
< |
*/ |
309 |
< |
protected boolean runAndReset() { |
310 |
< |
if (setRunning()) { |
289 |
> |
boolean doRunAndReset() { |
290 |
> |
if (!compareAndSetState(0, RUNNING)) |
291 |
> |
return false; |
292 |
|
try { |
293 |
< |
// don't bother to set result; it can't be accessed |
294 |
< |
callable.call(); |
295 |
< |
return reset(); |
293 |
> |
runner = Thread.currentThread(); |
294 |
> |
callable.call(); // don't set result |
295 |
> |
runner = null; |
296 |
> |
return compareAndSetState(RUNNING, 0); |
297 |
|
} catch(Throwable ex) { |
298 |
< |
setException(ex); |
298 |
> |
doSetException(ex); |
299 |
> |
return false; |
300 |
|
} |
301 |
|
} |
319 |
– |
return false; |
302 |
|
} |
321 |
– |
|
303 |
|
} |
323 |
– |
|