ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FutureTask.java
Revision: 1.70
Committed: Fri Jun 17 22:54:47 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.69: +8 -9 lines
Log Message:
use ordinary ordered integers for state values

File Contents

# User Rev Content
1 tim 1.1 /*
2 dl 1.2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 dl 1.23 * Expert Group and released to the public domain, as explained at
4 jsr166 1.59 * http://creativecommons.org/publicdomain/zero/1.0/
5 tim 1.1 */
6    
7     package java.util.concurrent;
8 dl 1.62 import java.util.concurrent.locks.LockSupport;
9 dl 1.13
10 tim 1.1 /**
11 dl 1.8 * A cancellable asynchronous computation. This class provides a base
12     * implementation of {@link Future}, with methods to start and cancel
13     * a computation, query to see if the computation is complete, and
14 dl 1.4 * retrieve the result of the computation. The result can only be
15 jsr166 1.64 * retrieved when the computation has completed; the {@code get}
16     * methods will block if the computation has not yet completed. Once
17 dl 1.8 * the computation has completed, the computation cannot be restarted
18 jsr166 1.64 * or cancelled (unless the computation is invoked using
19     * {@link #runAndReset}).
20 tim 1.1 *
21 jsr166 1.64 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
22     * {@link Runnable} object. Because {@code FutureTask} implements
23     * {@code Runnable}, a {@code FutureTask} can be submitted to an
24     * {@link Executor} for execution.
25 tim 1.1 *
26 dl 1.14 * <p>In addition to serving as a standalone class, this class provides
27 jsr166 1.64 * {@code protected} functionality that may be useful when creating
28 dl 1.14 * customized task classes.
29     *
30 tim 1.1 * @since 1.5
31 dl 1.4 * @author Doug Lea
32 jsr166 1.64 * @param <V> The result type returned by this FutureTask's {@code get} methods
33 tim 1.1 */
34 peierls 1.39 public class FutureTask<V> implements RunnableFuture<V> {
35 dl 1.62 /*
36     * Revision notes: This differs from previous versions of this
37     * class that relied on AbstractQueuedSynchronizer, mainly to
38     * avoid surprising users about retaining interrupt status during
39     * cancellation races. Sync control in the current design relies
40     * on a "state" field updated via CAS to track completion, along
41     * with a simple Treiber stack to hold waiting threads.
42     *
43     * Style note: As usual, we bypass overhead of using
44     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
45     */
46    
47     /**
48 jsr166 1.67 * The run state of this task, initially UNDECIDED. The run state
49 jsr166 1.69 * transitions to a terminal state only in method setCompletion.
50     * During setCompletion, state may take on transient values of
51     * COMPLETING (while outcome is being set) or INTERRUPTING (only
52     * while interrupting the runner to satisfy a cancel(true)).
53 jsr166 1.70 * State values are highly order-dependent to simplify checks.
54 jsr166 1.69 *
55     * Possible state transitions:
56     * UNDECIDED -> COMPLETING -> NORMAL
57     * UNDECIDED -> COMPLETING -> EXCEPTIONAL
58     * UNDECIDED -> CANCELLED
59     * UNDECIDED -> INTERRUPTING -> INTERRUPTED
60 dl 1.62 */
61     private volatile int state;
62 jsr166 1.70 private static final int UNDECIDED = 0;
63     private static final int COMPLETING = 1;
64     private static final int NORMAL = 2;
65     private static final int EXCEPTIONAL = 3;
66     private static final int CANCELLED = 4;
67     private static final int INTERRUPTING = 5;
68     private static final int INTERRUPTED = 6;
69 dl 1.62
70 jsr166 1.64 /** The underlying callable */
71     private final Callable<V> callable;
72 dl 1.62 /** The result to return or exception to throw from get() */
73     private Object outcome; // non-volatile, protected by state reads/writes
74     /** The thread running the callable; CASed during run() */
75     private volatile Thread runner;
76     /** Treiber stack of waiting threads */
77     private volatile WaitNode waiters;
78    
79     /**
80     * Sets completion status, unless already completed. If
81 jsr166 1.69 * necessary, we first set state to transient states COMPLETING
82     * or INTERRUPTING to establish precedence.
83 dl 1.62 *
84     * @param x the outcome
85     * @param mode the completion state value
86 jsr166 1.67 * @return true if this call caused transition from UNDECIDED to completed
87 dl 1.62 */
88     private boolean setCompletion(Object x, int mode) {
89 jsr166 1.69 // set up transient states
90     int next = (x != null) ? COMPLETING : mode;
91     if (!UNSAFE.compareAndSwapInt(this, stateOffset, UNDECIDED, next))
92 jsr166 1.68 return false;
93     if (next == INTERRUPTING) {
94 jsr166 1.69 Thread t = runner; // recheck to avoid leaked interrupt
95 jsr166 1.68 if (t != null)
96     t.interrupt();
97 jsr166 1.69 state = INTERRUPTED;
98 jsr166 1.68 }
99     else if (next == COMPLETING) {
100     outcome = x;
101     state = mode;
102 dl 1.62 }
103 jsr166 1.68 if (waiters != null)
104     releaseAll();
105     done();
106     return true;
107 dl 1.62 }
108    
109     /**
110 jsr166 1.64 * Returns result or throws exception for completed task.
111     *
112 dl 1.62 * @param s completed state value
113     */
114     private V report(int s) throws ExecutionException {
115     Object x = outcome;
116     if (s == NORMAL)
117     return (V)x;
118 jsr166 1.69 if (s >= CANCELLED)
119 dl 1.62 throw new CancellationException();
120     throw new ExecutionException((Throwable)x);
121     }
122 dl 1.11
123 tim 1.1 /**
124 jsr166 1.64 * Creates a {@code FutureTask} that will, upon running, execute the
125     * given {@code Callable}.
126 tim 1.1 *
127     * @param callable the callable task
128 dl 1.9 * @throws NullPointerException if callable is null
129 tim 1.1 */
130     public FutureTask(Callable<V> callable) {
131 dl 1.9 if (callable == null)
132     throw new NullPointerException();
133 dl 1.62 this.callable = callable;
134 tim 1.1 }
135    
136     /**
137 jsr166 1.64 * Creates a {@code FutureTask} that will, upon running, execute the
138     * given {@code Runnable}, and arrange that {@code get} will return the
139 tim 1.1 * given result on successful completion.
140     *
141 jsr166 1.54 * @param runnable the runnable task
142 tim 1.1 * @param result the result to return on successful completion. If
143 dl 1.9 * you don't need a particular result, consider using
144 dl 1.16 * constructions of the form:
145 jsr166 1.58 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
146 dl 1.9 * @throws NullPointerException if runnable is null
147 tim 1.1 */
148 dl 1.15 public FutureTask(Runnable runnable, V result) {
149 dl 1.62 this.callable = Executors.callable(runnable, result);
150 dl 1.20 }
151    
152     public boolean isCancelled() {
153 jsr166 1.69 return state >= CANCELLED;
154 dl 1.20 }
155 jsr166 1.35
156 dl 1.20 public boolean isDone() {
157 jsr166 1.67 return state != UNDECIDED;
158 dl 1.13 }
159    
160     public boolean cancel(boolean mayInterruptIfRunning) {
161 jsr166 1.67 return state == UNDECIDED &&
162 jsr166 1.69 setCompletion(null, (mayInterruptIfRunning && runner != null) ?
163 dl 1.62 INTERRUPTING : CANCELLED);
164 dl 1.13 }
165 jsr166 1.35
166 jsr166 1.43 /**
167     * @throws CancellationException {@inheritDoc}
168     */
169 dl 1.2 public V get() throws InterruptedException, ExecutionException {
170 jsr166 1.64 int s = state;
171     if (s <= COMPLETING)
172     s = awaitDone(false, 0L);
173     return report(s);
174 tim 1.1 }
175    
176 jsr166 1.43 /**
177     * @throws CancellationException {@inheritDoc}
178     */
179 dl 1.2 public V get(long timeout, TimeUnit unit)
180 tim 1.1 throws InterruptedException, ExecutionException, TimeoutException {
181 jsr166 1.64 int s = state;
182     if (s <= COMPLETING &&
183     (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
184 dl 1.62 throw new TimeoutException();
185     return report(s);
186 tim 1.1 }
187    
188     /**
189 dl 1.20 * Protected method invoked when this task transitions to state
190 jsr166 1.64 * {@code isDone} (whether normally or via cancellation). The
191 dl 1.20 * default implementation does nothing. Subclasses may override
192     * this method to invoke completion callbacks or perform
193     * bookkeeping. Note that you can query status inside the
194     * implementation of this method to determine whether this task
195     * has been cancelled.
196     */
197     protected void done() { }
198    
199     /**
200 jsr166 1.64 * Sets the result of this future to the given value unless
201 dl 1.29 * this future has already been set or has been cancelled.
202 jsr166 1.64 *
203     * <p>This method is invoked internally by the {@link #run} method
204 dl 1.40 * upon successful completion of the computation.
205 jsr166 1.64 *
206 tim 1.1 * @param v the value
207 jsr166 1.35 */
208 dl 1.2 protected void set(V v) {
209 dl 1.62 setCompletion(v, NORMAL);
210 tim 1.1 }
211    
212     /**
213 jsr166 1.64 * Causes this future to report an {@link ExecutionException}
214     * with the given throwable as its cause, unless this future has
215 dl 1.24 * already been set or has been cancelled.
216 jsr166 1.64 *
217     * <p>This method is invoked internally by the {@link #run} method
218 dl 1.40 * upon failure of the computation.
219 jsr166 1.64 *
220 jsr166 1.41 * @param t the cause of failure
221 jsr166 1.35 */
222 dl 1.2 protected void setException(Throwable t) {
223 dl 1.62 setCompletion(t, EXCEPTIONAL);
224 tim 1.1 }
225 jsr166 1.35
226 dl 1.24 public void run() {
227 jsr166 1.68 if (state != UNDECIDED ||
228     !UNSAFE.compareAndSwapObject(this, runnerOffset,
229     null, Thread.currentThread()))
230     return;
231    
232     try {
233 dl 1.62 V result;
234     try {
235     result = callable.call();
236     } catch (Throwable ex) {
237     setException(ex);
238     return;
239     }
240     set(result);
241 jsr166 1.68 } finally {
242     runner = null;
243 jsr166 1.69 int s = state;
244     if (s >= INTERRUPTING) {
245     while ((s = state) == INTERRUPTING)
246     Thread.yield(); // wait out pending cancellation interrupt
247     Thread.interrupted(); // clear any interrupt from cancel(true)
248     }
249 dl 1.62 }
250 dl 1.24 }
251    
252     /**
253 dl 1.30 * Executes the computation without setting its result, and then
254 jsr166 1.64 * resets this future to initial state, failing to do so if the
255 dl 1.24 * computation encounters an exception or is cancelled. This is
256     * designed for use with tasks that intrinsically execute more
257     * than once.
258 jsr166 1.64 *
259 dl 1.24 * @return true if successfully run and reset
260     */
261     protected boolean runAndReset() {
262 jsr166 1.67 if (state != UNDECIDED ||
263 jsr166 1.66 !UNSAFE.compareAndSwapObject(this, runnerOffset,
264     null, Thread.currentThread()))
265 dl 1.62 return false;
266 jsr166 1.68
267 dl 1.62 try {
268 jsr166 1.68 try {
269     callable.call(); // don't set result
270     return (state == UNDECIDED);
271     } catch (Throwable ex) {
272     setException(ex);
273 dl 1.62 return false;
274 jsr166 1.68 }
275     } finally {
276     runner = null;
277 jsr166 1.69 int s = state;
278     if (s >= INTERRUPTING) {
279     while ((s = state) == INTERRUPTING)
280     Thread.yield(); // wait out pending cancellation interrupt
281     Thread.interrupted(); // clear any interrupt from cancel(true)
282     }
283 dl 1.62 }
284 dl 1.14 }
285 dl 1.3
286 dl 1.14 /**
287 dl 1.62 * Simple linked list nodes to record waiting threads in a Treiber
288 jsr166 1.64 * stack. See other classes such as Phaser and SynchronousQueue
289 dl 1.62 * for more detailed explanation.
290 dl 1.20 */
291 dl 1.62 static final class WaitNode {
292     volatile Thread thread;
293     WaitNode next;
294     }
295 dl 1.42
296 dl 1.62 /**
297 jsr166 1.64 * Removes and signals all waiting threads.
298 dl 1.62 */
299     private void releaseAll() {
300     WaitNode q;
301     while ((q = waiters) != null) {
302     if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
303     for (;;) {
304     Thread t = q.thread;
305     if (t != null) {
306     q.thread = null;
307     LockSupport.unpark(t);
308     }
309     WaitNode next = q.next;
310     if (next == null)
311     return;
312     q.next = null; // unlink to help gc
313     q = next;
314     }
315     }
316 dl 1.24 }
317 dl 1.62 }
318 dl 1.24
319 dl 1.62 /**
320 jsr166 1.64 * Awaits completion or aborts on interrupt or timeout.
321     *
322 dl 1.62 * @param timed true if use timed waits
323 jsr166 1.64 * @param nanos time to wait, if timed
324 dl 1.62 * @return state upon completion
325     */
326     private int awaitDone(boolean timed, long nanos)
327     throws InterruptedException {
328 jsr166 1.63 long last = timed ? System.nanoTime() : 0L;
329 dl 1.62 WaitNode q = null;
330     boolean queued = false;
331 jsr166 1.64 for (;;) {
332 dl 1.62 if (Thread.interrupted()) {
333     removeWaiter(q);
334     throw new InterruptedException();
335     }
336 jsr166 1.64
337     int s = state;
338     if (s > COMPLETING) {
339 dl 1.62 if (q != null)
340     q.thread = null;
341     return s;
342     }
343     else if (q == null)
344     q = new WaitNode();
345     else if (!queued)
346     queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
347     q.next = waiters, q);
348     else if (q.thread == null)
349     q.thread = Thread.currentThread();
350     else if (timed) {
351     long now = System.nanoTime();
352     if ((nanos -= (now - last)) <= 0L) {
353     removeWaiter(q);
354     return state;
355 dl 1.50 }
356 dl 1.62 last = now;
357     LockSupport.parkNanos(this, nanos);
358 dl 1.50 }
359 dl 1.62 else
360     LockSupport.park(this);
361 dl 1.24 }
362 dl 1.62 }
363 dl 1.24
364 dl 1.62 /**
365 jsr166 1.64 * Tries to unlink a timed-out or interrupted wait node to avoid
366     * accumulating garbage. Internal nodes are simply unspliced
367 dl 1.62 * without CAS since it is harmless if they are traversed anyway
368     * by releasers or concurrent calls to removeWaiter.
369     */
370     private void removeWaiter(WaitNode node) {
371     if (node != null) {
372     node.thread = null;
373     WaitNode pred = null;
374     WaitNode q = waiters;
375     while (q != null) {
376     WaitNode next = node.next;
377     if (q != node) {
378     pred = q;
379     q = next;
380 dl 1.50 }
381 dl 1.62 else if (pred != null) {
382     pred.next = next;
383     break;
384 dl 1.50 }
385 dl 1.62 else if (UNSAFE.compareAndSwapObject(this, waitersOffset,
386     q, next))
387 jsr166 1.56 break;
388 dl 1.62 else { // restart on CAS failure
389     pred = null;
390     q = waiters;
391 jsr166 1.55 }
392 jsr166 1.56 }
393 dl 1.14 }
394 dl 1.62 }
395 dl 1.14
396 dl 1.62 // Unsafe mechanics
397     private static final sun.misc.Unsafe UNSAFE;
398     private static final long stateOffset;
399     private static final long runnerOffset;
400     private static final long waitersOffset;
401     static {
402     try {
403     UNSAFE = sun.misc.Unsafe.getUnsafe();
404     Class<?> k = FutureTask.class;
405     stateOffset = UNSAFE.objectFieldOffset
406     (k.getDeclaredField("state"));
407     runnerOffset = UNSAFE.objectFieldOffset
408     (k.getDeclaredField("runner"));
409     waitersOffset = UNSAFE.objectFieldOffset
410     (k.getDeclaredField("waiters"));
411     } catch (Exception e) {
412     throw new Error(e);
413 dl 1.14 }
414 dl 1.15 }
415 dl 1.62
416 dl 1.15 }