ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FutureTask.java
Revision: 1.67
Committed: Fri Jun 17 21:07:12 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.66: +14 -12 lines
Log Message:
reintroduce a constant, UNDECIDED, for state 0

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.LockSupport;
9
10 /**
11 * A cancellable asynchronous computation. This class provides a base
12 * implementation of {@link Future}, with methods to start and cancel
13 * a computation, query to see if the computation is complete, and
14 * retrieve the result of the computation. The result can only be
15 * retrieved when the computation has completed; the {@code get}
16 * methods will block if the computation has not yet completed. Once
17 * the computation has completed, the computation cannot be restarted
18 * or cancelled (unless the computation is invoked using
19 * {@link #runAndReset}).
20 *
21 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
22 * {@link Runnable} object. Because {@code FutureTask} implements
23 * {@code Runnable}, a {@code FutureTask} can be submitted to an
24 * {@link Executor} for execution.
25 *
26 * <p>In addition to serving as a standalone class, this class provides
27 * {@code protected} functionality that may be useful when creating
28 * customized task classes.
29 *
30 * @since 1.5
31 * @author Doug Lea
32 * @param <V> The result type returned by this FutureTask's {@code get} methods
33 */
34 public class FutureTask<V> implements RunnableFuture<V> {
35 /*
36 * Revision notes: This differs from previous versions of this
37 * class that relied on AbstractQueuedSynchronizer, mainly to
38 * avoid surprising users about retaining interrupt status during
39 * cancellation races. Sync control in the current design relies
40 * on a "state" field updated via CAS to track completion, along
41 * with a simple Treiber stack to hold waiting threads.
42 *
43 * Style note: As usual, we bypass overhead of using
44 * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
45 */
46
47 /**
48 * The run state of this task, initially UNDECIDED. The run state
49 * transitions to NORMAL, EXCEPTIONAL, or CANCELLED (only) in
50 * method setCompletion. During setCompletion, state may take on
51 * transient values of COMPLETING (while outcome is being set) or
52 * INTERRUPTING (while interrupting the runner). State values are
53 * ordered and set to powers of two to simplify checks.
54 */
55 private volatile int state;
56 private static final int UNDECIDED = 0x00;
57 private static final int COMPLETING = 0x01;
58 private static final int INTERRUPTING = 0x02;
59 private static final int NORMAL = 0x04;
60 private static final int EXCEPTIONAL = 0x08;
61 private static final int CANCELLED = 0x10;
62
63 /** The underlying callable */
64 private final Callable<V> callable;
65 /** The result to return or exception to throw from get() */
66 private Object outcome; // non-volatile, protected by state reads/writes
67 /** The thread running the callable; CASed during run() */
68 private volatile Thread runner;
69 /** Treiber stack of waiting threads */
70 private volatile WaitNode waiters;
71
72 /**
73 * Sets completion status, unless already completed. If
74 * necessary, we first set state to COMPLETING or INTERRUPTING to
75 * establish precedence. This intentionally stalls (just via
76 * yields) in (uncommon) cases of concurrent calls during
77 * cancellation until state is set, to avoid surprising users
78 * during cancellation races.
79 *
80 * @param x the outcome
81 * @param mode the completion state value
82 * @return true if this call caused transition from UNDECIDED to completed
83 */
84 private boolean setCompletion(Object x, int mode) {
85 Thread r = runner;
86 if (r == Thread.currentThread()) // null out runner on completion
87 UNSAFE.putObject(this, runnerOffset, r = null); // nonvolatile OK
88 int next = ((mode == INTERRUPTING) ? // set up transient states
89 (r != null) ? INTERRUPTING : CANCELLED :
90 (x != null) ? COMPLETING : mode);
91 for (;;) {
92 int s = state;
93 if (s == UNDECIDED) {
94 if (UNSAFE.compareAndSwapInt(this, stateOffset,
95 UNDECIDED, next)) {
96 if (next == INTERRUPTING) {
97 Thread t = runner; // recheck
98 if (t != null)
99 t.interrupt();
100 state = CANCELLED;
101 }
102 else if (next == COMPLETING) {
103 outcome = x;
104 state = mode;
105 }
106 if (waiters != null)
107 releaseAll();
108 done();
109 return true;
110 }
111 }
112 else if (s == INTERRUPTING)
113 Thread.yield(); // wait out pending cancellation interrupt
114 else
115 return false;
116 }
117 }
118
119 /**
120 * Returns result or throws exception for completed task.
121 *
122 * @param s completed state value
123 */
124 private V report(int s) throws ExecutionException {
125 Object x = outcome;
126 if (s == NORMAL)
127 return (V)x;
128 if ((s & (CANCELLED | INTERRUPTING)) != 0)
129 throw new CancellationException();
130 throw new ExecutionException((Throwable)x);
131 }
132
133 /**
134 * Creates a {@code FutureTask} that will, upon running, execute the
135 * given {@code Callable}.
136 *
137 * @param callable the callable task
138 * @throws NullPointerException if callable is null
139 */
140 public FutureTask(Callable<V> callable) {
141 if (callable == null)
142 throw new NullPointerException();
143 this.callable = callable;
144 }
145
146 /**
147 * Creates a {@code FutureTask} that will, upon running, execute the
148 * given {@code Runnable}, and arrange that {@code get} will return the
149 * given result on successful completion.
150 *
151 * @param runnable the runnable task
152 * @param result the result to return on successful completion. If
153 * you don't need a particular result, consider using
154 * constructions of the form:
155 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
156 * @throws NullPointerException if runnable is null
157 */
158 public FutureTask(Runnable runnable, V result) {
159 this.callable = Executors.callable(runnable, result);
160 }
161
162 public boolean isCancelled() {
163 return (state & (CANCELLED | INTERRUPTING)) != 0;
164 }
165
166 public boolean isDone() {
167 return state != UNDECIDED;
168 }
169
170 public boolean cancel(boolean mayInterruptIfRunning) {
171 return state == UNDECIDED &&
172 setCompletion(null, mayInterruptIfRunning ?
173 INTERRUPTING : CANCELLED);
174 }
175
176 /**
177 * @throws CancellationException {@inheritDoc}
178 */
179 public V get() throws InterruptedException, ExecutionException {
180 int s = state;
181 if (s <= COMPLETING)
182 s = awaitDone(false, 0L);
183 return report(s);
184 }
185
186 /**
187 * @throws CancellationException {@inheritDoc}
188 */
189 public V get(long timeout, TimeUnit unit)
190 throws InterruptedException, ExecutionException, TimeoutException {
191 int s = state;
192 if (s <= COMPLETING &&
193 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
194 throw new TimeoutException();
195 return report(s);
196 }
197
198 /**
199 * Protected method invoked when this task transitions to state
200 * {@code isDone} (whether normally or via cancellation). The
201 * default implementation does nothing. Subclasses may override
202 * this method to invoke completion callbacks or perform
203 * bookkeeping. Note that you can query status inside the
204 * implementation of this method to determine whether this task
205 * has been cancelled.
206 */
207 protected void done() { }
208
209 /**
210 * Sets the result of this future to the given value unless
211 * this future has already been set or has been cancelled.
212 *
213 * <p>This method is invoked internally by the {@link #run} method
214 * upon successful completion of the computation.
215 *
216 * @param v the value
217 */
218 protected void set(V v) {
219 setCompletion(v, NORMAL);
220 }
221
222 /**
223 * Causes this future to report an {@link ExecutionException}
224 * with the given throwable as its cause, unless this future has
225 * already been set or has been cancelled.
226 *
227 * <p>This method is invoked internally by the {@link #run} method
228 * upon failure of the computation.
229 *
230 * @param t the cause of failure
231 */
232 protected void setException(Throwable t) {
233 setCompletion(t, EXCEPTIONAL);
234 }
235
236 public void run() {
237 if (state == UNDECIDED &&
238 UNSAFE.compareAndSwapObject(this, runnerOffset,
239 null, Thread.currentThread())) {
240 V result;
241 try {
242 result = callable.call();
243 } catch (Throwable ex) {
244 setException(ex);
245 return;
246 }
247 set(result);
248 }
249 }
250
251 /**
252 * Executes the computation without setting its result, and then
253 * resets this future to initial state, failing to do so if the
254 * computation encounters an exception or is cancelled. This is
255 * designed for use with tasks that intrinsically execute more
256 * than once.
257 *
258 * @return true if successfully run and reset
259 */
260 protected boolean runAndReset() {
261 if (state != UNDECIDED ||
262 !UNSAFE.compareAndSwapObject(this, runnerOffset,
263 null, Thread.currentThread()))
264 return false;
265 try {
266 callable.call(); // don't set result
267 } catch (Throwable ex) {
268 setException(ex);
269 return false;
270 }
271 runner = null;
272 for (;;) {
273 int s = state;
274 if (s == UNDECIDED)
275 return true;
276 if (s != INTERRUPTING)
277 return false;
278 Thread.yield(); // wait out pending cancellation interrupt
279 }
280 }
281
282 /**
283 * Simple linked list nodes to record waiting threads in a Treiber
284 * stack. See other classes such as Phaser and SynchronousQueue
285 * for more detailed explanation.
286 */
287 static final class WaitNode {
288 volatile Thread thread;
289 WaitNode next;
290 }
291
292 /**
293 * Removes and signals all waiting threads.
294 */
295 private void releaseAll() {
296 WaitNode q;
297 while ((q = waiters) != null) {
298 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
299 for (;;) {
300 Thread t = q.thread;
301 if (t != null) {
302 q.thread = null;
303 LockSupport.unpark(t);
304 }
305 WaitNode next = q.next;
306 if (next == null)
307 return;
308 q.next = null; // unlink to help gc
309 q = next;
310 }
311 }
312 }
313 }
314
315 /**
316 * Awaits completion or aborts on interrupt or timeout.
317 *
318 * @param timed true if use timed waits
319 * @param nanos time to wait, if timed
320 * @return state upon completion
321 */
322 private int awaitDone(boolean timed, long nanos)
323 throws InterruptedException {
324 long last = timed ? System.nanoTime() : 0L;
325 WaitNode q = null;
326 boolean queued = false;
327 for (;;) {
328 if (Thread.interrupted()) {
329 removeWaiter(q);
330 throw new InterruptedException();
331 }
332
333 int s = state;
334 if (s > COMPLETING) {
335 if (q != null)
336 q.thread = null;
337 return s;
338 }
339 else if (q == null)
340 q = new WaitNode();
341 else if (!queued)
342 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
343 q.next = waiters, q);
344 else if (q.thread == null)
345 q.thread = Thread.currentThread();
346 else if (timed) {
347 long now = System.nanoTime();
348 if ((nanos -= (now - last)) <= 0L) {
349 removeWaiter(q);
350 return state;
351 }
352 last = now;
353 LockSupport.parkNanos(this, nanos);
354 }
355 else
356 LockSupport.park(this);
357 }
358 }
359
360 /**
361 * Tries to unlink a timed-out or interrupted wait node to avoid
362 * accumulating garbage. Internal nodes are simply unspliced
363 * without CAS since it is harmless if they are traversed anyway
364 * by releasers or concurrent calls to removeWaiter.
365 */
366 private void removeWaiter(WaitNode node) {
367 if (node != null) {
368 node.thread = null;
369 WaitNode pred = null;
370 WaitNode q = waiters;
371 while (q != null) {
372 WaitNode next = node.next;
373 if (q != node) {
374 pred = q;
375 q = next;
376 }
377 else if (pred != null) {
378 pred.next = next;
379 break;
380 }
381 else if (UNSAFE.compareAndSwapObject(this, waitersOffset,
382 q, next))
383 break;
384 else { // restart on CAS failure
385 pred = null;
386 q = waiters;
387 }
388 }
389 }
390 }
391
392 // Unsafe mechanics
393 private static final sun.misc.Unsafe UNSAFE;
394 private static final long stateOffset;
395 private static final long runnerOffset;
396 private static final long waitersOffset;
397 static {
398 try {
399 UNSAFE = sun.misc.Unsafe.getUnsafe();
400 Class<?> k = FutureTask.class;
401 stateOffset = UNSAFE.objectFieldOffset
402 (k.getDeclaredField("state"));
403 runnerOffset = UNSAFE.objectFieldOffset
404 (k.getDeclaredField("runner"));
405 waitersOffset = UNSAFE.objectFieldOffset
406 (k.getDeclaredField("waiters"));
407 } catch (Exception e) {
408 throw new Error(e);
409 }
410 }
411
412 }