ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.109
Committed: Sat May 3 12:16:03 2014 UTC (10 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.108: +47 -44 lines
Log Message:
screen explicit commonPool arguments, for compatibility

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.88 import java.util.concurrent.CompletionException;
23     import java.util.concurrent.CompletionStage;
24 dl 1.1 import java.util.concurrent.locks.LockSupport;
25    
26     /**
27     * A {@link Future} that may be explicitly completed (setting its
28 dl 1.88 * value and status), and may be used as a {@link CompletionStage},
29     * supporting dependent functions and actions that trigger upon its
30     * completion.
31 dl 1.1 *
32 jsr166 1.50 * <p>When two or more threads attempt to
33     * {@link #complete complete},
34 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
35 jsr166 1.50 * {@link #cancel cancel}
36     * a CompletableFuture, only one of them succeeds.
37 dl 1.19 *
38 dl 1.91 * <p>In addition to these and related methods for directly
39     * manipulating status and results, CompletableFuture implements
40     * interface {@link CompletionStage} with the following policies: <ul>
41 dl 1.35 *
42 dl 1.88 * <li>Actions supplied for dependent completions of
43     * <em>non-async</em> methods may be performed by the thread that
44     * completes the current CompletableFuture, or by any other caller of
45 dl 1.96 * a completion method.</li>
46 jsr166 1.65 *
47 dl 1.88 * <li>All <em>async</em> methods without an explicit Executor
48 dl 1.96 * argument are performed using the {@link ForkJoinPool#commonPool()}
49     * (unless it does not support a parallelism level of at least two, in
50     * which case, a new Thread is used). To simplify monitoring,
51     * debugging, and tracking, all generated asynchronous tasks are
52     * instances of the marker interface {@link
53     * AsynchronousCompletionTask}. </li>
54 dl 1.35 *
55 dl 1.88 * <li>All CompletionStage methods are implemented independently of
56     * other public methods, so the behavior of one method is not impacted
57     * by overrides of others in subclasses. </li> </ul>
58     *
59 dl 1.91 * <p>CompletableFuture also implements {@link Future} with the following
60 dl 1.88 * policies: <ul>
61     *
62     * <li>Since (unlike {@link FutureTask}) this class has no direct
63 jsr166 1.55 * control over the computation that causes it to be completed,
64 dl 1.88 * cancellation is treated as just another form of exceptional
65     * completion. Method {@link #cancel cancel} has the same effect as
66     * {@code completeExceptionally(new CancellationException())}. Method
67     * {@link #isCompletedExceptionally} can be used to determine if a
68     * CompletableFuture completed in any exceptional fashion.</li>
69 jsr166 1.55 *
70 dl 1.88 * <li>In case of exceptional completion with a CompletionException,
71 jsr166 1.55 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
72     * {@link ExecutionException} with the same cause as held in the
73 dl 1.88 * corresponding CompletionException. To simplify usage in most
74     * contexts, this class also defines methods {@link #join()} and
75     * {@link #getNow} that instead throw the CompletionException directly
76     * in these cases.</li> </ul>
77 jsr166 1.80 *
78 dl 1.1 * @author Doug Lea
79     * @since 1.8
80     */
81 dl 1.88 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
82 dl 1.28
83 dl 1.1 /*
84 dl 1.20 * Overview:
85 dl 1.1 *
86 dl 1.104 * A CompletableFuture may have dependent completion actions,
87     * collected in a linked stack. It atomically completes by CASing
88     * a result field, and then pops off and runs those actions. This
89     * applies across normal vs exceptional outcomes, sync vs async
90     * actions, binary triggers, and various forms of completions.
91     *
92     * Non-nullness of field result (set via CAS) indicates done. An
93     * AltResult is used to box null as a result, as well as to hold
94     * exceptions. Using a single field makes completion simple to
95     * detect and trigger. Encoding and decoding is straightforward
96     * but adds vertical sprawl. One minor simplification relies on
97     * the (static) NIL (to box null results) being the only AltResult
98     * with a null exception field, so we don't usually need explicit
99     * comparisons with NIL. Exception propagation mechanics
100     * surrounding decoding rely on unchecked casts of decoded results
101     * really being unchecked, and user type errors being caught at
102     * point of use, as is currently the case in Java. These are
103     * highlighted by using SuppressWarnings annotated temporaries.
104     *
105     * Dependent actions are represented by Completion objects linked
106     * as Treiber stacks headed by field completions. There are four
107     * kinds of Completions: single-source (UniCompletion), two-source
108     * (BiCompletion), shared (CoBiCompletion, used by the second
109     * source of a BiCompletion), and Signallers that unblock waiters.
110     *
111     * The same patterns of methods and classes are used for each form
112     * of Completion (apply, combine, etc), and are written in a
113     * similar style. For each form X there is, when applicable:
114     *
115     * * Method nowX (for example nowApply) that immediately executes
116     * a supplied function and sets result
117     * * Class AsyncX class (for example AsyncApply) that calls nowX
118     * from another task,
119     * * Class DelayedX (for example DelayedApply) that holds
120 dl 1.109 * arguments and calls nowX when ready.
121 dl 1.104 *
122     * For each public CompletionStage method M* (for example
123     * thenApply{Async}), there is a method doM (for example
124     * doThenApply) that creates and/or invokes the appropriate form.
125     * Each deals with three cases that can arise when adding a
126     * dependent completion to CompletableFuture f:
127     *
128     * * f is already complete, so the dependent action is run
129     * immediately, via a "now" method, which, if async,
130     * starts the action in a new task.
131     * * f is not complete, so a Completion action is created and
132     * pushed to f's completions. It is triggered via
133     * f.postComplete when f completes.
134     * * f is not complete, but completes while adding the completion
135     * action, so we try to trigger it upon adding (see method
136     * unipush and derivatives) to cover races.
137     *
138     * Methods with two sources (for example thenCombine) must deal
139     * with races across both while pushing actions. The second
140     * completion is an CoBiCompletion pointing to the first, shared
141     * to ensure that at most one claims and performs the action. The
142     * multiple-arity method allOf does this pairwise to form a tree
143     * of completions. (Method anyOf just uses a depth-one Or tree.)
144     *
145     * Upon setting results, method postComplete is called unless
146     * the target is guaranteed not to be observable (i.e., not yet
147     * returned or linked). Multiple threads can call postComplete,
148     * which atomically pops each dependent action, and tries to
149     * trigger it via method tryAct. Any such action must be performed
150     * only once, even if called from several threads, so Completions
151     * maintain status via CAS, and on success run one of the "now"
152     * methods. Triggering can propagate recursively, so tryAct
153     * returns a completed dependent (if one exists) for further
154     * processing by its caller.
155     *
156     * Blocking methods get() and join() rely on Signaller Completions
157     * that wake up waiting threads. The mechanics are similar to
158     * Treiber stack wait-nodes used in FutureTask, Phaser, and
159     * SynchronousQueue. See their internal documentation for
160     * algorithmic details.
161     *
162     * Without precautions, CompletableFutures would be prone to
163     * garbage accumulation as chains of completions build up, each
164     * pointing back to its sources. So we detach (null out) most
165     * Completion fields as soon as possible. To support this,
166     * internal methods check for and harmlessly ignore null arguments
167     * that may have been obtained during races with threads nulling
168     * out fields. (Some of these checked cases cannot currently
169     * happen.) Fields of Async classes can be but currently are not
170     * fully detached, because they do not in general form cycles.
171     */
172    
173     volatile Object result; // Either the result or boxed AltResult
174     volatile Completion<?> completions; // Treiber stack of dependent actions
175    
176     final boolean internalComplete(Object r) { // CAS from null to r
177     return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
178     }
179    
180     final boolean casCompletions(Completion<?> cmp, Completion<?> val) {
181     return UNSAFE.compareAndSwapObject(this, COMPLETIONS, cmp, val);
182     }
183    
184     /* ------------- Encoding and decoding outcomes -------------- */
185    
186     static final class AltResult { // See above
187     final Throwable ex; // null only for NIL
188     AltResult(Throwable x) { this.ex = x; }
189 dl 1.1 }
190    
191     static final AltResult NIL = new AltResult(null);
192    
193 dl 1.20 /**
194 dl 1.104 * Returns the encoding of the given (non-null) exception as a
195     * wrapped CompletionException unless it is one already.
196 dl 1.99 */
197 dl 1.104 static AltResult altThrowable(Throwable x) {
198     return new AltResult((x instanceof CompletionException) ? x :
199     new CompletionException(x));
200 dl 1.99 }
201    
202     /**
203 dl 1.104 * Returns the encoding of the given arguments: if the exception
204     * is non-null, encodes as altThrowable. Otherwise uses the given
205     * value, boxed as NIL if null.
206 dl 1.99 */
207 dl 1.104 static Object encodeOutcome(Object v, Throwable x) {
208     return (x != null) ? altThrowable(x) : (v == null) ? NIL : v;
209 dl 1.20 }
210    
211 dl 1.1 /**
212 jsr166 1.108 * Decodes outcome to return result or throw unchecked exception.
213 dl 1.1 */
214 dl 1.104 private static <T> T reportJoin(Object r) {
215     if (r instanceof AltResult) {
216     Throwable x;
217     if ((x = ((AltResult)r).ex) == null)
218     return null;
219     if (x instanceof CancellationException)
220     throw (CancellationException)x;
221     if (x instanceof CompletionException)
222     throw (CompletionException)x;
223     throw new CompletionException(x);
224 dl 1.28 }
225 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r;
226     return tr;
227 dl 1.1 }
228    
229     /**
230 jsr166 1.108 * Reports result using Future.get conventions.
231 dl 1.1 */
232 dl 1.104 private static <T> T reportGet(Object r)
233     throws InterruptedException, ExecutionException {
234     if (r == null) // by convention below, null means interrupted
235     throw new InterruptedException();
236     if (r instanceof AltResult) {
237     Throwable x, cause;
238     if ((x = ((AltResult)r).ex) == null)
239 dl 1.28 return null;
240 dl 1.104 if (x instanceof CancellationException)
241     throw (CancellationException)x;
242     if ((x instanceof CompletionException) &&
243     (cause = x.getCause()) != null)
244     x = cause;
245     throw new ExecutionException(x);
246 dl 1.28 }
247 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r;
248     return tr;
249 dl 1.1 }
250    
251 dl 1.104 /* ------------- Async Tasks -------------- */
252    
253 dl 1.1 /**
254 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
255 dl 1.28 * {@code async} methods. This may be useful for monitoring,
256     * debugging, and tracking asynchronous activities.
257 jsr166 1.57 *
258     * @since 1.8
259 dl 1.1 */
260 dl 1.28 public static interface AsynchronousCompletionTask {
261 dl 1.1 }
262    
263 dl 1.104 /**
264     * Base class for tasks that can act as either FJ or plain
265     * Runnables. Abstract method compute calls an associated "now"
266     * method. Method exec calls compute if its CompletableFuture is
267     * not already done, and runs completions if done. Fields are not
268     * in general final and can be nulled out after use (but most
269     * currently are not). Classes include serialVersionUIDs even
270     * though they are currently never serialized.
271     */
272     abstract static class Async<T> extends ForkJoinTask<Void>
273 dl 1.28 implements Runnable, AsynchronousCompletionTask {
274 dl 1.104 CompletableFuture<T> dep; // the CompletableFuture to trigger
275     Async(CompletableFuture<T> dep) { this.dep = dep; }
276    
277 jsr166 1.105 abstract void compute(); // call the associated "now" method
278 dl 1.104
279 jsr166 1.105 public final boolean exec() {
280 dl 1.104 CompletableFuture<T> d;
281     if ((d = dep) != null) {
282     if (d.result == null) // suppress if cancelled
283     compute();
284     if (d.result != null)
285     d.postComplete();
286     dep = null; // detach
287     }
288     return true;
289     }
290 dl 1.28 public final Void getRawResult() { return null; }
291     public final void setRawResult(Void v) { }
292     public final void run() { exec(); }
293 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
294 jsr166 1.26 }
295    
296 dl 1.109 /**
297     * Default executor -- ForkJoinPool.commonPool() unless it cannot
298     * support parallelism.
299     */
300     static final Executor asyncPool =
301     (ForkJoinPool.getCommonPoolParallelism() > 1) ?
302     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
303    
304     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
305     static final class ThreadPerTaskExecutor implements Executor {
306     public void execute(Runnable r) { new Thread(r).start(); }
307     }
308    
309     /**
310     * Null-checks user executor argument, and translates uses of
311     * commonPool to asyncPool in case parallelism disabled.
312     */
313     static Executor screenExecutor(Executor e) {
314     if (e == null) throw new NullPointerException();
315     return (e == ForkJoinPool.commonPool()) ? asyncPool : e;
316     }
317    
318 dl 1.104 /* ------------- Completions -------------- */
319    
320 jsr166 1.106 abstract static class Completion<T> { // See above
321 dl 1.104 volatile Completion<?> next; // Treiber stack link
322    
323     /**
324     * Performs completion action if enabled, returning a
325 jsr166 1.107 * completed dependent CompletableFuture, if one exists.
326 dl 1.104 */
327     abstract CompletableFuture<?> tryAct();
328 dl 1.96 }
329    
330 dl 1.104 /**
331 jsr166 1.107 * Triggers all reachable enabled dependents. Call only when
332 dl 1.104 * known to be done.
333     */
334     final void postComplete() {
335     /*
336     * On each step, variable f holds current completions to pop
337     * and run. It is extended along only one path at a time,
338     * pushing others to avoid StackOverflowErrors on recursion.
339     */
340     CompletableFuture<?> f = this; Completion<?> h;
341     while ((h = f.completions) != null ||
342     (f != this && (h = (f = this).completions) != null)) {
343     CompletableFuture<?> d; Completion<?> t;
344     if (f.casCompletions(h, t = h.next)) {
345     if (t != null) {
346     if (f != this) { // push
347     do {} while (!casCompletions(h.next = completions, h));
348     continue;
349     }
350     h.next = null; // detach
351 dl 1.28 }
352 dl 1.104 f = (d = h.tryAct()) == null ? this : d;
353 dl 1.19 }
354     }
355     }
356    
357 dl 1.104 /* ------------- One-source Completions -------------- */
358    
359     /**
360     * A Completion with a source and dependent. The "dep" field acts
361     * as a claim, nulled out to disable further attempts to
362     * trigger. Fields can only be observed by other threads upon
363     * successful push; and should be nulled out after claim.
364     */
365 jsr166 1.106 abstract static class UniCompletion<T> extends Completion<T> {
366 dl 1.104 Executor async; // executor to use (null if none)
367     CompletableFuture<T> dep; // the dependent to complete
368     CompletableFuture<?> src; // source of value for tryAct
369    
370     UniCompletion(Executor async, CompletableFuture<T> dep,
371     CompletableFuture<?> src) {
372     this.async = async; this.dep = dep; this.src = src;
373     }
374    
375     /** Tries to claim completion action by CASing dep to null */
376     final boolean claim(CompletableFuture<T> d) {
377     return UNSAFE.compareAndSwapObject(this, DEP, d, null);
378     }
379    
380     private static final sun.misc.Unsafe UNSAFE;
381     private static final long DEP;
382     static {
383     try {
384     UNSAFE = sun.misc.Unsafe.getUnsafe();
385     Class<?> k = UniCompletion.class;
386     DEP = UNSAFE.objectFieldOffset
387     (k.getDeclaredField("dep"));
388     } catch (Exception x) {
389     throw new Error(x);
390 dl 1.1 }
391     }
392     }
393    
394 dl 1.104 /** Pushes c on to completions, and triggers c if done. */
395     private void unipush(UniCompletion<?> c) {
396     if (c != null) {
397     CompletableFuture<?> d;
398     while (result == null && !casCompletions(c.next = completions, c))
399     c.next = null; // clear on CAS failure
400     if ((d = c.tryAct()) != null) // cover races
401     d.postComplete();
402     if (result != null) // clean stack
403     postComplete();
404     }
405     }
406    
407     // Immediate, async, delayed, and routing support for Function/apply
408    
409     static <T,U> void nowApply(Executor e, CompletableFuture<U> d, Object r,
410     Function<? super T,? extends U> f) {
411     if (d != null && f != null) {
412     T t; U u; Throwable x;
413     if (r instanceof AltResult) {
414     t = null;
415     x = ((AltResult)r).ex;
416     }
417     else {
418     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
419     x = null;
420     }
421     if (x == null) {
422 dl 1.21 try {
423 dl 1.104 if (e != null) {
424     e.execute(new AsyncApply<T,U>(d, t, f));
425     return;
426     }
427     u = f.apply(t);
428     } catch (Throwable ex) {
429     x = ex;
430 dl 1.21 u = null;
431     }
432 dl 1.1 }
433 dl 1.104 else
434     u = null;
435     d.internalComplete(encodeOutcome(u, x));
436 dl 1.1 }
437     }
438    
439 dl 1.104 static final class AsyncApply<T,U> extends Async<U> {
440 jsr166 1.105 T arg; Function<? super T,? extends U> fn;
441 dl 1.104 AsyncApply(CompletableFuture<U> dep, T arg,
442     Function<? super T,? extends U> fn) {
443     super(dep); this.arg = arg; this.fn = fn;
444 dl 1.1 }
445 jsr166 1.105 final void compute() { nowApply(null, dep, arg, fn); }
446 dl 1.1 private static final long serialVersionUID = 5232453952276885070L;
447     }
448    
449 dl 1.104 static final class DelayedApply<T,U> extends UniCompletion<U> {
450     Function<? super T,? extends U> fn;
451     DelayedApply(Executor async, CompletableFuture<U> dep,
452     CompletableFuture<?> src,
453     Function<? super T,? extends U> fn) {
454     super(async, dep, src); this.fn = fn;
455     }
456     final CompletableFuture<?> tryAct() {
457     CompletableFuture<U> d; CompletableFuture<?> a; Object r;
458     if ((d = dep) != null && (a = src) != null &&
459     (r = a.result) != null && claim(d)) {
460     nowApply(async, d, r, fn);
461     src = null; fn = null;
462     if (d.result != null) return d;
463 dl 1.7 }
464 dl 1.104 return null;
465 dl 1.7 }
466     }
467    
468 dl 1.104 private <U> CompletableFuture<U> doThenApply(
469     Function<? super T,? extends U> fn, Executor e) {
470     if (fn == null) throw new NullPointerException();
471     CompletableFuture<U> d = new CompletableFuture<U>();
472     Object r = result;
473     if (r == null)
474     unipush(new DelayedApply<T,U>(e, d, this, fn));
475     else
476     nowApply(e, d, r, fn);
477     return d;
478     }
479    
480     // Consumer/accept
481    
482     static <T,U> void nowAccept(Executor e, CompletableFuture<U> d,
483     Object r, Consumer<? super T> f) {
484     if (d != null && f != null) {
485     T t; Throwable x;
486     if (r instanceof AltResult) {
487     t = null;
488     x = ((AltResult)r).ex;
489     }
490     else {
491     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
492     x = null;
493     }
494     if (x == null) {
495 dl 1.21 try {
496 dl 1.104 if (e != null) {
497     e.execute(new AsyncAccept<T,U>(d, t, f));
498     return;
499     }
500     f.accept(t);
501     } catch (Throwable ex) {
502     x = ex;
503 dl 1.21 }
504 dl 1.7 }
505 dl 1.104 d.internalComplete(encodeOutcome(null, x));
506 dl 1.7 }
507     }
508    
509 dl 1.104 static final class AsyncAccept<T,U> extends Async<U> {
510 jsr166 1.105 T arg; Consumer<? super T> fn;
511 dl 1.104 AsyncAccept(CompletableFuture<U> dep, T arg,
512     Consumer<? super T> fn) {
513     super(dep); this.arg = arg; this.fn = fn;
514 dl 1.37 }
515 jsr166 1.105 final void compute() { nowAccept(null, dep, arg, fn); }
516 dl 1.37 private static final long serialVersionUID = 5232453952276885070L;
517     }
518    
519 dl 1.104 static final class DelayedAccept<T> extends UniCompletion<Void> {
520     Consumer<? super T> fn;
521     DelayedAccept(Executor async, CompletableFuture<Void> dep,
522     CompletableFuture<?> src, Consumer<? super T> fn) {
523     super(async, dep, src); this.fn = fn;
524 dl 1.91 }
525 dl 1.104 final CompletableFuture<?> tryAct() {
526     CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
527     if ((d = dep) != null && (a = src) != null &&
528     (r = a.result) != null && claim(d)) {
529     nowAccept(async, d, r, fn);
530     src = null; fn = null;
531     if (d.result != null) return d;
532 dl 1.91 }
533 dl 1.104 return null;
534 dl 1.91 }
535     }
536    
537 dl 1.104 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
538     Executor e) {
539     if (fn == null) throw new NullPointerException();
540     CompletableFuture<Void> d = new CompletableFuture<Void>();
541     Object r = result;
542     if (r == null)
543     unipush(new DelayedAccept<T>(e, d, this, fn));
544     else
545     nowAccept(e, d, r, fn);
546     return d;
547 dl 1.28 }
548    
549 dl 1.104 // Runnable/run
550 dl 1.1
551 dl 1.104 static <T> void nowRun(Executor e, CompletableFuture<T> d, Object r,
552     Runnable f) {
553     if (d != null && f != null) {
554     Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
555     if (x == null) {
556     try {
557     if (e != null) {
558     e.execute(new AsyncRun<T>(d, f));
559     return;
560 dl 1.17 }
561 dl 1.104 f.run();
562     } catch (Throwable ex) {
563     x = ex;
564 dl 1.17 }
565 dl 1.1 }
566 dl 1.104 d.internalComplete(encodeOutcome(null, x));
567 dl 1.1 }
568     }
569    
570 dl 1.104 static final class AsyncRun<T> extends Async<T> {
571 jsr166 1.105 Runnable fn;
572 dl 1.104 AsyncRun(CompletableFuture<T> dep, Runnable fn) {
573     super(dep); this.fn = fn;
574 dl 1.7 }
575 jsr166 1.105 final void compute() { nowRun(null, dep, null, fn); }
576 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
577 dl 1.7 }
578    
579 dl 1.104 static final class DelayedRun extends UniCompletion<Void> {
580 jsr166 1.105 Runnable fn;
581 dl 1.104 DelayedRun(Executor async, CompletableFuture<Void> dep,
582     CompletableFuture<?> src, Runnable fn) {
583     super(async, dep, src); this.fn = fn;
584     }
585     final CompletableFuture<?> tryAct() {
586     CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
587     if ((d = dep) != null && (a = src) != null &&
588     (r = a.result) != null && claim(d)) {
589     nowRun(async, d, r, fn);
590     src = null; fn = null; // clear refs
591     if (d.result != null) return d;
592 dl 1.1 }
593 dl 1.104 return null;
594 dl 1.1 }
595     }
596    
597 dl 1.104 private CompletableFuture<Void> doThenRun(Runnable fn, Executor e) {
598     if (fn == null) throw new NullPointerException();
599     CompletableFuture<Void> d = new CompletableFuture<Void>();
600     Object r = result;
601     if (r == null)
602     unipush(new DelayedRun(e, d, this, fn));
603     else
604     nowRun(e, d, r, fn);
605     return d;
606     }
607    
608     // Supplier/get
609    
610     static <T> void nowSupply(CompletableFuture<T> d, Supplier<T> f) {
611     if (d != null && f != null) {
612     T t; Throwable x;
613     try {
614     t = f.get();
615     x = null;
616     } catch (Throwable ex) {
617     x = ex;
618     t = null;
619 jsr166 1.2 }
620 dl 1.104 d.internalComplete(encodeOutcome(t, x));
621 jsr166 1.2 }
622 dl 1.1 }
623    
624 dl 1.104 static final class AsyncSupply<T> extends Async<T> {
625 jsr166 1.105 Supplier<T> fn;
626 dl 1.104 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
627     super(dep); this.fn = fn;
628 dl 1.7 }
629 jsr166 1.105 final void compute() { nowSupply(dep, fn); }
630 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
631 dl 1.7 }
632    
633 dl 1.104 // WhenComplete
634    
635     static <T> void nowWhen(Executor e, CompletableFuture<T> d, Object r,
636     BiConsumer<? super T,? super Throwable> f) {
637     if (d != null && f != null) {
638     T t; Throwable x, dx;
639     if (r instanceof AltResult) {
640     t = null;
641     x = ((AltResult)r).ex;
642     }
643     else {
644     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
645     x = null;
646     }
647     try {
648     if (e != null) {
649     e.execute(new AsyncWhen<T>(d, r, f));
650     return;
651 jsr166 1.2 }
652 dl 1.104 f.accept(t, x);
653     dx = null;
654     } catch (Throwable ex) {
655     dx = ex;
656 jsr166 1.2 }
657 dl 1.104 d.internalComplete(encodeOutcome(t, x != null ? x : dx));
658 jsr166 1.2 }
659 dl 1.1 }
660    
661 dl 1.104 static final class AsyncWhen<T> extends Async<T> {
662 jsr166 1.105 Object arg; BiConsumer<? super T,? super Throwable> fn;
663 dl 1.104 AsyncWhen(CompletableFuture<T> dep, Object arg,
664     BiConsumer<? super T,? super Throwable> fn) {
665     super(dep); this.arg = arg; this.fn = fn;
666 dl 1.35 }
667 jsr166 1.105 final void compute() { nowWhen(null, dep, arg, fn); }
668 dl 1.35 private static final long serialVersionUID = 5232453952276885070L;
669     }
670    
671 dl 1.104 static final class DelayedWhen<T> extends UniCompletion<T> {
672     BiConsumer<? super T, ? super Throwable> fn;
673     DelayedWhen(Executor async, CompletableFuture<T> dep,
674     CompletableFuture<?> src,
675     BiConsumer<? super T, ? super Throwable> fn) {
676     super(async, dep, src); this.fn = fn;
677     }
678     final CompletableFuture<?> tryAct() {
679     CompletableFuture<T> d; CompletableFuture<?> a; Object r;
680     if ((d = dep) != null && (a = src) != null &&
681     (r = a.result) != null && claim(d)) {
682     nowWhen(async, d, r, fn);
683     src = null; fn = null;
684     if (d.result != null) return d;
685 jsr166 1.2 }
686 dl 1.104 return null;
687 jsr166 1.2 }
688 dl 1.1 }
689    
690 dl 1.104 private CompletableFuture<T> doWhenComplete(
691     BiConsumer<? super T, ? super Throwable> fn, Executor e) {
692     if (fn == null) throw new NullPointerException();
693     CompletableFuture<T> d = new CompletableFuture<T>();
694     Object r = result;
695     if (r == null)
696     unipush(new DelayedWhen<T>(e, d, this, fn));
697     else
698     nowWhen(e, d, r, fn);
699     return d;
700 dl 1.7 }
701    
702 dl 1.104 // Handle
703    
704     static <T,U> void nowHandle(Executor e, CompletableFuture<U> d, Object r,
705     BiFunction<? super T, Throwable, ? extends U> f) {
706     if (d != null && f != null) {
707     T t; U u; Throwable x, dx;
708     if (r instanceof AltResult) {
709     t = null;
710     x = ((AltResult)r).ex;
711     }
712     else {
713     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
714     x = null;
715     }
716     try {
717     if (e != null) {
718     e.execute(new AsyncCombine<T,Throwable,U>(d, t, x, f));
719     return;
720 dl 1.1 }
721 dl 1.104 u = f.apply(t, x);
722     dx = null;
723     } catch (Throwable ex) {
724     dx = ex;
725     u = null;
726 jsr166 1.2 }
727 dl 1.104 d.internalComplete(encodeOutcome(u, dx));
728 jsr166 1.2 }
729 dl 1.1 }
730    
731 dl 1.104 static final class DelayedHandle<T,U> extends UniCompletion<U> {
732     BiFunction<? super T, Throwable, ? extends U> fn;
733     DelayedHandle(Executor async, CompletableFuture<U> dep,
734     CompletableFuture<?> src,
735     BiFunction<? super T, Throwable, ? extends U> fn) {
736     super(async, dep, src); this.fn = fn;
737     }
738     final CompletableFuture<?> tryAct() {
739     CompletableFuture<U> d; CompletableFuture<?> a; Object r;
740     if ((d = dep) != null && (a = src) != null &&
741     (r = a.result) != null && claim(d)) {
742     nowHandle(async, d, r, fn);
743     src = null; fn = null;
744     if (d.result != null) return d;
745 dl 1.35 }
746 dl 1.104 return null;
747 dl 1.35 }
748     }
749    
750 dl 1.104 private <U> CompletableFuture<U> doHandle(
751     BiFunction<? super T, Throwable, ? extends U> fn,
752     Executor e) {
753     if (fn == null) throw new NullPointerException();
754     CompletableFuture<U> d = new CompletableFuture<U>();
755     Object r = result;
756     if (r == null)
757     unipush(new DelayedHandle<T,U>(e, d, this, fn));
758     else
759     nowHandle(e, d, r, fn);
760     return d;
761 dl 1.1 }
762    
763 dl 1.104 // Exceptionally
764    
765     static <T> void nowExceptionally(CompletableFuture<T> d, Object r,
766     Function<? super Throwable, ? extends T> f) {
767     if (d != null && f != null) {
768     T t; Throwable x, dx;
769     if ((r instanceof AltResult) && (x = ((AltResult)r).ex) != null) {
770     try {
771     t = f.apply(x);
772     dx = null;
773     } catch (Throwable ex) {
774     dx = ex;
775 dl 1.88 t = null;
776     }
777     }
778 dl 1.104 else {
779     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
780     dx = null;
781     }
782     d.internalComplete(encodeOutcome(t, dx));
783 dl 1.88 }
784     }
785    
786 dl 1.104 static final class DelayedExceptionally<T> extends UniCompletion<T> {
787     Function<? super Throwable, ? extends T> fn;
788     DelayedExceptionally(CompletableFuture<T> dep, CompletableFuture<?> src,
789     Function<? super Throwable, ? extends T> fn) {
790     super(null, dep, src); this.fn = fn;
791     }
792     final CompletableFuture<?> tryAct() {
793     CompletableFuture<T> d; CompletableFuture<?> a; Object r;
794     if ((d = dep) != null && (a = src) != null &&
795     (r = a.result) != null && claim(d)) {
796     nowExceptionally(d, r, fn);
797     src = null; fn = null;
798     if (d.result != null) return d;
799 dl 1.17 }
800 dl 1.104 return null;
801 dl 1.17 }
802     }
803    
804 dl 1.104 private CompletableFuture<T> doExceptionally(
805     Function<Throwable, ? extends T> fn) {
806     if (fn == null) throw new NullPointerException();
807     CompletableFuture<T> d = new CompletableFuture<T>();
808     Object r = result;
809     if (r == null)
810     unipush(new DelayedExceptionally<T>(d, this, fn));
811     else
812     nowExceptionally(d, r, fn);
813     return d;
814 dl 1.75 }
815    
816 dl 1.104 // Identity function used by nowCompose and anyOf
817    
818     static <T> void nowCopy(CompletableFuture<T> d, Object r) {
819     if (d != null && d.result == null) {
820     Throwable x;
821     d.internalComplete(((r instanceof AltResult) &&
822     (x = ((AltResult)r).ex) != null &&
823     !(x instanceof CompletionException)) ?
824     new AltResult(new CompletionException(x)): r);
825 dl 1.17 }
826     }
827    
828 dl 1.104 static final class DelayedCopy<T> extends UniCompletion<T> {
829     DelayedCopy(CompletableFuture<T> dep, CompletableFuture<?> src) {
830     super(null, dep, src);
831     }
832     final CompletableFuture<?> tryAct() {
833     CompletableFuture<T> d; CompletableFuture<?> a; Object r;
834     if ((d = dep) != null && (a = src) != null &&
835     (r = a.result) != null && claim(d)) {
836     nowCopy(d, r);
837     src = null;
838     if (d.result != null) return d;
839 dl 1.28 }
840 dl 1.104 return null;
841 dl 1.28 }
842     }
843    
844 dl 1.104 // Compose
845 dl 1.28
846 dl 1.104 static <T,U> void nowCompose(Executor e, CompletableFuture<U> d, Object r,
847     Function<? super T, ? extends CompletionStage<U>> f) {
848     if (d != null && f != null) {
849     T t; Throwable x;
850 dl 1.88 if (r instanceof AltResult) {
851     t = null;
852 dl 1.104 x = ((AltResult)r).ex;
853 dl 1.88 }
854     else {
855 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
856     x = null;
857 dl 1.88 }
858 dl 1.104 if (x == null) {
859 dl 1.88 try {
860     if (e != null)
861 dl 1.104 e.execute(new AsyncCompose<T,U>(d, t, f));
862     else {
863     CompletableFuture<U> c =
864     f.apply(t).toCompletableFuture();
865     Object s = c.result;
866     if (s == null)
867     c.unipush(new DelayedCopy<U>(d, c));
868     else
869     nowCopy(d, s);
870     }
871     return;
872     } catch (Throwable ex) {
873     x = ex;
874 dl 1.88 }
875     }
876 dl 1.104 d.internalComplete(encodeOutcome(null, x));
877 dl 1.88 }
878 dl 1.28 }
879    
880 dl 1.104 static final class AsyncCompose<T,U> extends Async<U> {
881 jsr166 1.105 T arg; Function<? super T, ? extends CompletionStage<U>> fn;
882 dl 1.104 AsyncCompose(CompletableFuture<U> dep, T arg,
883     Function<? super T, ? extends CompletionStage<U>> fn) {
884     super(dep); this.arg = arg; this.fn = fn;
885     }
886 jsr166 1.105 final void compute() { nowCompose(null, dep, arg, fn); }
887 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
888     }
889    
890     static final class DelayedCompose<T,U> extends UniCompletion<U> {
891     Function<? super T, ? extends CompletionStage<U>> fn;
892     DelayedCompose(Executor async, CompletableFuture<U> dep,
893     CompletableFuture<?> src,
894     Function<? super T, ? extends CompletionStage<U>> fn) {
895     super(async, dep, src); this.fn = fn;
896     }
897     final CompletableFuture<?> tryAct() {
898     CompletableFuture<U> d; CompletableFuture<?> a; Object r;
899     if ((d = dep) != null && (a = src) != null &&
900     (r = a.result) != null && claim(d)) {
901     nowCompose(async, d, r, fn);
902     src = null; fn = null;
903     if (d.result != null) return d;
904     }
905     return null;
906     }
907     }
908    
909     private <U> CompletableFuture<U> doThenCompose(
910     Function<? super T, ? extends CompletionStage<U>> fn, Executor e) {
911 dl 1.88 if (fn == null) throw new NullPointerException();
912 dl 1.104 Object r = result;
913     if (r == null || e != null) {
914     CompletableFuture<U> d = new CompletableFuture<U>();
915     if (r == null)
916     unipush(new DelayedCompose<T,U>(e, d, this, fn));
917     else
918     nowCompose(e, d, r, fn);
919     return d;
920 dl 1.88 }
921 dl 1.104 else { // try to return function result
922     T t; Throwable x;
923 dl 1.88 if (r instanceof AltResult) {
924     t = null;
925 dl 1.104 x = ((AltResult)r).ex;
926 dl 1.88 }
927     else {
928 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
929     x = null;
930 dl 1.88 }
931 dl 1.104 if (x == null) {
932 dl 1.88 try {
933 dl 1.104 return fn.apply(t).toCompletableFuture();
934     } catch (Throwable ex) {
935     x = ex;
936 dl 1.88 }
937     }
938 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
939     d.result = encodeOutcome(null, x);
940     return d;
941 dl 1.88 }
942 dl 1.28 }
943    
944 dl 1.104 /* ------------- Two-source Completions -------------- */
945    
946     /** A Completion with two sources */
947 jsr166 1.106 abstract static class BiCompletion<T> extends UniCompletion<T> {
948 dl 1.104 CompletableFuture<?> snd; // second source for tryAct
949     BiCompletion(Executor async, CompletableFuture<T> dep,
950     CompletableFuture<?> src, CompletableFuture<?> snd) {
951     super(async, dep, src); this.snd = snd;
952     }
953     }
954    
955     /** A Completion delegating to a shared BiCompletion */
956     static final class CoBiCompletion<T> extends Completion<T> {
957     BiCompletion<T> completion;
958     CoBiCompletion(BiCompletion<T> completion) {
959     this.completion = completion;
960 dl 1.88 }
961 dl 1.104 final CompletableFuture<?> tryAct() {
962     BiCompletion<T> c;
963     return (c = completion) == null ? null : c.tryAct();
964 dl 1.88 }
965     }
966    
967 dl 1.104 /* ------------- Two-source Anded -------------- */
968    
969     /* Pushes c on to completions and o's completions unless both done. */
970     private <U> void bipushAnded(CompletableFuture<?> o, BiCompletion<U> c) {
971     if (c != null && o != null) {
972     Object r; CompletableFuture<?> d;
973     while ((r = result) == null &&
974     !casCompletions(c.next = completions, c))
975     c.next = null;
976     if (o.result == null) {
977     Completion<U> q = (r != null) ? c : new CoBiCompletion<U>(c);
978     while (o.result == null &&
979     !o.casCompletions(q.next = o.completions, q))
980     q.next = null;
981 dl 1.88 }
982 dl 1.104 if ((d = c.tryAct()) != null)
983     d.postComplete();
984     if (o.result != null)
985     o.postComplete();
986     if (result != null)
987     postComplete();
988 dl 1.88 }
989 dl 1.104 }
990    
991     // BiFunction/combine
992    
993     static <T,U,V> void nowCombine(Executor e, CompletableFuture<V> d,
994     Object r, Object s,
995     BiFunction<? super T,? super U,? extends V> f) {
996     if (d != null && f != null) {
997     T t; U u; V v; Throwable x;
998 dl 1.88 if (r instanceof AltResult) {
999     t = null;
1000 dl 1.104 x = ((AltResult)r).ex;
1001 dl 1.88 }
1002     else {
1003 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1004     x = null;
1005 dl 1.88 }
1006 dl 1.104 if (x != null)
1007 dl 1.88 u = null;
1008     else if (s instanceof AltResult) {
1009 dl 1.104 x = ((AltResult)s).ex;
1010 dl 1.88 u = null;
1011     }
1012     else {
1013 dl 1.104 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1014 dl 1.88 }
1015 dl 1.104 if (x == null) {
1016 dl 1.88 try {
1017 dl 1.104 if (e != null) {
1018     e.execute(new AsyncCombine<T,U,V>(d, t, u, f));
1019     return;
1020     }
1021     v = f.apply(t, u);
1022     } catch (Throwable ex) {
1023     x = ex;
1024     v = null;
1025 dl 1.88 }
1026     }
1027 dl 1.104 else
1028     v = null;
1029     d.internalComplete(encodeOutcome(v, x));
1030 dl 1.88 }
1031     }
1032    
1033 dl 1.104 static final class AsyncCombine<T,U,V> extends Async<V> {
1034 jsr166 1.105 T arg1; U arg2; BiFunction<? super T,? super U,? extends V> fn;
1035 dl 1.104 AsyncCombine(CompletableFuture<V> dep, T arg1, U arg2,
1036     BiFunction<? super T,? super U,? extends V> fn) {
1037     super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1038     }
1039 jsr166 1.105 final void compute() { nowCombine(null, dep, arg1, arg2, fn); }
1040 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
1041     }
1042    
1043     static final class DelayedCombine<T,U,V> extends BiCompletion<V> {
1044     BiFunction<? super T,? super U,? extends V> fn;
1045     DelayedCombine(Executor async, CompletableFuture<V> dep,
1046     CompletableFuture<?> src, CompletableFuture<?> snd,
1047     BiFunction<? super T,? super U,? extends V> fn) {
1048     super(async, dep, src, snd); this.fn = fn;
1049     }
1050     final CompletableFuture<?> tryAct() {
1051     CompletableFuture<V> d; CompletableFuture<?> a, b; Object r, s;
1052     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1053     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1054     nowCombine(async, d, r, s, fn);
1055     src = null; snd = null; fn = null;
1056     if (d.result != null) return d;
1057 dl 1.88 }
1058 dl 1.104 return null;
1059 dl 1.88 }
1060 dl 1.104 }
1061    
1062     private <U,V> CompletableFuture<V> doThenCombine(
1063     CompletableFuture<? extends U> o,
1064     BiFunction<? super T,? super U,? extends V> fn,
1065     Executor e) {
1066     if (o == null || fn == null) throw new NullPointerException();
1067     CompletableFuture<V> d = new CompletableFuture<V>();
1068     Object r = result, s = o.result;
1069     if (r == null || s == null)
1070     bipushAnded(o, new DelayedCombine<T,U,V>(e, d, this, o, fn));
1071     else
1072     nowCombine(e, d, r, s, fn);
1073     return d;
1074     }
1075    
1076     // BiConsumer/AcceptBoth
1077    
1078     static <T,U,V> void nowAcceptBoth(Executor e, CompletableFuture<V> d,
1079     Object r, Object s,
1080     BiConsumer<? super T,? super U> f) {
1081     if (d != null && f != null) {
1082     T t; U u; Throwable x;
1083 dl 1.88 if (r instanceof AltResult) {
1084     t = null;
1085 dl 1.104 x = ((AltResult)r).ex;
1086 dl 1.88 }
1087     else {
1088 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1089     x = null;
1090 dl 1.88 }
1091 dl 1.104 if (x != null)
1092 dl 1.88 u = null;
1093     else if (s instanceof AltResult) {
1094 dl 1.104 x = ((AltResult)s).ex;
1095 dl 1.88 u = null;
1096     }
1097     else {
1098 dl 1.104 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1099 dl 1.88 }
1100 dl 1.104 if (x == null) {
1101 dl 1.88 try {
1102 dl 1.104 if (e != null) {
1103     e.execute(new AsyncAcceptBoth<T,U,V>(d, t, u, f));
1104     return;
1105     }
1106     f.accept(t, u);
1107     } catch (Throwable ex) {
1108     x = ex;
1109 dl 1.88 }
1110     }
1111 dl 1.104 d.internalComplete(encodeOutcome(null, x));
1112     }
1113     }
1114    
1115     static final class AsyncAcceptBoth<T,U,V> extends Async<V> {
1116 jsr166 1.105 T arg1; U arg2; BiConsumer<? super T,? super U> fn;
1117 dl 1.104 AsyncAcceptBoth(CompletableFuture<V> dep, T arg1, U arg2,
1118     BiConsumer<? super T,? super U> fn) {
1119     super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1120     }
1121 jsr166 1.105 final void compute() { nowAcceptBoth(null, dep, arg1, arg2, fn); }
1122 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
1123     }
1124    
1125     static final class DelayedAcceptBoth<T,U> extends BiCompletion<Void> {
1126     BiConsumer<? super T,? super U> fn;
1127     DelayedAcceptBoth(Executor async, CompletableFuture<Void> dep,
1128     CompletableFuture<?> src, CompletableFuture<?> snd,
1129     BiConsumer<? super T,? super U> fn) {
1130     super(async, dep, src, snd); this.fn = fn;
1131     }
1132     final CompletableFuture<?> tryAct() {
1133     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1134     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1135     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1136     nowAcceptBoth(async, d, r, s, fn);
1137     src = null; snd = null; fn = null;
1138     if (d.result != null) return d;
1139     }
1140     return null;
1141 dl 1.88 }
1142     }
1143    
1144 dl 1.104 private <U> CompletableFuture<Void> doThenAcceptBoth(
1145     CompletableFuture<? extends U> o,
1146     BiConsumer<? super T, ? super U> fn,
1147     Executor e) {
1148     if (o == null || fn == null) throw new NullPointerException();
1149     CompletableFuture<Void> d = new CompletableFuture<Void>();
1150     Object r = result, s = o.result;
1151     if (r == null || s == null)
1152     bipushAnded(o, new DelayedAcceptBoth<T,U>(e, d, this, o, fn));
1153     else
1154     nowAcceptBoth(e, d, r, s, fn);
1155     return d;
1156     }
1157    
1158     // Runnable/both
1159    
1160     static final class DelayedRunAfterBoth extends BiCompletion<Void> {
1161     Runnable fn;
1162     DelayedRunAfterBoth(Executor async, CompletableFuture<Void> dep,
1163     CompletableFuture<?> src, CompletableFuture<?> snd,
1164     Runnable fn) {
1165     super(async, dep, src, snd); this.fn = fn;
1166     }
1167     final CompletableFuture<?> tryAct() {
1168     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1169     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1170     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1171     Throwable x = (r instanceof AltResult) ?
1172     ((AltResult)r).ex : null;
1173     nowRun(async, d, (x == null) ? s : r, fn);
1174     src = null; snd = null; fn = null;
1175     if (d.result != null) return d;
1176 dl 1.88 }
1177 dl 1.104 return null;
1178 dl 1.88 }
1179 dl 1.104 }
1180    
1181     private CompletableFuture<Void> doRunAfterBoth(
1182     CompletableFuture<?> o, Runnable fn, Executor e) {
1183     if (o == null || fn == null) throw new NullPointerException();
1184     CompletableFuture<Void> d = new CompletableFuture<Void>();
1185     Object r = result, s = o.result;
1186     if (r == null || s == null)
1187     bipushAnded(o, new DelayedRunAfterBoth(e, d, this, o, fn));
1188 dl 1.101 else {
1189 dl 1.104 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1190     nowRun(e, d, (x == null) ? s : r, fn);
1191     }
1192     return d;
1193     }
1194    
1195     // allOf
1196    
1197     static <T> void nowAnd(CompletableFuture<T> d, Object r, Object s) {
1198     if (d != null) {
1199     Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1200     if (x == null && (s instanceof AltResult))
1201     x = ((AltResult)s).ex;
1202     d.internalComplete(encodeOutcome(null, x));
1203 dl 1.88 }
1204     }
1205    
1206 dl 1.104 static final class DelayedAnd extends BiCompletion<Void> {
1207     DelayedAnd(CompletableFuture<Void> dep,
1208     CompletableFuture<?> src, CompletableFuture<?> snd) {
1209     super(null, dep, src, snd);
1210     }
1211     final CompletableFuture<?> tryAct() {
1212     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1213     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1214     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1215     nowAnd(d, r, s);
1216     src = null; snd = null;
1217     if (d.result != null) return d;
1218 dl 1.88 }
1219 dl 1.104 return null;
1220 dl 1.88 }
1221 dl 1.104 }
1222    
1223     /** Recursively constructs a tree of And completions */
1224     private static CompletableFuture<Void> doAllOf(CompletableFuture<?>[] cfs,
1225     int lo, int hi) {
1226     CompletableFuture<Void> d = new CompletableFuture<Void>();
1227     if (lo > hi) // empty
1228     d.result = NIL;
1229 dl 1.101 else {
1230 dl 1.104 int mid = (lo + hi) >>> 1;
1231     CompletableFuture<?> fst = (lo == mid ? cfs[lo] :
1232     doAllOf(cfs, lo, mid));
1233     CompletableFuture<?> snd = (lo == hi ? fst : // and fst with self
1234     (hi == mid+1) ? cfs[hi] :
1235     doAllOf(cfs, mid+1, hi));
1236     Object r = fst.result, s = snd.result; // throw NPE if null elements
1237     if (r == null || s == null) {
1238     DelayedAnd a = new DelayedAnd(d, fst, snd);
1239     if (fst == snd)
1240     fst.unipush(a);
1241     else
1242     fst.bipushAnded(snd, a);
1243 dl 1.88 }
1244 dl 1.104 else
1245     nowAnd(d, r, s);
1246 dl 1.88 }
1247 dl 1.104 return d;
1248 dl 1.88 }
1249    
1250 dl 1.104 /* ------------- Two-source Ored -------------- */
1251    
1252     /* Pushes c on to completions and o's completions unless either done. */
1253     private <U> void bipushOred(CompletableFuture<?> o, BiCompletion<U> c) {
1254     if (c != null && o != null) {
1255     CompletableFuture<?> d;
1256     while (o.result == null && result == null) {
1257     if (casCompletions(c.next = completions, c)) {
1258     CoBiCompletion<U> q = new CoBiCompletion<U>(c);
1259     while (result == null && o.result == null &&
1260     !o.casCompletions(q.next = o.completions, q))
1261     q.next = null;
1262     break;
1263 dl 1.88 }
1264 dl 1.104 c.next = null;
1265 dl 1.88 }
1266 dl 1.104 if ((d = c.tryAct()) != null)
1267     d.postComplete();
1268     if (o.result != null)
1269     o.postComplete();
1270     if (result != null)
1271     postComplete();
1272 dl 1.88 }
1273 dl 1.104 }
1274    
1275     // Function/applyEither
1276    
1277     static final class DelayedApplyToEither<T,U> extends BiCompletion<U> {
1278     Function<? super T,? extends U> fn;
1279     DelayedApplyToEither(Executor async, CompletableFuture<U> dep,
1280     CompletableFuture<?> src, CompletableFuture<?> snd,
1281     Function<? super T,? extends U> fn) {
1282     super(async, dep, src, snd); this.fn = fn;
1283     }
1284     final CompletableFuture<?> tryAct() {
1285     CompletableFuture<U> d; CompletableFuture<?> a, b; Object r;
1286     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1287     ((r = a.result) != null || (r = b.result) != null) &&
1288     claim(d)) {
1289     nowApply(async, d, r, fn);
1290     src = null; snd = null; fn = null;
1291     if (d.result != null) return d;
1292 dl 1.88 }
1293 dl 1.104 return null;
1294 dl 1.88 }
1295     }
1296    
1297 dl 1.104 private <U> CompletableFuture<U> doApplyToEither(
1298     CompletableFuture<? extends T> o,
1299     Function<? super T, U> fn, Executor e) {
1300     if (o == null || fn == null) throw new NullPointerException();
1301     CompletableFuture<U> d = new CompletableFuture<U>();
1302     Object r = result;
1303     if (r == null && (r = o.result) == null)
1304     bipushOred(o, new DelayedApplyToEither<T,U>(e, d, this, o, fn));
1305     else
1306     nowApply(e, d, r, fn);
1307     return d;
1308     }
1309    
1310     // Consumer/acceptEither
1311    
1312     static final class DelayedAcceptEither<T> extends BiCompletion<Void> {
1313     Consumer<? super T> fn;
1314     DelayedAcceptEither(Executor async, CompletableFuture<Void> dep,
1315     CompletableFuture<?> src, CompletableFuture<?> snd,
1316     Consumer<? super T> fn) {
1317     super(async, dep, src, snd); this.fn = fn;
1318     }
1319     final CompletableFuture<?> tryAct() {
1320     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1321     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1322     ((r = a.result) != null || (r = b.result) != null) &&
1323     claim(d)) {
1324     nowAccept(async, d, r, fn);
1325     src = null; snd = null; fn = null;
1326     if (d.result != null) return d;
1327 dl 1.88 }
1328 dl 1.104 return null;
1329 dl 1.88 }
1330 dl 1.104 }
1331    
1332     private CompletableFuture<Void> doAcceptEither(
1333     CompletableFuture<? extends T> o,
1334     Consumer<? super T> fn, Executor e) {
1335     if (o == null || fn == null) throw new NullPointerException();
1336     CompletableFuture<Void> d = new CompletableFuture<Void>();
1337     Object r = result;
1338     if (r == null && (r = o.result) == null)
1339     bipushOred(o, new DelayedAcceptEither<T>(e, d, this, o, fn));
1340     else
1341     nowAccept(e, d, r, fn);
1342     return d;
1343     }
1344    
1345     // Runnable/runEither
1346    
1347     static final class DelayedRunAfterEither extends BiCompletion<Void> {
1348     Runnable fn;
1349     DelayedRunAfterEither(Executor async, CompletableFuture<Void> dep,
1350     CompletableFuture<?> src,
1351     CompletableFuture<?> snd, Runnable fn) {
1352     super(async, dep, src, snd); this.fn = fn;
1353     }
1354     final CompletableFuture<?> tryAct() {
1355     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1356     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1357     ((r = a.result) != null || (r = b.result) != null) &&
1358     claim(d)) {
1359     nowRun(async, d, r, fn);
1360     src = null; snd = null; fn = null;
1361     if (d.result != null) return d;
1362 dl 1.88 }
1363 dl 1.104 return null;
1364 dl 1.88 }
1365     }
1366    
1367 dl 1.104 private CompletableFuture<Void> doRunAfterEither(
1368     CompletableFuture<?> o, Runnable fn, Executor e) {
1369     if (o == null || fn == null) throw new NullPointerException();
1370     CompletableFuture<Void> d = new CompletableFuture<Void>();
1371     Object r = result;
1372     if (r == null && (r = o.result) == null)
1373     bipushOred(o, new DelayedRunAfterEither(e, d, this, o, fn));
1374     else
1375     nowRun(e, d, r, fn);
1376     return d;
1377     }
1378    
1379     /* ------------- Signallers -------------- */
1380    
1381     /**
1382     * Heuristic spin value for waitingGet() before blocking on
1383     * multiprocessors
1384     */
1385     static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
1386     1 << 8 : 0);
1387    
1388     /**
1389     * Completion for recording and releasing a waiting thread. See
1390     * other classes such as Phaser and SynchronousQueue for more
1391     * detailed explanation. This class implements ManagedBlocker to
1392     * avoid starvation when blocking actions pile up in
1393     * ForkJoinPools.
1394     */
1395     static final class Signaller extends Completion<Void>
1396     implements ForkJoinPool.ManagedBlocker {
1397     long nanos; // wait time if timed
1398     final long deadline; // non-zero if timed
1399     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
1400     volatile Thread thread;
1401     Signaller(boolean interruptible, long nanos, long deadline) {
1402     this.thread = Thread.currentThread();
1403     this.interruptControl = interruptible ? 1 : 0;
1404     this.nanos = nanos;
1405     this.deadline = deadline;
1406     }
1407     final CompletableFuture<?> tryAct() {
1408     Thread w = thread;
1409     if (w != null) {
1410     thread = null; // no need to CAS
1411     LockSupport.unpark(w);
1412 dl 1.88 }
1413 dl 1.104 return null;
1414 dl 1.88 }
1415 dl 1.104 public boolean isReleasable() {
1416     if (thread == null)
1417     return true;
1418     if (Thread.interrupted()) {
1419     int i = interruptControl;
1420     interruptControl = -1;
1421     if (i > 0)
1422     return true;
1423 dl 1.88 }
1424 dl 1.104 if (deadline != 0L &&
1425     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
1426     thread = null;
1427     return true;
1428 dl 1.88 }
1429 dl 1.104 return false;
1430     }
1431     public boolean block() {
1432     if (isReleasable())
1433     return true;
1434     else if (deadline == 0L)
1435     LockSupport.park(this);
1436     else if (nanos > 0L)
1437     LockSupport.parkNanos(this, nanos);
1438     return isReleasable();
1439 dl 1.88 }
1440     }
1441    
1442 dl 1.104 /**
1443     * Returns raw result after waiting, or null if interruptible and
1444     * interrupted.
1445     */
1446     private Object waitingGet(boolean interruptible) {
1447     Signaller q = null;
1448     boolean queued = false;
1449     int spins = SPINS;
1450 dl 1.88 Object r;
1451 dl 1.104 while ((r = result) == null) {
1452     if (spins > 0) {
1453     if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1454     --spins;
1455 dl 1.88 }
1456 dl 1.104 else if (q == null)
1457     q = new Signaller(interruptible, 0L, 0L);
1458     else if (!queued)
1459     queued = casCompletions(q.next = completions, q);
1460     else if (interruptible && q.interruptControl < 0) {
1461     q.thread = null;
1462     removeCancelledSignallers();
1463     return null;
1464 dl 1.88 }
1465 dl 1.104 else if (q.thread != null && result == null) {
1466     try {
1467     ForkJoinPool.managedBlock(q);
1468     } catch (InterruptedException ie) {
1469     q.interruptControl = -1;
1470     }
1471 dl 1.88 }
1472 dl 1.104 }
1473     if (q != null) {
1474     q.thread = null;
1475     if (q.interruptControl < 0) {
1476     if (interruptible)
1477     r = null; // report interruption
1478 dl 1.88 else
1479 dl 1.104 Thread.currentThread().interrupt();
1480 dl 1.88 }
1481     }
1482 dl 1.104 postComplete();
1483     return r;
1484 dl 1.88 }
1485    
1486 dl 1.104 /**
1487     * Returns raw result after waiting, or null if interrupted, or
1488     * throws TimeoutException on timeout.
1489     */
1490     private Object timedGet(long nanos) throws TimeoutException {
1491     if (Thread.interrupted())
1492     return null;
1493     if (nanos <= 0L)
1494     throw new TimeoutException();
1495     long d = System.nanoTime() + nanos;
1496     Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
1497     boolean queued = false;
1498 dl 1.88 Object r;
1499 dl 1.104 while ((r = result) == null) {
1500     if (!queued)
1501     queued = casCompletions(q.next = completions, q);
1502     else if (q.interruptControl < 0 || q.nanos <= 0L) {
1503     q.thread = null;
1504     removeCancelledSignallers();
1505     if (q.interruptControl < 0)
1506     return null;
1507     throw new TimeoutException();
1508     }
1509     else if (q.thread != null && result == null) {
1510     try {
1511     ForkJoinPool.managedBlock(q);
1512     } catch (InterruptedException ie) {
1513     q.interruptControl = -1;
1514     }
1515 dl 1.88 }
1516     }
1517 dl 1.104 q.thread = null;
1518     postComplete();
1519     return (q.interruptControl < 0) ? null : r;
1520     }
1521    
1522     /**
1523     * Unlinks cancelled Signallers to avoid accumulating garbage.
1524     * Internal nodes are simply unspliced without CAS since it is
1525     * harmless if they are traversed anyway. To avoid effects of
1526     * unsplicing from already removed nodes, the list is retraversed
1527     * in case of an apparent race.
1528     */
1529     private void removeCancelledSignallers() {
1530     for (Completion<?> p = null, q = completions; q != null;) {
1531     Completion<?> s = q.next;
1532     if ((q instanceof Signaller) && ((Signaller)q).thread == null) {
1533     if (p != null) {
1534     p.next = s;
1535     if (!(p instanceof Signaller) ||
1536     ((Signaller)p).thread != null)
1537     break;
1538     }
1539     else if (casCompletions(q, s))
1540     break;
1541     p = null; // restart
1542     q = completions;
1543 dl 1.88 }
1544     else {
1545 dl 1.104 p = q;
1546     q = s;
1547 dl 1.88 }
1548     }
1549     }
1550    
1551 dl 1.104 /* ------------- public methods -------------- */
1552 dl 1.88
1553     /**
1554     * Creates a new incomplete CompletableFuture.
1555     */
1556     public CompletableFuture() {
1557     }
1558    
1559     /**
1560     * Returns a new CompletableFuture that is asynchronously completed
1561     * by a task running in the {@link ForkJoinPool#commonPool()} with
1562     * the value obtained by calling the given Supplier.
1563     *
1564     * @param supplier a function returning the value to be used
1565     * to complete the returned CompletableFuture
1566 jsr166 1.95 * @param <U> the function's return type
1567 dl 1.88 * @return the new CompletableFuture
1568     */
1569     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1570     if (supplier == null) throw new NullPointerException();
1571 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
1572     asyncPool.execute(new AsyncSupply<U>(d, supplier));
1573     return d;
1574 dl 1.88 }
1575    
1576     /**
1577     * Returns a new CompletableFuture that is asynchronously completed
1578     * by a task running in the given executor with the value obtained
1579     * by calling the given Supplier.
1580     *
1581     * @param supplier a function returning the value to be used
1582     * to complete the returned CompletableFuture
1583     * @param executor the executor to use for asynchronous execution
1584 jsr166 1.95 * @param <U> the function's return type
1585 dl 1.88 * @return the new CompletableFuture
1586     */
1587     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1588     Executor executor) {
1589 dl 1.109 if (supplier == null) throw new NullPointerException();
1590     Executor e = screenExecutor(executor);
1591 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
1592 dl 1.109 e.execute(new AsyncSupply<U>(d, supplier));
1593 dl 1.104 return d;
1594 dl 1.28 }
1595    
1596     /**
1597 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1598     * by a task running in the {@link ForkJoinPool#commonPool()} after
1599     * it runs the given action.
1600 dl 1.28 *
1601     * @param runnable the action to run before completing the
1602     * returned CompletableFuture
1603 jsr166 1.58 * @return the new CompletableFuture
1604 dl 1.28 */
1605     public static CompletableFuture<Void> runAsync(Runnable runnable) {
1606     if (runnable == null) throw new NullPointerException();
1607 dl 1.104 CompletableFuture<Void> d = new CompletableFuture<Void>();
1608     asyncPool.execute(new AsyncRun<Void>(d, runnable));
1609     return d;
1610 dl 1.28 }
1611    
1612     /**
1613 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1614     * by a task running in the given executor after it runs the given
1615     * action.
1616 dl 1.28 *
1617     * @param runnable the action to run before completing the
1618     * returned CompletableFuture
1619     * @param executor the executor to use for asynchronous execution
1620 jsr166 1.58 * @return the new CompletableFuture
1621 dl 1.28 */
1622     public static CompletableFuture<Void> runAsync(Runnable runnable,
1623     Executor executor) {
1624 dl 1.109 if (runnable == null) throw new NullPointerException();
1625     Executor e = screenExecutor(executor);
1626 dl 1.104 CompletableFuture<Void> d = new CompletableFuture<Void>();
1627 dl 1.109 e.execute(new AsyncRun<Void>(d, runnable));
1628 dl 1.104 return d;
1629 dl 1.28 }
1630    
1631     /**
1632 dl 1.77 * Returns a new CompletableFuture that is already completed with
1633     * the given value.
1634     *
1635     * @param value the value
1636 jsr166 1.95 * @param <U> the type of the value
1637 dl 1.77 * @return the completed CompletableFuture
1638     */
1639     public static <U> CompletableFuture<U> completedFuture(U value) {
1640 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
1641     d.result = (value == null) ? NIL : value;
1642     return d;
1643 dl 1.77 }
1644    
1645     /**
1646 dl 1.28 * Returns {@code true} if completed in any fashion: normally,
1647     * exceptionally, or via cancellation.
1648     *
1649     * @return {@code true} if completed
1650     */
1651     public boolean isDone() {
1652     return result != null;
1653     }
1654    
1655     /**
1656 dl 1.49 * Waits if necessary for this future to complete, and then
1657 dl 1.48 * returns its result.
1658 dl 1.28 *
1659 dl 1.48 * @return the result value
1660     * @throws CancellationException if this future was cancelled
1661     * @throws ExecutionException if this future completed exceptionally
1662 dl 1.28 * @throws InterruptedException if the current thread was interrupted
1663     * while waiting
1664     */
1665     public T get() throws InterruptedException, ExecutionException {
1666 jsr166 1.105 Object r;
1667 dl 1.104 return reportGet((r = result) == null ? waitingGet(true) : r);
1668 dl 1.28 }
1669    
1670     /**
1671 dl 1.49 * Waits if necessary for at most the given time for this future
1672     * to complete, and then returns its result, if available.
1673 dl 1.28 *
1674     * @param timeout the maximum time to wait
1675     * @param unit the time unit of the timeout argument
1676 dl 1.48 * @return the result value
1677     * @throws CancellationException if this future was cancelled
1678     * @throws ExecutionException if this future completed exceptionally
1679 dl 1.28 * @throws InterruptedException if the current thread was interrupted
1680     * while waiting
1681     * @throws TimeoutException if the wait timed out
1682     */
1683     public T get(long timeout, TimeUnit unit)
1684     throws InterruptedException, ExecutionException, TimeoutException {
1685 jsr166 1.105 Object r;
1686 dl 1.28 long nanos = unit.toNanos(timeout);
1687 dl 1.104 return reportGet((r = result) == null ? timedGet(nanos) : r);
1688 dl 1.28 }
1689    
1690     /**
1691     * Returns the result value when complete, or throws an
1692     * (unchecked) exception if completed exceptionally. To better
1693     * conform with the use of common functional forms, if a
1694     * computation involved in the completion of this
1695     * CompletableFuture threw an exception, this method throws an
1696     * (unchecked) {@link CompletionException} with the underlying
1697     * exception as its cause.
1698     *
1699     * @return the result value
1700     * @throws CancellationException if the computation was cancelled
1701 jsr166 1.55 * @throws CompletionException if this future completed
1702     * exceptionally or a completion computation threw an exception
1703 dl 1.28 */
1704     public T join() {
1705 dl 1.104 Object r;
1706 jsr166 1.105 return reportJoin((r = result) == null ? waitingGet(false) : r);
1707 dl 1.28 }
1708    
1709     /**
1710     * Returns the result value (or throws any encountered exception)
1711     * if completed, else returns the given valueIfAbsent.
1712     *
1713     * @param valueIfAbsent the value to return if not completed
1714     * @return the result value, if completed, else the given valueIfAbsent
1715     * @throws CancellationException if the computation was cancelled
1716 jsr166 1.55 * @throws CompletionException if this future completed
1717     * exceptionally or a completion computation threw an exception
1718 dl 1.28 */
1719     public T getNow(T valueIfAbsent) {
1720 dl 1.104 Object r;
1721 jsr166 1.106 return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
1722 dl 1.28 }
1723    
1724     /**
1725     * If not already completed, sets the value returned by {@link
1726     * #get()} and related methods to the given value.
1727     *
1728     * @param value the result value
1729     * @return {@code true} if this invocation caused this CompletableFuture
1730     * to transition to a completed state, else {@code false}
1731     */
1732     public boolean complete(T value) {
1733 dl 1.104 boolean triggered = internalComplete(value == null ? NIL : value);
1734     postComplete();
1735 dl 1.28 return triggered;
1736     }
1737    
1738     /**
1739     * If not already completed, causes invocations of {@link #get()}
1740     * and related methods to throw the given exception.
1741     *
1742     * @param ex the exception
1743     * @return {@code true} if this invocation caused this CompletableFuture
1744     * to transition to a completed state, else {@code false}
1745     */
1746     public boolean completeExceptionally(Throwable ex) {
1747     if (ex == null) throw new NullPointerException();
1748 dl 1.104 boolean triggered = internalComplete(new AltResult(ex));
1749     postComplete();
1750 dl 1.28 return triggered;
1751     }
1752    
1753 dl 1.104 public <U> CompletableFuture<U> thenApply(
1754     Function<? super T,? extends U> fn) {
1755 dl 1.28 return doThenApply(fn, null);
1756     }
1757    
1758 dl 1.104 public <U> CompletableFuture<U> thenApplyAsync(
1759     Function<? super T,? extends U> fn) {
1760     return doThenApply(fn, asyncPool);
1761 dl 1.17 }
1762    
1763 dl 1.104 public <U> CompletableFuture<U> thenApplyAsync(
1764     Function<? super T,? extends U> fn, Executor executor) {
1765 dl 1.109 return doThenApply(fn, screenExecutor(executor));
1766 dl 1.28 }
1767 dl 1.1
1768 dl 1.104 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
1769 dl 1.88 return doThenAccept(action, null);
1770 dl 1.28 }
1771    
1772 dl 1.104 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
1773     return doThenAccept(action, asyncPool);
1774 dl 1.28 }
1775    
1776 dl 1.104 public CompletableFuture<Void> thenAcceptAsync(
1777     Consumer<? super T> action, Executor executor) {
1778 dl 1.109 return doThenAccept(action, screenExecutor(executor));
1779 dl 1.7 }
1780    
1781 dl 1.104 public CompletableFuture<Void> thenRun(Runnable action) {
1782 dl 1.28 return doThenRun(action, null);
1783     }
1784    
1785 dl 1.104 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1786     return doThenRun(action, asyncPool);
1787 dl 1.28 }
1788    
1789 dl 1.104 public CompletableFuture<Void> thenRunAsync(
1790     Runnable action, Executor executor) {
1791 dl 1.109 return doThenRun(action, screenExecutor(executor));
1792 dl 1.28 }
1793    
1794 dl 1.104 public <U,V> CompletableFuture<V> thenCombine(
1795     CompletionStage<? extends U> other,
1796     BiFunction<? super T,? super U,? extends V> fn) {
1797 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, null);
1798 dl 1.28 }
1799    
1800 dl 1.104 public <U,V> CompletableFuture<V> thenCombineAsync(
1801     CompletionStage<? extends U> other,
1802     BiFunction<? super T,? super U,? extends V> fn) {
1803     return doThenCombine(other.toCompletableFuture(), fn, asyncPool);
1804 dl 1.28 }
1805    
1806 dl 1.104 public <U,V> CompletableFuture<V> thenCombineAsync(
1807     CompletionStage<? extends U> other,
1808     BiFunction<? super T,? super U,? extends V> fn,
1809     Executor executor) {
1810 dl 1.109 return doThenCombine(other.toCompletableFuture(), fn,
1811     screenExecutor(executor));
1812 dl 1.1 }
1813    
1814 dl 1.104 public <U> CompletableFuture<Void> thenAcceptBoth(
1815     CompletionStage<? extends U> other,
1816     BiConsumer<? super T, ? super U> action) {
1817 dl 1.88 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
1818 dl 1.28 }
1819    
1820 dl 1.104 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1821     CompletionStage<? extends U> other,
1822     BiConsumer<? super T, ? super U> action) {
1823     return doThenAcceptBoth(other.toCompletableFuture(), action, asyncPool);
1824 dl 1.28 }
1825    
1826 dl 1.104 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1827     CompletionStage<? extends U> other,
1828     BiConsumer<? super T, ? super U> action,
1829     Executor executor) {
1830 dl 1.109 return doThenAcceptBoth(other.toCompletableFuture(), action,
1831     screenExecutor(executor));
1832 dl 1.28 }
1833    
1834 dl 1.104 public CompletableFuture<Void> runAfterBoth(
1835     CompletionStage<?> other, Runnable action) {
1836 dl 1.88 return doRunAfterBoth(other.toCompletableFuture(), action, null);
1837 dl 1.7 }
1838    
1839 dl 1.104 public CompletableFuture<Void> runAfterBothAsync(
1840     CompletionStage<?> other, Runnable action) {
1841     return doRunAfterBoth(other.toCompletableFuture(), action, asyncPool);
1842 dl 1.28 }
1843    
1844 dl 1.104 public CompletableFuture<Void> runAfterBothAsync(
1845     CompletionStage<?> other, Runnable action, Executor executor) {
1846 dl 1.109 return doRunAfterBoth(other.toCompletableFuture(), action,
1847     screenExecutor(executor));
1848 dl 1.28 }
1849    
1850 dl 1.104 public <U> CompletableFuture<U> applyToEither(
1851     CompletionStage<? extends T> other, Function<? super T, U> fn) {
1852 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, null);
1853 dl 1.28 }
1854    
1855 dl 1.104 public <U> CompletableFuture<U> applyToEitherAsync(
1856     CompletionStage<? extends T> other, Function<? super T, U> fn) {
1857     return doApplyToEither(other.toCompletableFuture(), fn, asyncPool);
1858 dl 1.28 }
1859    
1860 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
1861 dl 1.104 (CompletionStage<? extends T> other, Function<? super T, U> fn,
1862 dl 1.48 Executor executor) {
1863 dl 1.109 return doApplyToEither(other.toCompletableFuture(), fn,
1864     screenExecutor(executor));
1865 dl 1.1 }
1866    
1867 dl 1.104 public CompletableFuture<Void> acceptEither(
1868     CompletionStage<? extends T> other, Consumer<? super T> action) {
1869 dl 1.88 return doAcceptEither(other.toCompletableFuture(), action, null);
1870 dl 1.28 }
1871    
1872 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
1873 dl 1.104 (CompletionStage<? extends T> other, Consumer<? super T> action) {
1874     return doAcceptEither(other.toCompletableFuture(), action, asyncPool);
1875 dl 1.28 }
1876    
1877 dl 1.104 public CompletableFuture<Void> acceptEitherAsync(
1878     CompletionStage<? extends T> other, Consumer<? super T> action,
1879     Executor executor) {
1880 dl 1.109 return doAcceptEither(other.toCompletableFuture(), action,
1881     screenExecutor(executor));
1882 dl 1.7 }
1883    
1884 dl 1.104 public CompletableFuture<Void> runAfterEither(
1885     CompletionStage<?> other, Runnable action) {
1886 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, null);
1887 dl 1.28 }
1888    
1889 dl 1.104 public CompletableFuture<Void> runAfterEitherAsync(
1890     CompletionStage<?> other, Runnable action) {
1891     return doRunAfterEither(other.toCompletableFuture(), action, asyncPool);
1892 dl 1.28 }
1893    
1894 dl 1.104 public CompletableFuture<Void> runAfterEitherAsync(
1895     CompletionStage<?> other, Runnable action, Executor executor) {
1896 dl 1.109 return doRunAfterEither(other.toCompletableFuture(), action,
1897     screenExecutor(executor));
1898 dl 1.1 }
1899    
1900 dl 1.48 public <U> CompletableFuture<U> thenCompose
1901 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
1902 jsr166 1.81 return doThenCompose(fn, null);
1903 dl 1.37 }
1904    
1905 dl 1.104 public <U> CompletableFuture<U> thenComposeAsync(
1906     Function<? super T, ? extends CompletionStage<U>> fn) {
1907     return doThenCompose(fn, asyncPool);
1908 dl 1.37 }
1909    
1910 dl 1.104 public <U> CompletableFuture<U> thenComposeAsync(
1911     Function<? super T, ? extends CompletionStage<U>> fn,
1912     Executor executor) {
1913 dl 1.109 return doThenCompose(fn, screenExecutor(executor));
1914 dl 1.37 }
1915    
1916 dl 1.104 public CompletableFuture<T> whenComplete(
1917     BiConsumer<? super T, ? super Throwable> action) {
1918 dl 1.88 return doWhenComplete(action, null);
1919     }
1920    
1921 dl 1.104 public CompletableFuture<T> whenCompleteAsync(
1922     BiConsumer<? super T, ? super Throwable> action) {
1923     return doWhenComplete(action, asyncPool);
1924 dl 1.88 }
1925    
1926 dl 1.104 public CompletableFuture<T> whenCompleteAsync(
1927     BiConsumer<? super T, ? super Throwable> action, Executor executor) {
1928 dl 1.109 return doWhenComplete(action, screenExecutor(executor));
1929 dl 1.88 }
1930    
1931 dl 1.104 public <U> CompletableFuture<U> handle(
1932     BiFunction<? super T, Throwable, ? extends U> fn) {
1933 dl 1.88 return doHandle(fn, null);
1934     }
1935    
1936 dl 1.104 public <U> CompletableFuture<U> handleAsync(
1937     BiFunction<? super T, Throwable, ? extends U> fn) {
1938     return doHandle(fn, asyncPool);
1939 dl 1.88 }
1940    
1941 dl 1.104 public <U> CompletableFuture<U> handleAsync(
1942     BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
1943 dl 1.109 return doHandle(fn, screenExecutor(executor));
1944 dl 1.88 }
1945    
1946     /**
1947 jsr166 1.108 * Returns this CompletableFuture.
1948 dl 1.88 *
1949     * @return this CompletableFuture
1950     */
1951     public CompletableFuture<T> toCompletableFuture() {
1952     return this;
1953 dl 1.28 }
1954    
1955 dl 1.88 // not in interface CompletionStage
1956    
1957 dl 1.28 /**
1958 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
1959     * CompletableFuture completes, with the result of the given
1960     * function of the exception triggering this CompletableFuture's
1961     * completion when it completes exceptionally; otherwise, if this
1962     * CompletableFuture completes normally, then the returned
1963     * CompletableFuture also completes normally with the same value.
1964 dl 1.88 * Note: More flexible versions of this functionality are
1965     * available using methods {@code whenComplete} and {@code handle}.
1966 dl 1.28 *
1967     * @param fn the function to use to compute the value of the
1968     * returned CompletableFuture if this CompletableFuture completed
1969     * exceptionally
1970     * @return the new CompletableFuture
1971     */
1972 dl 1.104 public CompletableFuture<T> exceptionally(
1973     Function<Throwable, ? extends T> fn) {
1974     return doExceptionally(fn);
1975 dl 1.28 }
1976    
1977 dl 1.35 /* ------------- Arbitrary-arity constructions -------------- */
1978    
1979     /**
1980     * Returns a new CompletableFuture that is completed when all of
1981 jsr166 1.66 * the given CompletableFutures complete. If any of the given
1982 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
1983     * CompletableFuture also does so, with a CompletionException
1984     * holding this exception as its cause. Otherwise, the results,
1985     * if any, of the given CompletableFutures are not reflected in
1986     * the returned CompletableFuture, but may be obtained by
1987     * inspecting them individually. If no CompletableFutures are
1988     * provided, returns a CompletableFuture completed with the value
1989     * {@code null}.
1990 dl 1.35 *
1991     * <p>Among the applications of this method is to await completion
1992     * of a set of independent CompletableFutures before continuing a
1993     * program, as in: {@code CompletableFuture.allOf(c1, c2,
1994     * c3).join();}.
1995     *
1996     * @param cfs the CompletableFutures
1997 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
1998 dl 1.35 * given CompletableFutures complete
1999     * @throws NullPointerException if the array or any of its elements are
2000     * {@code null}
2001     */
2002     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2003 dl 1.104 return doAllOf(cfs, 0, cfs.length - 1);
2004 dl 1.35 }
2005    
2006     /**
2007 dl 1.76 * Returns a new CompletableFuture that is completed when any of
2008 jsr166 1.79 * the given CompletableFutures complete, with the same result.
2009     * Otherwise, if it completed exceptionally, the returned
2010 dl 1.77 * CompletableFuture also does so, with a CompletionException
2011     * holding this exception as its cause. If no CompletableFutures
2012     * are provided, returns an incomplete CompletableFuture.
2013 dl 1.35 *
2014     * @param cfs the CompletableFutures
2015 dl 1.77 * @return a new CompletableFuture that is completed with the
2016     * result or exception of any of the given CompletableFutures when
2017     * one completes
2018 dl 1.35 * @throws NullPointerException if the array or any of its elements are
2019     * {@code null}
2020     */
2021 dl 1.77 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2022 dl 1.104 CompletableFuture<Object> d = new CompletableFuture<Object>();
2023     for (int i = 0; i < cfs.length; ++i) {
2024     CompletableFuture<?> c = cfs[i];
2025     Object r = c.result; // throw NPE if null element
2026     if (d.result == null) {
2027     if (r == null)
2028     c.unipush(new DelayedCopy<Object>(d, c));
2029     else
2030     nowCopy(d, r);
2031 dl 1.77 }
2032 dl 1.35 }
2033 dl 1.104 return d;
2034 dl 1.35 }
2035    
2036     /* ------------- Control and status methods -------------- */
2037    
2038 dl 1.28 /**
2039 dl 1.37 * If not already completed, completes this CompletableFuture with
2040     * a {@link CancellationException}. Dependent CompletableFutures
2041     * that have not already completed will also complete
2042     * exceptionally, with a {@link CompletionException} caused by
2043     * this {@code CancellationException}.
2044 dl 1.28 *
2045     * @param mayInterruptIfRunning this value has no effect in this
2046     * implementation because interrupts are not used to control
2047     * processing.
2048     *
2049     * @return {@code true} if this task is now cancelled
2050     */
2051     public boolean cancel(boolean mayInterruptIfRunning) {
2052 dl 1.46 boolean cancelled = (result == null) &&
2053 dl 1.104 internalComplete(new AltResult(new CancellationException()));
2054     postComplete();
2055 dl 1.48 return cancelled || isCancelled();
2056 dl 1.28 }
2057    
2058     /**
2059     * Returns {@code true} if this CompletableFuture was cancelled
2060     * before it completed normally.
2061     *
2062     * @return {@code true} if this CompletableFuture was cancelled
2063     * before it completed normally
2064     */
2065     public boolean isCancelled() {
2066     Object r;
2067 jsr166 1.43 return ((r = result) instanceof AltResult) &&
2068     (((AltResult)r).ex instanceof CancellationException);
2069 dl 1.28 }
2070    
2071     /**
2072 dl 1.88 * Returns {@code true} if this CompletableFuture completed
2073 dl 1.91 * exceptionally, in any way. Possible causes include
2074     * cancellation, explicit invocation of {@code
2075     * completeExceptionally}, and abrupt termination of a
2076     * CompletionStage action.
2077 dl 1.88 *
2078     * @return {@code true} if this CompletableFuture completed
2079     * exceptionally
2080     */
2081     public boolean isCompletedExceptionally() {
2082 dl 1.91 Object r;
2083     return ((r = result) instanceof AltResult) && r != NIL;
2084 dl 1.88 }
2085    
2086     /**
2087 dl 1.28 * Forcibly sets or resets the value subsequently returned by
2088 jsr166 1.42 * method {@link #get()} and related methods, whether or not
2089     * already completed. This method is designed for use only in
2090     * error recovery actions, and even in such situations may result
2091     * in ongoing dependent completions using established versus
2092 dl 1.30 * overwritten outcomes.
2093 dl 1.28 *
2094     * @param value the completion value
2095     */
2096     public void obtrudeValue(T value) {
2097     result = (value == null) ? NIL : value;
2098 dl 1.104 postComplete();
2099 dl 1.28 }
2100    
2101 dl 1.30 /**
2102 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
2103     * and related methods to throw the given exception, whether or
2104     * not already completed. This method is designed for use only in
2105 dl 1.30 * recovery actions, and even in such situations may result in
2106     * ongoing dependent completions using established versus
2107     * overwritten outcomes.
2108     *
2109     * @param ex the exception
2110     */
2111     public void obtrudeException(Throwable ex) {
2112     if (ex == null) throw new NullPointerException();
2113     result = new AltResult(ex);
2114 dl 1.104 postComplete();
2115 dl 1.30 }
2116    
2117 dl 1.35 /**
2118     * Returns the estimated number of CompletableFutures whose
2119     * completions are awaiting completion of this CompletableFuture.
2120     * This method is designed for use in monitoring system state, not
2121     * for synchronization control.
2122     *
2123     * @return the number of dependent CompletableFutures
2124     */
2125     public int getNumberOfDependents() {
2126     int count = 0;
2127 dl 1.104 for (Completion<?> p = completions; p != null; p = p.next)
2128 dl 1.35 ++count;
2129     return count;
2130     }
2131    
2132     /**
2133     * Returns a string identifying this CompletableFuture, as well as
2134 jsr166 1.40 * its completion state. The state, in brackets, contains the
2135 dl 1.35 * String {@code "Completed Normally"} or the String {@code
2136     * "Completed Exceptionally"}, or the String {@code "Not
2137     * completed"} followed by the number of CompletableFutures
2138     * dependent upon its completion, if any.
2139     *
2140     * @return a string identifying this CompletableFuture, as well as its state
2141     */
2142     public String toString() {
2143     Object r = result;
2144 jsr166 1.40 int count;
2145     return super.toString() +
2146     ((r == null) ?
2147     (((count = getNumberOfDependents()) == 0) ?
2148     "[Not completed]" :
2149     "[Not completed, " + count + " dependents]") :
2150     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2151     "[Completed exceptionally]" :
2152     "[Completed normally]"));
2153 dl 1.35 }
2154    
2155 dl 1.1 // Unsafe mechanics
2156     private static final sun.misc.Unsafe UNSAFE;
2157     private static final long RESULT;
2158     private static final long COMPLETIONS;
2159     static {
2160     try {
2161     UNSAFE = sun.misc.Unsafe.getUnsafe();
2162     Class<?> k = CompletableFuture.class;
2163     RESULT = UNSAFE.objectFieldOffset
2164     (k.getDeclaredField("result"));
2165     COMPLETIONS = UNSAFE.objectFieldOffset
2166     (k.getDeclaredField("completions"));
2167 dl 1.104 } catch (Exception x) {
2168     throw new Error(x);
2169 dl 1.1 }
2170     }
2171     }