--- jsr166/src/jsr166y/ForkJoinTask.java 2009/07/27 20:57:44 1.23 +++ jsr166/src/jsr166y/ForkJoinTask.java 2009/07/31 16:27:08 1.25 @@ -25,12 +25,11 @@ import java.util.WeakHashMap; *

A "main" ForkJoinTask begins execution when submitted to a * {@link ForkJoinPool}. Once started, it will usually in turn start * other subtasks. As indicated by the name of this class, many - * programs using ForkJoinTasks employ only methods {@code fork} - * and {@code join}, or derivatives such as - * {@code invokeAll}. However, this class also provides a number - * of other methods that can come into play in advanced usages, as - * well as extension mechanics that allow support of new forms of - * fork/join processing. + * programs using ForkJoinTasks employ only methods {@code fork} and + * {@code join}, or derivatives such as {@code invokeAll}. However, + * this class also provides a number of other methods that can come + * into play in advanced usages, as well as extension mechanics that + * allow support of new forms of fork/join processing. * *

A ForkJoinTask is a lightweight form of {@link Future}. The * efficiency of ForkJoinTasks stems from a set of restrictions (that @@ -94,8 +93,8 @@ import java.util.WeakHashMap; * lightweight task scheduling framework, and so cannot be overridden. * Developers creating new basic styles of fork/join processing should * minimally implement {@code protected} methods - * {@code exec}, {@code setRawResult}, and - * {@code getRawResult}, while also introducing an abstract + * {@link #exec}, {@link #setRawResult}, and + * {@link #getRawResult}, while also introducing an abstract * computational method that can be implemented in its subclasses, * possibly relying on other {@code protected} methods provided * by this class. @@ -248,7 +247,7 @@ public abstract class ForkJoinTask im synchronized (this) { try { while (status >= 0) { - long nt = nanos - System.nanoTime() - startTime; + long nt = nanos - (System.nanoTime() - startTime); if (nt <= 0) break; wait(nt / 1000000, (int) (nt % 1000000)); @@ -673,13 +672,13 @@ public abstract class ForkJoinTask im /** * Asserts that the results of this task's computation will not be * used. If a cancellation occurs before attempting to execute this - * task, then execution will be suppressed, {@code isCancelled} - * will report true, and {@code join} will result in a + * task, execution will be suppressed, {@link #isCancelled} + * will report true, and {@link #join} will result in a * {@code CancellationException} being thrown. Otherwise, when * cancellation races with completion, there are no guarantees - * about whether {@code isCancelled} will report true, whether - * {@code join} will return normally or via an exception, or - * whether these behaviors will remain consistent upon repeated + * about whether {@code isCancelled} will report {@code true}, + * whether {@code join} will return normally or via an exception, + * or whether these behaviors will remain consistent upon repeated * invocation. * *

This method may be overridden in subclasses, but if so, must @@ -689,7 +688,7 @@ public abstract class ForkJoinTask im *

This method is designed to be invoked by other * tasks. To terminate the current task, you can just return or * throw an unchecked exception from its computation method, or - * invoke {@code completeExceptionally}. + * invoke {@link #completeExceptionally}. * * @param mayInterruptIfRunning this value is ignored in the * default implementation because tasks are not in general @@ -778,9 +777,10 @@ public abstract class ForkJoinTask im public final V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + long nanos = unit.toNanos(timeout); ForkJoinWorkerThread w = getWorker(); if (w == null || status < 0 || !w.unpushTask(this) || !tryQuietlyInvoke()) - awaitDone(w, unit.toNanos(timeout)); + awaitDone(w, nanos); return reportTimedFutureResult(); } @@ -984,15 +984,18 @@ public abstract class ForkJoinTask im protected abstract boolean exec(); /** - * Returns, but does not unschedule or execute, the task queued by - * the current thread but not yet executed, if one is + * Returns, but does not unschedule or execute, a task queued by + * the current thread but not yet executed, if one is immediately * available. There is no guarantee that this task will actually - * be polled or executed next. This method is designed primarily - * to support extensions, and is unlikely to be useful otherwise. - * This method may be invoked only from within ForkJoinTask - * computations (as may be determined using method {@link - * #inForkJoinPool}). Attempts to invoke in other contexts result - * in exceptions or errors, possibly including ClassCastException. + * be polled or executed next. Conversely, this method may return + * null even if a task exists but cannot be accessed without + * contention with other threads. This method is designed + * primarily to support extensions, and is unlikely to be useful + * otherwise. This method may be invoked only from within + * ForkJoinTask computations (as may be determined using method + * {@link #inForkJoinPool}). Attempts to invoke in other contexts + * result in exceptions or errors, possibly including + * ClassCastException. * * @return the next task, or {@code null} if none are available */ @@ -1039,7 +1042,60 @@ public abstract class ForkJoinTask im .pollTask(); } - // adaptors + /** + * Adaptor for Runnables. This implements RunnableFuture + * to be compliant with AbstractExecutorService constraints + * when used in ForkJoinPool. + */ + static final class AdaptedRunnable extends ForkJoinTask + implements RunnableFuture { + final Runnable runnable; + final T resultOnCompletion; + T result; + AdaptedRunnable(Runnable runnable, T result) { + if (runnable == null) throw new NullPointerException(); + this.runnable = runnable; + this.resultOnCompletion = result; + } + public T getRawResult() { return result; } + public void setRawResult(T v) { result = v; } + public boolean exec() { + runnable.run(); + result = resultOnCompletion; + return true; + } + public void run() { invoke(); } + private static final long serialVersionUID = 5232453952276885070L; + } + + /** + * Adaptor for Callables + */ + static final class AdaptedCallable extends ForkJoinTask + implements RunnableFuture { + final Callable callable; + T result; + AdaptedCallable(Callable callable) { + if (callable == null) throw new NullPointerException(); + this.callable = callable; + } + public T getRawResult() { return result; } + public void setRawResult(T v) { result = v; } + public boolean exec() { + try { + result = callable.call(); + return true; + } catch (Error err) { + throw err; + } catch (RuntimeException rex) { + throw rex; + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + public void run() { invoke(); } + private static final long serialVersionUID = 2838392045355241008L; + } /** * Returns a new ForkJoinTask that performs the {@code run} @@ -1050,7 +1106,7 @@ public abstract class ForkJoinTask im * @return the task */ public static ForkJoinTask adapt(Runnable runnable) { - return new ForkJoinPool.AdaptedRunnable(runnable, null); + return new AdaptedRunnable(runnable, null); } /** @@ -1063,7 +1119,7 @@ public abstract class ForkJoinTask im * @return the task */ public static ForkJoinTask adapt(Runnable runnable, T result) { - return new ForkJoinPool.AdaptedRunnable(runnable, result); + return new AdaptedRunnable(runnable, result); } /** @@ -1076,7 +1132,7 @@ public abstract class ForkJoinTask im * @return the task */ public static ForkJoinTask adapt(Callable callable) { - return new ForkJoinPool.AdaptedCallable(callable); + return new AdaptedCallable(callable); } // Serialization support