/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain. Use, modify, and
* redistribute this code in any way without acknowledgement.
*/
package java.util.concurrent;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.locks.*;
/**
* Provides default implementation of {@link ExecutorService}
* execution methods. This class implements the submit and
* invoke methods using the default {@link FutureTask} and
* {@link PrivilegedFutureTask} classes provided in this package. For
* example, the the implementation of submit(Runnable)
* creates an associated FutureTask that is executed and
* returned. Subclasses overriding these methods to use different
* {@link Future} implementations should do so consistently for each
* of these methods.
*
* @since 1.5
* @author Doug Lea
*/
public abstract class AbstractExecutorService implements ExecutorService {
public Future> submit(Runnable task) {
FutureTask> ftask = new FutureTask(task, Boolean.TRUE);
execute(ftask);
return ftask;
}
public Future submit(Callable task) {
FutureTask ftask = new FutureTask(task);
execute(ftask);
return ftask;
}
public void invoke(Runnable task) throws ExecutionException, InterruptedException {
FutureTask> ftask = new FutureTask(task, Boolean.TRUE);
execute(ftask);
ftask.get();
}
public T invoke(Callable task) throws ExecutionException, InterruptedException {
FutureTask ftask = new FutureTask(task);
execute(ftask);
return ftask.get();
}
public Future submit(PrivilegedAction action) {
Callable task = new PrivilegedActionAdapter(action);
FutureTask future = new PrivilegedFutureTask(task);
execute(future);
return future;
}
public Future submit(PrivilegedExceptionAction action) {
Callable task = new PrivilegedExceptionActionAdapter(action);
FutureTask future = new PrivilegedFutureTask(task);
execute(future);
return future;
}
private static class PrivilegedActionAdapter implements Callable {
PrivilegedActionAdapter(PrivilegedAction action) {
this.action = action;
}
public Object call () {
return action.run();
}
private final PrivilegedAction action;
}
private static class PrivilegedExceptionActionAdapter implements Callable {
PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) {
this.action = action;
}
public Object call () throws Exception {
return action.run();
}
private final PrivilegedExceptionAction action;
}
/**
* Helper class to wait for tasks in bulk-execute methods
*/
private static class TaskGroupWaiter {
private final ReentrantLock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private int firstIndex = -1;
private int countDown;
TaskGroupWaiter(int ntasks) { countDown = ntasks; }
void signalDone(int index) {
lock.lock();
try {
if (firstIndex < 0)
firstIndex = index;
if (--countDown == 0)
done.signalAll();
} finally {
lock.unlock();
}
}
int await() throws InterruptedException {
lock.lock();
try {
while (countDown > 0)
done.await();
return firstIndex;
} finally {
lock.unlock();
}
}
int awaitNanos(long nanos) throws InterruptedException {
lock.lock();
try {
while (countDown > 0 && nanos > 0)
nanos = done.awaitNanos(nanos);
return firstIndex;
} finally {
lock.unlock();
}
}
boolean isDone() {
lock.lock();
try {
return countDown <= 0;
} finally {
lock.unlock();
}
}
}
/**
* FutureTask extension to provide signal when task completes
*/
private static class SignallingFuture extends FutureTask {
private final TaskGroupWaiter waiter;
private final int index;
SignallingFuture(Callable c, TaskGroupWaiter w, int i) {
super(c); waiter = w; index = i;
}
SignallingFuture(Runnable t, T r, TaskGroupWaiter w, int i) {
super(t, r); waiter = w; index = i;
}
protected void done() {
waiter.signalDone(index);
}
}
// any/all methods, each a little bit different than the other
public List> runAny(List tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
int n = tasks.size();
List> futures = new ArrayList>(n);
if (n == 0)
return futures;
TaskGroupWaiter waiter = new TaskGroupWaiter(1);
try {
int i = 0;
for (Runnable t : tasks) {
SignallingFuture f =
new SignallingFuture(t, Boolean.TRUE, waiter, i++);
futures.add(f);
if (!waiter.isDone())
execute(f);
}
int first = waiter.await();
return futures;
} finally {
for (Future> f : futures)
f.cancel(true);
}
}
public List> runAny(List tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int n = tasks.size();
List> futures = new ArrayList>(n);
if (n == 0)
return futures;
TaskGroupWaiter waiter = new TaskGroupWaiter(1);
try {
int i = 0;
for (Runnable t : tasks) {
SignallingFuture f =
new SignallingFuture(t, Boolean.TRUE, waiter, i++);
futures.add(f);
if (!waiter.isDone())
execute(f);
}
int first = waiter.awaitNanos(nanos);
return futures;
} finally {
for (Future> f : futures)
f.cancel(true);
}
}
public List> runAll(List tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
int n = tasks.size();
List> futures = new ArrayList>(n);
if (n == 0)
return futures;
TaskGroupWaiter waiter = new TaskGroupWaiter(n);
int i = 0;
try {
for (Runnable t : tasks) {
SignallingFuture f =
new SignallingFuture(t, Boolean.TRUE, waiter, i++);
futures.add(f);
execute(f);
}
waiter.await();
return futures;
} finally {
if (!waiter.isDone())
for (Future> f : futures)
f.cancel(true);
}
}
public List> runAll(List tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int n = tasks.size();
List> futures = new ArrayList>(n);
if (n == 0)
return futures;
TaskGroupWaiter waiter = new TaskGroupWaiter(n);
try {
int i = 0;
for (Runnable t : tasks) {
SignallingFuture f =
new SignallingFuture(t, Boolean.TRUE, waiter, i++);
futures.add(f);
execute(f);
}
waiter.awaitNanos(nanos);
return futures;
} finally {
if (!waiter.isDone())
for (Future> f : futures)
f.cancel(true);
}
}
public List> callAny(List> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
int n = tasks.size();
List> futures = new ArrayList>(n);
if (n == 0)
return futures;
TaskGroupWaiter waiter = new TaskGroupWaiter(1);
int i = 0;
try {
for (Callable t : tasks) {
SignallingFuture f = new SignallingFuture(t, waiter, i++);
futures.add(f);
if (!waiter.isDone())
execute(f);
}
int first = waiter.await();
return futures;
} finally {
for (Future f : futures)
f.cancel(true);
}
}
public List> callAny(List> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int n = tasks.size();
List> futures= new ArrayList>(n);
if (n == 0)
return futures;
TaskGroupWaiter waiter = new TaskGroupWaiter(1);
try {
int i = 0;
for (Callable t : tasks) {
SignallingFuture f = new SignallingFuture(t, waiter, i++);
futures.add(f);
if (!waiter.isDone())
execute(f);
}
int first = waiter.awaitNanos(nanos);
return futures;
} finally {
for (Future f : futures)
f.cancel(true);
}
}
public List> callAll(List> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
int n = tasks.size();
List> futures = new ArrayList>(n);
if (n == 0)
return futures;
TaskGroupWaiter waiter = new TaskGroupWaiter(n);
try {
int i = 0;
for (Callable t : tasks) {
SignallingFuture f = new SignallingFuture(t, waiter, i++);
futures.add(f);
execute(f);
}
waiter.await();
return futures;
} finally {
if (!waiter.isDone())
for (Future f : futures)
f.cancel(true);
}
}
public List> callAll(List> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int n = tasks.size();
List> futures = new ArrayList>(n);
if (n == 0)
return futures;
TaskGroupWaiter waiter = new TaskGroupWaiter(n);
try {
int i = 0;
for (Callable t : tasks) {
SignallingFuture f = new SignallingFuture(t, waiter, i++);
futures.add(f);
execute(f);
}
waiter.awaitNanos(nanos);
return futures;
} finally {
if (!waiter.isDone())
for (Future f : futures)
f.cancel(true);
}
}
}