ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FutureTask.java
Revision: 1.71
Committed: Fri Jun 17 23:29:40 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.70: +10 -3 lines
Log Message:
fix race causing interrupt to not be delivered

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4 jsr166 1.59 * http://creativecommons.org/publicdomain/zero/1.0/
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.62 import java.util.concurrent.locks.LockSupport;
9 dl 1.13
10 tim 1.1 /**
11 dl 1.8 * A cancellable asynchronous computation. This class provides a base
12     * implementation of {@link Future}, with methods to start and cancel
13     * a computation, query to see if the computation is complete, and
14 dl 1.4 * retrieve the result of the computation. The result can only be
15 jsr166 1.64 * retrieved when the computation has completed; the {@code get}
16     * methods will block if the computation has not yet completed. Once
17 dl 1.8 * the computation has completed, the computation cannot be restarted
18 jsr166 1.64 * or cancelled (unless the computation is invoked using
19     * {@link #runAndReset}).
20 tim 1.1 *
21 jsr166 1.64 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
22     * {@link Runnable} object. Because {@code FutureTask} implements
23     * {@code Runnable}, a {@code FutureTask} can be submitted to an
24     * {@link Executor} for execution.
25 tim 1.1 *
26 dl 1.14 * <p>In addition to serving as a standalone class, this class provides
27 jsr166 1.64 * {@code protected} functionality that may be useful when creating
28 dl 1.14 * customized task classes.
29     *
30 tim 1.1 * @since 1.5
31 dl 1.4 * @author Doug Lea
32 jsr166 1.64 * @param <V> The result type returned by this FutureTask's {@code get} methods
33 tim 1.1 */
34 peierls 1.39 public class FutureTask<V> implements RunnableFuture<V> {
35 dl 1.62 /*
36     * Revision notes: This differs from previous versions of this
37     * class that relied on AbstractQueuedSynchronizer, mainly to
38     * avoid surprising users about retaining interrupt status during
39     * cancellation races. Sync control in the current design relies
40     * on a "state" field updated via CAS to track completion, along
41     * with a simple Treiber stack to hold waiting threads.
42     *
43     * Style note: As usual, we bypass overhead of using
44     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
45     */
46    
47     /**
48 jsr166 1.67 * The run state of this task, initially UNDECIDED. The run state
49 jsr166 1.69 * transitions to a terminal state only in method setCompletion.
50     * During setCompletion, state may take on transient values of
51     * COMPLETING (while outcome is being set) or INTERRUPTING (only
52     * while interrupting the runner to satisfy a cancel(true)).
53 jsr166 1.70 * State values are highly order-dependent to simplify checks.
54 jsr166 1.69 *
55     * Possible state transitions:
56     * UNDECIDED -> COMPLETING -> NORMAL
57     * UNDECIDED -> COMPLETING -> EXCEPTIONAL
58     * UNDECIDED -> CANCELLED
59     * UNDECIDED -> INTERRUPTING -> INTERRUPTED
60 dl 1.62 */
61     private volatile int state;
62 jsr166 1.70 private static final int UNDECIDED = 0;
63     private static final int COMPLETING = 1;
64     private static final int NORMAL = 2;
65     private static final int EXCEPTIONAL = 3;
66     private static final int CANCELLED = 4;
67     private static final int INTERRUPTING = 5;
68     private static final int INTERRUPTED = 6;
69 dl 1.62
70 jsr166 1.64 /** The underlying callable */
71     private final Callable<V> callable;
72 dl 1.62 /** The result to return or exception to throw from get() */
73     private Object outcome; // non-volatile, protected by state reads/writes
74     /** The thread running the callable; CASed during run() */
75     private volatile Thread runner;
76     /** Treiber stack of waiting threads */
77     private volatile WaitNode waiters;
78    
79     /**
80     * Sets completion status, unless already completed. If
81 jsr166 1.69 * necessary, we first set state to transient states COMPLETING
82     * or INTERRUPTING to establish precedence.
83 dl 1.62 *
84     * @param x the outcome
85     * @param mode the completion state value
86 jsr166 1.67 * @return true if this call caused transition from UNDECIDED to completed
87 dl 1.62 */
88     private boolean setCompletion(Object x, int mode) {
89 jsr166 1.69 // set up transient states
90     int next = (x != null) ? COMPLETING : mode;
91     if (!UNSAFE.compareAndSwapInt(this, stateOffset, UNDECIDED, next))
92 jsr166 1.68 return false;
93     if (next == INTERRUPTING) {
94 jsr166 1.71 // Must check after CAS to avoid missed interrupt
95     Thread t = runner;
96 jsr166 1.68 if (t != null)
97     t.interrupt();
98 jsr166 1.69 state = INTERRUPTED;
99 jsr166 1.68 }
100     else if (next == COMPLETING) {
101     outcome = x;
102     state = mode;
103 dl 1.62 }
104 jsr166 1.68 if (waiters != null)
105     releaseAll();
106     done();
107     return true;
108 dl 1.62 }
109    
110     /**
111 jsr166 1.64 * Returns result or throws exception for completed task.
112     *
113 dl 1.62 * @param s completed state value
114     */
115     private V report(int s) throws ExecutionException {
116     Object x = outcome;
117     if (s == NORMAL)
118     return (V)x;
119 jsr166 1.69 if (s >= CANCELLED)
120 dl 1.62 throw new CancellationException();
121     throw new ExecutionException((Throwable)x);
122     }
123 dl 1.11
124 tim 1.1 /**
125 jsr166 1.64 * Creates a {@code FutureTask} that will, upon running, execute the
126     * given {@code Callable}.
127 tim 1.1 *
128     * @param callable the callable task
129 dl 1.9 * @throws NullPointerException if callable is null
130 tim 1.1 */
131     public FutureTask(Callable<V> callable) {
132 dl 1.9 if (callable == null)
133     throw new NullPointerException();
134 dl 1.62 this.callable = callable;
135 tim 1.1 }
136    
137     /**
138 jsr166 1.64 * Creates a {@code FutureTask} that will, upon running, execute the
139     * given {@code Runnable}, and arrange that {@code get} will return the
140 tim 1.1 * given result on successful completion.
141     *
142 jsr166 1.54 * @param runnable the runnable task
143 tim 1.1 * @param result the result to return on successful completion. If
144 dl 1.9 * you don't need a particular result, consider using
145 dl 1.16 * constructions of the form:
146 jsr166 1.58 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
147 dl 1.9 * @throws NullPointerException if runnable is null
148 tim 1.1 */
149 dl 1.15 public FutureTask(Runnable runnable, V result) {
150 dl 1.62 this.callable = Executors.callable(runnable, result);
151 dl 1.20 }
152    
153     public boolean isCancelled() {
154 jsr166 1.69 return state >= CANCELLED;
155 dl 1.20 }
156 jsr166 1.35
157 dl 1.20 public boolean isDone() {
158 jsr166 1.67 return state != UNDECIDED;
159 dl 1.13 }
160    
161     public boolean cancel(boolean mayInterruptIfRunning) {
162 jsr166 1.67 return state == UNDECIDED &&
163 jsr166 1.71 setCompletion(null,
164     mayInterruptIfRunning ? INTERRUPTING : CANCELLED);
165 dl 1.13 }
166 jsr166 1.35
167 jsr166 1.43 /**
168     * @throws CancellationException {@inheritDoc}
169     */
170 dl 1.2 public V get() throws InterruptedException, ExecutionException {
171 jsr166 1.64 int s = state;
172     if (s <= COMPLETING)
173     s = awaitDone(false, 0L);
174     return report(s);
175 tim 1.1 }
176    
177 jsr166 1.43 /**
178     * @throws CancellationException {@inheritDoc}
179     */
180 dl 1.2 public V get(long timeout, TimeUnit unit)
181 tim 1.1 throws InterruptedException, ExecutionException, TimeoutException {
182 jsr166 1.64 int s = state;
183     if (s <= COMPLETING &&
184     (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
185 dl 1.62 throw new TimeoutException();
186     return report(s);
187 tim 1.1 }
188    
189     /**
190 dl 1.20 * Protected method invoked when this task transitions to state
191 jsr166 1.64 * {@code isDone} (whether normally or via cancellation). The
192 dl 1.20 * default implementation does nothing. Subclasses may override
193     * this method to invoke completion callbacks or perform
194     * bookkeeping. Note that you can query status inside the
195     * implementation of this method to determine whether this task
196     * has been cancelled.
197     */
198     protected void done() { }
199    
200     /**
201 jsr166 1.64 * Sets the result of this future to the given value unless
202 dl 1.29 * this future has already been set or has been cancelled.
203 jsr166 1.64 *
204     * <p>This method is invoked internally by the {@link #run} method
205 dl 1.40 * upon successful completion of the computation.
206 jsr166 1.64 *
207 tim 1.1 * @param v the value
208 jsr166 1.35 */
209 dl 1.2 protected void set(V v) {
210 dl 1.62 setCompletion(v, NORMAL);
211 tim 1.1 }
212    
213     /**
214 jsr166 1.64 * Causes this future to report an {@link ExecutionException}
215     * with the given throwable as its cause, unless this future has
216 dl 1.24 * already been set or has been cancelled.
217 jsr166 1.64 *
218     * <p>This method is invoked internally by the {@link #run} method
219 dl 1.40 * upon failure of the computation.
220 jsr166 1.64 *
221 jsr166 1.41 * @param t the cause of failure
222 jsr166 1.35 */
223 dl 1.2 protected void setException(Throwable t) {
224 dl 1.62 setCompletion(t, EXCEPTIONAL);
225 tim 1.1 }
226 jsr166 1.35
227 dl 1.24 public void run() {
228 jsr166 1.68 if (state != UNDECIDED ||
229     !UNSAFE.compareAndSwapObject(this, runnerOffset,
230     null, Thread.currentThread()))
231     return;
232    
233     try {
234 jsr166 1.71 // Recheck to avoid missed interrupt.
235     if (state != UNDECIDED)
236     return;
237 dl 1.62 V result;
238     try {
239     result = callable.call();
240     } catch (Throwable ex) {
241     setException(ex);
242     return;
243     }
244     set(result);
245 jsr166 1.68 } finally {
246     runner = null;
247 jsr166 1.69 int s = state;
248     if (s >= INTERRUPTING) {
249     while ((s = state) == INTERRUPTING)
250     Thread.yield(); // wait out pending cancellation interrupt
251     Thread.interrupted(); // clear any interrupt from cancel(true)
252     }
253 dl 1.62 }
254 dl 1.24 }
255    
256     /**
257 dl 1.30 * Executes the computation without setting its result, and then
258 jsr166 1.64 * resets this future to initial state, failing to do so if the
259 dl 1.24 * computation encounters an exception or is cancelled. This is
260     * designed for use with tasks that intrinsically execute more
261     * than once.
262 jsr166 1.64 *
263 dl 1.24 * @return true if successfully run and reset
264     */
265     protected boolean runAndReset() {
266 jsr166 1.67 if (state != UNDECIDED ||
267 jsr166 1.66 !UNSAFE.compareAndSwapObject(this, runnerOffset,
268     null, Thread.currentThread()))
269 dl 1.62 return false;
270 jsr166 1.68
271 dl 1.62 try {
272 jsr166 1.71 // Recheck to avoid missed interrupt.
273     if (state != UNDECIDED)
274     return false;
275 jsr166 1.68 try {
276     callable.call(); // don't set result
277     return (state == UNDECIDED);
278     } catch (Throwable ex) {
279     setException(ex);
280 dl 1.62 return false;
281 jsr166 1.68 }
282     } finally {
283     runner = null;
284 jsr166 1.69 int s = state;
285     if (s >= INTERRUPTING) {
286     while ((s = state) == INTERRUPTING)
287     Thread.yield(); // wait out pending cancellation interrupt
288     Thread.interrupted(); // clear any interrupt from cancel(true)
289     }
290 dl 1.62 }
291 dl 1.14 }
292 dl 1.3
293 dl 1.14 /**
294 dl 1.62 * Simple linked list nodes to record waiting threads in a Treiber
295 jsr166 1.64 * stack. See other classes such as Phaser and SynchronousQueue
296 dl 1.62 * for more detailed explanation.
297 dl 1.20 */
298 dl 1.62 static final class WaitNode {
299     volatile Thread thread;
300     WaitNode next;
301     }
302 dl 1.42
303 dl 1.62 /**
304 jsr166 1.64 * Removes and signals all waiting threads.
305 dl 1.62 */
306     private void releaseAll() {
307     WaitNode q;
308     while ((q = waiters) != null) {
309     if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
310     for (;;) {
311     Thread t = q.thread;
312     if (t != null) {
313     q.thread = null;
314     LockSupport.unpark(t);
315     }
316     WaitNode next = q.next;
317     if (next == null)
318     return;
319     q.next = null; // unlink to help gc
320     q = next;
321     }
322     }
323 dl 1.24 }
324 dl 1.62 }
325 dl 1.24
326 dl 1.62 /**
327 jsr166 1.64 * Awaits completion or aborts on interrupt or timeout.
328     *
329 dl 1.62 * @param timed true if use timed waits
330 jsr166 1.64 * @param nanos time to wait, if timed
331 dl 1.62 * @return state upon completion
332     */
333     private int awaitDone(boolean timed, long nanos)
334     throws InterruptedException {
335 jsr166 1.63 long last = timed ? System.nanoTime() : 0L;
336 dl 1.62 WaitNode q = null;
337     boolean queued = false;
338 jsr166 1.64 for (;;) {
339 dl 1.62 if (Thread.interrupted()) {
340     removeWaiter(q);
341     throw new InterruptedException();
342     }
343 jsr166 1.64
344     int s = state;
345     if (s > COMPLETING) {
346 dl 1.62 if (q != null)
347     q.thread = null;
348     return s;
349     }
350     else if (q == null)
351     q = new WaitNode();
352     else if (!queued)
353     queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
354     q.next = waiters, q);
355     else if (q.thread == null)
356     q.thread = Thread.currentThread();
357     else if (timed) {
358     long now = System.nanoTime();
359     if ((nanos -= (now - last)) <= 0L) {
360     removeWaiter(q);
361     return state;
362 dl 1.50 }
363 dl 1.62 last = now;
364     LockSupport.parkNanos(this, nanos);
365 dl 1.50 }
366 dl 1.62 else
367     LockSupport.park(this);
368 dl 1.24 }
369 dl 1.62 }
370 dl 1.24
371 dl 1.62 /**
372 jsr166 1.64 * Tries to unlink a timed-out or interrupted wait node to avoid
373     * accumulating garbage. Internal nodes are simply unspliced
374 dl 1.62 * without CAS since it is harmless if they are traversed anyway
375     * by releasers or concurrent calls to removeWaiter.
376     */
377     private void removeWaiter(WaitNode node) {
378     if (node != null) {
379     node.thread = null;
380     WaitNode pred = null;
381     WaitNode q = waiters;
382     while (q != null) {
383     WaitNode next = node.next;
384     if (q != node) {
385     pred = q;
386     q = next;
387 dl 1.50 }
388 dl 1.62 else if (pred != null) {
389     pred.next = next;
390     break;
391 dl 1.50 }
392 dl 1.62 else if (UNSAFE.compareAndSwapObject(this, waitersOffset,
393     q, next))
394 jsr166 1.56 break;
395 dl 1.62 else { // restart on CAS failure
396     pred = null;
397     q = waiters;
398 jsr166 1.55 }
399 jsr166 1.56 }
400 dl 1.14 }
401 dl 1.62 }
402 dl 1.14
403 dl 1.62 // Unsafe mechanics
404     private static final sun.misc.Unsafe UNSAFE;
405     private static final long stateOffset;
406     private static final long runnerOffset;
407     private static final long waitersOffset;
408     static {
409     try {
410     UNSAFE = sun.misc.Unsafe.getUnsafe();
411     Class<?> k = FutureTask.class;
412     stateOffset = UNSAFE.objectFieldOffset
413     (k.getDeclaredField("state"));
414     runnerOffset = UNSAFE.objectFieldOffset
415     (k.getDeclaredField("runner"));
416     waitersOffset = UNSAFE.objectFieldOffset
417     (k.getDeclaredField("waiters"));
418     } catch (Exception e) {
419     throw new Error(e);
420 dl 1.14 }
421 dl 1.15 }
422 dl 1.62
423 dl 1.15 }