/* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; import java.util.function.Supplier; import java.util.function.Consumer; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.BiFunction; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.locks.LockSupport; /** * A {@link Future} that may be explicitly completed (setting its * value and status), and may be used as a {@link CompletionStage}, * supporting dependent functions and actions that trigger upon its * completion. * *

When two or more threads attempt to * {@link #complete complete}, * {@link #completeExceptionally completeExceptionally}, or * {@link #cancel cancel} * a CompletableFuture, only one of them succeeds. * *

In addition to these and related methods for directly * manipulating status and results, CompletableFuture implements * interface {@link CompletionStage} with the following policies:

* *

CompletableFuture also implements {@link Future} with the following * policies:

* * @author Doug Lea * @since 1.8 */ public class CompletableFuture implements Future, CompletionStage { /* * Overview: * * A CompletableFuture may have dependent completion actions, * collected in a linked stack. It atomically completes by CASing * a result field, and then pops off and runs those actions. This * applies across normal vs exceptional outcomes, sync vs async * actions, binary triggers, and various forms of completions. * * Non-nullness of field result (set via CAS) indicates done. An * AltResult is used to box null as a result, as well as to hold * exceptions. Using a single field makes completion simple to * detect and trigger. Encoding and decoding is straightforward * but adds vertical sprawl. One minor simplification relies on * the (static) NIL (to box null results) being the only AltResult * with a null exception field, so we don't usually need explicit * comparisons with NIL. Exception propagation mechanics * surrounding decoding rely on unchecked casts of decoded results * really being unchecked, and user type errors being caught at * point of use, as is currently the case in Java. These are * highlighted by using SuppressWarnings annotated temporaries. * * Dependent actions are represented by Completion objects linked * as Treiber stacks headed by field completions. There are four * kinds of Completions: single-source (UniCompletion), two-source * (BiCompletion), shared (CoBiCompletion, used by the second * source of a BiCompletion), and Signallers that unblock waiters. * * The same patterns of methods and classes are used for each form * of Completion (apply, combine, etc), and are written in a * similar style. For each form X there is, when applicable: * * * Method nowX (for example nowApply) that immediately executes * a supplied function and sets result * * Class AsyncX class (for example AsyncApply) that calls nowX * from another task, * * Class DelayedX (for example DelayedApply) that holds * arguments and calls Xnow when ready. * * For each public CompletionStage method M* (for example * thenApply{Async}), there is a method doM (for example * doThenApply) that creates and/or invokes the appropriate form. * Each deals with three cases that can arise when adding a * dependent completion to CompletableFuture f: * * * f is already complete, so the dependent action is run * immediately, via a "now" method, which, if async, * starts the action in a new task. * * f is not complete, so a Completion action is created and * pushed to f's completions. It is triggered via * f.postComplete when f completes. * * f is not complete, but completes while adding the completion * action, so we try to trigger it upon adding (see method * unipush and derivatives) to cover races. * * Methods with two sources (for example thenCombine) must deal * with races across both while pushing actions. The second * completion is an CoBiCompletion pointing to the first, shared * to ensure that at most one claims and performs the action. The * multiple-arity method allOf does this pairwise to form a tree * of completions. (Method anyOf just uses a depth-one Or tree.) * * Upon setting results, method postComplete is called unless * the target is guaranteed not to be observable (i.e., not yet * returned or linked). Multiple threads can call postComplete, * which atomically pops each dependent action, and tries to * trigger it via method tryAct. Any such action must be performed * only once, even if called from several threads, so Completions * maintain status via CAS, and on success run one of the "now" * methods. Triggering can propagate recursively, so tryAct * returns a completed dependent (if one exists) for further * processing by its caller. * * Blocking methods get() and join() rely on Signaller Completions * that wake up waiting threads. The mechanics are similar to * Treiber stack wait-nodes used in FutureTask, Phaser, and * SynchronousQueue. See their internal documentation for * algorithmic details. * * Without precautions, CompletableFutures would be prone to * garbage accumulation as chains of completions build up, each * pointing back to its sources. So we detach (null out) most * Completion fields as soon as possible. To support this, * internal methods check for and harmlessly ignore null arguments * that may have been obtained during races with threads nulling * out fields. (Some of these checked cases cannot currently * happen.) Fields of Async classes can be but currently are not * fully detached, because they do not in general form cycles. */ volatile Object result; // Either the result or boxed AltResult volatile Completion completions; // Treiber stack of dependent actions final boolean internalComplete(Object r) { // CAS from null to r return UNSAFE.compareAndSwapObject(this, RESULT, null, r); } final boolean casCompletions(Completion cmp, Completion val) { return UNSAFE.compareAndSwapObject(this, COMPLETIONS, cmp, val); } /* ------------- Encoding and decoding outcomes -------------- */ static final class AltResult { // See above final Throwable ex; // null only for NIL AltResult(Throwable x) { this.ex = x; } } static final AltResult NIL = new AltResult(null); /** * Returns the encoding of the given (non-null) exception as a * wrapped CompletionException unless it is one already. */ static AltResult altThrowable(Throwable x) { return new AltResult((x instanceof CompletionException) ? x : new CompletionException(x)); } /** * Returns the encoding of the given arguments: if the exception * is non-null, encodes as altThrowable. Otherwise uses the given * value, boxed as NIL if null. */ static Object encodeOutcome(Object v, Throwable x) { return (x != null) ? altThrowable(x) : (v == null) ? NIL : v; } /** * Decodes outcome to return result or throw unchecked exception */ private static T reportJoin(Object r) { if (r instanceof AltResult) { Throwable x; if ((x = ((AltResult)r).ex) == null) return null; if (x instanceof CancellationException) throw (CancellationException)x; if (x instanceof CompletionException) throw (CompletionException)x; throw new CompletionException(x); } @SuppressWarnings("unchecked") T tr = (T) r; return tr; } /** * Reports result using Future.get conventions */ private static T reportGet(Object r) throws InterruptedException, ExecutionException { if (r == null) // by convention below, null means interrupted throw new InterruptedException(); if (r instanceof AltResult) { Throwable x, cause; if ((x = ((AltResult)r).ex) == null) return null; if (x instanceof CancellationException) throw (CancellationException)x; if ((x instanceof CompletionException) && (cause = x.getCause()) != null) x = cause; throw new ExecutionException(x); } @SuppressWarnings("unchecked") T tr = (T) r; return tr; } /* ------------- Async Tasks -------------- */ /** * Default executor -- ForkJoinPool.commonPool() unless it cannot * support parallelism. */ static final Executor asyncPool = (ForkJoinPool.getCommonPoolParallelism() > 1) ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ static final class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } } /** * A marker interface identifying asynchronous tasks produced by * {@code async} methods. This may be useful for monitoring, * debugging, and tracking asynchronous activities. * * @since 1.8 */ public static interface AsynchronousCompletionTask { } /** * Base class for tasks that can act as either FJ or plain * Runnables. Abstract method compute calls an associated "now" * method. Method exec calls compute if its CompletableFuture is * not already done, and runs completions if done. Fields are not * in general final and can be nulled out after use (but most * currently are not). Classes include serialVersionUIDs even * though they are currently never serialized. */ abstract static class Async extends ForkJoinTask implements Runnable, AsynchronousCompletionTask { CompletableFuture dep; // the CompletableFuture to trigger Async(CompletableFuture dep) { this.dep = dep; } abstract void compute(); // call the associated "now" method public final boolean exec() { CompletableFuture d; if ((d = dep) != null) { if (d.result == null) // suppress if cancelled compute(); if (d.result != null) d.postComplete(); dep = null; // detach } return true; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final void run() { exec(); } private static final long serialVersionUID = 5232453952276885070L; } /* ------------- Completions -------------- */ static abstract class Completion { // See above volatile Completion next; // Treiber stack link /** * Performs completion action if enabled, returning a * completed dependent Completablefuture, if one exists. */ abstract CompletableFuture tryAct(); } /** * Triggers all reaachble enabled dependents. Call only when * known to be done. */ final void postComplete() { /* * On each step, variable f holds current completions to pop * and run. It is extended along only one path at a time, * pushing others to avoid StackOverflowErrors on recursion. */ CompletableFuture f = this; Completion h; while ((h = f.completions) != null || (f != this && (h = (f = this).completions) != null)) { CompletableFuture d; Completion t; if (f.casCompletions(h, t = h.next)) { if (t != null) { if (f != this) { // push do {} while (!casCompletions(h.next = completions, h)); continue; } h.next = null; // detach } f = (d = h.tryAct()) == null ? this : d; } } } /* ------------- One-source Completions -------------- */ /** * A Completion with a source and dependent. The "dep" field acts * as a claim, nulled out to disable further attempts to * trigger. Fields can only be observed by other threads upon * successful push; and should be nulled out after claim. */ static abstract class UniCompletion extends Completion { Executor async; // executor to use (null if none) CompletableFuture dep; // the dependent to complete CompletableFuture src; // source of value for tryAct UniCompletion(Executor async, CompletableFuture dep, CompletableFuture src) { this.async = async; this.dep = dep; this.src = src; } /** Tries to claim completion action by CASing dep to null */ final boolean claim(CompletableFuture d) { return UNSAFE.compareAndSwapObject(this, DEP, d, null); } private static final sun.misc.Unsafe UNSAFE; private static final long DEP; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = UniCompletion.class; DEP = UNSAFE.objectFieldOffset (k.getDeclaredField("dep")); } catch (Exception x) { throw new Error(x); } } } /** Pushes c on to completions, and triggers c if done. */ private void unipush(UniCompletion c) { if (c != null) { CompletableFuture d; while (result == null && !casCompletions(c.next = completions, c)) c.next = null; // clear on CAS failure if ((d = c.tryAct()) != null) // cover races d.postComplete(); if (result != null) // clean stack postComplete(); } } // Immediate, async, delayed, and routing support for Function/apply static void nowApply(Executor e, CompletableFuture d, Object r, Function f) { if (d != null && f != null) { T t; U u; Throwable x; if (r instanceof AltResult) { t = null; x = ((AltResult)r).ex; } else { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; x = null; } if (x == null) { try { if (e != null) { e.execute(new AsyncApply(d, t, f)); return; } u = f.apply(t); } catch (Throwable ex) { x = ex; u = null; } } else u = null; d.internalComplete(encodeOutcome(u, x)); } } static final class AsyncApply extends Async { T arg; Function fn; AsyncApply(CompletableFuture dep, T arg, Function fn) { super(dep); this.arg = arg; this.fn = fn; } final void compute() { nowApply(null, dep, arg, fn); } private static final long serialVersionUID = 5232453952276885070L; } static final class DelayedApply extends UniCompletion { Function fn; DelayedApply(Executor async, CompletableFuture dep, CompletableFuture src, Function fn) { super(async, dep, src); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a; Object r; if ((d = dep) != null && (a = src) != null && (r = a.result) != null && claim(d)) { nowApply(async, d, r, fn); src = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doThenApply( Function fn, Executor e) { if (fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result; if (r == null) unipush(new DelayedApply(e, d, this, fn)); else nowApply(e, d, r, fn); return d; } // Consumer/accept static void nowAccept(Executor e, CompletableFuture d, Object r, Consumer f) { if (d != null && f != null) { T t; Throwable x; if (r instanceof AltResult) { t = null; x = ((AltResult)r).ex; } else { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; x = null; } if (x == null) { try { if (e != null) { e.execute(new AsyncAccept(d, t, f)); return; } f.accept(t); } catch (Throwable ex) { x = ex; } } d.internalComplete(encodeOutcome(null, x)); } } static final class AsyncAccept extends Async { T arg; Consumer fn; AsyncAccept(CompletableFuture dep, T arg, Consumer fn) { super(dep); this.arg = arg; this.fn = fn; } final void compute() { nowAccept(null, dep, arg, fn); } private static final long serialVersionUID = 5232453952276885070L; } static final class DelayedAccept extends UniCompletion { Consumer fn; DelayedAccept(Executor async, CompletableFuture dep, CompletableFuture src, Consumer fn) { super(async, dep, src); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a; Object r; if ((d = dep) != null && (a = src) != null && (r = a.result) != null && claim(d)) { nowAccept(async, d, r, fn); src = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doThenAccept(Consumer fn, Executor e) { if (fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result; if (r == null) unipush(new DelayedAccept(e, d, this, fn)); else nowAccept(e, d, r, fn); return d; } // Runnable/run static void nowRun(Executor e, CompletableFuture d, Object r, Runnable f) { if (d != null && f != null) { Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null; if (x == null) { try { if (e != null) { e.execute(new AsyncRun(d, f)); return; } f.run(); } catch (Throwable ex) { x = ex; } } d.internalComplete(encodeOutcome(null, x)); } } static final class AsyncRun extends Async { Runnable fn; AsyncRun(CompletableFuture dep, Runnable fn) { super(dep); this.fn = fn; } final void compute() { nowRun(null, dep, null, fn); } private static final long serialVersionUID = 5232453952276885070L; } static final class DelayedRun extends UniCompletion { Runnable fn; DelayedRun(Executor async, CompletableFuture dep, CompletableFuture src, Runnable fn) { super(async, dep, src); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a; Object r; if ((d = dep) != null && (a = src) != null && (r = a.result) != null && claim(d)) { nowRun(async, d, r, fn); src = null; fn = null; // clear refs if (d.result != null) return d; } return null; } } private CompletableFuture doThenRun(Runnable fn, Executor e) { if (fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result; if (r == null) unipush(new DelayedRun(e, d, this, fn)); else nowRun(e, d, r, fn); return d; } // Supplier/get static void nowSupply(CompletableFuture d, Supplier f) { if (d != null && f != null) { T t; Throwable x; try { t = f.get(); x = null; } catch (Throwable ex) { x = ex; t = null; } d.internalComplete(encodeOutcome(t, x)); } } static final class AsyncSupply extends Async { Supplier fn; AsyncSupply(CompletableFuture dep, Supplier fn) { super(dep); this.fn = fn; } final void compute() { nowSupply(dep, fn); } private static final long serialVersionUID = 5232453952276885070L; } // WhenComplete static void nowWhen(Executor e, CompletableFuture d, Object r, BiConsumer f) { if (d != null && f != null) { T t; Throwable x, dx; if (r instanceof AltResult) { t = null; x = ((AltResult)r).ex; } else { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; x = null; } try { if (e != null) { e.execute(new AsyncWhen(d, r, f)); return; } f.accept(t, x); dx = null; } catch (Throwable ex) { dx = ex; } d.internalComplete(encodeOutcome(t, x != null ? x : dx)); } } static final class AsyncWhen extends Async { Object arg; BiConsumer fn; AsyncWhen(CompletableFuture dep, Object arg, BiConsumer fn) { super(dep); this.arg = arg; this.fn = fn; } final void compute() { nowWhen(null, dep, arg, fn); } private static final long serialVersionUID = 5232453952276885070L; } static final class DelayedWhen extends UniCompletion { BiConsumer fn; DelayedWhen(Executor async, CompletableFuture dep, CompletableFuture src, BiConsumer fn) { super(async, dep, src); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a; Object r; if ((d = dep) != null && (a = src) != null && (r = a.result) != null && claim(d)) { nowWhen(async, d, r, fn); src = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doWhenComplete( BiConsumer fn, Executor e) { if (fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result; if (r == null) unipush(new DelayedWhen(e, d, this, fn)); else nowWhen(e, d, r, fn); return d; } // Handle static void nowHandle(Executor e, CompletableFuture d, Object r, BiFunction f) { if (d != null && f != null) { T t; U u; Throwable x, dx; if (r instanceof AltResult) { t = null; x = ((AltResult)r).ex; } else { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; x = null; } try { if (e != null) { e.execute(new AsyncCombine(d, t, x, f)); return; } u = f.apply(t, x); dx = null; } catch (Throwable ex) { dx = ex; u = null; } d.internalComplete(encodeOutcome(u, dx)); } } static final class DelayedHandle extends UniCompletion { BiFunction fn; DelayedHandle(Executor async, CompletableFuture dep, CompletableFuture src, BiFunction fn) { super(async, dep, src); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a; Object r; if ((d = dep) != null && (a = src) != null && (r = a.result) != null && claim(d)) { nowHandle(async, d, r, fn); src = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doHandle( BiFunction fn, Executor e) { if (fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result; if (r == null) unipush(new DelayedHandle(e, d, this, fn)); else nowHandle(e, d, r, fn); return d; } // Exceptionally static void nowExceptionally(CompletableFuture d, Object r, Function f) { if (d != null && f != null) { T t; Throwable x, dx; if ((r instanceof AltResult) && (x = ((AltResult)r).ex) != null) { try { t = f.apply(x); dx = null; } catch (Throwable ex) { dx = ex; t = null; } } else { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; dx = null; } d.internalComplete(encodeOutcome(t, dx)); } } static final class DelayedExceptionally extends UniCompletion { Function fn; DelayedExceptionally(CompletableFuture dep, CompletableFuture src, Function fn) { super(null, dep, src); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a; Object r; if ((d = dep) != null && (a = src) != null && (r = a.result) != null && claim(d)) { nowExceptionally(d, r, fn); src = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doExceptionally( Function fn) { if (fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result; if (r == null) unipush(new DelayedExceptionally(d, this, fn)); else nowExceptionally(d, r, fn); return d; } // Identity function used by nowCompose and anyOf static void nowCopy(CompletableFuture d, Object r) { if (d != null && d.result == null) { Throwable x; d.internalComplete(((r instanceof AltResult) && (x = ((AltResult)r).ex) != null && !(x instanceof CompletionException)) ? new AltResult(new CompletionException(x)): r); } } static final class DelayedCopy extends UniCompletion { DelayedCopy(CompletableFuture dep, CompletableFuture src) { super(null, dep, src); } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a; Object r; if ((d = dep) != null && (a = src) != null && (r = a.result) != null && claim(d)) { nowCopy(d, r); src = null; if (d.result != null) return d; } return null; } } // Compose static void nowCompose(Executor e, CompletableFuture d, Object r, Function> f) { if (d != null && f != null) { T t; Throwable x; if (r instanceof AltResult) { t = null; x = ((AltResult)r).ex; } else { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; x = null; } if (x == null) { try { if (e != null) e.execute(new AsyncCompose(d, t, f)); else { CompletableFuture c = f.apply(t).toCompletableFuture(); Object s = c.result; if (s == null) c.unipush(new DelayedCopy(d, c)); else nowCopy(d, s); } return; } catch (Throwable ex) { x = ex; } } d.internalComplete(encodeOutcome(null, x)); } } static final class AsyncCompose extends Async { T arg; Function> fn; AsyncCompose(CompletableFuture dep, T arg, Function> fn) { super(dep); this.arg = arg; this.fn = fn; } final void compute() { nowCompose(null, dep, arg, fn); } private static final long serialVersionUID = 5232453952276885070L; } static final class DelayedCompose extends UniCompletion { Function> fn; DelayedCompose(Executor async, CompletableFuture dep, CompletableFuture src, Function> fn) { super(async, dep, src); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a; Object r; if ((d = dep) != null && (a = src) != null && (r = a.result) != null && claim(d)) { nowCompose(async, d, r, fn); src = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doThenCompose( Function> fn, Executor e) { if (fn == null) throw new NullPointerException(); Object r = result; if (r == null || e != null) { CompletableFuture d = new CompletableFuture(); if (r == null) unipush(new DelayedCompose(e, d, this, fn)); else nowCompose(e, d, r, fn); return d; } else { // try to return function result T t; Throwable x; if (r instanceof AltResult) { t = null; x = ((AltResult)r).ex; } else { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; x = null; } if (x == null) { try { return fn.apply(t).toCompletableFuture(); } catch (Throwable ex) { x = ex; } } CompletableFuture d = new CompletableFuture(); d.result = encodeOutcome(null, x); return d; } } /* ------------- Two-source Completions -------------- */ /** A Completion with two sources */ static abstract class BiCompletion extends UniCompletion { CompletableFuture snd; // second source for tryAct BiCompletion(Executor async, CompletableFuture dep, CompletableFuture src, CompletableFuture snd) { super(async, dep, src); this.snd = snd; } } /** A Completion delegating to a shared BiCompletion */ static final class CoBiCompletion extends Completion { BiCompletion completion; CoBiCompletion(BiCompletion completion) { this.completion = completion; } final CompletableFuture tryAct() { BiCompletion c; return (c = completion) == null ? null : c.tryAct(); } } /* ------------- Two-source Anded -------------- */ /* Pushes c on to completions and o's completions unless both done. */ private void bipushAnded(CompletableFuture o, BiCompletion c) { if (c != null && o != null) { Object r; CompletableFuture d; while ((r = result) == null && !casCompletions(c.next = completions, c)) c.next = null; if (o.result == null) { Completion q = (r != null) ? c : new CoBiCompletion(c); while (o.result == null && !o.casCompletions(q.next = o.completions, q)) q.next = null; } if ((d = c.tryAct()) != null) d.postComplete(); if (o.result != null) o.postComplete(); if (result != null) postComplete(); } } // BiFunction/combine static void nowCombine(Executor e, CompletableFuture d, Object r, Object s, BiFunction f) { if (d != null && f != null) { T t; U u; V v; Throwable x; if (r instanceof AltResult) { t = null; x = ((AltResult)r).ex; } else { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; x = null; } if (x != null) u = null; else if (s instanceof AltResult) { x = ((AltResult)s).ex; u = null; } else { @SuppressWarnings("unchecked") U us = (U) s; u = us; } if (x == null) { try { if (e != null) { e.execute(new AsyncCombine(d, t, u, f)); return; } v = f.apply(t, u); } catch (Throwable ex) { x = ex; v = null; } } else v = null; d.internalComplete(encodeOutcome(v, x)); } } static final class AsyncCombine extends Async { T arg1; U arg2; BiFunction fn; AsyncCombine(CompletableFuture dep, T arg1, U arg2, BiFunction fn) { super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; } final void compute() { nowCombine(null, dep, arg1, arg2, fn); } private static final long serialVersionUID = 5232453952276885070L; } static final class DelayedCombine extends BiCompletion { BiFunction fn; DelayedCombine(Executor async, CompletableFuture dep, CompletableFuture src, CompletableFuture snd, BiFunction fn) { super(async, dep, src, snd); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a, b; Object r, s; if ((d = dep) != null && (a = src) != null && (b = snd) != null && (r = a.result) != null && (s = b.result) != null && claim(d)) { nowCombine(async, d, r, s, fn); src = null; snd = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doThenCombine( CompletableFuture o, BiFunction fn, Executor e) { if (o == null || fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result, s = o.result; if (r == null || s == null) bipushAnded(o, new DelayedCombine(e, d, this, o, fn)); else nowCombine(e, d, r, s, fn); return d; } // BiConsumer/AcceptBoth static void nowAcceptBoth(Executor e, CompletableFuture d, Object r, Object s, BiConsumer f) { if (d != null && f != null) { T t; U u; Throwable x; if (r instanceof AltResult) { t = null; x = ((AltResult)r).ex; } else { @SuppressWarnings("unchecked") T tr = (T) r; t = tr; x = null; } if (x != null) u = null; else if (s instanceof AltResult) { x = ((AltResult)s).ex; u = null; } else { @SuppressWarnings("unchecked") U us = (U) s; u = us; } if (x == null) { try { if (e != null) { e.execute(new AsyncAcceptBoth(d, t, u, f)); return; } f.accept(t, u); } catch (Throwable ex) { x = ex; } } d.internalComplete(encodeOutcome(null, x)); } } static final class AsyncAcceptBoth extends Async { T arg1; U arg2; BiConsumer fn; AsyncAcceptBoth(CompletableFuture dep, T arg1, U arg2, BiConsumer fn) { super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; } final void compute() { nowAcceptBoth(null, dep, arg1, arg2, fn); } private static final long serialVersionUID = 5232453952276885070L; } static final class DelayedAcceptBoth extends BiCompletion { BiConsumer fn; DelayedAcceptBoth(Executor async, CompletableFuture dep, CompletableFuture src, CompletableFuture snd, BiConsumer fn) { super(async, dep, src, snd); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a, b; Object r, s; if ((d = dep) != null && (a = src) != null && (b = snd) != null && (r = a.result) != null && (s = b.result) != null && claim(d)) { nowAcceptBoth(async, d, r, s, fn); src = null; snd = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doThenAcceptBoth( CompletableFuture o, BiConsumer fn, Executor e) { if (o == null || fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result, s = o.result; if (r == null || s == null) bipushAnded(o, new DelayedAcceptBoth(e, d, this, o, fn)); else nowAcceptBoth(e, d, r, s, fn); return d; } // Runnable/both static final class DelayedRunAfterBoth extends BiCompletion { Runnable fn; DelayedRunAfterBoth(Executor async, CompletableFuture dep, CompletableFuture src, CompletableFuture snd, Runnable fn) { super(async, dep, src, snd); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a, b; Object r, s; if ((d = dep) != null && (a = src) != null && (b = snd) != null && (r = a.result) != null && (s = b.result) != null && claim(d)) { Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null; nowRun(async, d, (x == null) ? s : r, fn); src = null; snd = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doRunAfterBoth( CompletableFuture o, Runnable fn, Executor e) { if (o == null || fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result, s = o.result; if (r == null || s == null) bipushAnded(o, new DelayedRunAfterBoth(e, d, this, o, fn)); else { Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null; nowRun(e, d, (x == null) ? s : r, fn); } return d; } // allOf static void nowAnd(CompletableFuture d, Object r, Object s) { if (d != null) { Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null; if (x == null && (s instanceof AltResult)) x = ((AltResult)s).ex; d.internalComplete(encodeOutcome(null, x)); } } static final class DelayedAnd extends BiCompletion { DelayedAnd(CompletableFuture dep, CompletableFuture src, CompletableFuture snd) { super(null, dep, src, snd); } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a, b; Object r, s; if ((d = dep) != null && (a = src) != null && (b = snd) != null && (r = a.result) != null && (s = b.result) != null && claim(d)) { nowAnd(d, r, s); src = null; snd = null; if (d.result != null) return d; } return null; } } /** Recursively constructs a tree of And completions */ private static CompletableFuture doAllOf(CompletableFuture[] cfs, int lo, int hi) { CompletableFuture d = new CompletableFuture(); if (lo > hi) // empty d.result = NIL; else { int mid = (lo + hi) >>> 1; CompletableFuture fst = (lo == mid ? cfs[lo] : doAllOf(cfs, lo, mid)); CompletableFuture snd = (lo == hi ? fst : // and fst with self (hi == mid+1) ? cfs[hi] : doAllOf(cfs, mid+1, hi)); Object r = fst.result, s = snd.result; // throw NPE if null elements if (r == null || s == null) { DelayedAnd a = new DelayedAnd(d, fst, snd); if (fst == snd) fst.unipush(a); else fst.bipushAnded(snd, a); } else nowAnd(d, r, s); } return d; } /* ------------- Two-source Ored -------------- */ /* Pushes c on to completions and o's completions unless either done. */ private void bipushOred(CompletableFuture o, BiCompletion c) { if (c != null && o != null) { CompletableFuture d; while (o.result == null && result == null) { if (casCompletions(c.next = completions, c)) { CoBiCompletion q = new CoBiCompletion(c); while (result == null && o.result == null && !o.casCompletions(q.next = o.completions, q)) q.next = null; break; } c.next = null; } if ((d = c.tryAct()) != null) d.postComplete(); if (o.result != null) o.postComplete(); if (result != null) postComplete(); } } // Function/applyEither static final class DelayedApplyToEither extends BiCompletion { Function fn; DelayedApplyToEither(Executor async, CompletableFuture dep, CompletableFuture src, CompletableFuture snd, Function fn) { super(async, dep, src, snd); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a, b; Object r; if ((d = dep) != null && (a = src) != null && (b = snd) != null && ((r = a.result) != null || (r = b.result) != null) && claim(d)) { nowApply(async, d, r, fn); src = null; snd = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doApplyToEither( CompletableFuture o, Function fn, Executor e) { if (o == null || fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result; if (r == null && (r = o.result) == null) bipushOred(o, new DelayedApplyToEither(e, d, this, o, fn)); else nowApply(e, d, r, fn); return d; } // Consumer/acceptEither static final class DelayedAcceptEither extends BiCompletion { Consumer fn; DelayedAcceptEither(Executor async, CompletableFuture dep, CompletableFuture src, CompletableFuture snd, Consumer fn) { super(async, dep, src, snd); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a, b; Object r; if ((d = dep) != null && (a = src) != null && (b = snd) != null && ((r = a.result) != null || (r = b.result) != null) && claim(d)) { nowAccept(async, d, r, fn); src = null; snd = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doAcceptEither( CompletableFuture o, Consumer fn, Executor e) { if (o == null || fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result; if (r == null && (r = o.result) == null) bipushOred(o, new DelayedAcceptEither(e, d, this, o, fn)); else nowAccept(e, d, r, fn); return d; } // Runnable/runEither static final class DelayedRunAfterEither extends BiCompletion { Runnable fn; DelayedRunAfterEither(Executor async, CompletableFuture dep, CompletableFuture src, CompletableFuture snd, Runnable fn) { super(async, dep, src, snd); this.fn = fn; } final CompletableFuture tryAct() { CompletableFuture d; CompletableFuture a, b; Object r; if ((d = dep) != null && (a = src) != null && (b = snd) != null && ((r = a.result) != null || (r = b.result) != null) && claim(d)) { nowRun(async, d, r, fn); src = null; snd = null; fn = null; if (d.result != null) return d; } return null; } } private CompletableFuture doRunAfterEither( CompletableFuture o, Runnable fn, Executor e) { if (o == null || fn == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); Object r = result; if (r == null && (r = o.result) == null) bipushOred(o, new DelayedRunAfterEither(e, d, this, o, fn)); else nowRun(e, d, r, fn); return d; } /* ------------- Signallers -------------- */ /** * Heuristic spin value for waitingGet() before blocking on * multiprocessors */ static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ? 1 << 8 : 0); /** * Completion for recording and releasing a waiting thread. See * other classes such as Phaser and SynchronousQueue for more * detailed explanation. This class implements ManagedBlocker to * avoid starvation when blocking actions pile up in * ForkJoinPools. */ static final class Signaller extends Completion implements ForkJoinPool.ManagedBlocker { long nanos; // wait time if timed final long deadline; // non-zero if timed volatile int interruptControl; // > 0: interruptible, < 0: interrupted volatile Thread thread; Signaller(boolean interruptible, long nanos, long deadline) { this.thread = Thread.currentThread(); this.interruptControl = interruptible ? 1 : 0; this.nanos = nanos; this.deadline = deadline; } final CompletableFuture tryAct() { Thread w = thread; if (w != null) { thread = null; // no need to CAS LockSupport.unpark(w); } return null; } public boolean isReleasable() { if (thread == null) return true; if (Thread.interrupted()) { int i = interruptControl; interruptControl = -1; if (i > 0) return true; } if (deadline != 0L && (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) { thread = null; return true; } return false; } public boolean block() { if (isReleasable()) return true; else if (deadline == 0L) LockSupport.park(this); else if (nanos > 0L) LockSupport.parkNanos(this, nanos); return isReleasable(); } } /** * Returns raw result after waiting, or null if interruptible and * interrupted. */ private Object waitingGet(boolean interruptible) { Signaller q = null; boolean queued = false; int spins = SPINS; Object r; while ((r = result) == null) { if (spins > 0) { if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } else if (q == null) q = new Signaller(interruptible, 0L, 0L); else if (!queued) queued = casCompletions(q.next = completions, q); else if (interruptible && q.interruptControl < 0) { q.thread = null; removeCancelledSignallers(); return null; } else if (q.thread != null && result == null) { try { ForkJoinPool.managedBlock(q); } catch (InterruptedException ie) { q.interruptControl = -1; } } } if (q != null) { q.thread = null; if (q.interruptControl < 0) { if (interruptible) r = null; // report interruption else Thread.currentThread().interrupt(); } } postComplete(); return r; } /** * Returns raw result after waiting, or null if interrupted, or * throws TimeoutException on timeout. */ private Object timedGet(long nanos) throws TimeoutException { if (Thread.interrupted()) return null; if (nanos <= 0L) throw new TimeoutException(); long d = System.nanoTime() + nanos; Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0 boolean queued = false; Object r; while ((r = result) == null) { if (!queued) queued = casCompletions(q.next = completions, q); else if (q.interruptControl < 0 || q.nanos <= 0L) { q.thread = null; removeCancelledSignallers(); if (q.interruptControl < 0) return null; throw new TimeoutException(); } else if (q.thread != null && result == null) { try { ForkJoinPool.managedBlock(q); } catch (InterruptedException ie) { q.interruptControl = -1; } } } q.thread = null; postComplete(); return (q.interruptControl < 0) ? null : r; } /** * Unlinks cancelled Signallers to avoid accumulating garbage. * Internal nodes are simply unspliced without CAS since it is * harmless if they are traversed anyway. To avoid effects of * unsplicing from already removed nodes, the list is retraversed * in case of an apparent race. */ private void removeCancelledSignallers() { for (Completion p = null, q = completions; q != null;) { Completion s = q.next; if ((q instanceof Signaller) && ((Signaller)q).thread == null) { if (p != null) { p.next = s; if (!(p instanceof Signaller) || ((Signaller)p).thread != null) break; } else if (casCompletions(q, s)) break; p = null; // restart q = completions; } else { p = q; q = s; } } } /* ------------- public methods -------------- */ /** * Creates a new incomplete CompletableFuture. */ public CompletableFuture() { } /** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the {@link ForkJoinPool#commonPool()} with * the value obtained by calling the given Supplier. * * @param supplier a function returning the value to be used * to complete the returned CompletableFuture * @param the function's return type * @return the new CompletableFuture */ public static CompletableFuture supplyAsync(Supplier supplier) { if (supplier == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); asyncPool.execute(new AsyncSupply(d, supplier)); return d; } /** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the given executor with the value obtained * by calling the given Supplier. * * @param supplier a function returning the value to be used * to complete the returned CompletableFuture * @param executor the executor to use for asynchronous execution * @param the function's return type * @return the new CompletableFuture */ public static CompletableFuture supplyAsync(Supplier supplier, Executor executor) { if (executor == null || supplier == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); executor.execute(new AsyncSupply(d, supplier)); return d; } /** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the {@link ForkJoinPool#commonPool()} after * it runs the given action. * * @param runnable the action to run before completing the * returned CompletableFuture * @return the new CompletableFuture */ public static CompletableFuture runAsync(Runnable runnable) { if (runnable == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); asyncPool.execute(new AsyncRun(d, runnable)); return d; } /** * Returns a new CompletableFuture that is asynchronously completed * by a task running in the given executor after it runs the given * action. * * @param runnable the action to run before completing the * returned CompletableFuture * @param executor the executor to use for asynchronous execution * @return the new CompletableFuture */ public static CompletableFuture runAsync(Runnable runnable, Executor executor) { if (executor == null || runnable == null) throw new NullPointerException(); CompletableFuture d = new CompletableFuture(); executor.execute(new AsyncRun(d, runnable)); return d; } /** * Returns a new CompletableFuture that is already completed with * the given value. * * @param value the value * @param the type of the value * @return the completed CompletableFuture */ public static CompletableFuture completedFuture(U value) { CompletableFuture d = new CompletableFuture(); d.result = (value == null) ? NIL : value; return d; } /** * Returns {@code true} if completed in any fashion: normally, * exceptionally, or via cancellation. * * @return {@code true} if completed */ public boolean isDone() { return result != null; } /** * Waits if necessary for this future to complete, and then * returns its result. * * @return the result value * @throws CancellationException if this future was cancelled * @throws ExecutionException if this future completed exceptionally * @throws InterruptedException if the current thread was interrupted * while waiting */ public T get() throws InterruptedException, ExecutionException { Object r; return reportGet((r = result) == null ? waitingGet(true) : r); } /** * Waits if necessary for at most the given time for this future * to complete, and then returns its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the result value * @throws CancellationException if this future was cancelled * @throws ExecutionException if this future completed exceptionally * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { Object r; long nanos = unit.toNanos(timeout); return reportGet((r = result) == null ? timedGet(nanos) : r); } /** * Returns the result value when complete, or throws an * (unchecked) exception if completed exceptionally. To better * conform with the use of common functional forms, if a * computation involved in the completion of this * CompletableFuture threw an exception, this method throws an * (unchecked) {@link CompletionException} with the underlying * exception as its cause. * * @return the result value * @throws CancellationException if the computation was cancelled * @throws CompletionException if this future completed * exceptionally or a completion computation threw an exception */ public T join() { Object r; return reportJoin((r = result) == null ? waitingGet(false) : r); } /** * Returns the result value (or throws any encountered exception) * if completed, else returns the given valueIfAbsent. * * @param valueIfAbsent the value to return if not completed * @return the result value, if completed, else the given valueIfAbsent * @throws CancellationException if the computation was cancelled * @throws CompletionException if this future completed * exceptionally or a completion computation threw an exception */ public T getNow(T valueIfAbsent) { Object r; return (r = result) == null? valueIfAbsent : reportJoin(r); } /** * If not already completed, sets the value returned by {@link * #get()} and related methods to the given value. * * @param value the result value * @return {@code true} if this invocation caused this CompletableFuture * to transition to a completed state, else {@code false} */ public boolean complete(T value) { boolean triggered = internalComplete(value == null ? NIL : value); postComplete(); return triggered; } /** * If not already completed, causes invocations of {@link #get()} * and related methods to throw the given exception. * * @param ex the exception * @return {@code true} if this invocation caused this CompletableFuture * to transition to a completed state, else {@code false} */ public boolean completeExceptionally(Throwable ex) { if (ex == null) throw new NullPointerException(); boolean triggered = internalComplete(new AltResult(ex)); postComplete(); return triggered; } public CompletableFuture thenApply( Function fn) { return doThenApply(fn, null); } public CompletableFuture thenApplyAsync( Function fn) { return doThenApply(fn, asyncPool); } public CompletableFuture thenApplyAsync( Function fn, Executor executor) { if (executor == null) throw new NullPointerException(); return doThenApply(fn, executor); } public CompletableFuture thenAccept(Consumer action) { return doThenAccept(action, null); } public CompletableFuture thenAcceptAsync(Consumer action) { return doThenAccept(action, asyncPool); } public CompletableFuture thenAcceptAsync( Consumer action, Executor executor) { if (executor == null) throw new NullPointerException(); return doThenAccept(action, executor); } public CompletableFuture thenRun(Runnable action) { return doThenRun(action, null); } public CompletableFuture thenRunAsync(Runnable action) { return doThenRun(action, asyncPool); } public CompletableFuture thenRunAsync( Runnable action, Executor executor) { if (executor == null) throw new NullPointerException(); return doThenRun(action, executor); } public CompletableFuture thenCombine( CompletionStage other, BiFunction fn) { return doThenCombine(other.toCompletableFuture(), fn, null); } public CompletableFuture thenCombineAsync( CompletionStage other, BiFunction fn) { return doThenCombine(other.toCompletableFuture(), fn, asyncPool); } public CompletableFuture thenCombineAsync( CompletionStage other, BiFunction fn, Executor executor) { if (executor == null) throw new NullPointerException(); return doThenCombine(other.toCompletableFuture(), fn, executor); } public CompletableFuture thenAcceptBoth( CompletionStage other, BiConsumer action) { return doThenAcceptBoth(other.toCompletableFuture(), action, null); } public CompletableFuture thenAcceptBothAsync( CompletionStage other, BiConsumer action) { return doThenAcceptBoth(other.toCompletableFuture(), action, asyncPool); } public CompletableFuture thenAcceptBothAsync( CompletionStage other, BiConsumer action, Executor executor) { if (executor == null) throw new NullPointerException(); return doThenAcceptBoth(other.toCompletableFuture(), action, executor); } public CompletableFuture runAfterBoth( CompletionStage other, Runnable action) { return doRunAfterBoth(other.toCompletableFuture(), action, null); } public CompletableFuture runAfterBothAsync( CompletionStage other, Runnable action) { return doRunAfterBoth(other.toCompletableFuture(), action, asyncPool); } public CompletableFuture runAfterBothAsync( CompletionStage other, Runnable action, Executor executor) { if (executor == null) throw new NullPointerException(); return doRunAfterBoth(other.toCompletableFuture(), action, executor); } public CompletableFuture applyToEither( CompletionStage other, Function fn) { return doApplyToEither(other.toCompletableFuture(), fn, null); } public CompletableFuture applyToEitherAsync( CompletionStage other, Function fn) { return doApplyToEither(other.toCompletableFuture(), fn, asyncPool); } public CompletableFuture applyToEitherAsync (CompletionStage other, Function fn, Executor executor) { if (executor == null) throw new NullPointerException(); return doApplyToEither(other.toCompletableFuture(), fn, executor); } public CompletableFuture acceptEither( CompletionStage other, Consumer action) { return doAcceptEither(other.toCompletableFuture(), action, null); } public CompletableFuture acceptEitherAsync (CompletionStage other, Consumer action) { return doAcceptEither(other.toCompletableFuture(), action, asyncPool); } public CompletableFuture acceptEitherAsync( CompletionStage other, Consumer action, Executor executor) { if (executor == null) throw new NullPointerException(); return doAcceptEither(other.toCompletableFuture(), action, executor); } public CompletableFuture runAfterEither( CompletionStage other, Runnable action) { return doRunAfterEither(other.toCompletableFuture(), action, null); } public CompletableFuture runAfterEitherAsync( CompletionStage other, Runnable action) { return doRunAfterEither(other.toCompletableFuture(), action, asyncPool); } public CompletableFuture runAfterEitherAsync( CompletionStage other, Runnable action, Executor executor) { if (executor == null) throw new NullPointerException(); return doRunAfterEither(other.toCompletableFuture(), action, executor); } public CompletableFuture thenCompose (Function> fn) { return doThenCompose(fn, null); } public CompletableFuture thenComposeAsync( Function> fn) { return doThenCompose(fn, asyncPool); } public CompletableFuture thenComposeAsync( Function> fn, Executor executor) { if (executor == null) throw new NullPointerException(); return doThenCompose(fn, executor); } public CompletableFuture whenComplete( BiConsumer action) { return doWhenComplete(action, null); } public CompletableFuture whenCompleteAsync( BiConsumer action) { return doWhenComplete(action, asyncPool); } public CompletableFuture whenCompleteAsync( BiConsumer action, Executor executor) { if (executor == null) throw new NullPointerException(); return doWhenComplete(action, executor); } public CompletableFuture handle( BiFunction fn) { return doHandle(fn, null); } public CompletableFuture handleAsync( BiFunction fn) { return doHandle(fn, asyncPool); } public CompletableFuture handleAsync( BiFunction fn, Executor executor) { if (executor == null) throw new NullPointerException(); return doHandle(fn, executor); } /** * Returns this CompletableFuture * * @return this CompletableFuture */ public CompletableFuture toCompletableFuture() { return this; } // not in interface CompletionStage /** * Returns a new CompletableFuture that is completed when this * CompletableFuture completes, with the result of the given * function of the exception triggering this CompletableFuture's * completion when it completes exceptionally; otherwise, if this * CompletableFuture completes normally, then the returned * CompletableFuture also completes normally with the same value. * Note: More flexible versions of this functionality are * available using methods {@code whenComplete} and {@code handle}. * * @param fn the function to use to compute the value of the * returned CompletableFuture if this CompletableFuture completed * exceptionally * @return the new CompletableFuture */ public CompletableFuture exceptionally( Function fn) { return doExceptionally(fn); } /* ------------- Arbitrary-arity constructions -------------- */ /** * Returns a new CompletableFuture that is completed when all of * the given CompletableFutures complete. If any of the given * CompletableFutures complete exceptionally, then the returned * CompletableFuture also does so, with a CompletionException * holding this exception as its cause. Otherwise, the results, * if any, of the given CompletableFutures are not reflected in * the returned CompletableFuture, but may be obtained by * inspecting them individually. If no CompletableFutures are * provided, returns a CompletableFuture completed with the value * {@code null}. * *

Among the applications of this method is to await completion * of a set of independent CompletableFutures before continuing a * program, as in: {@code CompletableFuture.allOf(c1, c2, * c3).join();}. * * @param cfs the CompletableFutures * @return a new CompletableFuture that is completed when all of the * given CompletableFutures complete * @throws NullPointerException if the array or any of its elements are * {@code null} */ public static CompletableFuture allOf(CompletableFuture... cfs) { return doAllOf(cfs, 0, cfs.length - 1); } /** * Returns a new CompletableFuture that is completed when any of * the given CompletableFutures complete, with the same result. * Otherwise, if it completed exceptionally, the returned * CompletableFuture also does so, with a CompletionException * holding this exception as its cause. If no CompletableFutures * are provided, returns an incomplete CompletableFuture. * * @param cfs the CompletableFutures * @return a new CompletableFuture that is completed with the * result or exception of any of the given CompletableFutures when * one completes * @throws NullPointerException if the array or any of its elements are * {@code null} */ public static CompletableFuture anyOf(CompletableFuture... cfs) { CompletableFuture d = new CompletableFuture(); for (int i = 0; i < cfs.length; ++i) { CompletableFuture c = cfs[i]; Object r = c.result; // throw NPE if null element if (d.result == null) { if (r == null) c.unipush(new DelayedCopy(d, c)); else nowCopy(d, r); } } return d; } /* ------------- Control and status methods -------------- */ /** * If not already completed, completes this CompletableFuture with * a {@link CancellationException}. Dependent CompletableFutures * that have not already completed will also complete * exceptionally, with a {@link CompletionException} caused by * this {@code CancellationException}. * * @param mayInterruptIfRunning this value has no effect in this * implementation because interrupts are not used to control * processing. * * @return {@code true} if this task is now cancelled */ public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = (result == null) && internalComplete(new AltResult(new CancellationException())); postComplete(); return cancelled || isCancelled(); } /** * Returns {@code true} if this CompletableFuture was cancelled * before it completed normally. * * @return {@code true} if this CompletableFuture was cancelled * before it completed normally */ public boolean isCancelled() { Object r; return ((r = result) instanceof AltResult) && (((AltResult)r).ex instanceof CancellationException); } /** * Returns {@code true} if this CompletableFuture completed * exceptionally, in any way. Possible causes include * cancellation, explicit invocation of {@code * completeExceptionally}, and abrupt termination of a * CompletionStage action. * * @return {@code true} if this CompletableFuture completed * exceptionally */ public boolean isCompletedExceptionally() { Object r; return ((r = result) instanceof AltResult) && r != NIL; } /** * Forcibly sets or resets the value subsequently returned by * method {@link #get()} and related methods, whether or not * already completed. This method is designed for use only in * error recovery actions, and even in such situations may result * in ongoing dependent completions using established versus * overwritten outcomes. * * @param value the completion value */ public void obtrudeValue(T value) { result = (value == null) ? NIL : value; postComplete(); } /** * Forcibly causes subsequent invocations of method {@link #get()} * and related methods to throw the given exception, whether or * not already completed. This method is designed for use only in * recovery actions, and even in such situations may result in * ongoing dependent completions using established versus * overwritten outcomes. * * @param ex the exception */ public void obtrudeException(Throwable ex) { if (ex == null) throw new NullPointerException(); result = new AltResult(ex); postComplete(); } /** * Returns the estimated number of CompletableFutures whose * completions are awaiting completion of this CompletableFuture. * This method is designed for use in monitoring system state, not * for synchronization control. * * @return the number of dependent CompletableFutures */ public int getNumberOfDependents() { int count = 0; for (Completion p = completions; p != null; p = p.next) ++count; return count; } /** * Returns a string identifying this CompletableFuture, as well as * its completion state. The state, in brackets, contains the * String {@code "Completed Normally"} or the String {@code * "Completed Exceptionally"}, or the String {@code "Not * completed"} followed by the number of CompletableFutures * dependent upon its completion, if any. * * @return a string identifying this CompletableFuture, as well as its state */ public String toString() { Object r = result; int count; return super.toString() + ((r == null) ? (((count = getNumberOfDependents()) == 0) ? "[Not completed]" : "[Not completed, " + count + " dependents]") : (((r instanceof AltResult) && ((AltResult)r).ex != null) ? "[Completed exceptionally]" : "[Completed normally]")); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long RESULT; private static final long COMPLETIONS; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = CompletableFuture.class; RESULT = UNSAFE.objectFieldOffset (k.getDeclaredField("result")); COMPLETIONS = UNSAFE.objectFieldOffset (k.getDeclaredField("completions")); } catch (Exception x) { throw new Error(x); } } }