/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain. Use, modify, and * redistribute this code in any way without acknowledgement. */ package java.util.concurrent; import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.*; import java.util.concurrent.locks.*; /** * Provides default implementation of {@link ExecutorService} * execution methods. This class implements the submit and * invoke methods using the default {@link FutureTask} and * {@link PrivilegedFutureTask} classes provided in this package. For * example, the the implementation of submit(Runnable) * creates an associated FutureTask that is executed and * returned. Subclasses overriding these methods to use different * {@link Future} implementations should do so consistently for each * of these methods. * * @since 1.5 * @author Doug Lea */ public abstract class AbstractExecutorService implements ExecutorService { public Future submit(Runnable task) { FutureTask ftask = new FutureTask(task, Boolean.TRUE); execute(ftask); return ftask; } public Future submit(Callable task) { FutureTask ftask = new FutureTask(task); execute(ftask); return ftask; } public void invoke(Runnable task) throws ExecutionException, InterruptedException { FutureTask ftask = new FutureTask(task, Boolean.TRUE); execute(ftask); ftask.get(); } public T invoke(Callable task) throws ExecutionException, InterruptedException { FutureTask ftask = new FutureTask(task); execute(ftask); return ftask.get(); } public Future submit(PrivilegedAction action) { Callable task = new PrivilegedActionAdapter(action); FutureTask future = new PrivilegedFutureTask(task); execute(future); return future; } public Future submit(PrivilegedExceptionAction action) { Callable task = new PrivilegedExceptionActionAdapter(action); FutureTask future = new PrivilegedFutureTask(task); execute(future); return future; } private static class PrivilegedActionAdapter implements Callable { PrivilegedActionAdapter(PrivilegedAction action) { this.action = action; } public Object call () { return action.run(); } private final PrivilegedAction action; } private static class PrivilegedExceptionActionAdapter implements Callable { PrivilegedExceptionActionAdapter(PrivilegedExceptionAction action) { this.action = action; } public Object call () throws Exception { return action.run(); } private final PrivilegedExceptionAction action; } /** * Helper class to wait for tasks in bulk-execute methods */ private static class TaskGroupWaiter { private final ReentrantLock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private int firstIndex = -1; private int countDown; TaskGroupWaiter(int ntasks) { countDown = ntasks; } void signalDone(int index) { lock.lock(); try { if (firstIndex < 0) firstIndex = index; if (--countDown == 0) done.signalAll(); } finally { lock.unlock(); } } int await() throws InterruptedException { lock.lock(); try { while (countDown > 0) done.await(); return firstIndex; } finally { lock.unlock(); } } int awaitNanos(long nanos) throws InterruptedException { lock.lock(); try { while (countDown > 0 && nanos > 0) nanos = done.awaitNanos(nanos); return firstIndex; } finally { lock.unlock(); } } boolean isDone() { lock.lock(); try { return countDown <= 0; } finally { lock.unlock(); } } } /** * FutureTask extension to provide signal when task completes */ private static class SignallingFuture extends FutureTask { private final TaskGroupWaiter waiter; private final int index; SignallingFuture(Callable c, TaskGroupWaiter w, int i) { super(c); waiter = w; index = i; } SignallingFuture(Runnable t, T r, TaskGroupWaiter w, int i) { super(t, r); waiter = w; index = i; } protected void done() { waiter.signalDone(index); } } // any/all methods, each a little bit different than the other public List> runAny(List tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); int n = tasks.size(); List> futures = new ArrayList>(n); if (n == 0) return futures; TaskGroupWaiter waiter = new TaskGroupWaiter(1); try { int i = 0; for (Runnable t : tasks) { SignallingFuture f = new SignallingFuture(t, Boolean.TRUE, waiter, i++); futures.add(f); if (!waiter.isDone()) execute(f); } int first = waiter.await(); return futures; } finally { for (Future f : futures) f.cancel(true); } } public List> runAny(List tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null || unit == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int n = tasks.size(); List> futures = new ArrayList>(n); if (n == 0) return futures; TaskGroupWaiter waiter = new TaskGroupWaiter(1); try { int i = 0; for (Runnable t : tasks) { SignallingFuture f = new SignallingFuture(t, Boolean.TRUE, waiter, i++); futures.add(f); if (!waiter.isDone()) execute(f); } int first = waiter.awaitNanos(nanos); return futures; } finally { for (Future f : futures) f.cancel(true); } } public List> runAll(List tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); int n = tasks.size(); List> futures = new ArrayList>(n); if (n == 0) return futures; TaskGroupWaiter waiter = new TaskGroupWaiter(n); int i = 0; try { for (Runnable t : tasks) { SignallingFuture f = new SignallingFuture(t, Boolean.TRUE, waiter, i++); futures.add(f); execute(f); } waiter.await(); return futures; } finally { if (!waiter.isDone()) for (Future f : futures) f.cancel(true); } } public List> runAll(List tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null || unit == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int n = tasks.size(); List> futures = new ArrayList>(n); if (n == 0) return futures; TaskGroupWaiter waiter = new TaskGroupWaiter(n); try { int i = 0; for (Runnable t : tasks) { SignallingFuture f = new SignallingFuture(t, Boolean.TRUE, waiter, i++); futures.add(f); execute(f); } waiter.awaitNanos(nanos); return futures; } finally { if (!waiter.isDone()) for (Future f : futures) f.cancel(true); } } public List> callAny(List> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); int n = tasks.size(); List> futures = new ArrayList>(n); if (n == 0) return futures; TaskGroupWaiter waiter = new TaskGroupWaiter(1); int i = 0; try { for (Callable t : tasks) { SignallingFuture f = new SignallingFuture(t, waiter, i++); futures.add(f); if (!waiter.isDone()) execute(f); } int first = waiter.await(); return futures; } finally { for (Future f : futures) f.cancel(true); } } public List> callAny(List> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null || unit == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int n = tasks.size(); List> futures= new ArrayList>(n); if (n == 0) return futures; TaskGroupWaiter waiter = new TaskGroupWaiter(1); try { int i = 0; for (Callable t : tasks) { SignallingFuture f = new SignallingFuture(t, waiter, i++); futures.add(f); if (!waiter.isDone()) execute(f); } int first = waiter.awaitNanos(nanos); return futures; } finally { for (Future f : futures) f.cancel(true); } } public List> callAll(List> tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); int n = tasks.size(); List> futures = new ArrayList>(n); if (n == 0) return futures; TaskGroupWaiter waiter = new TaskGroupWaiter(n); try { int i = 0; for (Callable t : tasks) { SignallingFuture f = new SignallingFuture(t, waiter, i++); futures.add(f); execute(f); } waiter.await(); return futures; } finally { if (!waiter.isDone()) for (Future f : futures) f.cancel(true); } } public List> callAll(List> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null || unit == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int n = tasks.size(); List> futures = new ArrayList>(n); if (n == 0) return futures; TaskGroupWaiter waiter = new TaskGroupWaiter(n); try { int i = 0; for (Callable t : tasks) { SignallingFuture f = new SignallingFuture(t, waiter, i++); futures.add(f); execute(f); } waiter.awaitNanos(nanos); return futures; } finally { if (!waiter.isDone()) for (Future f : futures) f.cancel(true); } } }