ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/FutureTask.java
Revision: 1.2
Committed: Fri Oct 3 23:48:09 2014 UTC (9 years, 8 months ago) by jsr166
Branch: MAIN
Changes since 1.1: +48 -35 lines
Log Message:
backport fixes from src/main

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.concurrent.locks.LockSupport;
9    
10     /**
11     * A cancellable asynchronous computation. This class provides a base
12     * implementation of {@link Future}, with methods to start and cancel
13     * a computation, query to see if the computation is complete, and
14     * retrieve the result of the computation. The result can only be
15     * retrieved when the computation has completed; the {@code get}
16     * methods will block if the computation has not yet completed. Once
17     * the computation has completed, the computation cannot be restarted
18     * or cancelled (unless the computation is invoked using
19     * {@link #runAndReset}).
20     *
21     * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
22     * {@link Runnable} object. Because {@code FutureTask} implements
23     * {@code Runnable}, a {@code FutureTask} can be submitted to an
24     * {@link Executor} for execution.
25     *
26     * <p>In addition to serving as a standalone class, this class provides
27     * {@code protected} functionality that may be useful when creating
28     * customized task classes.
29     *
30     * @since 1.5
31     * @author Doug Lea
32     * @param <V> The result type returned by this FutureTask's {@code get} methods
33     */
34     public class FutureTask<V> implements RunnableFuture<V> {
35     /*
36     * Revision notes: This differs from previous versions of this
37     * class that relied on AbstractQueuedSynchronizer, mainly to
38     * avoid surprising users about retaining interrupt status during
39     * cancellation races. Sync control in the current design relies
40     * on a "state" field updated via CAS to track completion, along
41     * with a simple Treiber stack to hold waiting threads.
42     *
43     * Style note: As usual, we bypass overhead of using
44     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
45     */
46    
47     /**
48     * The run state of this task, initially NEW. The run state
49     * transitions to a terminal state only in methods set,
50     * setException, and cancel. During completion, state may take on
51     * transient values of COMPLETING (while outcome is being set) or
52     * INTERRUPTING (only while interrupting the runner to satisfy a
53     * cancel(true)). Transitions from these intermediate to final
54     * states use cheaper ordered/lazy writes because values are unique
55     * and cannot be further modified.
56     *
57     * Possible state transitions:
58     * NEW -> COMPLETING -> NORMAL
59     * NEW -> COMPLETING -> EXCEPTIONAL
60     * NEW -> CANCELLED
61     * NEW -> INTERRUPTING -> INTERRUPTED
62     */
63     private volatile int state;
64     private static final int NEW = 0;
65     private static final int COMPLETING = 1;
66     private static final int NORMAL = 2;
67     private static final int EXCEPTIONAL = 3;
68     private static final int CANCELLED = 4;
69     private static final int INTERRUPTING = 5;
70     private static final int INTERRUPTED = 6;
71    
72     /** The underlying callable; nulled out after running */
73     private Callable<V> callable;
74     /** The result to return or exception to throw from get() */
75     private Object outcome; // non-volatile, protected by state reads/writes
76     /** The thread running the callable; CASed during run() */
77     private volatile Thread runner;
78     /** Treiber stack of waiting threads */
79     private volatile WaitNode waiters;
80    
81     /**
82     * Returns result or throws exception for completed task.
83     *
84     * @param s completed state value
85     */
86     @SuppressWarnings("unchecked")
87     private V report(int s) throws ExecutionException {
88     Object x = outcome;
89     if (s == NORMAL)
90     return (V)x;
91     if (s >= CANCELLED)
92     throw new CancellationException();
93     throw new ExecutionException((Throwable)x);
94     }
95    
96     /**
97     * Creates a {@code FutureTask} that will, upon running, execute the
98     * given {@code Callable}.
99     *
100     * @param callable the callable task
101     * @throws NullPointerException if the callable is null
102     */
103     public FutureTask(Callable<V> callable) {
104     if (callable == null)
105     throw new NullPointerException();
106     this.callable = callable;
107     this.state = NEW; // ensure visibility of callable
108     }
109    
110     /**
111     * Creates a {@code FutureTask} that will, upon running, execute the
112     * given {@code Runnable}, and arrange that {@code get} will return the
113     * given result on successful completion.
114     *
115     * @param runnable the runnable task
116     * @param result the result to return on successful completion. If
117     * you don't need a particular result, consider using
118     * constructions of the form:
119     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
120     * @throws NullPointerException if the runnable is null
121     */
122     public FutureTask(Runnable runnable, V result) {
123     this.callable = Executors.callable(runnable, result);
124     this.state = NEW; // ensure visibility of callable
125     }
126    
127     public boolean isCancelled() {
128     return state >= CANCELLED;
129     }
130    
131     public boolean isDone() {
132     return state != NEW;
133     }
134    
135     public boolean cancel(boolean mayInterruptIfRunning) {
136     if (!(state == NEW &&
137 jsr166 1.2 U.compareAndSwapInt(this, STATE, NEW,
138 dl 1.1 mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
139     return false;
140     try { // in case call to interrupt throws exception
141     if (mayInterruptIfRunning) {
142     try {
143     Thread t = runner;
144     if (t != null)
145     t.interrupt();
146     } finally { // final state
147 jsr166 1.2 U.putOrderedInt(this, STATE, INTERRUPTED);
148 dl 1.1 }
149     }
150     } finally {
151     finishCompletion();
152     }
153     return true;
154     }
155    
156     /**
157     * @throws CancellationException {@inheritDoc}
158     */
159     public V get() throws InterruptedException, ExecutionException {
160     int s = state;
161     if (s <= COMPLETING)
162     s = awaitDone(false, 0L);
163     return report(s);
164     }
165    
166     /**
167     * @throws CancellationException {@inheritDoc}
168     */
169     public V get(long timeout, TimeUnit unit)
170     throws InterruptedException, ExecutionException, TimeoutException {
171     if (unit == null)
172     throw new NullPointerException();
173     int s = state;
174     if (s <= COMPLETING &&
175     (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
176     throw new TimeoutException();
177     return report(s);
178     }
179    
180     /**
181     * Protected method invoked when this task transitions to state
182     * {@code isDone} (whether normally or via cancellation). The
183     * default implementation does nothing. Subclasses may override
184     * this method to invoke completion callbacks or perform
185     * bookkeeping. Note that you can query status inside the
186     * implementation of this method to determine whether this task
187     * has been cancelled.
188     */
189     protected void done() { }
190    
191     /**
192     * Sets the result of this future to the given value unless
193     * this future has already been set or has been cancelled.
194     *
195     * <p>This method is invoked internally by the {@link #run} method
196     * upon successful completion of the computation.
197     *
198     * @param v the value
199     */
200     protected void set(V v) {
201 jsr166 1.2 if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
202 dl 1.1 outcome = v;
203 jsr166 1.2 U.putOrderedInt(this, STATE, NORMAL); // final state
204 dl 1.1 finishCompletion();
205     }
206     }
207    
208     /**
209     * Causes this future to report an {@link ExecutionException}
210     * with the given throwable as its cause, unless this future has
211     * already been set or has been cancelled.
212     *
213     * <p>This method is invoked internally by the {@link #run} method
214     * upon failure of the computation.
215     *
216     * @param t the cause of failure
217     */
218     protected void setException(Throwable t) {
219 jsr166 1.2 if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
220 dl 1.1 outcome = t;
221 jsr166 1.2 U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
222 dl 1.1 finishCompletion();
223     }
224     }
225    
226     public void run() {
227     if (state != NEW ||
228 jsr166 1.2 !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
229 dl 1.1 return;
230     try {
231     Callable<V> c = callable;
232     if (c != null && state == NEW) {
233     V result;
234     boolean ran;
235     try {
236     result = c.call();
237     ran = true;
238     } catch (Throwable ex) {
239     result = null;
240     ran = false;
241     setException(ex);
242     }
243     if (ran)
244     set(result);
245     }
246     } finally {
247     // runner must be non-null until state is settled to
248     // prevent concurrent calls to run()
249     runner = null;
250     // state must be re-read after nulling runner to prevent
251     // leaked interrupts
252     int s = state;
253     if (s >= INTERRUPTING)
254     handlePossibleCancellationInterrupt(s);
255     }
256     }
257    
258     /**
259     * Executes the computation without setting its result, and then
260     * resets this future to initial state, failing to do so if the
261     * computation encounters an exception or is cancelled. This is
262     * designed for use with tasks that intrinsically execute more
263     * than once.
264     *
265 jsr166 1.2 * @return {@code true} if successfully run and reset
266 dl 1.1 */
267     protected boolean runAndReset() {
268     if (state != NEW ||
269 jsr166 1.2 !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
270 dl 1.1 return false;
271     boolean ran = false;
272     int s = state;
273     try {
274     Callable<V> c = callable;
275     if (c != null && s == NEW) {
276     try {
277     c.call(); // don't set result
278     ran = true;
279     } catch (Throwable ex) {
280     setException(ex);
281     }
282     }
283     } finally {
284     // runner must be non-null until state is settled to
285     // prevent concurrent calls to run()
286     runner = null;
287     // state must be re-read after nulling runner to prevent
288     // leaked interrupts
289     s = state;
290     if (s >= INTERRUPTING)
291     handlePossibleCancellationInterrupt(s);
292     }
293     return ran && s == NEW;
294     }
295    
296     /**
297     * Ensures that any interrupt from a possible cancel(true) is only
298     * delivered to a task while in run or runAndReset.
299     */
300     private void handlePossibleCancellationInterrupt(int s) {
301     // It is possible for our interrupter to stall before getting a
302     // chance to interrupt us. Let's spin-wait patiently.
303     if (s == INTERRUPTING)
304     while (state == INTERRUPTING)
305     Thread.yield(); // wait out pending interrupt
306    
307     // assert state == INTERRUPTED;
308    
309     // We want to clear any interrupt we may have received from
310     // cancel(true). However, it is permissible to use interrupts
311     // as an independent mechanism for a task to communicate with
312     // its caller, and there is no way to clear only the
313     // cancellation interrupt.
314     //
315     // Thread.interrupted();
316     }
317    
318     /**
319     * Simple linked list nodes to record waiting threads in a Treiber
320     * stack. See other classes such as Phaser and SynchronousQueue
321     * for more detailed explanation.
322     */
323     static final class WaitNode {
324     volatile Thread thread;
325     volatile WaitNode next;
326     WaitNode() { thread = Thread.currentThread(); }
327     }
328    
329     /**
330     * Removes and signals all waiting threads, invokes done(), and
331     * nulls out callable.
332     */
333     private void finishCompletion() {
334     // assert state > COMPLETING;
335     for (WaitNode q; (q = waiters) != null;) {
336 jsr166 1.2 if (U.compareAndSwapObject(this, WAITERS, q, null)) {
337 dl 1.1 for (;;) {
338     Thread t = q.thread;
339     if (t != null) {
340     q.thread = null;
341     LockSupport.unpark(t);
342     }
343     WaitNode next = q.next;
344     if (next == null)
345     break;
346     q.next = null; // unlink to help gc
347     q = next;
348     }
349     break;
350     }
351     }
352    
353     done();
354    
355     callable = null; // to reduce footprint
356     }
357    
358     /**
359     * Awaits completion or aborts on interrupt or timeout.
360     *
361     * @param timed true if use timed waits
362     * @param nanos time to wait, if timed
363 jsr166 1.2 * @return state upon completion or at timeout
364 dl 1.1 */
365     private int awaitDone(boolean timed, long nanos)
366     throws InterruptedException {
367 jsr166 1.2 // The code below is very delicate, to achieve these goals:
368     // - call nanoTime exactly once for each call to park
369     // - if nanos <= 0, return promptly without allocation or nanoTime
370     // - if nanos == Long.MIN_VALUE, don't underflow
371     // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
372     // and we suffer a spurious wakeup, we will do no worse than
373     // to park-spin for a while
374     long startTime = 0L; // Special value 0L means not yet parked
375 dl 1.1 WaitNode q = null;
376     boolean queued = false;
377     for (;;) {
378     if (Thread.interrupted()) {
379     removeWaiter(q);
380     throw new InterruptedException();
381     }
382    
383     int s = state;
384     if (s > COMPLETING) {
385     if (q != null)
386     q.thread = null;
387     return s;
388     }
389     else if (s == COMPLETING) // cannot time out yet
390     Thread.yield();
391 jsr166 1.2 else if (q == null) {
392     if (timed && nanos <= 0L)
393     return s;
394 dl 1.1 q = new WaitNode();
395 jsr166 1.2 }
396 dl 1.1 else if (!queued)
397 jsr166 1.2 queued = U.compareAndSwapObject(this, WAITERS,
398     q.next = waiters, q);
399 dl 1.1 else if (timed) {
400 jsr166 1.2 final long parkNanos;
401     if (startTime == 0L) { // first time
402     startTime = System.nanoTime();
403     if (startTime == 0L)
404     startTime = 1L;
405     parkNanos = nanos;
406     } else {
407     long elapsed = System.nanoTime() - startTime;
408     if (elapsed >= nanos) {
409     removeWaiter(q);
410     return state;
411     }
412     parkNanos = nanos - elapsed;
413 dl 1.1 }
414 jsr166 1.2 LockSupport.parkNanos(this, parkNanos);
415 dl 1.1 }
416     else
417     LockSupport.park(this);
418     }
419     }
420    
421     /**
422     * Tries to unlink a timed-out or interrupted wait node to avoid
423     * accumulating garbage. Internal nodes are simply unspliced
424     * without CAS since it is harmless if they are traversed anyway
425     * by releasers. To avoid effects of unsplicing from already
426     * removed nodes, the list is retraversed in case of an apparent
427     * race. This is slow when there are a lot of nodes, but we don't
428     * expect lists to be long enough to outweigh higher-overhead
429     * schemes.
430     */
431     private void removeWaiter(WaitNode node) {
432     if (node != null) {
433     node.thread = null;
434     retry:
435     for (;;) { // restart on removeWaiter race
436     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
437     s = q.next;
438     if (q.thread != null)
439     pred = q;
440     else if (pred != null) {
441     pred.next = s;
442     if (pred.thread == null) // check for race
443     continue retry;
444     }
445 jsr166 1.2 else if (!U.compareAndSwapObject(this, WAITERS, q, s))
446 dl 1.1 continue retry;
447     }
448     break;
449     }
450     }
451     }
452    
453     // Unsafe mechanics
454 jsr166 1.2 private static final sun.misc.Unsafe U;
455     private static final long STATE;
456     private static final long RUNNER;
457     private static final long WAITERS;
458 dl 1.1 static {
459     try {
460 jsr166 1.2 U = sun.misc.Unsafe.getUnsafe();
461 dl 1.1 Class<?> k = FutureTask.class;
462 jsr166 1.2 STATE = U.objectFieldOffset(k.getDeclaredField("state"));
463     RUNNER = U.objectFieldOffset(k.getDeclaredField("runner"));
464     WAITERS = U.objectFieldOffset(k.getDeclaredField("waiters"));
465 dl 1.1 } catch (Exception e) {
466     throw new Error(e);
467     }
468     }
469    
470     }