ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/FutureTask.java
Revision: 1.64
Committed: Fri Jun 17 20:46:43 2011 UTC (12 years, 11 months ago) by jsr166
Branch: MAIN
Changes since 1.63: +56 -44 lines
Log Message:
style update

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.concurrent.locks.LockSupport;
9
10 /**
11 * A cancellable asynchronous computation. This class provides a base
12 * implementation of {@link Future}, with methods to start and cancel
13 * a computation, query to see if the computation is complete, and
14 * retrieve the result of the computation. The result can only be
15 * retrieved when the computation has completed; the {@code get}
16 * methods will block if the computation has not yet completed. Once
17 * the computation has completed, the computation cannot be restarted
18 * or cancelled (unless the computation is invoked using
19 * {@link #runAndReset}).
20 *
21 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
22 * {@link Runnable} object. Because {@code FutureTask} implements
23 * {@code Runnable}, a {@code FutureTask} can be submitted to an
24 * {@link Executor} for execution.
25 *
26 * <p>In addition to serving as a standalone class, this class provides
27 * {@code protected} functionality that may be useful when creating
28 * customized task classes.
29 *
30 * @since 1.5
31 * @author Doug Lea
32 * @param <V> The result type returned by this FutureTask's {@code get} methods
33 */
34 public class FutureTask<V> implements RunnableFuture<V> {
35 /*
36 * Revision notes: This differs from previous versions of this
37 * class that relied on AbstractQueuedSynchronizer, mainly to
38 * avoid surprising users about retaining interrupt status during
39 * cancellation races. Sync control in the current design relies
40 * on a "state" field updated via CAS to track completion, along
41 * with a simple Treiber stack to hold waiting threads.
42 *
43 * Style note: As usual, we bypass overhead of using
44 * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
45 */
46
47 /**
48 * The run state of this task, initially 0. The run state
49 * transitions to NORMAL, EXCEPTIONAL, or CANCELLED (only) in
50 * method setCompletion. During setCompletion, state may take on
51 * transient values of COMPLETING (while outcome is being set) or
52 * INTERRUPTING (while interrupting the runner). State values
53 * are ordered and set to powers of two to simplify checks.
54 */
55 private volatile int state;
56 private static final int COMPLETING = 0x01;
57 private static final int INTERRUPTING = 0x02;
58 private static final int NORMAL = 0x04;
59 private static final int EXCEPTIONAL = 0x08;
60 private static final int CANCELLED = 0x10;
61
62 /** The underlying callable */
63 private final Callable<V> callable;
64 /** The result to return or exception to throw from get() */
65 private Object outcome; // non-volatile, protected by state reads/writes
66 /** The thread running the callable; CASed during run() */
67 private volatile Thread runner;
68 /** Treiber stack of waiting threads */
69 private volatile WaitNode waiters;
70
71 /**
72 * Sets completion status, unless already completed. If
73 * necessary, we first set state to COMPLETING or INTERRUPTING to
74 * establish precedence. This intentionally stalls (just via
75 * yields) in (uncommon) cases of concurrent calls during
76 * cancellation until state is set, to avoid surprising users
77 * during cancellation races.
78 *
79 * @param x the outcome
80 * @param mode the completion state value
81 * @return true if this call caused transition from 0 to completed
82 */
83 private boolean setCompletion(Object x, int mode) {
84 Thread r = runner;
85 if (r == Thread.currentThread()) // null out runner on completion
86 UNSAFE.putObject(this, runnerOffset, r = null); // nonvolatile OK
87 int next = ((mode == INTERRUPTING) ? // set up transient states
88 (r != null) ? INTERRUPTING : CANCELLED :
89 (x != null) ? COMPLETING : mode);
90 for (;;) {
91 int s = state;
92 if (s == 0) {
93 if (UNSAFE.compareAndSwapInt(this, stateOffset, 0, next)) {
94 if (next == INTERRUPTING) {
95 Thread t = runner; // recheck
96 if (t != null)
97 t.interrupt();
98 state = CANCELLED;
99 }
100 else if (next == COMPLETING) {
101 outcome = x;
102 state = mode;
103 }
104 if (waiters != null)
105 releaseAll();
106 done();
107 return true;
108 }
109 }
110 else if (s == INTERRUPTING)
111 Thread.yield(); // wait out pending cancellation interrupt
112 else
113 return false;
114 }
115 }
116
117 /**
118 * Returns result or throws exception for completed task.
119 *
120 * @param s completed state value
121 */
122 private V report(int s) throws ExecutionException {
123 Object x = outcome;
124 if (s == NORMAL)
125 return (V)x;
126 if ((s & (CANCELLED | INTERRUPTING)) != 0)
127 throw new CancellationException();
128 throw new ExecutionException((Throwable)x);
129 }
130
131 /**
132 * Creates a {@code FutureTask} that will, upon running, execute the
133 * given {@code Callable}.
134 *
135 * @param callable the callable task
136 * @throws NullPointerException if callable is null
137 */
138 public FutureTask(Callable<V> callable) {
139 if (callable == null)
140 throw new NullPointerException();
141 this.callable = callable;
142 }
143
144 /**
145 * Creates a {@code FutureTask} that will, upon running, execute the
146 * given {@code Runnable}, and arrange that {@code get} will return the
147 * given result on successful completion.
148 *
149 * @param runnable the runnable task
150 * @param result the result to return on successful completion. If
151 * you don't need a particular result, consider using
152 * constructions of the form:
153 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
154 * @throws NullPointerException if runnable is null
155 */
156 public FutureTask(Runnable runnable, V result) {
157 this.callable = Executors.callable(runnable, result);
158 }
159
160 public boolean isCancelled() {
161 return (state & (CANCELLED | INTERRUPTING)) != 0;
162 }
163
164 public boolean isDone() {
165 return state != 0;
166 }
167
168 public boolean cancel(boolean mayInterruptIfRunning) {
169 return state == 0 &&
170 setCompletion(null, mayInterruptIfRunning ?
171 INTERRUPTING : CANCELLED);
172 }
173
174 /**
175 * @throws CancellationException {@inheritDoc}
176 */
177 public V get() throws InterruptedException, ExecutionException {
178 int s = state;
179 if (s <= COMPLETING)
180 s = awaitDone(false, 0L);
181 return report(s);
182 }
183
184 /**
185 * @throws CancellationException {@inheritDoc}
186 */
187 public V get(long timeout, TimeUnit unit)
188 throws InterruptedException, ExecutionException, TimeoutException {
189 int s = state;
190 if (s <= COMPLETING &&
191 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
192 throw new TimeoutException();
193 return report(s);
194 }
195
196 /**
197 * Protected method invoked when this task transitions to state
198 * {@code isDone} (whether normally or via cancellation). The
199 * default implementation does nothing. Subclasses may override
200 * this method to invoke completion callbacks or perform
201 * bookkeeping. Note that you can query status inside the
202 * implementation of this method to determine whether this task
203 * has been cancelled.
204 */
205 protected void done() { }
206
207 /**
208 * Sets the result of this future to the given value unless
209 * this future has already been set or has been cancelled.
210 *
211 * <p>This method is invoked internally by the {@link #run} method
212 * upon successful completion of the computation.
213 *
214 * @param v the value
215 */
216 protected void set(V v) {
217 setCompletion(v, NORMAL);
218 }
219
220 /**
221 * Causes this future to report an {@link ExecutionException}
222 * with the given throwable as its cause, unless this future has
223 * already been set or has been cancelled.
224 *
225 * <p>This method is invoked internally by the {@link #run} method
226 * upon failure of the computation.
227 *
228 * @param t the cause of failure
229 */
230 protected void setException(Throwable t) {
231 setCompletion(t, EXCEPTIONAL);
232 }
233
234 public void run() {
235 Thread r = Thread.currentThread();
236 if (state == 0 &&
237 UNSAFE.compareAndSwapObject(this, runnerOffset, null, r)) {
238 V result;
239 try {
240 result = callable.call();
241 } catch (Throwable ex) {
242 setException(ex);
243 return;
244 }
245 set(result);
246 }
247 }
248
249 /**
250 * Executes the computation without setting its result, and then
251 * resets this future to initial state, failing to do so if the
252 * computation encounters an exception or is cancelled. This is
253 * designed for use with tasks that intrinsically execute more
254 * than once.
255 *
256 * @return true if successfully run and reset
257 */
258 protected boolean runAndReset() {
259 Thread r = Thread.currentThread();
260 if (state != 0 ||
261 !UNSAFE.compareAndSwapObject(this, runnerOffset, null, r))
262 return false;
263 try {
264 callable.call(); // don't set result
265 } catch (Throwable ex) {
266 setException(ex);
267 return false;
268 }
269 runner = null;
270 for (;;) {
271 int s = state;
272 if (s == 0)
273 return true;
274 if (s != INTERRUPTING)
275 return false;
276 Thread.yield(); // wait out pending cancellation interrupt
277 }
278 }
279
280 /**
281 * Simple linked list nodes to record waiting threads in a Treiber
282 * stack. See other classes such as Phaser and SynchronousQueue
283 * for more detailed explanation.
284 */
285 static final class WaitNode {
286 volatile Thread thread;
287 WaitNode next;
288 }
289
290 /**
291 * Removes and signals all waiting threads.
292 */
293 private void releaseAll() {
294 WaitNode q;
295 while ((q = waiters) != null) {
296 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
297 for (;;) {
298 Thread t = q.thread;
299 if (t != null) {
300 q.thread = null;
301 LockSupport.unpark(t);
302 }
303 WaitNode next = q.next;
304 if (next == null)
305 return;
306 q.next = null; // unlink to help gc
307 q = next;
308 }
309 }
310 }
311 }
312
313 /**
314 * Awaits completion or aborts on interrupt or timeout.
315 *
316 * @param timed true if use timed waits
317 * @param nanos time to wait, if timed
318 * @return state upon completion
319 */
320 private int awaitDone(boolean timed, long nanos)
321 throws InterruptedException {
322 long last = timed ? System.nanoTime() : 0L;
323 WaitNode q = null;
324 boolean queued = false;
325 for (;;) {
326 if (Thread.interrupted()) {
327 removeWaiter(q);
328 throw new InterruptedException();
329 }
330
331 int s = state;
332 if (s > COMPLETING) {
333 if (q != null)
334 q.thread = null;
335 return s;
336 }
337 else if (q == null)
338 q = new WaitNode();
339 else if (!queued)
340 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
341 q.next = waiters, q);
342 else if (q.thread == null)
343 q.thread = Thread.currentThread();
344 else if (timed) {
345 long now = System.nanoTime();
346 if ((nanos -= (now - last)) <= 0L) {
347 removeWaiter(q);
348 return state;
349 }
350 last = now;
351 LockSupport.parkNanos(this, nanos);
352 }
353 else
354 LockSupport.park(this);
355 }
356 }
357
358 /**
359 * Tries to unlink a timed-out or interrupted wait node to avoid
360 * accumulating garbage. Internal nodes are simply unspliced
361 * without CAS since it is harmless if they are traversed anyway
362 * by releasers or concurrent calls to removeWaiter.
363 */
364 private void removeWaiter(WaitNode node) {
365 if (node != null) {
366 node.thread = null;
367 WaitNode pred = null;
368 WaitNode q = waiters;
369 while (q != null) {
370 WaitNode next = node.next;
371 if (q != node) {
372 pred = q;
373 q = next;
374 }
375 else if (pred != null) {
376 pred.next = next;
377 break;
378 }
379 else if (UNSAFE.compareAndSwapObject(this, waitersOffset,
380 q, next))
381 break;
382 else { // restart on CAS failure
383 pred = null;
384 q = waiters;
385 }
386 }
387 }
388 }
389
390 // Unsafe mechanics
391 private static final sun.misc.Unsafe UNSAFE;
392 private static final long stateOffset;
393 private static final long runnerOffset;
394 private static final long waitersOffset;
395 static {
396 try {
397 UNSAFE = sun.misc.Unsafe.getUnsafe();
398 Class<?> k = FutureTask.class;
399 stateOffset = UNSAFE.objectFieldOffset
400 (k.getDeclaredField("state"));
401 runnerOffset = UNSAFE.objectFieldOffset
402 (k.getDeclaredField("runner"));
403 waitersOffset = UNSAFE.objectFieldOffset
404 (k.getDeclaredField("waiters"));
405 } catch (Exception e) {
406 throw new Error(e);
407 }
408 }
409
410 }