--- jsr166/src/jsr166e/CompletableFuture.java 2013/02/06 07:04:48 1.9
+++ jsr166/src/jsr166e/CompletableFuture.java 2013/07/14 19:55:05 1.19
@@ -6,9 +6,9 @@
package jsr166e;
import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executor;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.CancellationException;
@@ -18,52 +18,69 @@ import java.util.concurrent.locks.LockSu
/**
* A {@link Future} that may be explicitly completed (setting its
* value and status), and may include dependent functions and actions
- * that trigger upon its completion. Methods are available for adding
- * those based on Functions, Blocks, and Runnables, depending on
- * whether they require arguments and/or produce results, as well as
- * those triggered after either or both the current and another
- * CompletableFuture complete. Functions and actions supplied for
- * dependent completions (mainly using methods with prefix {@code
- * then}) may be performed by the thread that completes the current
+ * that trigger upon its completion.
+ *
+ *
When two or more threads attempt to
+ * {@link #complete complete},
+ * {@link #completeExceptionally completeExceptionally}, or
+ * {@link #cancel cancel}
+ * a CompletableFuture, only one of them succeeds.
+ *
+ *
Methods are available for adding dependents based on
+ * user-provided Functions, Actions, or Runnables. The appropriate
+ * form to use depends on whether actions require arguments and/or
+ * produce results. Completion of a dependent action will trigger the
+ * completion of another CompletableFuture. Actions may also be
+ * triggered after either or both the current and another
+ * CompletableFuture complete. Multiple CompletableFutures may also
+ * be grouped as one using {@link #anyOf(CompletableFuture...)} and
+ * {@link #allOf(CompletableFuture...)}.
+ *
+ *
CompletableFutures themselves do not execute asynchronously.
+ * However, actions supplied for dependent completions of another
+ * CompletableFuture may do so, depending on whether they are provided
+ * via one of the async methods (that is, methods with names
+ * of the form xxxAsync). The async
+ * methods provide a way to commence asynchronous processing of an
+ * action using either a given {@link Executor} or by default the
+ * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
+ * debugging, and tracking, all generated asynchronous tasks are
+ * instances of the marker interface {@link AsynchronousCompletionTask}.
+ *
+ *
Actions supplied for dependent completions of non-async
+ * methods may be performed by the thread that completes the current
* CompletableFuture, or by any other caller of these methods. There
* are no guarantees about the order of processing completions unless
* constrained by these methods.
*
- *
When two or more threads attempt to {@link #complete} or {@link
- * #completeExceptionally} a CompletableFuture, only one of them
- * succeeds.
+ *
Since (unlike {@link FutureTask}) this class has no direct
+ * control over the computation that causes it to be completed,
+ * cancellation is treated as just another form of exceptional completion.
+ * Method {@link #cancel cancel} has the same effect as
+ * {@code completeExceptionally(new CancellationException())}.
*
- *
Upon exceptional completion, or when a completion entails
- * computation of a function or action, and it terminates abruptly
- * with an (unchecked) exception or error, then further completions
- * act as {@code completeExceptionally} with a {@link
- * CompletionException} holding that exception as its cause. If a
- * CompletableFuture completes exceptionally, and is not followed by a
- * {@link #exceptionally} or {@link #handle} completion, then all of
- * its dependents (and their dependents) also complete exceptionally
- * with CompletionExceptions holding the ultimate cause. In case of a
- * CompletionException, methods {@link #get()} and {@link #get(long,
- * TimeUnit)} throw an {@link ExecutionException} with the same cause
- * as would be held in the corresponding CompletionException. However,
- * in these cases, methods {@link #join()} and {@link #getNow} throw
- * the CompletionException, which simplifies usage especially within
- * other completion functions.
+ *
Upon exceptional completion (including cancellation), or when a
+ * completion entails an additional computation which terminates
+ * abruptly with an (unchecked) exception or error, then all of their
+ * dependent completions (and their dependents in turn) generally act
+ * as {@code completeExceptionally} with a {@link CompletionException}
+ * holding that exception as its cause. However, the {@link
+ * #exceptionally exceptionally} and {@link #handle handle}
+ * completions are able to handle exceptional completions of
+ * the CompletableFutures they depend on.
*
- *
CompletableFutures themselves do not execute asynchronously.
- * However, the {@code async} methods provide commonly useful ways to
- * commence asynchronous processing, using either a given {@link
- * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
- * function or action that will result in the completion of a new
- * CompletableFuture. To simplify monitoring, debugging, and tracking,
- * all generated asynchronous tasks are instances of the tagging
- * interface {@link AsynchronousCompletionTask}.
+ *
In case of exceptional completion with a CompletionException,
+ * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
+ * {@link ExecutionException} with the same cause as held in the
+ * corresponding CompletionException. However, in these cases,
+ * methods {@link #join()} and {@link #getNow} throw the
+ * CompletionException, which simplifies usage.
*
- *
jsr166e note: During transition, this class
- * uses nested functional interfaces with different names but the
- * same forms as those expected for JDK8.
+ *
Arguments used to pass a completion result (that is, for parameters
+ * of type {@code T}) may be null, but passing a null value for any other
+ * parameter will result in a {@link NullPointerException} being thrown.
*
* @author Doug Lea
- * @since 1.8
*/
public class CompletableFuture implements Future {
// jsr166e nested interfaces
@@ -110,9 +127,10 @@ public class CompletableFuture implem
* extends AtomicInteger so callers can claim the action via
* compareAndSet(0, 1). The Completion.run methods are all
* written a boringly similar uniform way (that sometimes includes
- * unnecessary-looking checks, kept to maintain uniformity). There
- * are enough dimensions upon which they differ that factoring to
- * use common code isn't worthwhile.
+ * unnecessary-looking checks, kept to maintain uniformity).
+ * There are enough dimensions upon which they differ that
+ * attempts to factor commonalities while maintaining efficiency
+ * require more lines of code than they would save.
*
* 4. The exported then/and/or methods do support a bit of
* factoring (see doThenApply etc). They must cope with the
@@ -169,7 +187,7 @@ public class CompletableFuture implem
* CompletionException unless it is one already. Otherwise uses
* the given result, boxed as NIL if null.
*/
- final void internalComplete(Object v, Throwable ex) {
+ final void internalComplete(T v, Throwable ex) {
if (result == null)
UNSAFE.compareAndSwapObject
(this, RESULT, null,
@@ -189,11 +207,14 @@ public class CompletableFuture implem
/* ------------- waiting for completions -------------- */
+ /** Number of processors, for spin control */
+ static final int NCPU = Runtime.getRuntime().availableProcessors();
+
/**
* Heuristic spin value for waitingGet() before blocking on
* multiprocessors
*/
- static final int WAITING_GET_SPINS = 256;
+ static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
/**
* Linked nodes to record waiting threads in a Treiber stack. See
@@ -248,7 +269,7 @@ public class CompletableFuture implem
private Object waitingGet(boolean interruptible) {
WaitNode q = null;
boolean queued = false;
- int h = 0, spins = 0;
+ int spins = SPINS;
for (Object r;;) {
if ((r = result) != null) {
if (q != null) { // suppress unpark
@@ -264,15 +285,9 @@ public class CompletableFuture implem
postComplete(); // help release others
return r;
}
- else if (h == 0) {
- h = ThreadLocalRandom.current().nextInt();
- if (Runtime.getRuntime().availableProcessors() > 1)
- spins = WAITING_GET_SPINS;
- }
else if (spins > 0) {
- h ^= h << 1; // xorshift
- h ^= h >>> 3;
- if ((h ^= h << 10) >= 0)
+ int rnd = ThreadLocalRandom.current().nextInt();
+ if (rnd >= 0)
--spins;
}
else if (q == null)
@@ -380,9 +395,11 @@ public class CompletableFuture implem
/* ------------- Async tasks -------------- */
/**
- * A tagging interface identifying asynchronous tasks produced by
+ * A marker interface identifying asynchronous tasks produced by
* {@code async} methods. This may be useful for monitoring,
* debugging, and tracking asynchronous activities.
+ *
+ * @since 1.8
*/
public static interface AsynchronousCompletionTask {
}
@@ -441,11 +458,11 @@ public class CompletableFuture implem
}
static final class AsyncApply extends Async {
- final Fun super T,? extends U> fn;
final T arg;
+ final Fun super T,? extends U> fn;
final CompletableFuture dst;
AsyncApply(T arg, Fun super T,? extends U> fn,
- CompletableFuture dst) {
+ CompletableFuture dst) {
this.arg = arg; this.fn = fn; this.dst = dst;
}
public final boolean exec() {
@@ -465,14 +482,14 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
- static final class AsyncBiApply extends Async {
- final BiFun super T,? super U,? extends V> fn;
+ static final class AsyncCombine extends Async {
final T arg1;
final U arg2;
+ final BiFun super T,? super U,? extends V> fn;
final CompletableFuture dst;
- AsyncBiApply(T arg1, U arg2,
- BiFun super T,? super U,? extends V> fn,
- CompletableFuture dst) {
+ AsyncCombine(T arg1, U arg2,
+ BiFun super T,? super U,? extends V> fn,
+ CompletableFuture dst) {
this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
}
public final boolean exec() {
@@ -493,11 +510,11 @@ public class CompletableFuture implem
}
static final class AsyncAccept extends Async {
- final Action super T> fn;
final T arg;
+ final Action super T> fn;
final CompletableFuture dst;
AsyncAccept(T arg, Action super T> fn,
- CompletableFuture dst) {
+ CompletableFuture dst) {
this.arg = arg; this.fn = fn; this.dst = dst;
}
public final boolean exec() {
@@ -516,14 +533,14 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
- static final class AsyncBiAccept extends Async {
- final BiAction super T,? super U> fn;
+ static final class AsyncAcceptBoth extends Async {
final T arg1;
final U arg2;
+ final BiAction super T,? super U> fn;
final CompletableFuture dst;
- AsyncBiAccept(T arg1, U arg2,
- BiAction super T,? super U> fn,
- CompletableFuture dst) {
+ AsyncAcceptBoth(T arg1, U arg2,
+ BiAction super T,? super U> fn,
+ CompletableFuture dst) {
this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
}
public final boolean exec() {
@@ -542,6 +559,47 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
+ static final class AsyncCompose extends Async {
+ final T arg;
+ final Fun super T, CompletableFuture> fn;
+ final CompletableFuture dst;
+ AsyncCompose(T arg,
+ Fun super T, CompletableFuture> fn,
+ CompletableFuture dst) {
+ this.arg = arg; this.fn = fn; this.dst = dst;
+ }
+ public final boolean exec() {
+ CompletableFuture d, fr; U u; Throwable ex;
+ if ((d = this.dst) != null && d.result == null) {
+ try {
+ fr = fn.apply(arg);
+ ex = (fr == null) ? new NullPointerException() : null;
+ } catch (Throwable rex) {
+ ex = rex;
+ fr = null;
+ }
+ if (ex != null)
+ u = null;
+ else {
+ Object r = fr.result;
+ if (r == null)
+ r = fr.waitingGet(false);
+ if (r instanceof AltResult) {
+ ex = ((AltResult)r).ex;
+ u = null;
+ }
+ else {
+ @SuppressWarnings("unchecked") U ur = (U) r;
+ u = ur;
+ }
+ }
+ d.internalComplete(u, ex);
+ }
+ return true;
+ }
+ private static final long serialVersionUID = 5232453952276885070L;
+ }
+
/* ------------- Completions -------------- */
/**
@@ -560,14 +618,15 @@ public class CompletableFuture implem
abstract static class Completion extends AtomicInteger implements Runnable {
}
- static final class ApplyCompletion extends Completion {
+ static final class ThenApply extends Completion {
final CompletableFuture extends T> src;
final Fun super T,? extends U> fn;
final CompletableFuture dst;
final Executor executor;
- ApplyCompletion(CompletableFuture extends T> src,
- Fun super T,? extends U> fn,
- CompletableFuture dst, Executor executor) {
+ ThenApply(CompletableFuture extends T> src,
+ Fun super T,? extends U> fn,
+ CompletableFuture dst,
+ Executor executor) {
this.src = src; this.fn = fn; this.dst = dst;
this.executor = executor;
}
@@ -609,14 +668,15 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
- static final class AcceptCompletion extends Completion {
+ static final class ThenAccept extends Completion {
final CompletableFuture extends T> src;
final Action super T> fn;
final CompletableFuture dst;
final Executor executor;
- AcceptCompletion(CompletableFuture extends T> src,
- Action super T> fn,
- CompletableFuture dst, Executor executor) {
+ ThenAccept(CompletableFuture extends T> src,
+ Action super T> fn,
+ CompletableFuture dst,
+ Executor executor) {
this.src = src; this.fn = fn; this.dst = dst;
this.executor = executor;
}
@@ -657,20 +717,20 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
- static final class RunCompletion extends Completion {
- final CompletableFuture extends T> src;
+ static final class ThenRun extends Completion {
+ final CompletableFuture> src;
final Runnable fn;
final CompletableFuture dst;
final Executor executor;
- RunCompletion(CompletableFuture extends T> src,
- Runnable fn,
- CompletableFuture dst,
- Executor executor) {
+ ThenRun(CompletableFuture> src,
+ Runnable fn,
+ CompletableFuture dst,
+ Executor executor) {
this.src = src; this.fn = fn; this.dst = dst;
this.executor = executor;
}
public final void run() {
- final CompletableFuture extends T> a;
+ final CompletableFuture> a;
final Runnable fn;
final CompletableFuture dst;
Object r; Throwable ex;
@@ -701,16 +761,17 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
- static final class BiApplyCompletion extends Completion {
+ static final class ThenCombine extends Completion {
final CompletableFuture extends T> src;
final CompletableFuture extends U> snd;
final BiFun super T,? super U,? extends V> fn;
final CompletableFuture dst;
final Executor executor;
- BiApplyCompletion(CompletableFuture extends T> src,
- CompletableFuture extends U> snd,
- BiFun super T,? super U,? extends V> fn,
- CompletableFuture dst, Executor executor) {
+ ThenCombine(CompletableFuture extends T> src,
+ CompletableFuture extends U> snd,
+ BiFun super T,? super U,? extends V> fn,
+ CompletableFuture dst,
+ Executor executor) {
this.src = src; this.snd = snd;
this.fn = fn; this.dst = dst;
this.executor = executor;
@@ -752,7 +813,7 @@ public class CompletableFuture implem
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncBiApply(t, u, fn, dst));
+ e.execute(new AsyncCombine(t, u, fn, dst));
else
v = fn.apply(t, u);
} catch (Throwable rex) {
@@ -766,16 +827,17 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
- static final class BiAcceptCompletion extends Completion {
+ static final class ThenAcceptBoth extends Completion {
final CompletableFuture extends T> src;
final CompletableFuture extends U> snd;
final BiAction super T,? super U> fn;
final CompletableFuture dst;
final Executor executor;
- BiAcceptCompletion(CompletableFuture extends T> src,
- CompletableFuture extends U> snd,
- BiAction super T,? super U> fn,
- CompletableFuture dst, Executor executor) {
+ ThenAcceptBoth(CompletableFuture extends T> src,
+ CompletableFuture extends U> snd,
+ BiAction super T,? super U> fn,
+ CompletableFuture dst,
+ Executor executor) {
this.src = src; this.snd = snd;
this.fn = fn; this.dst = dst;
this.executor = executor;
@@ -816,7 +878,7 @@ public class CompletableFuture implem
if (ex == null) {
try {
if (e != null)
- e.execute(new AsyncBiAccept(t, u, fn, dst));
+ e.execute(new AsyncAcceptBoth(t, u, fn, dst));
else
fn.accept(t, u);
} catch (Throwable rex) {
@@ -830,22 +892,23 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
- static final class BiRunCompletion extends Completion {
- final CompletableFuture extends T> src;
+ static final class RunAfterBoth extends Completion {
+ final CompletableFuture> src;
final CompletableFuture> snd;
final Runnable fn;
final CompletableFuture dst;
final Executor executor;
- BiRunCompletion(CompletableFuture extends T> src,
- CompletableFuture> snd,
- Runnable fn,
- CompletableFuture dst, Executor executor) {
+ RunAfterBoth(CompletableFuture> src,
+ CompletableFuture> snd,
+ Runnable fn,
+ CompletableFuture dst,
+ Executor executor) {
this.src = src; this.snd = snd;
this.fn = fn; this.dst = dst;
this.executor = executor;
}
public final void run() {
- final CompletableFuture extends T> a;
+ final CompletableFuture> a;
final CompletableFuture> b;
final Runnable fn;
final CompletableFuture dst;
@@ -881,16 +944,49 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
- static final class OrApplyCompletion extends Completion {
+ static final class AndCompletion extends Completion {
+ final CompletableFuture> src;
+ final CompletableFuture> snd;
+ final CompletableFuture dst;
+ AndCompletion(CompletableFuture> src,
+ CompletableFuture> snd,
+ CompletableFuture dst) {
+ this.src = src; this.snd = snd; this.dst = dst;
+ }
+ public final void run() {
+ final CompletableFuture> a;
+ final CompletableFuture> b;
+ final CompletableFuture dst;
+ Object r, s; Throwable ex;
+ if ((dst = this.dst) != null &&
+ (a = this.src) != null &&
+ (r = a.result) != null &&
+ (b = this.snd) != null &&
+ (s = b.result) != null &&
+ compareAndSet(0, 1)) {
+ if (r instanceof AltResult)
+ ex = ((AltResult)r).ex;
+ else
+ ex = null;
+ if (ex == null && (s instanceof AltResult))
+ ex = ((AltResult)s).ex;
+ dst.internalComplete(null, ex);
+ }
+ }
+ private static final long serialVersionUID = 5232453952276885070L;
+ }
+
+ static final class ApplyToEither extends Completion {
final CompletableFuture extends T> src;
final CompletableFuture extends T> snd;
final Fun super T,? extends U> fn;
final CompletableFuture dst;
final Executor executor;
- OrApplyCompletion(CompletableFuture extends T> src,
- CompletableFuture extends T> snd,
- Fun super T,? extends U> fn,
- CompletableFuture dst, Executor executor) {
+ ApplyToEither(CompletableFuture extends T> src,
+ CompletableFuture extends T> snd,
+ Fun super T,? extends U> fn,
+ CompletableFuture dst,
+ Executor executor) {
this.src = src; this.snd = snd;
this.fn = fn; this.dst = dst;
this.executor = executor;
@@ -934,16 +1030,17 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
- static final class OrAcceptCompletion extends Completion {
+ static final class AcceptEither extends Completion {
final CompletableFuture extends T> src;
final CompletableFuture extends T> snd;
final Action super T> fn;
final CompletableFuture dst;
final Executor executor;
- OrAcceptCompletion(CompletableFuture extends T> src,
- CompletableFuture extends T> snd,
- Action super T> fn,
- CompletableFuture dst, Executor executor) {
+ AcceptEither(CompletableFuture extends T> src,
+ CompletableFuture extends T> snd,
+ Action super T> fn,
+ CompletableFuture dst,
+ Executor executor) {
this.src = src; this.snd = snd;
this.fn = fn; this.dst = dst;
this.executor = executor;
@@ -986,22 +1083,23 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
- static final class OrRunCompletion extends Completion {
- final CompletableFuture extends T> src;
+ static final class RunAfterEither extends Completion {
+ final CompletableFuture> src;
final CompletableFuture> snd;
final Runnable fn;
final CompletableFuture dst;
final Executor executor;
- OrRunCompletion(CompletableFuture extends T> src,
- CompletableFuture> snd,
- Runnable fn,
- CompletableFuture dst, Executor executor) {
+ RunAfterEither(CompletableFuture> src,
+ CompletableFuture> snd,
+ Runnable fn,
+ CompletableFuture dst,
+ Executor executor) {
this.src = src; this.snd = snd;
this.fn = fn; this.dst = dst;
this.executor = executor;
}
public final void run() {
- final CompletableFuture extends T> a;
+ final CompletableFuture> a;
final CompletableFuture> b;
final Runnable fn;
final CompletableFuture dst;
@@ -1033,6 +1131,38 @@ public class CompletableFuture implem
private static final long serialVersionUID = 5232453952276885070L;
}
+ static final class OrCompletion extends Completion {
+ final CompletableFuture> src;
+ final CompletableFuture> snd;
+ final CompletableFuture