runnerUpdater =
AtomicReferenceFieldUpdater.newUpdater
(CancellableTask.class, Object.class, "runner");
/**
* The runnable underlying this task
*/
private volatile Runnable runnable;
/**
* Creates a new CancellableTask which invokes the given
* Runnable when executed.
* @param r the runnable action
* @throws NullPointerException if runnable is null
*/
public CancellableTask(Runnable r) {
if (r == null)
throw new NullPointerException();
this.runnable = r;
}
/**
* Creates a new CancellableTask without a runnable action, which
* must be set using setRunnable before use. This is
* intended for use in subclasses that must complete superclass
* construction before establishing the runnable action.
*/
protected CancellableTask() {
}
public boolean cancel(boolean mayInterruptIfRunning) {
Object r = runner;
if (r == DONE || r == CANCELLED)
return false;
if (mayInterruptIfRunning &&
r != null &&
r instanceof Thread &&
runnerUpdater.compareAndSet(this, r, CANCELLED))
((Thread)r).interrupt();
else
runnerUpdater.set(this, CANCELLED);
return true;
}
public boolean isCancelled() {
return runner == CANCELLED;
}
public boolean isDone() {
Object r = runner;
return r == DONE || r == CANCELLED;
}
/**
* Return the Runnable forming the basis of this task.
* @return the runnable action
* @see #setRunnable
*/
protected Runnable getRunnable() {
return runnable;
}
/**
* Set the Runnable forming the basis of this task.
* @param r the runnable action
* @see #getRunnable
*/
protected void setRunnable(Runnable r) {
runnable = r;
}
/**
* Set the state of this task to Cancelled.
*/
protected void setCancelled() {
runnerUpdater.set(this, CANCELLED);
}
/**
* Set the state of this task to Done, unless already
* in a Cancelled state, in which Cancelled status is preserved.
*
*/
protected void setDone() {
for (;;) {
Object r = runner;
if (r == DONE || r == CANCELLED)
return;
if (runnerUpdater.compareAndSet(this, r, DONE))
return;
}
}
/**
* Attempt to set the state of this task to Running, succeeding
* only if the state is currently NOT Done, Running, or Cancelled.
* @return true if successful
*/
protected boolean setRunning() {
return runnerUpdater.compareAndSet(this, null, Thread.currentThread());
}
/**
* Runs the runnable if not cancelled, maintaining run status.
*/
public void run() {
if (setRunning()) {
try {
runnable.run();
} finally {
setDone();
}
}
}
/**
* Reset the run state of this task to its initial state unless
* it has been cancelled. (Note that a cancelled task cannot be
* reset.)
* @return true if successful
*/
protected boolean reset() {
for (;;) {
Object r = runner;
if (r == CANCELLED)
return false;
if (runnerUpdater.compareAndSet(this, r, null))
return true;
}
}
/**
* Implementation of Future methods under the control of a current
* CancellableTask, which it relies on for methods
* isDone, isCancelled and cancel. This
* class is split into an inner class to permit Future support to
* be mixed-in with other flavors of tasks. Normally, such a
* class will delegate Future get methods to the
* InnerCancellableFuture, and internally arrange that
* set methods be invoked when computations are ready.
*
* Sample Usage. Here are fragments of an example subclass.
*
* class MyFutureTask<V> extends CancellableTask implements Future<
V> {
*
* MyFutureTask(Callable<V> callable) {
* setRunnable(new InnerCancellableFuture<V>(callable));
* }
*
* public V get() throws InterruptedException, ExecutionException {
* return ((InnerCancellableFuture<V>)getRunnable()).get();
* }
* // (And similarly for timeout version.)
*
* void action() { // whatever action causes execution
* try {
* ((InnerCancellableFuture<V>)getRunnable()).set(compute());
* } catch (Exception ex) {
* ((InnerCancellableFuture<V>)getRunnable()).setException(ex);
* }
* }
* }
*
*/
protected class InnerCancellableFuture implements Future, Runnable {
private final Callable callable;
private final ReentrantLock lock = new ReentrantLock();
private final Condition accessible = lock.newCondition();
private V result;
private Throwable exception;
/**
* Create an InnerCancellableFuture that will execute the
* given callable.
* @param callable the function to execute
*/
protected InnerCancellableFuture(Callable callable) {
this.callable = callable;
}
public boolean cancel(boolean mayInterruptIfRunning) {
return CancellableTask.this.cancel(mayInterruptIfRunning);
}
public boolean isCancelled() {
return CancellableTask.this.isCancelled();
}
public boolean isDone() {
return CancellableTask.this.isDone();
}
/**
* Sets this Future to the results of callable.call
*/
public void run() {
try {
set(callable.call());
} catch(Throwable ex) {
setException(ex);
}
}
/**
* Waits if necessary for the call to callable.call to
* complete, and then retrieves its result.
*
* @return computed result
* @throws CancellationException here???
* @throws ExecutionException if underlying computation threw an
* exception
* @throws InterruptedException if current thread was interrupted
* while waiting
*/
public V get() throws InterruptedException, ExecutionException {
lock.lock();
try {
while (!isDone())
accessible.await();
if (isCancelled())
throw new CancellationException();
else if (exception != null)
throw new ExecutionException(exception);
else
return result;
} finally {
lock.unlock();
}
}
/**
* Waits if necessary for at most the given time for the call to
* callable.call to complete, and then retrieves its
* result.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return computed result
* @throws ExecutionException if underlying computation threw an
* exception
* @throws InterruptedException if current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
lock.lock();
try {
if (!isDone()) {
long nanos = unit.toNanos(timeout);
do {
if (nanos <= 0)
throw new TimeoutException();
nanos = accessible.awaitNanos(nanos);
} while (!isDone());
}
if (isCancelled())
throw new CancellationException();
else if (exception != null)
throw new ExecutionException(exception);
else
return result;
} finally {
lock.unlock();
}
}
/**
* Sets the result of this Future to the given value.
* @param v the value
*/
protected void set(V v) {
lock.lock();
try {
result = v;
setDone();
accessible.signalAll();
} finally {
lock.unlock();
}
}
/**
* Causes this futue to report an ExecutionException
* with the given throwable as its cause.
* @param t the cause of failure.
*/
protected void setException(Throwable t) {
lock.lock();
try {
exception = t;
setDone();
accessible.signalAll();
} finally {
lock.unlock();
}
}
}
}