ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FutureTask.java
Revision: 1.62
Committed: Fri Jun 17 14:43:54 2011 UTC (12 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.61: +251 -175 lines
Log Message:
Reimplement to avoid surprises about interrupt status on cancellation

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.LockSupport;
9
10 /**
11 * A cancellable asynchronous computation. This class provides a base
12 * implementation of {@link Future}, with methods to start and cancel
13 * a computation, query to see if the computation is complete, and
14 * retrieve the result of the computation. The result can only be
15 * retrieved when the computation has completed; the <tt>get</tt>
16 * method will block if the computation has not yet completed. Once
17 * the computation has completed, the computation cannot be restarted
18 * or cancelled.
19 *
20 * <p>A <tt>FutureTask</tt> can be used to wrap a {@link Callable} or
21 * {@link java.lang.Runnable} object. Because <tt>FutureTask</tt>
22 * implements <tt>Runnable</tt>, a <tt>FutureTask</tt> can be
23 * submitted to an {@link Executor} for execution.
24 *
25 * <p>In addition to serving as a standalone class, this class provides
26 * <tt>protected</tt> functionality that may be useful when creating
27 * customized task classes.
28 *
29 * @since 1.5
30 * @author Doug Lea
31 * @param <V> The result type returned by this FutureTask's <tt>get</tt> method
32 */
33 public class FutureTask<V> implements RunnableFuture<V> {
34 /*
35 * Revision notes: This differs from previous versions of this
36 * class that relied on AbstractQueuedSynchronizer, mainly to
37 * avoid surprising users about retaining interrupt status during
38 * cancellation races. Sync control in the current design relies
39 * on a "state" field updated via CAS to track completion, along
40 * with a simple Treiber stack to hold waiting threads.
41 *
42 * Style note: As usual, we bypass overhead of using
43 * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
44 */
45
46 /**
47 * The run state of this task, initially 0. The run state
48 * transitions to NORMAL, EXCEPTIONAL, or CANCELLED (only) in
49 * method setCompletion. During setCompletion, state may take on
50 * transient values of COMPLETING (while outcome is being set) or
51 * INTERRUPTING (while interrupting the runner). State values
52 * are ordered and set to powers of two to simplify checks.
53 */
54 private volatile int state;
55 private static final int COMPLETING = 0x01;
56 private static final int INTERRUPTING = 0x02;
57 private static final int NORMAL = 0x04;
58 private static final int EXCEPTIONAL = 0x08;
59 private static final int CANCELLED = 0x10;
60
61 /** The result to return or exception to throw from get() */
62 private Object outcome; // non-volatile, protected by state reads/writes
63 /** The thread running the callable; CASed during run() */
64 private volatile Thread runner;
65 /** The underlying callable */
66 private final Callable<V> callable;
67 /** Treiber stack of waiting threads */
68 private volatile WaitNode waiters;
69
70 /**
71 * Sets completion status, unless already completed. If
72 * necessary, we first set state to COMPLETING or INTERRUPTING to
73 * establish precedence. This intentionally stalls (just via
74 * yields) in (uncommon) cases of concurrent calls during
75 * cancellation until state is set, to avoid surprising users
76 * during cancellation races.
77 *
78 * @param x the outcome
79 * @param mode the completion state value
80 * @return true if this call caused transtion from 0 to completed
81 */
82 private boolean setCompletion(Object x, int mode) {
83 Thread r = runner;
84 if (r == Thread.currentThread()) // null out runner on completion
85 UNSAFE.putObject(this, runnerOffset, r = null); // nonvolatile OK
86 int next = ((mode == INTERRUPTING) ? // set up transient states
87 (r != null) ? INTERRUPTING : CANCELLED :
88 (x != null) ? COMPLETING : mode);
89 for (int s;;) {
90 if ((s = state) == 0) {
91 if (UNSAFE.compareAndSwapInt(this, stateOffset, 0, next)) {
92 if (next == INTERRUPTING) {
93 Thread t = runner; // recheck
94 if (t != null)
95 t.interrupt();
96 state = CANCELLED;
97 }
98 else if (next == COMPLETING) {
99 outcome = x;
100 state = mode;
101 }
102 if (waiters != null)
103 releaseAll();
104 done();
105 return true;
106 }
107 }
108 else if (s == INTERRUPTING)
109 Thread.yield(); // wait out cancellation
110 else
111 return false;
112 }
113 }
114
115 /**
116 * Returns result or throws exception for completed task
117 * @param s completed state value
118 */
119 private V report(int s) throws ExecutionException {
120 Object x = outcome;
121 if (s == NORMAL)
122 return (V)x;
123 if ((s & (CANCELLED | INTERRUPTING)) != 0)
124 throw new CancellationException();
125 throw new ExecutionException((Throwable)x);
126 }
127
128 /**
129 * Creates a <tt>FutureTask</tt> that will, upon running, execute the
130 * given <tt>Callable</tt>.
131 *
132 * @param callable the callable task
133 * @throws NullPointerException if callable is null
134 */
135 public FutureTask(Callable<V> callable) {
136 if (callable == null)
137 throw new NullPointerException();
138 this.callable = callable;
139 }
140
141 /**
142 * Creates a <tt>FutureTask</tt> that will, upon running, execute the
143 * given <tt>Runnable</tt>, and arrange that <tt>get</tt> will return the
144 * given result on successful completion.
145 *
146 * @param runnable the runnable task
147 * @param result the result to return on successful completion. If
148 * you don't need a particular result, consider using
149 * constructions of the form:
150 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
151 * @throws NullPointerException if runnable is null
152 */
153 public FutureTask(Runnable runnable, V result) {
154 this.callable = Executors.callable(runnable, result);
155 }
156
157 public boolean isCancelled() {
158 return (state & (CANCELLED | INTERRUPTING)) != 0;
159 }
160
161 public boolean isDone() {
162 return state != 0;
163 }
164
165 public boolean cancel(boolean mayInterruptIfRunning) {
166 return state == 0 &&
167 setCompletion(null, mayInterruptIfRunning ?
168 INTERRUPTING : CANCELLED);
169 }
170
171 /**
172 * @throws CancellationException {@inheritDoc}
173 */
174 public V get() throws InterruptedException, ExecutionException {
175 int s;
176 return report((s = state) > COMPLETING ? s : awaitDone(false, 0L));
177 }
178
179 /**
180 * @throws CancellationException {@inheritDoc}
181 */
182 public V get(long timeout, TimeUnit unit)
183 throws InterruptedException, ExecutionException, TimeoutException {
184 int s;
185 long nanos = unit.toNanos(timeout);
186 if ((s = state) <= COMPLETING &&
187 (s = awaitDone(true, nanos)) <= COMPLETING)
188 throw new TimeoutException();
189 return report(s);
190 }
191
192 /**
193 * Protected method invoked when this task transitions to state
194 * <tt>isDone</tt> (whether normally or via cancellation). The
195 * default implementation does nothing. Subclasses may override
196 * this method to invoke completion callbacks or perform
197 * bookkeeping. Note that you can query status inside the
198 * implementation of this method to determine whether this task
199 * has been cancelled.
200 */
201 protected void done() { }
202
203 /**
204 * Sets the result of this Future to the given value unless
205 * this future has already been set or has been cancelled.
206 * This method is invoked internally by the <tt>run</tt> method
207 * upon successful completion of the computation.
208 * @param v the value
209 */
210 protected void set(V v) {
211 setCompletion(v, NORMAL);
212 }
213
214 /**
215 * Causes this future to report an <tt>ExecutionException</tt>
216 * with the given throwable as its cause, unless this Future has
217 * already been set or has been cancelled.
218 * This method is invoked internally by the <tt>run</tt> method
219 * upon failure of the computation.
220 * @param t the cause of failure
221 */
222 protected void setException(Throwable t) {
223 setCompletion(t, EXCEPTIONAL);
224 }
225
226 public void run() {
227 Thread r = Thread.currentThread();
228 if (state == 0 &&
229 UNSAFE.compareAndSwapObject(this, runnerOffset, null, r)) {
230 V result;
231 try {
232 result = callable.call();
233 } catch (Throwable ex) {
234 setException(ex);
235 return;
236 }
237 set(result);
238 }
239 }
240
241 /**
242 * Executes the computation without setting its result, and then
243 * resets this Future to initial state, failing to do so if the
244 * computation encounters an exception or is cancelled. This is
245 * designed for use with tasks that intrinsically execute more
246 * than once.
247 * @return true if successfully run and reset
248 */
249 protected boolean runAndReset() {
250 Thread r = Thread.currentThread();
251 if (state != 0 ||
252 !UNSAFE.compareAndSwapObject(this, runnerOffset, null, r))
253 return false;
254 try {
255 callable.call(); // don't set result
256 } catch (Throwable ex) {
257 setException(ex);
258 return false;
259 }
260 runner = null;
261 for (;;) {
262 int s = state;
263 if (s == 0)
264 return true;
265 if (s != INTERRUPTING)
266 return false;
267 Thread.yield(); // wait out racing cancellation
268 }
269 }
270
271 /**
272 * Simple linked list nodes to record waiting threads in a Treiber
273 * stack. See other classes such as Phaser and SynchronousQueue
274 * for more detailed explanation.
275 */
276 static final class WaitNode {
277 volatile Thread thread;
278 WaitNode next;
279 }
280
281 /**
282 * Removes and signals all waiting threads
283 */
284 private void releaseAll() {
285 WaitNode q;
286 while ((q = waiters) != null) {
287 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
288 for (;;) {
289 Thread t = q.thread;
290 if (t != null) {
291 q.thread = null;
292 LockSupport.unpark(t);
293 }
294 WaitNode next = q.next;
295 if (next == null)
296 return;
297 q.next = null; // unlink to help gc
298 q = next;
299 }
300 }
301 }
302 }
303
304 /**
305 * Awaits completion or aborts on interrupt of timeout
306 * @param timed true if use timed waits
307 * @param nanos time to wait if timed
308 * @return state upon completion
309 */
310 private int awaitDone(boolean timed, long nanos)
311 throws InterruptedException {
312 long last = timed? System.nanoTime() : 0L;
313 WaitNode q = null;
314 boolean queued = false;
315 for (int s;;) {
316 if (Thread.interrupted()) {
317 removeWaiter(q);
318 throw new InterruptedException();
319 }
320 else if ((s = state) > COMPLETING) {
321 if (q != null)
322 q.thread = null;
323 return s;
324 }
325 else if (q == null)
326 q = new WaitNode();
327 else if (!queued)
328 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
329 q.next = waiters, q);
330 else if (q.thread == null)
331 q.thread = Thread.currentThread();
332 else if (timed) {
333 long now = System.nanoTime();
334 if ((nanos -= (now - last)) <= 0L) {
335 removeWaiter(q);
336 return state;
337 }
338 last = now;
339 LockSupport.parkNanos(this, nanos);
340 }
341 else
342 LockSupport.park(this);
343 }
344 }
345
346 /**
347 * Try to unlink a timed-out or interrupted wait node to avoid
348 * accumulating garbage. Internal nodes are simply unspliced
349 * without CAS since it is harmless if they are traversed anyway
350 * by releasers or concurrent calls to removeWaiter.
351 */
352 private void removeWaiter(WaitNode node) {
353 if (node != null) {
354 node.thread = null;
355 WaitNode pred = null;
356 WaitNode q = waiters;
357 while (q != null) {
358 WaitNode next = node.next;
359 if (q != node) {
360 pred = q;
361 q = next;
362 }
363 else if (pred != null) {
364 pred.next = next;
365 break;
366 }
367 else if (UNSAFE.compareAndSwapObject(this, waitersOffset,
368 q, next))
369 break;
370 else { // restart on CAS failure
371 pred = null;
372 q = waiters;
373 }
374 }
375 }
376 }
377
378 // Unsafe mechanics
379 private static final sun.misc.Unsafe UNSAFE;
380 private static final long stateOffset;
381 private static final long runnerOffset;
382 private static final long waitersOffset;
383 static {
384 try {
385 UNSAFE = sun.misc.Unsafe.getUnsafe();
386 Class<?> k = FutureTask.class;
387 stateOffset = UNSAFE.objectFieldOffset
388 (k.getDeclaredField("state"));
389 runnerOffset = UNSAFE.objectFieldOffset
390 (k.getDeclaredField("runner"));
391 waitersOffset = UNSAFE.objectFieldOffset
392 (k.getDeclaredField("waiters"));
393 } catch (Exception e) {
394 throw new Error(e);
395 }
396 }
397
398 }