ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jdk8/java/util/concurrent/FutureTask.java
Revision: 1.2
Committed: Tue Sep 26 03:44:53 2017 UTC (6 years, 7 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.1: +35 -0 lines
Log Message:
backport 8186265: Make toString() methods of "task" objects more useful

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8
9 import java.util.concurrent.locks.LockSupport;
10
11 /**
12 * A cancellable asynchronous computation. This class provides a base
13 * implementation of {@link Future}, with methods to start and cancel
14 * a computation, query to see if the computation is complete, and
15 * retrieve the result of the computation. The result can only be
16 * retrieved when the computation has completed; the {@code get}
17 * methods will block if the computation has not yet completed. Once
18 * the computation has completed, the computation cannot be restarted
19 * or cancelled (unless the computation is invoked using
20 * {@link #runAndReset}).
21 *
22 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
23 * {@link Runnable} object. Because {@code FutureTask} implements
24 * {@code Runnable}, a {@code FutureTask} can be submitted to an
25 * {@link Executor} for execution.
26 *
27 * <p>In addition to serving as a standalone class, this class provides
28 * {@code protected} functionality that may be useful when creating
29 * customized task classes.
30 *
31 * @since 1.5
32 * @author Doug Lea
33 * @param <V> The result type returned by this FutureTask's {@code get} methods
34 */
35 public class FutureTask<V> implements RunnableFuture<V> {
36 /*
37 * Revision notes: This differs from previous versions of this
38 * class that relied on AbstractQueuedSynchronizer, mainly to
39 * avoid surprising users about retaining interrupt status during
40 * cancellation races. Sync control in the current design relies
41 * on a "state" field updated via CAS to track completion, along
42 * with a simple Treiber stack to hold waiting threads.
43 *
44 * Style note: As usual, we bypass overhead of using
45 * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.
46 */
47
48 /**
49 * The run state of this task, initially NEW. The run state
50 * transitions to a terminal state only in methods set,
51 * setException, and cancel. During completion, state may take on
52 * transient values of COMPLETING (while outcome is being set) or
53 * INTERRUPTING (only while interrupting the runner to satisfy a
54 * cancel(true)). Transitions from these intermediate to final
55 * states use cheaper ordered/lazy writes because values are unique
56 * and cannot be further modified.
57 *
58 * Possible state transitions:
59 * NEW -> COMPLETING -> NORMAL
60 * NEW -> COMPLETING -> EXCEPTIONAL
61 * NEW -> CANCELLED
62 * NEW -> INTERRUPTING -> INTERRUPTED
63 */
64 private volatile int state;
65 private static final int NEW = 0;
66 private static final int COMPLETING = 1;
67 private static final int NORMAL = 2;
68 private static final int EXCEPTIONAL = 3;
69 private static final int CANCELLED = 4;
70 private static final int INTERRUPTING = 5;
71 private static final int INTERRUPTED = 6;
72
73 /** The underlying callable; nulled out after running */
74 private Callable<V> callable;
75 /** The result to return or exception to throw from get() */
76 private Object outcome; // non-volatile, protected by state reads/writes
77 /** The thread running the callable; CASed during run() */
78 private volatile Thread runner;
79 /** Treiber stack of waiting threads */
80 private volatile WaitNode waiters;
81
82 /**
83 * Returns result or throws exception for completed task.
84 *
85 * @param s completed state value
86 */
87 @SuppressWarnings("unchecked")
88 private V report(int s) throws ExecutionException {
89 Object x = outcome;
90 if (s == NORMAL)
91 return (V)x;
92 if (s >= CANCELLED)
93 throw new CancellationException();
94 throw new ExecutionException((Throwable)x);
95 }
96
97 /**
98 * Creates a {@code FutureTask} that will, upon running, execute the
99 * given {@code Callable}.
100 *
101 * @param callable the callable task
102 * @throws NullPointerException if the callable is null
103 */
104 public FutureTask(Callable<V> callable) {
105 if (callable == null)
106 throw new NullPointerException();
107 this.callable = callable;
108 this.state = NEW; // ensure visibility of callable
109 }
110
111 /**
112 * Creates a {@code FutureTask} that will, upon running, execute the
113 * given {@code Runnable}, and arrange that {@code get} will return the
114 * given result on successful completion.
115 *
116 * @param runnable the runnable task
117 * @param result the result to return on successful completion. If
118 * you don't need a particular result, consider using
119 * constructions of the form:
120 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
121 * @throws NullPointerException if the runnable is null
122 */
123 public FutureTask(Runnable runnable, V result) {
124 this.callable = Executors.callable(runnable, result);
125 this.state = NEW; // ensure visibility of callable
126 }
127
128 public boolean isCancelled() {
129 return state >= CANCELLED;
130 }
131
132 public boolean isDone() {
133 return state != NEW;
134 }
135
136 public boolean cancel(boolean mayInterruptIfRunning) {
137 if (!(state == NEW &&
138 U.compareAndSwapInt(this, STATE, NEW,
139 mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
140 return false;
141 try { // in case call to interrupt throws exception
142 if (mayInterruptIfRunning) {
143 try {
144 Thread t = runner;
145 if (t != null)
146 t.interrupt();
147 } finally { // final state
148 U.putOrderedInt(this, STATE, INTERRUPTED);
149 }
150 }
151 } finally {
152 finishCompletion();
153 }
154 return true;
155 }
156
157 /**
158 * @throws CancellationException {@inheritDoc}
159 */
160 public V get() throws InterruptedException, ExecutionException {
161 int s = state;
162 if (s <= COMPLETING)
163 s = awaitDone(false, 0L);
164 return report(s);
165 }
166
167 /**
168 * @throws CancellationException {@inheritDoc}
169 */
170 public V get(long timeout, TimeUnit unit)
171 throws InterruptedException, ExecutionException, TimeoutException {
172 if (unit == null)
173 throw new NullPointerException();
174 int s = state;
175 if (s <= COMPLETING &&
176 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
177 throw new TimeoutException();
178 return report(s);
179 }
180
181 /**
182 * Protected method invoked when this task transitions to state
183 * {@code isDone} (whether normally or via cancellation). The
184 * default implementation does nothing. Subclasses may override
185 * this method to invoke completion callbacks or perform
186 * bookkeeping. Note that you can query status inside the
187 * implementation of this method to determine whether this task
188 * has been cancelled.
189 */
190 protected void done() { }
191
192 /**
193 * Sets the result of this future to the given value unless
194 * this future has already been set or has been cancelled.
195 *
196 * <p>This method is invoked internally by the {@link #run} method
197 * upon successful completion of the computation.
198 *
199 * @param v the value
200 */
201 protected void set(V v) {
202 if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
203 outcome = v;
204 U.putOrderedInt(this, STATE, NORMAL); // final state
205 finishCompletion();
206 }
207 }
208
209 /**
210 * Causes this future to report an {@link ExecutionException}
211 * with the given throwable as its cause, unless this future has
212 * already been set or has been cancelled.
213 *
214 * <p>This method is invoked internally by the {@link #run} method
215 * upon failure of the computation.
216 *
217 * @param t the cause of failure
218 */
219 protected void setException(Throwable t) {
220 if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
221 outcome = t;
222 U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
223 finishCompletion();
224 }
225 }
226
227 public void run() {
228 if (state != NEW ||
229 !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
230 return;
231 try {
232 Callable<V> c = callable;
233 if (c != null && state == NEW) {
234 V result;
235 boolean ran;
236 try {
237 result = c.call();
238 ran = true;
239 } catch (Throwable ex) {
240 result = null;
241 ran = false;
242 setException(ex);
243 }
244 if (ran)
245 set(result);
246 }
247 } finally {
248 // runner must be non-null until state is settled to
249 // prevent concurrent calls to run()
250 runner = null;
251 // state must be re-read after nulling runner to prevent
252 // leaked interrupts
253 int s = state;
254 if (s >= INTERRUPTING)
255 handlePossibleCancellationInterrupt(s);
256 }
257 }
258
259 /**
260 * Executes the computation without setting its result, and then
261 * resets this future to initial state, failing to do so if the
262 * computation encounters an exception or is cancelled. This is
263 * designed for use with tasks that intrinsically execute more
264 * than once.
265 *
266 * @return {@code true} if successfully run and reset
267 */
268 protected boolean runAndReset() {
269 if (state != NEW ||
270 !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
271 return false;
272 boolean ran = false;
273 int s = state;
274 try {
275 Callable<V> c = callable;
276 if (c != null && s == NEW) {
277 try {
278 c.call(); // don't set result
279 ran = true;
280 } catch (Throwable ex) {
281 setException(ex);
282 }
283 }
284 } finally {
285 // runner must be non-null until state is settled to
286 // prevent concurrent calls to run()
287 runner = null;
288 // state must be re-read after nulling runner to prevent
289 // leaked interrupts
290 s = state;
291 if (s >= INTERRUPTING)
292 handlePossibleCancellationInterrupt(s);
293 }
294 return ran && s == NEW;
295 }
296
297 /**
298 * Ensures that any interrupt from a possible cancel(true) is only
299 * delivered to a task while in run or runAndReset.
300 */
301 private void handlePossibleCancellationInterrupt(int s) {
302 // It is possible for our interrupter to stall before getting a
303 // chance to interrupt us. Let's spin-wait patiently.
304 if (s == INTERRUPTING)
305 while (state == INTERRUPTING)
306 Thread.yield(); // wait out pending interrupt
307
308 // assert state == INTERRUPTED;
309
310 // We want to clear any interrupt we may have received from
311 // cancel(true). However, it is permissible to use interrupts
312 // as an independent mechanism for a task to communicate with
313 // its caller, and there is no way to clear only the
314 // cancellation interrupt.
315 //
316 // Thread.interrupted();
317 }
318
319 /**
320 * Simple linked list nodes to record waiting threads in a Treiber
321 * stack. See other classes such as Phaser and SynchronousQueue
322 * for more detailed explanation.
323 */
324 static final class WaitNode {
325 volatile Thread thread;
326 volatile WaitNode next;
327 WaitNode() { thread = Thread.currentThread(); }
328 }
329
330 /**
331 * Removes and signals all waiting threads, invokes done(), and
332 * nulls out callable.
333 */
334 private void finishCompletion() {
335 // assert state > COMPLETING;
336 for (WaitNode q; (q = waiters) != null;) {
337 if (U.compareAndSwapObject(this, WAITERS, q, null)) {
338 for (;;) {
339 Thread t = q.thread;
340 if (t != null) {
341 q.thread = null;
342 LockSupport.unpark(t);
343 }
344 WaitNode next = q.next;
345 if (next == null)
346 break;
347 q.next = null; // unlink to help gc
348 q = next;
349 }
350 break;
351 }
352 }
353
354 done();
355
356 callable = null; // to reduce footprint
357 }
358
359 /**
360 * Awaits completion or aborts on interrupt or timeout.
361 *
362 * @param timed true if use timed waits
363 * @param nanos time to wait, if timed
364 * @return state upon completion or at timeout
365 */
366 private int awaitDone(boolean timed, long nanos)
367 throws InterruptedException {
368 // The code below is very delicate, to achieve these goals:
369 // - call nanoTime exactly once for each call to park
370 // - if nanos <= 0L, return promptly without allocation or nanoTime
371 // - if nanos == Long.MIN_VALUE, don't underflow
372 // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
373 // and we suffer a spurious wakeup, we will do no worse than
374 // to park-spin for a while
375 long startTime = 0L; // Special value 0L means not yet parked
376 WaitNode q = null;
377 boolean queued = false;
378 for (;;) {
379 int s = state;
380 if (s > COMPLETING) {
381 if (q != null)
382 q.thread = null;
383 return s;
384 }
385 else if (s == COMPLETING)
386 // We may have already promised (via isDone) that we are done
387 // so never return empty-handed or throw InterruptedException
388 Thread.yield();
389 else if (Thread.interrupted()) {
390 removeWaiter(q);
391 throw new InterruptedException();
392 }
393 else if (q == null) {
394 if (timed && nanos <= 0L)
395 return s;
396 q = new WaitNode();
397 }
398 else if (!queued)
399 queued = U.compareAndSwapObject(this, WAITERS,
400 q.next = waiters, q);
401 else if (timed) {
402 final long parkNanos;
403 if (startTime == 0L) { // first time
404 startTime = System.nanoTime();
405 if (startTime == 0L)
406 startTime = 1L;
407 parkNanos = nanos;
408 } else {
409 long elapsed = System.nanoTime() - startTime;
410 if (elapsed >= nanos) {
411 removeWaiter(q);
412 return state;
413 }
414 parkNanos = nanos - elapsed;
415 }
416 // nanoTime may be slow; recheck before parking
417 if (state < COMPLETING)
418 LockSupport.parkNanos(this, parkNanos);
419 }
420 else
421 LockSupport.park(this);
422 }
423 }
424
425 /**
426 * Tries to unlink a timed-out or interrupted wait node to avoid
427 * accumulating garbage. Internal nodes are simply unspliced
428 * without CAS since it is harmless if they are traversed anyway
429 * by releasers. To avoid effects of unsplicing from already
430 * removed nodes, the list is retraversed in case of an apparent
431 * race. This is slow when there are a lot of nodes, but we don't
432 * expect lists to be long enough to outweigh higher-overhead
433 * schemes.
434 */
435 private void removeWaiter(WaitNode node) {
436 if (node != null) {
437 node.thread = null;
438 retry:
439 for (;;) { // restart on removeWaiter race
440 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
441 s = q.next;
442 if (q.thread != null)
443 pred = q;
444 else if (pred != null) {
445 pred.next = s;
446 if (pred.thread == null) // check for race
447 continue retry;
448 }
449 else if (!U.compareAndSwapObject(this, WAITERS, q, s))
450 continue retry;
451 }
452 break;
453 }
454 }
455 }
456
457 /**
458 * Returns a string representation of this FutureTask.
459 *
460 * @implSpec
461 * The default implementation returns a string identifying this
462 * FutureTask, as well as its completion state. The state, in
463 * brackets, contains one of the strings {@code "Completed Normally"},
464 * {@code "Completed Exceptionally"}, {@code "Cancelled"}, or {@code
465 * "Not completed"}.
466 *
467 * @return a string representation of this FutureTask
468 */
469 public String toString() {
470 final String status;
471 switch (state) {
472 case NORMAL:
473 status = "[Completed normally]";
474 break;
475 case EXCEPTIONAL:
476 status = "[Completed exceptionally: " + outcome + "]";
477 break;
478 case CANCELLED:
479 case INTERRUPTING:
480 case INTERRUPTED:
481 status = "[Cancelled]";
482 break;
483 default:
484 final Callable<?> callable = this.callable;
485 status = (callable == null)
486 ? "[Not completed]"
487 : "[Not completed, task = " + callable + "]";
488 }
489 return super.toString() + status;
490 }
491
492 // Unsafe mechanics
493 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
494 private static final long STATE;
495 private static final long RUNNER;
496 private static final long WAITERS;
497 static {
498 try {
499 STATE = U.objectFieldOffset
500 (FutureTask.class.getDeclaredField("state"));
501 RUNNER = U.objectFieldOffset
502 (FutureTask.class.getDeclaredField("runner"));
503 WAITERS = U.objectFieldOffset
504 (FutureTask.class.getDeclaredField("waiters"));
505 } catch (ReflectiveOperationException e) {
506 throw new Error(e);
507 }
508
509 // Reduce the risk of rare disastrous classloading in first call to
510 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
511 Class<?> ensureLoaded = LockSupport.class;
512 }
513
514 }