ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk7/java/util/concurrent/FutureTask.java
Revision: 1.1
Committed: Sun Dec 16 20:55:16 2012 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Log Message:
Create src/jdk7 package

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.concurrent.locks.LockSupport;
9    
10     /**
11     * A cancellable asynchronous computation. This class provides a base
12     * implementation of {@link Future}, with methods to start and cancel
13     * a computation, query to see if the computation is complete, and
14     * retrieve the result of the computation. The result can only be
15     * retrieved when the computation has completed; the {@code get}
16     * methods will block if the computation has not yet completed. Once
17     * the computation has completed, the computation cannot be restarted
18     * or cancelled (unless the computation is invoked using
19     * {@link #runAndReset}).
20     *
21     * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
22     * {@link Runnable} object. Because {@code FutureTask} implements
23     * {@code Runnable}, a {@code FutureTask} can be submitted to an
24     * {@link Executor} for execution.
25     *
26     * <p>In addition to serving as a standalone class, this class provides
27     * {@code protected} functionality that may be useful when creating
28     * customized task classes.
29     *
30     * @since 1.5
31     * @author Doug Lea
32     * @param <V> The result type returned by this FutureTask's {@code get} methods
33     */
34     public class FutureTask<V> implements RunnableFuture<V> {
35     /*
36     * Revision notes: This differs from previous versions of this
37     * class that relied on AbstractQueuedSynchronizer, mainly to
38     * avoid surprising users about retaining interrupt status during
39     * cancellation races. Sync control in the current design relies
40     * on a "state" field updated via CAS to track completion, along
41     * with a simple Treiber stack to hold waiting threads.
42     *
43     * Style note: As usual, we bypass overhead of using
44     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
45     */
46    
47     /**
48     * The run state of this task, initially NEW. The run state
49     * transitions to a terminal state only in methods set,
50     * setException, and cancel. During completion, state may take on
51     * transient values of COMPLETING (while outcome is being set) or
52     * INTERRUPTING (only while interrupting the runner to satisfy a
53     * cancel(true)). Transitions from these intermediate to final
54     * states use cheaper ordered/lazy writes because values are unique
55     * and cannot be further modified.
56     *
57     * Possible state transitions:
58     * NEW -> COMPLETING -> NORMAL
59     * NEW -> COMPLETING -> EXCEPTIONAL
60     * NEW -> CANCELLED
61     * NEW -> INTERRUPTING -> INTERRUPTED
62     */
63     private volatile int state;
64     private static final int NEW = 0;
65     private static final int COMPLETING = 1;
66     private static final int NORMAL = 2;
67     private static final int EXCEPTIONAL = 3;
68     private static final int CANCELLED = 4;
69     private static final int INTERRUPTING = 5;
70     private static final int INTERRUPTED = 6;
71    
72     /** The underlying callable; nulled out after running */
73     private Callable<V> callable;
74     /** The result to return or exception to throw from get() */
75     private Object outcome; // non-volatile, protected by state reads/writes
76     /** The thread running the callable; CASed during run() */
77     private volatile Thread runner;
78     /** Treiber stack of waiting threads */
79     private volatile WaitNode waiters;
80    
81     /**
82     * Returns result or throws exception for completed task.
83     *
84     * @param s completed state value
85     */
86     @SuppressWarnings("unchecked")
87     private V report(int s) throws ExecutionException {
88     Object x = outcome;
89     if (s == NORMAL)
90     return (V)x;
91     if (s >= CANCELLED)
92     throw new CancellationException();
93     throw new ExecutionException((Throwable)x);
94     }
95    
96     /**
97     * Creates a {@code FutureTask} that will, upon running, execute the
98     * given {@code Callable}.
99     *
100     * @param callable the callable task
101     * @throws NullPointerException if the callable is null
102     */
103     public FutureTask(Callable<V> callable) {
104     if (callable == null)
105     throw new NullPointerException();
106     this.callable = callable;
107     this.state = NEW; // ensure visibility of callable
108     }
109    
110     /**
111     * Creates a {@code FutureTask} that will, upon running, execute the
112     * given {@code Runnable}, and arrange that {@code get} will return the
113     * given result on successful completion.
114     *
115     * @param runnable the runnable task
116     * @param result the result to return on successful completion. If
117     * you don't need a particular result, consider using
118     * constructions of the form:
119     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
120     * @throws NullPointerException if the runnable is null
121     */
122     public FutureTask(Runnable runnable, V result) {
123     this.callable = Executors.callable(runnable, result);
124     this.state = NEW; // ensure visibility of callable
125     }
126    
127     public boolean isCancelled() {
128     return state >= CANCELLED;
129     }
130    
131     public boolean isDone() {
132     return state != NEW;
133     }
134    
135     public boolean cancel(boolean mayInterruptIfRunning) {
136     if (!(state == NEW &&
137     UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
138     mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
139     return false;
140     try { // in case call to interrupt throws exception
141     if (mayInterruptIfRunning) {
142     try {
143     Thread t = runner;
144     if (t != null)
145     t.interrupt();
146     } finally { // final state
147     UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
148     }
149     }
150     } finally {
151     finishCompletion();
152     }
153     return true;
154     }
155    
156     /**
157     * @throws CancellationException {@inheritDoc}
158     */
159     public V get() throws InterruptedException, ExecutionException {
160     int s = state;
161     if (s <= COMPLETING)
162     s = awaitDone(false, 0L);
163     return report(s);
164     }
165    
166     /**
167     * @throws CancellationException {@inheritDoc}
168     */
169     public V get(long timeout, TimeUnit unit)
170     throws InterruptedException, ExecutionException, TimeoutException {
171     if (unit == null)
172     throw new NullPointerException();
173     int s = state;
174     if (s <= COMPLETING &&
175     (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
176     throw new TimeoutException();
177     return report(s);
178     }
179    
180     /**
181     * Protected method invoked when this task transitions to state
182     * {@code isDone} (whether normally or via cancellation). The
183     * default implementation does nothing. Subclasses may override
184     * this method to invoke completion callbacks or perform
185     * bookkeeping. Note that you can query status inside the
186     * implementation of this method to determine whether this task
187     * has been cancelled.
188     */
189     protected void done() { }
190    
191     /**
192     * Sets the result of this future to the given value unless
193     * this future has already been set or has been cancelled.
194     *
195     * <p>This method is invoked internally by the {@link #run} method
196     * upon successful completion of the computation.
197     *
198     * @param v the value
199     */
200     protected void set(V v) {
201     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
202     outcome = v;
203     UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
204     finishCompletion();
205     }
206     }
207    
208     /**
209     * Causes this future to report an {@link ExecutionException}
210     * with the given throwable as its cause, unless this future has
211     * already been set or has been cancelled.
212     *
213     * <p>This method is invoked internally by the {@link #run} method
214     * upon failure of the computation.
215     *
216     * @param t the cause of failure
217     */
218     protected void setException(Throwable t) {
219     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
220     outcome = t;
221     UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
222     finishCompletion();
223     }
224     }
225    
226     public void run() {
227     if (state != NEW ||
228     !UNSAFE.compareAndSwapObject(this, runnerOffset,
229     null, Thread.currentThread()))
230     return;
231     try {
232     Callable<V> c = callable;
233     if (c != null && state == NEW) {
234     V result;
235     boolean ran;
236     try {
237     result = c.call();
238     ran = true;
239     } catch (Throwable ex) {
240     result = null;
241     ran = false;
242     setException(ex);
243     }
244     if (ran)
245     set(result);
246     }
247     } finally {
248     // runner must be non-null until state is settled to
249     // prevent concurrent calls to run()
250     runner = null;
251     // state must be re-read after nulling runner to prevent
252     // leaked interrupts
253     int s = state;
254     if (s >= INTERRUPTING)
255     handlePossibleCancellationInterrupt(s);
256     }
257     }
258    
259     /**
260     * Executes the computation without setting its result, and then
261     * resets this future to initial state, failing to do so if the
262     * computation encounters an exception or is cancelled. This is
263     * designed for use with tasks that intrinsically execute more
264     * than once.
265     *
266     * @return true if successfully run and reset
267     */
268     protected boolean runAndReset() {
269     if (state != NEW ||
270     !UNSAFE.compareAndSwapObject(this, runnerOffset,
271     null, Thread.currentThread()))
272     return false;
273     boolean ran = false;
274     int s = state;
275     try {
276     Callable<V> c = callable;
277     if (c != null && s == NEW) {
278     try {
279     c.call(); // don't set result
280     ran = true;
281     } catch (Throwable ex) {
282     setException(ex);
283     }
284     }
285     } finally {
286     // runner must be non-null until state is settled to
287     // prevent concurrent calls to run()
288     runner = null;
289     // state must be re-read after nulling runner to prevent
290     // leaked interrupts
291     s = state;
292     if (s >= INTERRUPTING)
293     handlePossibleCancellationInterrupt(s);
294     }
295     return ran && s == NEW;
296     }
297    
298     /**
299     * Ensures that any interrupt from a possible cancel(true) is only
300     * delivered to a task while in run or runAndReset.
301     */
302     private void handlePossibleCancellationInterrupt(int s) {
303     // It is possible for our interrupter to stall before getting a
304     // chance to interrupt us. Let's spin-wait patiently.
305     if (s == INTERRUPTING)
306     while (state == INTERRUPTING)
307     Thread.yield(); // wait out pending interrupt
308    
309     // assert state == INTERRUPTED;
310    
311     // We want to clear any interrupt we may have received from
312     // cancel(true). However, it is permissible to use interrupts
313     // as an independent mechanism for a task to communicate with
314     // its caller, and there is no way to clear only the
315     // cancellation interrupt.
316     //
317     // Thread.interrupted();
318     }
319    
320     /**
321     * Simple linked list nodes to record waiting threads in a Treiber
322     * stack. See other classes such as Phaser and SynchronousQueue
323     * for more detailed explanation.
324     */
325     static final class WaitNode {
326     volatile Thread thread;
327     volatile WaitNode next;
328     WaitNode() { thread = Thread.currentThread(); }
329     }
330    
331     /**
332     * Removes and signals all waiting threads, invokes done(), and
333     * nulls out callable.
334     */
335     private void finishCompletion() {
336     // assert state > COMPLETING;
337     for (WaitNode q; (q = waiters) != null;) {
338     if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
339     for (;;) {
340     Thread t = q.thread;
341     if (t != null) {
342     q.thread = null;
343     LockSupport.unpark(t);
344     }
345     WaitNode next = q.next;
346     if (next == null)
347     break;
348     q.next = null; // unlink to help gc
349     q = next;
350     }
351     break;
352     }
353     }
354    
355     done();
356    
357     callable = null; // to reduce footprint
358     }
359    
360     /**
361     * Awaits completion or aborts on interrupt or timeout.
362     *
363     * @param timed true if use timed waits
364     * @param nanos time to wait, if timed
365     * @return state upon completion
366     */
367     private int awaitDone(boolean timed, long nanos)
368     throws InterruptedException {
369     final long deadline = timed ? System.nanoTime() + nanos : 0L;
370     WaitNode q = null;
371     boolean queued = false;
372     for (;;) {
373     if (Thread.interrupted()) {
374     removeWaiter(q);
375     throw new InterruptedException();
376     }
377    
378     int s = state;
379     if (s > COMPLETING) {
380     if (q != null)
381     q.thread = null;
382     return s;
383     }
384     else if (s == COMPLETING) // cannot time out yet
385     Thread.yield();
386     else if (q == null)
387     q = new WaitNode();
388     else if (!queued)
389     queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
390     q.next = waiters, q);
391     else if (timed) {
392     nanos = deadline - System.nanoTime();
393     if (nanos <= 0L) {
394     removeWaiter(q);
395     return state;
396     }
397     LockSupport.parkNanos(this, nanos);
398     }
399     else
400     LockSupport.park(this);
401     }
402     }
403    
404     /**
405     * Tries to unlink a timed-out or interrupted wait node to avoid
406     * accumulating garbage. Internal nodes are simply unspliced
407     * without CAS since it is harmless if they are traversed anyway
408     * by releasers. To avoid effects of unsplicing from already
409     * removed nodes, the list is retraversed in case of an apparent
410     * race. This is slow when there are a lot of nodes, but we don't
411     * expect lists to be long enough to outweigh higher-overhead
412     * schemes.
413     */
414     private void removeWaiter(WaitNode node) {
415     if (node != null) {
416     node.thread = null;
417     retry:
418     for (;;) { // restart on removeWaiter race
419     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
420     s = q.next;
421     if (q.thread != null)
422     pred = q;
423     else if (pred != null) {
424     pred.next = s;
425     if (pred.thread == null) // check for race
426     continue retry;
427     }
428     else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
429     q, s))
430     continue retry;
431     }
432     break;
433     }
434     }
435     }
436    
437     // Unsafe mechanics
438     private static final sun.misc.Unsafe UNSAFE;
439     private static final long stateOffset;
440     private static final long runnerOffset;
441     private static final long waitersOffset;
442     static {
443     try {
444     UNSAFE = sun.misc.Unsafe.getUnsafe();
445     Class<?> k = FutureTask.class;
446     stateOffset = UNSAFE.objectFieldOffset
447     (k.getDeclaredField("state"));
448     runnerOffset = UNSAFE.objectFieldOffset
449     (k.getDeclaredField("runner"));
450     waitersOffset = UNSAFE.objectFieldOffset
451     (k.getDeclaredField("waiters"));
452     } catch (Exception e) {
453     throw new Error(e);
454     }
455     }
456    
457     }