/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; import java.util.concurrent.locks.LockSupport; /** * A cancellable asynchronous computation. This class provides a base * implementation of {@link Future}, with methods to start and cancel * a computation, query to see if the computation is complete, and * retrieve the result of the computation. The result can only be * retrieved when the computation has completed; the {@code get} * methods will block if the computation has not yet completed. Once * the computation has completed, the computation cannot be restarted * or cancelled (unless the computation is invoked using * {@link #runAndReset}). * *
A {@code FutureTask} can be used to wrap a {@link Callable} or * {@link Runnable} object. Because {@code FutureTask} implements * {@code Runnable}, a {@code FutureTask} can be submitted to an * {@link Executor} for execution. * *
In addition to serving as a standalone class, this class provides
* {@code protected} functionality that may be useful when creating
* customized task classes.
*
* @since 1.5
* @author Doug Lea
* @param This method is invoked internally by the {@link #run} method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v) {
setCompletion(v, NORMAL);
}
/**
* Causes this future to report an {@link ExecutionException}
* with the given throwable as its cause, unless this future has
* already been set or has been cancelled.
*
* This method is invoked internally by the {@link #run} method
* upon failure of the computation.
*
* @param t the cause of failure
*/
protected void setException(Throwable t) {
setCompletion(t, EXCEPTIONAL);
}
public void run() {
Thread r = Thread.currentThread();
if (state == 0 &&
UNSAFE.compareAndSwapObject(this, runnerOffset, null, r)) {
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
set(result);
}
}
/**
* Executes the computation without setting its result, and then
* resets this future to initial state, failing to do so if the
* computation encounters an exception or is cancelled. This is
* designed for use with tasks that intrinsically execute more
* than once.
*
* @return true if successfully run and reset
*/
protected boolean runAndReset() {
Thread r = Thread.currentThread();
if (state != 0 ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, r))
return false;
try {
callable.call(); // don't set result
} catch (Throwable ex) {
setException(ex);
return false;
}
runner = null;
for (;;) {
int s = state;
if (s == 0)
return true;
if (s != INTERRUPTING)
return false;
Thread.yield(); // wait out pending cancellation interrupt
}
}
/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread;
WaitNode next;
}
/**
* Removes and signals all waiting threads.
*/
private void releaseAll() {
WaitNode q;
while ((q = waiters) != null) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
return;
q.next = null; // unlink to help gc
q = next;
}
}
}
}
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
long last = timed ? System.nanoTime() : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (q.thread == null)
q.thread = Thread.currentThread();
else if (timed) {
long now = System.nanoTime();
if ((nanos -= (now - last)) <= 0L) {
removeWaiter(q);
return state;
}
last = now;
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
/**
* Tries to unlink a timed-out or interrupted wait node to avoid
* accumulating garbage. Internal nodes are simply unspliced
* without CAS since it is harmless if they are traversed anyway
* by releasers or concurrent calls to removeWaiter.
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
WaitNode pred = null;
WaitNode q = waiters;
while (q != null) {
WaitNode next = node.next;
if (q != node) {
pred = q;
q = next;
}
else if (pred != null) {
pred.next = next;
break;
}
else if (UNSAFE.compareAndSwapObject(this, waitersOffset,
q, next))
break;
else { // restart on CAS failure
pred = null;
q = waiters;
}
}
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
}