ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.111
Committed: Sat May 3 16:33:47 2014 UTC (10 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.110: +1 -1 lines
Log Message:
typo

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.88 import java.util.concurrent.CompletionException;
23     import java.util.concurrent.CompletionStage;
24 dl 1.1 import java.util.concurrent.locks.LockSupport;
25    
26     /**
27     * A {@link Future} that may be explicitly completed (setting its
28 dl 1.88 * value and status), and may be used as a {@link CompletionStage},
29     * supporting dependent functions and actions that trigger upon its
30     * completion.
31 dl 1.1 *
32 jsr166 1.50 * <p>When two or more threads attempt to
33     * {@link #complete complete},
34 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
35 jsr166 1.50 * {@link #cancel cancel}
36     * a CompletableFuture, only one of them succeeds.
37 dl 1.19 *
38 dl 1.91 * <p>In addition to these and related methods for directly
39     * manipulating status and results, CompletableFuture implements
40     * interface {@link CompletionStage} with the following policies: <ul>
41 dl 1.35 *
42 dl 1.88 * <li>Actions supplied for dependent completions of
43     * <em>non-async</em> methods may be performed by the thread that
44     * completes the current CompletableFuture, or by any other caller of
45 dl 1.96 * a completion method.</li>
46 jsr166 1.65 *
47 dl 1.88 * <li>All <em>async</em> methods without an explicit Executor
48 dl 1.96 * argument are performed using the {@link ForkJoinPool#commonPool()}
49     * (unless it does not support a parallelism level of at least two, in
50     * which case, a new Thread is used). To simplify monitoring,
51     * debugging, and tracking, all generated asynchronous tasks are
52     * instances of the marker interface {@link
53     * AsynchronousCompletionTask}. </li>
54 dl 1.35 *
55 dl 1.88 * <li>All CompletionStage methods are implemented independently of
56     * other public methods, so the behavior of one method is not impacted
57     * by overrides of others in subclasses. </li> </ul>
58     *
59 dl 1.91 * <p>CompletableFuture also implements {@link Future} with the following
60 dl 1.88 * policies: <ul>
61     *
62     * <li>Since (unlike {@link FutureTask}) this class has no direct
63 jsr166 1.55 * control over the computation that causes it to be completed,
64 dl 1.88 * cancellation is treated as just another form of exceptional
65     * completion. Method {@link #cancel cancel} has the same effect as
66     * {@code completeExceptionally(new CancellationException())}. Method
67     * {@link #isCompletedExceptionally} can be used to determine if a
68     * CompletableFuture completed in any exceptional fashion.</li>
69 jsr166 1.55 *
70 dl 1.88 * <li>In case of exceptional completion with a CompletionException,
71 jsr166 1.55 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
72     * {@link ExecutionException} with the same cause as held in the
73 dl 1.88 * corresponding CompletionException. To simplify usage in most
74     * contexts, this class also defines methods {@link #join()} and
75     * {@link #getNow} that instead throw the CompletionException directly
76     * in these cases.</li> </ul>
77 jsr166 1.80 *
78 dl 1.1 * @author Doug Lea
79     * @since 1.8
80     */
81 dl 1.88 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
82 dl 1.28
83 dl 1.1 /*
84 dl 1.20 * Overview:
85 dl 1.1 *
86 dl 1.104 * A CompletableFuture may have dependent completion actions,
87     * collected in a linked stack. It atomically completes by CASing
88     * a result field, and then pops off and runs those actions. This
89     * applies across normal vs exceptional outcomes, sync vs async
90     * actions, binary triggers, and various forms of completions.
91     *
92     * Non-nullness of field result (set via CAS) indicates done. An
93     * AltResult is used to box null as a result, as well as to hold
94     * exceptions. Using a single field makes completion simple to
95     * detect and trigger. Encoding and decoding is straightforward
96     * but adds vertical sprawl. One minor simplification relies on
97     * the (static) NIL (to box null results) being the only AltResult
98     * with a null exception field, so we don't usually need explicit
99     * comparisons with NIL. Exception propagation mechanics
100     * surrounding decoding rely on unchecked casts of decoded results
101     * really being unchecked, and user type errors being caught at
102     * point of use, as is currently the case in Java. These are
103     * highlighted by using SuppressWarnings annotated temporaries.
104     *
105     * Dependent actions are represented by Completion objects linked
106     * as Treiber stacks headed by field completions. There are four
107     * kinds of Completions: single-source (UniCompletion), two-source
108 dl 1.110 * (BiCompletion), shared (CoCompletion, used by the second
109 dl 1.104 * source of a BiCompletion), and Signallers that unblock waiters.
110     *
111     * The same patterns of methods and classes are used for each form
112     * of Completion (apply, combine, etc), and are written in a
113     * similar style. For each form X there is, when applicable:
114     *
115     * * Method nowX (for example nowApply) that immediately executes
116     * a supplied function and sets result
117     * * Class AsyncX class (for example AsyncApply) that calls nowX
118     * from another task,
119     * * Class DelayedX (for example DelayedApply) that holds
120 dl 1.109 * arguments and calls nowX when ready.
121 dl 1.104 *
122     * For each public CompletionStage method M* (for example
123     * thenApply{Async}), there is a method doM (for example
124     * doThenApply) that creates and/or invokes the appropriate form.
125     * Each deals with three cases that can arise when adding a
126     * dependent completion to CompletableFuture f:
127     *
128     * * f is already complete, so the dependent action is run
129     * immediately, via a "now" method, which, if async,
130     * starts the action in a new task.
131     * * f is not complete, so a Completion action is created and
132     * pushed to f's completions. It is triggered via
133     * f.postComplete when f completes.
134     * * f is not complete, but completes while adding the completion
135     * action, so we try to trigger it upon adding (see method
136     * unipush and derivatives) to cover races.
137     *
138     * Methods with two sources (for example thenCombine) must deal
139     * with races across both while pushing actions. The second
140 jsr166 1.111 * completion is a CoCompletion pointing to the first, shared
141 dl 1.104 * to ensure that at most one claims and performs the action. The
142     * multiple-arity method allOf does this pairwise to form a tree
143     * of completions. (Method anyOf just uses a depth-one Or tree.)
144     *
145     * Upon setting results, method postComplete is called unless
146     * the target is guaranteed not to be observable (i.e., not yet
147     * returned or linked). Multiple threads can call postComplete,
148     * which atomically pops each dependent action, and tries to
149     * trigger it via method tryAct. Any such action must be performed
150     * only once, even if called from several threads, so Completions
151     * maintain status via CAS, and on success run one of the "now"
152     * methods. Triggering can propagate recursively, so tryAct
153     * returns a completed dependent (if one exists) for further
154     * processing by its caller.
155     *
156     * Blocking methods get() and join() rely on Signaller Completions
157     * that wake up waiting threads. The mechanics are similar to
158     * Treiber stack wait-nodes used in FutureTask, Phaser, and
159     * SynchronousQueue. See their internal documentation for
160     * algorithmic details.
161     *
162     * Without precautions, CompletableFutures would be prone to
163     * garbage accumulation as chains of completions build up, each
164     * pointing back to its sources. So we detach (null out) most
165     * Completion fields as soon as possible. To support this,
166     * internal methods check for and harmlessly ignore null arguments
167     * that may have been obtained during races with threads nulling
168     * out fields. (Some of these checked cases cannot currently
169     * happen.) Fields of Async classes can be but currently are not
170     * fully detached, because they do not in general form cycles.
171     */
172    
173     volatile Object result; // Either the result or boxed AltResult
174 dl 1.110 volatile Completion completions; // Treiber stack of dependent actions
175 dl 1.104
176     final boolean internalComplete(Object r) { // CAS from null to r
177     return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
178     }
179    
180 dl 1.110 final boolean casCompletions(Completion cmp, Completion val) {
181 dl 1.104 return UNSAFE.compareAndSwapObject(this, COMPLETIONS, cmp, val);
182     }
183    
184     /* ------------- Encoding and decoding outcomes -------------- */
185    
186     static final class AltResult { // See above
187     final Throwable ex; // null only for NIL
188     AltResult(Throwable x) { this.ex = x; }
189 dl 1.1 }
190    
191     static final AltResult NIL = new AltResult(null);
192    
193 dl 1.20 /**
194 dl 1.104 * Returns the encoding of the given (non-null) exception as a
195     * wrapped CompletionException unless it is one already.
196 dl 1.99 */
197 dl 1.104 static AltResult altThrowable(Throwable x) {
198     return new AltResult((x instanceof CompletionException) ? x :
199     new CompletionException(x));
200 dl 1.99 }
201    
202     /**
203 dl 1.104 * Returns the encoding of the given arguments: if the exception
204     * is non-null, encodes as altThrowable. Otherwise uses the given
205     * value, boxed as NIL if null.
206 dl 1.99 */
207 dl 1.104 static Object encodeOutcome(Object v, Throwable x) {
208     return (x != null) ? altThrowable(x) : (v == null) ? NIL : v;
209 dl 1.20 }
210    
211 dl 1.1 /**
212 jsr166 1.108 * Decodes outcome to return result or throw unchecked exception.
213 dl 1.1 */
214 dl 1.104 private static <T> T reportJoin(Object r) {
215     if (r instanceof AltResult) {
216     Throwable x;
217     if ((x = ((AltResult)r).ex) == null)
218     return null;
219     if (x instanceof CancellationException)
220     throw (CancellationException)x;
221     if (x instanceof CompletionException)
222     throw (CompletionException)x;
223     throw new CompletionException(x);
224 dl 1.28 }
225 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r;
226     return tr;
227 dl 1.1 }
228    
229     /**
230 jsr166 1.108 * Reports result using Future.get conventions.
231 dl 1.1 */
232 dl 1.104 private static <T> T reportGet(Object r)
233     throws InterruptedException, ExecutionException {
234     if (r == null) // by convention below, null means interrupted
235     throw new InterruptedException();
236     if (r instanceof AltResult) {
237     Throwable x, cause;
238     if ((x = ((AltResult)r).ex) == null)
239 dl 1.28 return null;
240 dl 1.104 if (x instanceof CancellationException)
241     throw (CancellationException)x;
242     if ((x instanceof CompletionException) &&
243     (cause = x.getCause()) != null)
244     x = cause;
245     throw new ExecutionException(x);
246 dl 1.28 }
247 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r;
248     return tr;
249 dl 1.1 }
250    
251 dl 1.104 /* ------------- Async Tasks -------------- */
252    
253 dl 1.1 /**
254 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
255 dl 1.28 * {@code async} methods. This may be useful for monitoring,
256     * debugging, and tracking asynchronous activities.
257 jsr166 1.57 *
258     * @since 1.8
259 dl 1.1 */
260 dl 1.28 public static interface AsynchronousCompletionTask {
261 dl 1.1 }
262    
263 dl 1.104 /**
264     * Base class for tasks that can act as either FJ or plain
265     * Runnables. Abstract method compute calls an associated "now"
266     * method. Method exec calls compute if its CompletableFuture is
267     * not already done, and runs completions if done. Fields are not
268     * in general final and can be nulled out after use (but most
269     * currently are not). Classes include serialVersionUIDs even
270     * though they are currently never serialized.
271     */
272     abstract static class Async<T> extends ForkJoinTask<Void>
273 dl 1.28 implements Runnable, AsynchronousCompletionTask {
274 dl 1.104 CompletableFuture<T> dep; // the CompletableFuture to trigger
275     Async(CompletableFuture<T> dep) { this.dep = dep; }
276    
277 jsr166 1.105 abstract void compute(); // call the associated "now" method
278 dl 1.104
279 jsr166 1.105 public final boolean exec() {
280 dl 1.104 CompletableFuture<T> d;
281     if ((d = dep) != null) {
282     if (d.result == null) // suppress if cancelled
283     compute();
284     if (d.result != null)
285     d.postComplete();
286     dep = null; // detach
287     }
288     return true;
289     }
290 dl 1.28 public final Void getRawResult() { return null; }
291     public final void setRawResult(Void v) { }
292     public final void run() { exec(); }
293 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
294 jsr166 1.26 }
295    
296 dl 1.109 /**
297     * Default executor -- ForkJoinPool.commonPool() unless it cannot
298     * support parallelism.
299     */
300     static final Executor asyncPool =
301     (ForkJoinPool.getCommonPoolParallelism() > 1) ?
302     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
303    
304     /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
305     static final class ThreadPerTaskExecutor implements Executor {
306     public void execute(Runnable r) { new Thread(r).start(); }
307     }
308    
309     /**
310     * Null-checks user executor argument, and translates uses of
311     * commonPool to asyncPool in case parallelism disabled.
312     */
313     static Executor screenExecutor(Executor e) {
314     if (e == null) throw new NullPointerException();
315     return (e == ForkJoinPool.commonPool()) ? asyncPool : e;
316     }
317    
318 dl 1.104 /* ------------- Completions -------------- */
319    
320 dl 1.110 abstract static class Completion { // See above
321     volatile Completion next; // Treiber stack link
322 dl 1.104
323     /**
324     * Performs completion action if enabled, returning a
325 jsr166 1.107 * completed dependent CompletableFuture, if one exists.
326 dl 1.104 */
327     abstract CompletableFuture<?> tryAct();
328 dl 1.96 }
329    
330 dl 1.104 /**
331 jsr166 1.107 * Triggers all reachable enabled dependents. Call only when
332 dl 1.104 * known to be done.
333     */
334     final void postComplete() {
335     /*
336     * On each step, variable f holds current completions to pop
337     * and run. It is extended along only one path at a time,
338     * pushing others to avoid StackOverflowErrors on recursion.
339     */
340 dl 1.110 CompletableFuture<?> f = this; Completion h;
341 dl 1.104 while ((h = f.completions) != null ||
342     (f != this && (h = (f = this).completions) != null)) {
343 dl 1.110 CompletableFuture<?> d; Completion t;
344 dl 1.104 if (f.casCompletions(h, t = h.next)) {
345     if (t != null) {
346     if (f != this) { // push
347     do {} while (!casCompletions(h.next = completions, h));
348     continue;
349     }
350     h.next = null; // detach
351 dl 1.28 }
352 dl 1.104 f = (d = h.tryAct()) == null ? this : d;
353 dl 1.19 }
354     }
355     }
356    
357 dl 1.104 /* ------------- One-source Completions -------------- */
358    
359     /**
360     * A Completion with a source and dependent. The "dep" field acts
361     * as a claim, nulled out to disable further attempts to
362     * trigger. Fields can only be observed by other threads upon
363     * successful push; and should be nulled out after claim.
364     */
365 dl 1.110 abstract static class UniCompletion<T> extends Completion {
366 dl 1.104 Executor async; // executor to use (null if none)
367     CompletableFuture<T> dep; // the dependent to complete
368     CompletableFuture<?> src; // source of value for tryAct
369    
370     UniCompletion(Executor async, CompletableFuture<T> dep,
371     CompletableFuture<?> src) {
372     this.async = async; this.dep = dep; this.src = src;
373     }
374    
375     /** Tries to claim completion action by CASing dep to null */
376     final boolean claim(CompletableFuture<T> d) {
377     return UNSAFE.compareAndSwapObject(this, DEP, d, null);
378     }
379    
380     private static final sun.misc.Unsafe UNSAFE;
381     private static final long DEP;
382     static {
383     try {
384     UNSAFE = sun.misc.Unsafe.getUnsafe();
385     Class<?> k = UniCompletion.class;
386     DEP = UNSAFE.objectFieldOffset
387     (k.getDeclaredField("dep"));
388     } catch (Exception x) {
389     throw new Error(x);
390 dl 1.1 }
391     }
392     }
393    
394 dl 1.104 /** Pushes c on to completions, and triggers c if done. */
395 dl 1.110 private void unipush(Completion c) {
396 dl 1.104 if (c != null) {
397     CompletableFuture<?> d;
398     while (result == null && !casCompletions(c.next = completions, c))
399     c.next = null; // clear on CAS failure
400     if ((d = c.tryAct()) != null) // cover races
401     d.postComplete();
402     if (result != null) // clean stack
403     postComplete();
404     }
405     }
406    
407     // Immediate, async, delayed, and routing support for Function/apply
408    
409     static <T,U> void nowApply(Executor e, CompletableFuture<U> d, Object r,
410     Function<? super T,? extends U> f) {
411     if (d != null && f != null) {
412     T t; U u; Throwable x;
413     if (r instanceof AltResult) {
414     t = null;
415     x = ((AltResult)r).ex;
416     }
417     else {
418     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
419     x = null;
420     }
421     if (x == null) {
422 dl 1.21 try {
423 dl 1.104 if (e != null) {
424     e.execute(new AsyncApply<T,U>(d, t, f));
425     return;
426     }
427     u = f.apply(t);
428     } catch (Throwable ex) {
429     x = ex;
430 dl 1.21 u = null;
431     }
432 dl 1.1 }
433 dl 1.104 else
434     u = null;
435     d.internalComplete(encodeOutcome(u, x));
436 dl 1.1 }
437     }
438    
439 dl 1.104 static final class AsyncApply<T,U> extends Async<U> {
440 jsr166 1.105 T arg; Function<? super T,? extends U> fn;
441 dl 1.104 AsyncApply(CompletableFuture<U> dep, T arg,
442     Function<? super T,? extends U> fn) {
443     super(dep); this.arg = arg; this.fn = fn;
444 dl 1.1 }
445 jsr166 1.105 final void compute() { nowApply(null, dep, arg, fn); }
446 dl 1.1 private static final long serialVersionUID = 5232453952276885070L;
447     }
448    
449 dl 1.104 static final class DelayedApply<T,U> extends UniCompletion<U> {
450     Function<? super T,? extends U> fn;
451     DelayedApply(Executor async, CompletableFuture<U> dep,
452     CompletableFuture<?> src,
453     Function<? super T,? extends U> fn) {
454     super(async, dep, src); this.fn = fn;
455     }
456     final CompletableFuture<?> tryAct() {
457     CompletableFuture<U> d; CompletableFuture<?> a; Object r;
458     if ((d = dep) != null && (a = src) != null &&
459     (r = a.result) != null && claim(d)) {
460     nowApply(async, d, r, fn);
461     src = null; fn = null;
462     if (d.result != null) return d;
463 dl 1.7 }
464 dl 1.104 return null;
465 dl 1.7 }
466     }
467    
468 dl 1.104 private <U> CompletableFuture<U> doThenApply(
469     Function<? super T,? extends U> fn, Executor e) {
470     if (fn == null) throw new NullPointerException();
471     CompletableFuture<U> d = new CompletableFuture<U>();
472     Object r = result;
473     if (r == null)
474     unipush(new DelayedApply<T,U>(e, d, this, fn));
475     else
476     nowApply(e, d, r, fn);
477     return d;
478     }
479    
480     // Consumer/accept
481    
482     static <T,U> void nowAccept(Executor e, CompletableFuture<U> d,
483     Object r, Consumer<? super T> f) {
484     if (d != null && f != null) {
485     T t; Throwable x;
486     if (r instanceof AltResult) {
487     t = null;
488     x = ((AltResult)r).ex;
489     }
490     else {
491     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
492     x = null;
493     }
494     if (x == null) {
495 dl 1.21 try {
496 dl 1.104 if (e != null) {
497     e.execute(new AsyncAccept<T,U>(d, t, f));
498     return;
499     }
500     f.accept(t);
501     } catch (Throwable ex) {
502     x = ex;
503 dl 1.21 }
504 dl 1.7 }
505 dl 1.104 d.internalComplete(encodeOutcome(null, x));
506 dl 1.7 }
507     }
508    
509 dl 1.104 static final class AsyncAccept<T,U> extends Async<U> {
510 jsr166 1.105 T arg; Consumer<? super T> fn;
511 dl 1.104 AsyncAccept(CompletableFuture<U> dep, T arg,
512     Consumer<? super T> fn) {
513     super(dep); this.arg = arg; this.fn = fn;
514 dl 1.37 }
515 jsr166 1.105 final void compute() { nowAccept(null, dep, arg, fn); }
516 dl 1.37 private static final long serialVersionUID = 5232453952276885070L;
517     }
518    
519 dl 1.104 static final class DelayedAccept<T> extends UniCompletion<Void> {
520     Consumer<? super T> fn;
521     DelayedAccept(Executor async, CompletableFuture<Void> dep,
522     CompletableFuture<?> src, Consumer<? super T> fn) {
523     super(async, dep, src); this.fn = fn;
524 dl 1.91 }
525 dl 1.104 final CompletableFuture<?> tryAct() {
526     CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
527     if ((d = dep) != null && (a = src) != null &&
528     (r = a.result) != null && claim(d)) {
529     nowAccept(async, d, r, fn);
530     src = null; fn = null;
531     if (d.result != null) return d;
532 dl 1.91 }
533 dl 1.104 return null;
534 dl 1.91 }
535     }
536    
537 dl 1.104 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
538     Executor e) {
539     if (fn == null) throw new NullPointerException();
540     CompletableFuture<Void> d = new CompletableFuture<Void>();
541     Object r = result;
542     if (r == null)
543     unipush(new DelayedAccept<T>(e, d, this, fn));
544     else
545     nowAccept(e, d, r, fn);
546     return d;
547 dl 1.28 }
548    
549 dl 1.104 // Runnable/run
550 dl 1.1
551 dl 1.104 static <T> void nowRun(Executor e, CompletableFuture<T> d, Object r,
552     Runnable f) {
553     if (d != null && f != null) {
554     Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
555     if (x == null) {
556     try {
557     if (e != null) {
558     e.execute(new AsyncRun<T>(d, f));
559     return;
560 dl 1.17 }
561 dl 1.104 f.run();
562     } catch (Throwable ex) {
563     x = ex;
564 dl 1.17 }
565 dl 1.1 }
566 dl 1.104 d.internalComplete(encodeOutcome(null, x));
567 dl 1.1 }
568     }
569    
570 dl 1.104 static final class AsyncRun<T> extends Async<T> {
571 jsr166 1.105 Runnable fn;
572 dl 1.104 AsyncRun(CompletableFuture<T> dep, Runnable fn) {
573     super(dep); this.fn = fn;
574 dl 1.7 }
575 jsr166 1.105 final void compute() { nowRun(null, dep, null, fn); }
576 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
577 dl 1.7 }
578    
579 dl 1.104 static final class DelayedRun extends UniCompletion<Void> {
580 jsr166 1.105 Runnable fn;
581 dl 1.104 DelayedRun(Executor async, CompletableFuture<Void> dep,
582     CompletableFuture<?> src, Runnable fn) {
583     super(async, dep, src); this.fn = fn;
584     }
585     final CompletableFuture<?> tryAct() {
586     CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
587     if ((d = dep) != null && (a = src) != null &&
588     (r = a.result) != null && claim(d)) {
589     nowRun(async, d, r, fn);
590     src = null; fn = null; // clear refs
591     if (d.result != null) return d;
592 dl 1.1 }
593 dl 1.104 return null;
594 dl 1.1 }
595     }
596    
597 dl 1.104 private CompletableFuture<Void> doThenRun(Runnable fn, Executor e) {
598     if (fn == null) throw new NullPointerException();
599     CompletableFuture<Void> d = new CompletableFuture<Void>();
600     Object r = result;
601     if (r == null)
602     unipush(new DelayedRun(e, d, this, fn));
603     else
604     nowRun(e, d, r, fn);
605     return d;
606     }
607    
608     // Supplier/get
609    
610     static <T> void nowSupply(CompletableFuture<T> d, Supplier<T> f) {
611     if (d != null && f != null) {
612     T t; Throwable x;
613     try {
614     t = f.get();
615     x = null;
616     } catch (Throwable ex) {
617     x = ex;
618     t = null;
619 jsr166 1.2 }
620 dl 1.104 d.internalComplete(encodeOutcome(t, x));
621 jsr166 1.2 }
622 dl 1.1 }
623    
624 dl 1.104 static final class AsyncSupply<T> extends Async<T> {
625 jsr166 1.105 Supplier<T> fn;
626 dl 1.104 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
627     super(dep); this.fn = fn;
628 dl 1.7 }
629 jsr166 1.105 final void compute() { nowSupply(dep, fn); }
630 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
631 dl 1.7 }
632    
633 dl 1.104 // WhenComplete
634    
635     static <T> void nowWhen(Executor e, CompletableFuture<T> d, Object r,
636     BiConsumer<? super T,? super Throwable> f) {
637     if (d != null && f != null) {
638     T t; Throwable x, dx;
639     if (r instanceof AltResult) {
640     t = null;
641     x = ((AltResult)r).ex;
642     }
643     else {
644     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
645     x = null;
646     }
647     try {
648     if (e != null) {
649     e.execute(new AsyncWhen<T>(d, r, f));
650     return;
651 jsr166 1.2 }
652 dl 1.104 f.accept(t, x);
653     dx = null;
654     } catch (Throwable ex) {
655     dx = ex;
656 jsr166 1.2 }
657 dl 1.104 d.internalComplete(encodeOutcome(t, x != null ? x : dx));
658 jsr166 1.2 }
659 dl 1.1 }
660    
661 dl 1.104 static final class AsyncWhen<T> extends Async<T> {
662 jsr166 1.105 Object arg; BiConsumer<? super T,? super Throwable> fn;
663 dl 1.104 AsyncWhen(CompletableFuture<T> dep, Object arg,
664     BiConsumer<? super T,? super Throwable> fn) {
665     super(dep); this.arg = arg; this.fn = fn;
666 dl 1.35 }
667 jsr166 1.105 final void compute() { nowWhen(null, dep, arg, fn); }
668 dl 1.35 private static final long serialVersionUID = 5232453952276885070L;
669     }
670    
671 dl 1.104 static final class DelayedWhen<T> extends UniCompletion<T> {
672     BiConsumer<? super T, ? super Throwable> fn;
673     DelayedWhen(Executor async, CompletableFuture<T> dep,
674     CompletableFuture<?> src,
675     BiConsumer<? super T, ? super Throwable> fn) {
676     super(async, dep, src); this.fn = fn;
677     }
678     final CompletableFuture<?> tryAct() {
679     CompletableFuture<T> d; CompletableFuture<?> a; Object r;
680     if ((d = dep) != null && (a = src) != null &&
681     (r = a.result) != null && claim(d)) {
682     nowWhen(async, d, r, fn);
683     src = null; fn = null;
684     if (d.result != null) return d;
685 jsr166 1.2 }
686 dl 1.104 return null;
687 jsr166 1.2 }
688 dl 1.1 }
689    
690 dl 1.104 private CompletableFuture<T> doWhenComplete(
691     BiConsumer<? super T, ? super Throwable> fn, Executor e) {
692     if (fn == null) throw new NullPointerException();
693     CompletableFuture<T> d = new CompletableFuture<T>();
694     Object r = result;
695     if (r == null)
696     unipush(new DelayedWhen<T>(e, d, this, fn));
697     else
698     nowWhen(e, d, r, fn);
699     return d;
700 dl 1.7 }
701    
702 dl 1.104 // Handle
703    
704     static <T,U> void nowHandle(Executor e, CompletableFuture<U> d, Object r,
705     BiFunction<? super T, Throwable, ? extends U> f) {
706     if (d != null && f != null) {
707     T t; U u; Throwable x, dx;
708     if (r instanceof AltResult) {
709     t = null;
710     x = ((AltResult)r).ex;
711     }
712     else {
713     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
714     x = null;
715     }
716     try {
717     if (e != null) {
718     e.execute(new AsyncCombine<T,Throwable,U>(d, t, x, f));
719     return;
720 dl 1.1 }
721 dl 1.104 u = f.apply(t, x);
722     dx = null;
723     } catch (Throwable ex) {
724     dx = ex;
725     u = null;
726 jsr166 1.2 }
727 dl 1.104 d.internalComplete(encodeOutcome(u, dx));
728 jsr166 1.2 }
729 dl 1.1 }
730    
731 dl 1.104 static final class DelayedHandle<T,U> extends UniCompletion<U> {
732     BiFunction<? super T, Throwable, ? extends U> fn;
733     DelayedHandle(Executor async, CompletableFuture<U> dep,
734     CompletableFuture<?> src,
735     BiFunction<? super T, Throwable, ? extends U> fn) {
736     super(async, dep, src); this.fn = fn;
737     }
738     final CompletableFuture<?> tryAct() {
739     CompletableFuture<U> d; CompletableFuture<?> a; Object r;
740     if ((d = dep) != null && (a = src) != null &&
741     (r = a.result) != null && claim(d)) {
742     nowHandle(async, d, r, fn);
743     src = null; fn = null;
744     if (d.result != null) return d;
745 dl 1.35 }
746 dl 1.104 return null;
747 dl 1.35 }
748     }
749    
750 dl 1.104 private <U> CompletableFuture<U> doHandle(
751     BiFunction<? super T, Throwable, ? extends U> fn,
752     Executor e) {
753     if (fn == null) throw new NullPointerException();
754     CompletableFuture<U> d = new CompletableFuture<U>();
755     Object r = result;
756     if (r == null)
757     unipush(new DelayedHandle<T,U>(e, d, this, fn));
758     else
759     nowHandle(e, d, r, fn);
760     return d;
761 dl 1.1 }
762    
763 dl 1.104 // Exceptionally
764    
765     static <T> void nowExceptionally(CompletableFuture<T> d, Object r,
766     Function<? super Throwable, ? extends T> f) {
767     if (d != null && f != null) {
768     T t; Throwable x, dx;
769     if ((r instanceof AltResult) && (x = ((AltResult)r).ex) != null) {
770     try {
771     t = f.apply(x);
772     dx = null;
773     } catch (Throwable ex) {
774     dx = ex;
775 dl 1.88 t = null;
776     }
777     }
778 dl 1.104 else {
779     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
780     dx = null;
781     }
782     d.internalComplete(encodeOutcome(t, dx));
783 dl 1.88 }
784     }
785    
786 dl 1.104 static final class DelayedExceptionally<T> extends UniCompletion<T> {
787     Function<? super Throwable, ? extends T> fn;
788     DelayedExceptionally(CompletableFuture<T> dep, CompletableFuture<?> src,
789     Function<? super Throwable, ? extends T> fn) {
790     super(null, dep, src); this.fn = fn;
791     }
792     final CompletableFuture<?> tryAct() {
793     CompletableFuture<T> d; CompletableFuture<?> a; Object r;
794     if ((d = dep) != null && (a = src) != null &&
795     (r = a.result) != null && claim(d)) {
796     nowExceptionally(d, r, fn);
797     src = null; fn = null;
798     if (d.result != null) return d;
799 dl 1.17 }
800 dl 1.104 return null;
801 dl 1.17 }
802     }
803    
804 dl 1.104 private CompletableFuture<T> doExceptionally(
805     Function<Throwable, ? extends T> fn) {
806     if (fn == null) throw new NullPointerException();
807     CompletableFuture<T> d = new CompletableFuture<T>();
808     Object r = result;
809     if (r == null)
810     unipush(new DelayedExceptionally<T>(d, this, fn));
811     else
812     nowExceptionally(d, r, fn);
813     return d;
814 dl 1.75 }
815    
816 dl 1.104 // Identity function used by nowCompose and anyOf
817    
818     static <T> void nowCopy(CompletableFuture<T> d, Object r) {
819     if (d != null && d.result == null) {
820     Throwable x;
821     d.internalComplete(((r instanceof AltResult) &&
822     (x = ((AltResult)r).ex) != null &&
823     !(x instanceof CompletionException)) ?
824     new AltResult(new CompletionException(x)): r);
825 dl 1.17 }
826     }
827    
828 dl 1.104 static final class DelayedCopy<T> extends UniCompletion<T> {
829     DelayedCopy(CompletableFuture<T> dep, CompletableFuture<?> src) {
830     super(null, dep, src);
831     }
832     final CompletableFuture<?> tryAct() {
833     CompletableFuture<T> d; CompletableFuture<?> a; Object r;
834     if ((d = dep) != null && (a = src) != null &&
835     (r = a.result) != null && claim(d)) {
836     nowCopy(d, r);
837     src = null;
838     if (d.result != null) return d;
839 dl 1.28 }
840 dl 1.104 return null;
841 dl 1.28 }
842     }
843    
844 dl 1.104 // Compose
845 dl 1.28
846 dl 1.104 static <T,U> void nowCompose(Executor e, CompletableFuture<U> d, Object r,
847     Function<? super T, ? extends CompletionStage<U>> f) {
848     if (d != null && f != null) {
849     T t; Throwable x;
850 dl 1.88 if (r instanceof AltResult) {
851     t = null;
852 dl 1.104 x = ((AltResult)r).ex;
853 dl 1.88 }
854     else {
855 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
856     x = null;
857 dl 1.88 }
858 dl 1.104 if (x == null) {
859 dl 1.88 try {
860     if (e != null)
861 dl 1.104 e.execute(new AsyncCompose<T,U>(d, t, f));
862     else {
863     CompletableFuture<U> c =
864     f.apply(t).toCompletableFuture();
865     Object s = c.result;
866     if (s == null)
867     c.unipush(new DelayedCopy<U>(d, c));
868     else
869     nowCopy(d, s);
870     }
871     return;
872     } catch (Throwable ex) {
873     x = ex;
874 dl 1.88 }
875     }
876 dl 1.104 d.internalComplete(encodeOutcome(null, x));
877 dl 1.88 }
878 dl 1.28 }
879    
880 dl 1.104 static final class AsyncCompose<T,U> extends Async<U> {
881 jsr166 1.105 T arg; Function<? super T, ? extends CompletionStage<U>> fn;
882 dl 1.104 AsyncCompose(CompletableFuture<U> dep, T arg,
883     Function<? super T, ? extends CompletionStage<U>> fn) {
884     super(dep); this.arg = arg; this.fn = fn;
885     }
886 jsr166 1.105 final void compute() { nowCompose(null, dep, arg, fn); }
887 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
888     }
889    
890     static final class DelayedCompose<T,U> extends UniCompletion<U> {
891     Function<? super T, ? extends CompletionStage<U>> fn;
892     DelayedCompose(Executor async, CompletableFuture<U> dep,
893     CompletableFuture<?> src,
894     Function<? super T, ? extends CompletionStage<U>> fn) {
895     super(async, dep, src); this.fn = fn;
896     }
897     final CompletableFuture<?> tryAct() {
898     CompletableFuture<U> d; CompletableFuture<?> a; Object r;
899     if ((d = dep) != null && (a = src) != null &&
900     (r = a.result) != null && claim(d)) {
901     nowCompose(async, d, r, fn);
902     src = null; fn = null;
903     if (d.result != null) return d;
904     }
905     return null;
906     }
907     }
908    
909     private <U> CompletableFuture<U> doThenCompose(
910     Function<? super T, ? extends CompletionStage<U>> fn, Executor e) {
911 dl 1.88 if (fn == null) throw new NullPointerException();
912 dl 1.104 Object r = result;
913     if (r == null || e != null) {
914     CompletableFuture<U> d = new CompletableFuture<U>();
915     if (r == null)
916     unipush(new DelayedCompose<T,U>(e, d, this, fn));
917     else
918     nowCompose(e, d, r, fn);
919     return d;
920 dl 1.88 }
921 dl 1.104 else { // try to return function result
922     T t; Throwable x;
923 dl 1.88 if (r instanceof AltResult) {
924     t = null;
925 dl 1.104 x = ((AltResult)r).ex;
926 dl 1.88 }
927     else {
928 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
929     x = null;
930 dl 1.88 }
931 dl 1.104 if (x == null) {
932 dl 1.88 try {
933 dl 1.104 return fn.apply(t).toCompletableFuture();
934     } catch (Throwable ex) {
935     x = ex;
936 dl 1.88 }
937     }
938 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
939     d.result = encodeOutcome(null, x);
940     return d;
941 dl 1.88 }
942 dl 1.28 }
943    
944 dl 1.104 /* ------------- Two-source Completions -------------- */
945    
946     /** A Completion with two sources */
947 jsr166 1.106 abstract static class BiCompletion<T> extends UniCompletion<T> {
948 dl 1.104 CompletableFuture<?> snd; // second source for tryAct
949     BiCompletion(Executor async, CompletableFuture<T> dep,
950     CompletableFuture<?> src, CompletableFuture<?> snd) {
951     super(async, dep, src); this.snd = snd;
952     }
953     }
954    
955 dl 1.110 /** A Completion delegating to another Completion */
956     static final class CoCompletion extends Completion {
957     Completion completion;
958     CoCompletion(Completion completion) {
959 dl 1.104 this.completion = completion;
960 dl 1.88 }
961 dl 1.104 final CompletableFuture<?> tryAct() {
962 dl 1.110 Completion c; CompletableFuture<?> d;
963     if ((c = completion) == null || (d = c.tryAct()) == null)
964     return null;
965     completion = null; // detach
966     return d;
967 dl 1.88 }
968     }
969    
970 dl 1.104 /* ------------- Two-source Anded -------------- */
971    
972     /* Pushes c on to completions and o's completions unless both done. */
973 dl 1.110 private void bipushAnded(CompletableFuture<?> o, Completion c) {
974 dl 1.104 if (c != null && o != null) {
975     Object r; CompletableFuture<?> d;
976     while ((r = result) == null &&
977     !casCompletions(c.next = completions, c))
978     c.next = null;
979     if (o.result == null) {
980 dl 1.110 Completion q = (r != null) ? c : new CoCompletion(c);
981 dl 1.104 while (o.result == null &&
982     !o.casCompletions(q.next = o.completions, q))
983     q.next = null;
984 dl 1.88 }
985 dl 1.104 if ((d = c.tryAct()) != null)
986     d.postComplete();
987     if (o.result != null)
988     o.postComplete();
989     if (result != null)
990     postComplete();
991 dl 1.88 }
992 dl 1.104 }
993    
994     // BiFunction/combine
995    
996     static <T,U,V> void nowCombine(Executor e, CompletableFuture<V> d,
997     Object r, Object s,
998     BiFunction<? super T,? super U,? extends V> f) {
999     if (d != null && f != null) {
1000     T t; U u; V v; Throwable x;
1001 dl 1.88 if (r instanceof AltResult) {
1002     t = null;
1003 dl 1.104 x = ((AltResult)r).ex;
1004 dl 1.88 }
1005     else {
1006 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1007     x = null;
1008 dl 1.88 }
1009 dl 1.104 if (x != null)
1010 dl 1.88 u = null;
1011     else if (s instanceof AltResult) {
1012 dl 1.104 x = ((AltResult)s).ex;
1013 dl 1.88 u = null;
1014     }
1015     else {
1016 dl 1.104 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1017 dl 1.88 }
1018 dl 1.104 if (x == null) {
1019 dl 1.88 try {
1020 dl 1.104 if (e != null) {
1021     e.execute(new AsyncCombine<T,U,V>(d, t, u, f));
1022     return;
1023     }
1024     v = f.apply(t, u);
1025     } catch (Throwable ex) {
1026     x = ex;
1027     v = null;
1028 dl 1.88 }
1029     }
1030 dl 1.104 else
1031     v = null;
1032     d.internalComplete(encodeOutcome(v, x));
1033 dl 1.88 }
1034     }
1035    
1036 dl 1.104 static final class AsyncCombine<T,U,V> extends Async<V> {
1037 jsr166 1.105 T arg1; U arg2; BiFunction<? super T,? super U,? extends V> fn;
1038 dl 1.104 AsyncCombine(CompletableFuture<V> dep, T arg1, U arg2,
1039     BiFunction<? super T,? super U,? extends V> fn) {
1040     super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1041     }
1042 jsr166 1.105 final void compute() { nowCombine(null, dep, arg1, arg2, fn); }
1043 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
1044     }
1045    
1046     static final class DelayedCombine<T,U,V> extends BiCompletion<V> {
1047     BiFunction<? super T,? super U,? extends V> fn;
1048     DelayedCombine(Executor async, CompletableFuture<V> dep,
1049     CompletableFuture<?> src, CompletableFuture<?> snd,
1050     BiFunction<? super T,? super U,? extends V> fn) {
1051     super(async, dep, src, snd); this.fn = fn;
1052     }
1053     final CompletableFuture<?> tryAct() {
1054     CompletableFuture<V> d; CompletableFuture<?> a, b; Object r, s;
1055     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1056     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1057     nowCombine(async, d, r, s, fn);
1058     src = null; snd = null; fn = null;
1059     if (d.result != null) return d;
1060 dl 1.88 }
1061 dl 1.104 return null;
1062 dl 1.88 }
1063 dl 1.104 }
1064    
1065     private <U,V> CompletableFuture<V> doThenCombine(
1066     CompletableFuture<? extends U> o,
1067     BiFunction<? super T,? super U,? extends V> fn,
1068     Executor e) {
1069     if (o == null || fn == null) throw new NullPointerException();
1070     CompletableFuture<V> d = new CompletableFuture<V>();
1071     Object r = result, s = o.result;
1072     if (r == null || s == null)
1073     bipushAnded(o, new DelayedCombine<T,U,V>(e, d, this, o, fn));
1074     else
1075     nowCombine(e, d, r, s, fn);
1076     return d;
1077     }
1078    
1079     // BiConsumer/AcceptBoth
1080    
1081     static <T,U,V> void nowAcceptBoth(Executor e, CompletableFuture<V> d,
1082     Object r, Object s,
1083     BiConsumer<? super T,? super U> f) {
1084     if (d != null && f != null) {
1085     T t; U u; Throwable x;
1086 dl 1.88 if (r instanceof AltResult) {
1087     t = null;
1088 dl 1.104 x = ((AltResult)r).ex;
1089 dl 1.88 }
1090     else {
1091 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1092     x = null;
1093 dl 1.88 }
1094 dl 1.104 if (x != null)
1095 dl 1.88 u = null;
1096     else if (s instanceof AltResult) {
1097 dl 1.104 x = ((AltResult)s).ex;
1098 dl 1.88 u = null;
1099     }
1100     else {
1101 dl 1.104 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1102 dl 1.88 }
1103 dl 1.104 if (x == null) {
1104 dl 1.88 try {
1105 dl 1.104 if (e != null) {
1106     e.execute(new AsyncAcceptBoth<T,U,V>(d, t, u, f));
1107     return;
1108     }
1109     f.accept(t, u);
1110     } catch (Throwable ex) {
1111     x = ex;
1112 dl 1.88 }
1113     }
1114 dl 1.104 d.internalComplete(encodeOutcome(null, x));
1115     }
1116     }
1117    
1118     static final class AsyncAcceptBoth<T,U,V> extends Async<V> {
1119 jsr166 1.105 T arg1; U arg2; BiConsumer<? super T,? super U> fn;
1120 dl 1.104 AsyncAcceptBoth(CompletableFuture<V> dep, T arg1, U arg2,
1121     BiConsumer<? super T,? super U> fn) {
1122     super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1123     }
1124 jsr166 1.105 final void compute() { nowAcceptBoth(null, dep, arg1, arg2, fn); }
1125 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
1126     }
1127    
1128     static final class DelayedAcceptBoth<T,U> extends BiCompletion<Void> {
1129     BiConsumer<? super T,? super U> fn;
1130     DelayedAcceptBoth(Executor async, CompletableFuture<Void> dep,
1131     CompletableFuture<?> src, CompletableFuture<?> snd,
1132     BiConsumer<? super T,? super U> fn) {
1133     super(async, dep, src, snd); this.fn = fn;
1134     }
1135     final CompletableFuture<?> tryAct() {
1136     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1137     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1138     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1139     nowAcceptBoth(async, d, r, s, fn);
1140     src = null; snd = null; fn = null;
1141     if (d.result != null) return d;
1142     }
1143     return null;
1144 dl 1.88 }
1145     }
1146    
1147 dl 1.104 private <U> CompletableFuture<Void> doThenAcceptBoth(
1148     CompletableFuture<? extends U> o,
1149     BiConsumer<? super T, ? super U> fn,
1150     Executor e) {
1151     if (o == null || fn == null) throw new NullPointerException();
1152     CompletableFuture<Void> d = new CompletableFuture<Void>();
1153     Object r = result, s = o.result;
1154     if (r == null || s == null)
1155     bipushAnded(o, new DelayedAcceptBoth<T,U>(e, d, this, o, fn));
1156     else
1157     nowAcceptBoth(e, d, r, s, fn);
1158     return d;
1159     }
1160    
1161     // Runnable/both
1162    
1163     static final class DelayedRunAfterBoth extends BiCompletion<Void> {
1164     Runnable fn;
1165     DelayedRunAfterBoth(Executor async, CompletableFuture<Void> dep,
1166     CompletableFuture<?> src, CompletableFuture<?> snd,
1167     Runnable fn) {
1168     super(async, dep, src, snd); this.fn = fn;
1169     }
1170     final CompletableFuture<?> tryAct() {
1171     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1172     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1173     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1174     Throwable x = (r instanceof AltResult) ?
1175     ((AltResult)r).ex : null;
1176     nowRun(async, d, (x == null) ? s : r, fn);
1177     src = null; snd = null; fn = null;
1178     if (d.result != null) return d;
1179 dl 1.88 }
1180 dl 1.104 return null;
1181 dl 1.88 }
1182 dl 1.104 }
1183    
1184     private CompletableFuture<Void> doRunAfterBoth(
1185     CompletableFuture<?> o, Runnable fn, Executor e) {
1186     if (o == null || fn == null) throw new NullPointerException();
1187     CompletableFuture<Void> d = new CompletableFuture<Void>();
1188     Object r = result, s = o.result;
1189     if (r == null || s == null)
1190     bipushAnded(o, new DelayedRunAfterBoth(e, d, this, o, fn));
1191 dl 1.101 else {
1192 dl 1.104 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1193     nowRun(e, d, (x == null) ? s : r, fn);
1194     }
1195     return d;
1196     }
1197    
1198     // allOf
1199    
1200     static <T> void nowAnd(CompletableFuture<T> d, Object r, Object s) {
1201     if (d != null) {
1202     Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1203     if (x == null && (s instanceof AltResult))
1204     x = ((AltResult)s).ex;
1205     d.internalComplete(encodeOutcome(null, x));
1206 dl 1.88 }
1207     }
1208    
1209 dl 1.104 static final class DelayedAnd extends BiCompletion<Void> {
1210     DelayedAnd(CompletableFuture<Void> dep,
1211     CompletableFuture<?> src, CompletableFuture<?> snd) {
1212     super(null, dep, src, snd);
1213     }
1214     final CompletableFuture<?> tryAct() {
1215     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1216     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1217     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1218     nowAnd(d, r, s);
1219     src = null; snd = null;
1220     if (d.result != null) return d;
1221 dl 1.88 }
1222 dl 1.104 return null;
1223 dl 1.88 }
1224 dl 1.104 }
1225    
1226     /** Recursively constructs a tree of And completions */
1227     private static CompletableFuture<Void> doAllOf(CompletableFuture<?>[] cfs,
1228     int lo, int hi) {
1229     CompletableFuture<Void> d = new CompletableFuture<Void>();
1230     if (lo > hi) // empty
1231     d.result = NIL;
1232 dl 1.101 else {
1233 dl 1.104 int mid = (lo + hi) >>> 1;
1234     CompletableFuture<?> fst = (lo == mid ? cfs[lo] :
1235     doAllOf(cfs, lo, mid));
1236     CompletableFuture<?> snd = (lo == hi ? fst : // and fst with self
1237     (hi == mid+1) ? cfs[hi] :
1238     doAllOf(cfs, mid+1, hi));
1239     Object r = fst.result, s = snd.result; // throw NPE if null elements
1240     if (r == null || s == null) {
1241     DelayedAnd a = new DelayedAnd(d, fst, snd);
1242     if (fst == snd)
1243     fst.unipush(a);
1244     else
1245     fst.bipushAnded(snd, a);
1246 dl 1.88 }
1247 dl 1.104 else
1248     nowAnd(d, r, s);
1249 dl 1.88 }
1250 dl 1.104 return d;
1251 dl 1.88 }
1252    
1253 dl 1.104 /* ------------- Two-source Ored -------------- */
1254    
1255     /* Pushes c on to completions and o's completions unless either done. */
1256 dl 1.110 private void bipushOred(CompletableFuture<?> o, Completion c) {
1257 dl 1.104 if (c != null && o != null) {
1258     CompletableFuture<?> d;
1259     while (o.result == null && result == null) {
1260     if (casCompletions(c.next = completions, c)) {
1261 dl 1.110 CoCompletion q = new CoCompletion(c);
1262 dl 1.104 while (result == null && o.result == null &&
1263     !o.casCompletions(q.next = o.completions, q))
1264     q.next = null;
1265     break;
1266 dl 1.88 }
1267 dl 1.104 c.next = null;
1268 dl 1.88 }
1269 dl 1.104 if ((d = c.tryAct()) != null)
1270     d.postComplete();
1271     if (o.result != null)
1272     o.postComplete();
1273     if (result != null)
1274     postComplete();
1275 dl 1.88 }
1276 dl 1.104 }
1277    
1278     // Function/applyEither
1279    
1280     static final class DelayedApplyToEither<T,U> extends BiCompletion<U> {
1281     Function<? super T,? extends U> fn;
1282     DelayedApplyToEither(Executor async, CompletableFuture<U> dep,
1283     CompletableFuture<?> src, CompletableFuture<?> snd,
1284     Function<? super T,? extends U> fn) {
1285     super(async, dep, src, snd); this.fn = fn;
1286     }
1287     final CompletableFuture<?> tryAct() {
1288     CompletableFuture<U> d; CompletableFuture<?> a, b; Object r;
1289     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1290     ((r = a.result) != null || (r = b.result) != null) &&
1291     claim(d)) {
1292     nowApply(async, d, r, fn);
1293     src = null; snd = null; fn = null;
1294     if (d.result != null) return d;
1295 dl 1.88 }
1296 dl 1.104 return null;
1297 dl 1.88 }
1298     }
1299    
1300 dl 1.104 private <U> CompletableFuture<U> doApplyToEither(
1301     CompletableFuture<? extends T> o,
1302     Function<? super T, U> fn, Executor e) {
1303     if (o == null || fn == null) throw new NullPointerException();
1304     CompletableFuture<U> d = new CompletableFuture<U>();
1305     Object r = result;
1306     if (r == null && (r = o.result) == null)
1307     bipushOred(o, new DelayedApplyToEither<T,U>(e, d, this, o, fn));
1308     else
1309     nowApply(e, d, r, fn);
1310     return d;
1311     }
1312    
1313     // Consumer/acceptEither
1314    
1315     static final class DelayedAcceptEither<T> extends BiCompletion<Void> {
1316     Consumer<? super T> fn;
1317     DelayedAcceptEither(Executor async, CompletableFuture<Void> dep,
1318     CompletableFuture<?> src, CompletableFuture<?> snd,
1319     Consumer<? super T> fn) {
1320     super(async, dep, src, snd); this.fn = fn;
1321     }
1322     final CompletableFuture<?> tryAct() {
1323     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1324     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1325     ((r = a.result) != null || (r = b.result) != null) &&
1326     claim(d)) {
1327     nowAccept(async, d, r, fn);
1328     src = null; snd = null; fn = null;
1329     if (d.result != null) return d;
1330 dl 1.88 }
1331 dl 1.104 return null;
1332 dl 1.88 }
1333 dl 1.104 }
1334    
1335     private CompletableFuture<Void> doAcceptEither(
1336     CompletableFuture<? extends T> o,
1337     Consumer<? super T> fn, Executor e) {
1338     if (o == null || fn == null) throw new NullPointerException();
1339     CompletableFuture<Void> d = new CompletableFuture<Void>();
1340     Object r = result;
1341     if (r == null && (r = o.result) == null)
1342     bipushOred(o, new DelayedAcceptEither<T>(e, d, this, o, fn));
1343     else
1344     nowAccept(e, d, r, fn);
1345     return d;
1346     }
1347    
1348     // Runnable/runEither
1349    
1350     static final class DelayedRunAfterEither extends BiCompletion<Void> {
1351     Runnable fn;
1352     DelayedRunAfterEither(Executor async, CompletableFuture<Void> dep,
1353     CompletableFuture<?> src,
1354     CompletableFuture<?> snd, Runnable fn) {
1355     super(async, dep, src, snd); this.fn = fn;
1356     }
1357     final CompletableFuture<?> tryAct() {
1358     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1359     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1360     ((r = a.result) != null || (r = b.result) != null) &&
1361     claim(d)) {
1362     nowRun(async, d, r, fn);
1363     src = null; snd = null; fn = null;
1364     if (d.result != null) return d;
1365 dl 1.88 }
1366 dl 1.104 return null;
1367 dl 1.88 }
1368     }
1369    
1370 dl 1.104 private CompletableFuture<Void> doRunAfterEither(
1371     CompletableFuture<?> o, Runnable fn, Executor e) {
1372     if (o == null || fn == null) throw new NullPointerException();
1373     CompletableFuture<Void> d = new CompletableFuture<Void>();
1374     Object r = result;
1375     if (r == null && (r = o.result) == null)
1376     bipushOred(o, new DelayedRunAfterEither(e, d, this, o, fn));
1377     else
1378     nowRun(e, d, r, fn);
1379     return d;
1380     }
1381    
1382     /* ------------- Signallers -------------- */
1383    
1384     /**
1385     * Heuristic spin value for waitingGet() before blocking on
1386     * multiprocessors
1387     */
1388     static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
1389     1 << 8 : 0);
1390    
1391     /**
1392     * Completion for recording and releasing a waiting thread. See
1393     * other classes such as Phaser and SynchronousQueue for more
1394     * detailed explanation. This class implements ManagedBlocker to
1395     * avoid starvation when blocking actions pile up in
1396     * ForkJoinPools.
1397     */
1398 dl 1.110 static final class Signaller extends Completion
1399 dl 1.104 implements ForkJoinPool.ManagedBlocker {
1400     long nanos; // wait time if timed
1401     final long deadline; // non-zero if timed
1402     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
1403     volatile Thread thread;
1404     Signaller(boolean interruptible, long nanos, long deadline) {
1405     this.thread = Thread.currentThread();
1406     this.interruptControl = interruptible ? 1 : 0;
1407     this.nanos = nanos;
1408     this.deadline = deadline;
1409     }
1410     final CompletableFuture<?> tryAct() {
1411     Thread w = thread;
1412     if (w != null) {
1413     thread = null; // no need to CAS
1414     LockSupport.unpark(w);
1415 dl 1.88 }
1416 dl 1.104 return null;
1417 dl 1.88 }
1418 dl 1.104 public boolean isReleasable() {
1419     if (thread == null)
1420     return true;
1421     if (Thread.interrupted()) {
1422     int i = interruptControl;
1423     interruptControl = -1;
1424     if (i > 0)
1425     return true;
1426 dl 1.88 }
1427 dl 1.104 if (deadline != 0L &&
1428     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
1429     thread = null;
1430     return true;
1431 dl 1.88 }
1432 dl 1.104 return false;
1433     }
1434     public boolean block() {
1435     if (isReleasable())
1436     return true;
1437     else if (deadline == 0L)
1438     LockSupport.park(this);
1439     else if (nanos > 0L)
1440     LockSupport.parkNanos(this, nanos);
1441     return isReleasable();
1442 dl 1.88 }
1443     }
1444    
1445 dl 1.104 /**
1446     * Returns raw result after waiting, or null if interruptible and
1447     * interrupted.
1448     */
1449     private Object waitingGet(boolean interruptible) {
1450     Signaller q = null;
1451     boolean queued = false;
1452     int spins = SPINS;
1453 dl 1.88 Object r;
1454 dl 1.104 while ((r = result) == null) {
1455     if (spins > 0) {
1456     if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1457     --spins;
1458 dl 1.88 }
1459 dl 1.104 else if (q == null)
1460     q = new Signaller(interruptible, 0L, 0L);
1461     else if (!queued)
1462     queued = casCompletions(q.next = completions, q);
1463     else if (interruptible && q.interruptControl < 0) {
1464     q.thread = null;
1465     removeCancelledSignallers();
1466     return null;
1467 dl 1.88 }
1468 dl 1.104 else if (q.thread != null && result == null) {
1469     try {
1470     ForkJoinPool.managedBlock(q);
1471     } catch (InterruptedException ie) {
1472     q.interruptControl = -1;
1473     }
1474 dl 1.88 }
1475 dl 1.104 }
1476     if (q != null) {
1477     q.thread = null;
1478     if (q.interruptControl < 0) {
1479     if (interruptible)
1480     r = null; // report interruption
1481 dl 1.88 else
1482 dl 1.104 Thread.currentThread().interrupt();
1483 dl 1.88 }
1484     }
1485 dl 1.104 postComplete();
1486     return r;
1487 dl 1.88 }
1488    
1489 dl 1.104 /**
1490     * Returns raw result after waiting, or null if interrupted, or
1491     * throws TimeoutException on timeout.
1492     */
1493     private Object timedGet(long nanos) throws TimeoutException {
1494     if (Thread.interrupted())
1495     return null;
1496     if (nanos <= 0L)
1497     throw new TimeoutException();
1498     long d = System.nanoTime() + nanos;
1499     Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
1500     boolean queued = false;
1501 dl 1.88 Object r;
1502 dl 1.104 while ((r = result) == null) {
1503     if (!queued)
1504     queued = casCompletions(q.next = completions, q);
1505     else if (q.interruptControl < 0 || q.nanos <= 0L) {
1506     q.thread = null;
1507     removeCancelledSignallers();
1508     if (q.interruptControl < 0)
1509     return null;
1510     throw new TimeoutException();
1511     }
1512     else if (q.thread != null && result == null) {
1513     try {
1514     ForkJoinPool.managedBlock(q);
1515     } catch (InterruptedException ie) {
1516     q.interruptControl = -1;
1517     }
1518 dl 1.88 }
1519     }
1520 dl 1.104 q.thread = null;
1521     postComplete();
1522     return (q.interruptControl < 0) ? null : r;
1523     }
1524    
1525     /**
1526     * Unlinks cancelled Signallers to avoid accumulating garbage.
1527     * Internal nodes are simply unspliced without CAS since it is
1528     * harmless if they are traversed anyway. To avoid effects of
1529     * unsplicing from already removed nodes, the list is retraversed
1530     * in case of an apparent race.
1531     */
1532     private void removeCancelledSignallers() {
1533 dl 1.110 for (Completion p = null, q = completions; q != null;) {
1534     Completion s = q.next;
1535 dl 1.104 if ((q instanceof Signaller) && ((Signaller)q).thread == null) {
1536     if (p != null) {
1537     p.next = s;
1538     if (!(p instanceof Signaller) ||
1539     ((Signaller)p).thread != null)
1540     break;
1541     }
1542     else if (casCompletions(q, s))
1543     break;
1544     p = null; // restart
1545     q = completions;
1546 dl 1.88 }
1547     else {
1548 dl 1.104 p = q;
1549     q = s;
1550 dl 1.88 }
1551     }
1552     }
1553    
1554 dl 1.104 /* ------------- public methods -------------- */
1555 dl 1.88
1556     /**
1557     * Creates a new incomplete CompletableFuture.
1558     */
1559     public CompletableFuture() {
1560     }
1561    
1562     /**
1563     * Returns a new CompletableFuture that is asynchronously completed
1564     * by a task running in the {@link ForkJoinPool#commonPool()} with
1565     * the value obtained by calling the given Supplier.
1566     *
1567     * @param supplier a function returning the value to be used
1568     * to complete the returned CompletableFuture
1569 jsr166 1.95 * @param <U> the function's return type
1570 dl 1.88 * @return the new CompletableFuture
1571     */
1572     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1573     if (supplier == null) throw new NullPointerException();
1574 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
1575     asyncPool.execute(new AsyncSupply<U>(d, supplier));
1576     return d;
1577 dl 1.88 }
1578    
1579     /**
1580     * Returns a new CompletableFuture that is asynchronously completed
1581     * by a task running in the given executor with the value obtained
1582     * by calling the given Supplier.
1583     *
1584     * @param supplier a function returning the value to be used
1585     * to complete the returned CompletableFuture
1586     * @param executor the executor to use for asynchronous execution
1587 jsr166 1.95 * @param <U> the function's return type
1588 dl 1.88 * @return the new CompletableFuture
1589     */
1590     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1591     Executor executor) {
1592 dl 1.109 if (supplier == null) throw new NullPointerException();
1593     Executor e = screenExecutor(executor);
1594 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
1595 dl 1.109 e.execute(new AsyncSupply<U>(d, supplier));
1596 dl 1.104 return d;
1597 dl 1.28 }
1598    
1599     /**
1600 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1601     * by a task running in the {@link ForkJoinPool#commonPool()} after
1602     * it runs the given action.
1603 dl 1.28 *
1604     * @param runnable the action to run before completing the
1605     * returned CompletableFuture
1606 jsr166 1.58 * @return the new CompletableFuture
1607 dl 1.28 */
1608     public static CompletableFuture<Void> runAsync(Runnable runnable) {
1609     if (runnable == null) throw new NullPointerException();
1610 dl 1.104 CompletableFuture<Void> d = new CompletableFuture<Void>();
1611     asyncPool.execute(new AsyncRun<Void>(d, runnable));
1612     return d;
1613 dl 1.28 }
1614    
1615     /**
1616 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1617     * by a task running in the given executor after it runs the given
1618     * action.
1619 dl 1.28 *
1620     * @param runnable the action to run before completing the
1621     * returned CompletableFuture
1622     * @param executor the executor to use for asynchronous execution
1623 jsr166 1.58 * @return the new CompletableFuture
1624 dl 1.28 */
1625     public static CompletableFuture<Void> runAsync(Runnable runnable,
1626     Executor executor) {
1627 dl 1.109 if (runnable == null) throw new NullPointerException();
1628     Executor e = screenExecutor(executor);
1629 dl 1.104 CompletableFuture<Void> d = new CompletableFuture<Void>();
1630 dl 1.109 e.execute(new AsyncRun<Void>(d, runnable));
1631 dl 1.104 return d;
1632 dl 1.28 }
1633    
1634     /**
1635 dl 1.77 * Returns a new CompletableFuture that is already completed with
1636     * the given value.
1637     *
1638     * @param value the value
1639 jsr166 1.95 * @param <U> the type of the value
1640 dl 1.77 * @return the completed CompletableFuture
1641     */
1642     public static <U> CompletableFuture<U> completedFuture(U value) {
1643 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
1644     d.result = (value == null) ? NIL : value;
1645     return d;
1646 dl 1.77 }
1647    
1648     /**
1649 dl 1.28 * Returns {@code true} if completed in any fashion: normally,
1650     * exceptionally, or via cancellation.
1651     *
1652     * @return {@code true} if completed
1653     */
1654     public boolean isDone() {
1655     return result != null;
1656     }
1657    
1658     /**
1659 dl 1.49 * Waits if necessary for this future to complete, and then
1660 dl 1.48 * returns its result.
1661 dl 1.28 *
1662 dl 1.48 * @return the result value
1663     * @throws CancellationException if this future was cancelled
1664     * @throws ExecutionException if this future completed exceptionally
1665 dl 1.28 * @throws InterruptedException if the current thread was interrupted
1666     * while waiting
1667     */
1668     public T get() throws InterruptedException, ExecutionException {
1669 jsr166 1.105 Object r;
1670 dl 1.104 return reportGet((r = result) == null ? waitingGet(true) : r);
1671 dl 1.28 }
1672    
1673     /**
1674 dl 1.49 * Waits if necessary for at most the given time for this future
1675     * to complete, and then returns its result, if available.
1676 dl 1.28 *
1677     * @param timeout the maximum time to wait
1678     * @param unit the time unit of the timeout argument
1679 dl 1.48 * @return the result value
1680     * @throws CancellationException if this future was cancelled
1681     * @throws ExecutionException if this future completed exceptionally
1682 dl 1.28 * @throws InterruptedException if the current thread was interrupted
1683     * while waiting
1684     * @throws TimeoutException if the wait timed out
1685     */
1686     public T get(long timeout, TimeUnit unit)
1687     throws InterruptedException, ExecutionException, TimeoutException {
1688 jsr166 1.105 Object r;
1689 dl 1.28 long nanos = unit.toNanos(timeout);
1690 dl 1.104 return reportGet((r = result) == null ? timedGet(nanos) : r);
1691 dl 1.28 }
1692    
1693     /**
1694     * Returns the result value when complete, or throws an
1695     * (unchecked) exception if completed exceptionally. To better
1696     * conform with the use of common functional forms, if a
1697     * computation involved in the completion of this
1698     * CompletableFuture threw an exception, this method throws an
1699     * (unchecked) {@link CompletionException} with the underlying
1700     * exception as its cause.
1701     *
1702     * @return the result value
1703     * @throws CancellationException if the computation was cancelled
1704 jsr166 1.55 * @throws CompletionException if this future completed
1705     * exceptionally or a completion computation threw an exception
1706 dl 1.28 */
1707     public T join() {
1708 dl 1.104 Object r;
1709 jsr166 1.105 return reportJoin((r = result) == null ? waitingGet(false) : r);
1710 dl 1.28 }
1711    
1712     /**
1713     * Returns the result value (or throws any encountered exception)
1714     * if completed, else returns the given valueIfAbsent.
1715     *
1716     * @param valueIfAbsent the value to return if not completed
1717     * @return the result value, if completed, else the given valueIfAbsent
1718     * @throws CancellationException if the computation was cancelled
1719 jsr166 1.55 * @throws CompletionException if this future completed
1720     * exceptionally or a completion computation threw an exception
1721 dl 1.28 */
1722     public T getNow(T valueIfAbsent) {
1723 dl 1.104 Object r;
1724 jsr166 1.106 return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
1725 dl 1.28 }
1726    
1727     /**
1728     * If not already completed, sets the value returned by {@link
1729     * #get()} and related methods to the given value.
1730     *
1731     * @param value the result value
1732     * @return {@code true} if this invocation caused this CompletableFuture
1733     * to transition to a completed state, else {@code false}
1734     */
1735     public boolean complete(T value) {
1736 dl 1.104 boolean triggered = internalComplete(value == null ? NIL : value);
1737     postComplete();
1738 dl 1.28 return triggered;
1739     }
1740    
1741     /**
1742     * If not already completed, causes invocations of {@link #get()}
1743     * and related methods to throw the given exception.
1744     *
1745     * @param ex the exception
1746     * @return {@code true} if this invocation caused this CompletableFuture
1747     * to transition to a completed state, else {@code false}
1748     */
1749     public boolean completeExceptionally(Throwable ex) {
1750     if (ex == null) throw new NullPointerException();
1751 dl 1.104 boolean triggered = internalComplete(new AltResult(ex));
1752     postComplete();
1753 dl 1.28 return triggered;
1754     }
1755    
1756 dl 1.104 public <U> CompletableFuture<U> thenApply(
1757     Function<? super T,? extends U> fn) {
1758 dl 1.28 return doThenApply(fn, null);
1759     }
1760    
1761 dl 1.104 public <U> CompletableFuture<U> thenApplyAsync(
1762     Function<? super T,? extends U> fn) {
1763     return doThenApply(fn, asyncPool);
1764 dl 1.17 }
1765    
1766 dl 1.104 public <U> CompletableFuture<U> thenApplyAsync(
1767     Function<? super T,? extends U> fn, Executor executor) {
1768 dl 1.109 return doThenApply(fn, screenExecutor(executor));
1769 dl 1.28 }
1770 dl 1.1
1771 dl 1.104 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
1772 dl 1.88 return doThenAccept(action, null);
1773 dl 1.28 }
1774    
1775 dl 1.104 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
1776     return doThenAccept(action, asyncPool);
1777 dl 1.28 }
1778    
1779 dl 1.104 public CompletableFuture<Void> thenAcceptAsync(
1780     Consumer<? super T> action, Executor executor) {
1781 dl 1.109 return doThenAccept(action, screenExecutor(executor));
1782 dl 1.7 }
1783    
1784 dl 1.104 public CompletableFuture<Void> thenRun(Runnable action) {
1785 dl 1.28 return doThenRun(action, null);
1786     }
1787    
1788 dl 1.104 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1789     return doThenRun(action, asyncPool);
1790 dl 1.28 }
1791    
1792 dl 1.104 public CompletableFuture<Void> thenRunAsync(
1793     Runnable action, Executor executor) {
1794 dl 1.109 return doThenRun(action, screenExecutor(executor));
1795 dl 1.28 }
1796    
1797 dl 1.104 public <U,V> CompletableFuture<V> thenCombine(
1798     CompletionStage<? extends U> other,
1799     BiFunction<? super T,? super U,? extends V> fn) {
1800 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, null);
1801 dl 1.28 }
1802    
1803 dl 1.104 public <U,V> CompletableFuture<V> thenCombineAsync(
1804     CompletionStage<? extends U> other,
1805     BiFunction<? super T,? super U,? extends V> fn) {
1806     return doThenCombine(other.toCompletableFuture(), fn, asyncPool);
1807 dl 1.28 }
1808    
1809 dl 1.104 public <U,V> CompletableFuture<V> thenCombineAsync(
1810     CompletionStage<? extends U> other,
1811     BiFunction<? super T,? super U,? extends V> fn,
1812     Executor executor) {
1813 dl 1.109 return doThenCombine(other.toCompletableFuture(), fn,
1814     screenExecutor(executor));
1815 dl 1.1 }
1816    
1817 dl 1.104 public <U> CompletableFuture<Void> thenAcceptBoth(
1818     CompletionStage<? extends U> other,
1819     BiConsumer<? super T, ? super U> action) {
1820 dl 1.88 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
1821 dl 1.28 }
1822    
1823 dl 1.104 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1824     CompletionStage<? extends U> other,
1825     BiConsumer<? super T, ? super U> action) {
1826     return doThenAcceptBoth(other.toCompletableFuture(), action, asyncPool);
1827 dl 1.28 }
1828    
1829 dl 1.104 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1830     CompletionStage<? extends U> other,
1831     BiConsumer<? super T, ? super U> action,
1832     Executor executor) {
1833 dl 1.109 return doThenAcceptBoth(other.toCompletableFuture(), action,
1834     screenExecutor(executor));
1835 dl 1.28 }
1836    
1837 dl 1.104 public CompletableFuture<Void> runAfterBoth(
1838     CompletionStage<?> other, Runnable action) {
1839 dl 1.88 return doRunAfterBoth(other.toCompletableFuture(), action, null);
1840 dl 1.7 }
1841    
1842 dl 1.104 public CompletableFuture<Void> runAfterBothAsync(
1843     CompletionStage<?> other, Runnable action) {
1844     return doRunAfterBoth(other.toCompletableFuture(), action, asyncPool);
1845 dl 1.28 }
1846    
1847 dl 1.104 public CompletableFuture<Void> runAfterBothAsync(
1848     CompletionStage<?> other, Runnable action, Executor executor) {
1849 dl 1.109 return doRunAfterBoth(other.toCompletableFuture(), action,
1850     screenExecutor(executor));
1851 dl 1.28 }
1852    
1853 dl 1.104 public <U> CompletableFuture<U> applyToEither(
1854     CompletionStage<? extends T> other, Function<? super T, U> fn) {
1855 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, null);
1856 dl 1.28 }
1857    
1858 dl 1.104 public <U> CompletableFuture<U> applyToEitherAsync(
1859     CompletionStage<? extends T> other, Function<? super T, U> fn) {
1860     return doApplyToEither(other.toCompletableFuture(), fn, asyncPool);
1861 dl 1.28 }
1862    
1863 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
1864 dl 1.104 (CompletionStage<? extends T> other, Function<? super T, U> fn,
1865 dl 1.48 Executor executor) {
1866 dl 1.109 return doApplyToEither(other.toCompletableFuture(), fn,
1867     screenExecutor(executor));
1868 dl 1.1 }
1869    
1870 dl 1.104 public CompletableFuture<Void> acceptEither(
1871     CompletionStage<? extends T> other, Consumer<? super T> action) {
1872 dl 1.88 return doAcceptEither(other.toCompletableFuture(), action, null);
1873 dl 1.28 }
1874    
1875 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
1876 dl 1.104 (CompletionStage<? extends T> other, Consumer<? super T> action) {
1877     return doAcceptEither(other.toCompletableFuture(), action, asyncPool);
1878 dl 1.28 }
1879    
1880 dl 1.104 public CompletableFuture<Void> acceptEitherAsync(
1881     CompletionStage<? extends T> other, Consumer<? super T> action,
1882     Executor executor) {
1883 dl 1.109 return doAcceptEither(other.toCompletableFuture(), action,
1884     screenExecutor(executor));
1885 dl 1.7 }
1886    
1887 dl 1.104 public CompletableFuture<Void> runAfterEither(
1888     CompletionStage<?> other, Runnable action) {
1889 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, null);
1890 dl 1.28 }
1891    
1892 dl 1.104 public CompletableFuture<Void> runAfterEitherAsync(
1893     CompletionStage<?> other, Runnable action) {
1894     return doRunAfterEither(other.toCompletableFuture(), action, asyncPool);
1895 dl 1.28 }
1896    
1897 dl 1.104 public CompletableFuture<Void> runAfterEitherAsync(
1898     CompletionStage<?> other, Runnable action, Executor executor) {
1899 dl 1.109 return doRunAfterEither(other.toCompletableFuture(), action,
1900     screenExecutor(executor));
1901 dl 1.1 }
1902    
1903 dl 1.48 public <U> CompletableFuture<U> thenCompose
1904 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
1905 jsr166 1.81 return doThenCompose(fn, null);
1906 dl 1.37 }
1907    
1908 dl 1.104 public <U> CompletableFuture<U> thenComposeAsync(
1909     Function<? super T, ? extends CompletionStage<U>> fn) {
1910     return doThenCompose(fn, asyncPool);
1911 dl 1.37 }
1912    
1913 dl 1.104 public <U> CompletableFuture<U> thenComposeAsync(
1914     Function<? super T, ? extends CompletionStage<U>> fn,
1915     Executor executor) {
1916 dl 1.109 return doThenCompose(fn, screenExecutor(executor));
1917 dl 1.37 }
1918    
1919 dl 1.104 public CompletableFuture<T> whenComplete(
1920     BiConsumer<? super T, ? super Throwable> action) {
1921 dl 1.88 return doWhenComplete(action, null);
1922     }
1923    
1924 dl 1.104 public CompletableFuture<T> whenCompleteAsync(
1925     BiConsumer<? super T, ? super Throwable> action) {
1926     return doWhenComplete(action, asyncPool);
1927 dl 1.88 }
1928    
1929 dl 1.104 public CompletableFuture<T> whenCompleteAsync(
1930     BiConsumer<? super T, ? super Throwable> action, Executor executor) {
1931 dl 1.109 return doWhenComplete(action, screenExecutor(executor));
1932 dl 1.88 }
1933    
1934 dl 1.104 public <U> CompletableFuture<U> handle(
1935     BiFunction<? super T, Throwable, ? extends U> fn) {
1936 dl 1.88 return doHandle(fn, null);
1937     }
1938    
1939 dl 1.104 public <U> CompletableFuture<U> handleAsync(
1940     BiFunction<? super T, Throwable, ? extends U> fn) {
1941     return doHandle(fn, asyncPool);
1942 dl 1.88 }
1943    
1944 dl 1.104 public <U> CompletableFuture<U> handleAsync(
1945     BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
1946 dl 1.109 return doHandle(fn, screenExecutor(executor));
1947 dl 1.88 }
1948    
1949     /**
1950 jsr166 1.108 * Returns this CompletableFuture.
1951 dl 1.88 *
1952     * @return this CompletableFuture
1953     */
1954     public CompletableFuture<T> toCompletableFuture() {
1955     return this;
1956 dl 1.28 }
1957    
1958 dl 1.88 // not in interface CompletionStage
1959    
1960 dl 1.28 /**
1961 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
1962     * CompletableFuture completes, with the result of the given
1963     * function of the exception triggering this CompletableFuture's
1964     * completion when it completes exceptionally; otherwise, if this
1965     * CompletableFuture completes normally, then the returned
1966     * CompletableFuture also completes normally with the same value.
1967 dl 1.88 * Note: More flexible versions of this functionality are
1968     * available using methods {@code whenComplete} and {@code handle}.
1969 dl 1.28 *
1970     * @param fn the function to use to compute the value of the
1971     * returned CompletableFuture if this CompletableFuture completed
1972     * exceptionally
1973     * @return the new CompletableFuture
1974     */
1975 dl 1.104 public CompletableFuture<T> exceptionally(
1976     Function<Throwable, ? extends T> fn) {
1977     return doExceptionally(fn);
1978 dl 1.28 }
1979    
1980 dl 1.35 /* ------------- Arbitrary-arity constructions -------------- */
1981    
1982     /**
1983     * Returns a new CompletableFuture that is completed when all of
1984 jsr166 1.66 * the given CompletableFutures complete. If any of the given
1985 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
1986     * CompletableFuture also does so, with a CompletionException
1987     * holding this exception as its cause. Otherwise, the results,
1988     * if any, of the given CompletableFutures are not reflected in
1989     * the returned CompletableFuture, but may be obtained by
1990     * inspecting them individually. If no CompletableFutures are
1991     * provided, returns a CompletableFuture completed with the value
1992     * {@code null}.
1993 dl 1.35 *
1994     * <p>Among the applications of this method is to await completion
1995     * of a set of independent CompletableFutures before continuing a
1996     * program, as in: {@code CompletableFuture.allOf(c1, c2,
1997     * c3).join();}.
1998     *
1999     * @param cfs the CompletableFutures
2000 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
2001 dl 1.35 * given CompletableFutures complete
2002     * @throws NullPointerException if the array or any of its elements are
2003     * {@code null}
2004     */
2005     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2006 dl 1.104 return doAllOf(cfs, 0, cfs.length - 1);
2007 dl 1.35 }
2008    
2009     /**
2010 dl 1.76 * Returns a new CompletableFuture that is completed when any of
2011 jsr166 1.79 * the given CompletableFutures complete, with the same result.
2012     * Otherwise, if it completed exceptionally, the returned
2013 dl 1.77 * CompletableFuture also does so, with a CompletionException
2014     * holding this exception as its cause. If no CompletableFutures
2015     * are provided, returns an incomplete CompletableFuture.
2016 dl 1.35 *
2017     * @param cfs the CompletableFutures
2018 dl 1.77 * @return a new CompletableFuture that is completed with the
2019     * result or exception of any of the given CompletableFutures when
2020     * one completes
2021 dl 1.35 * @throws NullPointerException if the array or any of its elements are
2022     * {@code null}
2023     */
2024 dl 1.77 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2025 dl 1.104 CompletableFuture<Object> d = new CompletableFuture<Object>();
2026     for (int i = 0; i < cfs.length; ++i) {
2027     CompletableFuture<?> c = cfs[i];
2028     Object r = c.result; // throw NPE if null element
2029     if (d.result == null) {
2030     if (r == null)
2031     c.unipush(new DelayedCopy<Object>(d, c));
2032     else
2033     nowCopy(d, r);
2034 dl 1.77 }
2035 dl 1.35 }
2036 dl 1.104 return d;
2037 dl 1.35 }
2038    
2039     /* ------------- Control and status methods -------------- */
2040    
2041 dl 1.28 /**
2042 dl 1.37 * If not already completed, completes this CompletableFuture with
2043     * a {@link CancellationException}. Dependent CompletableFutures
2044     * that have not already completed will also complete
2045     * exceptionally, with a {@link CompletionException} caused by
2046     * this {@code CancellationException}.
2047 dl 1.28 *
2048     * @param mayInterruptIfRunning this value has no effect in this
2049     * implementation because interrupts are not used to control
2050     * processing.
2051     *
2052     * @return {@code true} if this task is now cancelled
2053     */
2054     public boolean cancel(boolean mayInterruptIfRunning) {
2055 dl 1.46 boolean cancelled = (result == null) &&
2056 dl 1.104 internalComplete(new AltResult(new CancellationException()));
2057     postComplete();
2058 dl 1.48 return cancelled || isCancelled();
2059 dl 1.28 }
2060    
2061     /**
2062     * Returns {@code true} if this CompletableFuture was cancelled
2063     * before it completed normally.
2064     *
2065     * @return {@code true} if this CompletableFuture was cancelled
2066     * before it completed normally
2067     */
2068     public boolean isCancelled() {
2069     Object r;
2070 jsr166 1.43 return ((r = result) instanceof AltResult) &&
2071     (((AltResult)r).ex instanceof CancellationException);
2072 dl 1.28 }
2073    
2074     /**
2075 dl 1.88 * Returns {@code true} if this CompletableFuture completed
2076 dl 1.91 * exceptionally, in any way. Possible causes include
2077     * cancellation, explicit invocation of {@code
2078     * completeExceptionally}, and abrupt termination of a
2079     * CompletionStage action.
2080 dl 1.88 *
2081     * @return {@code true} if this CompletableFuture completed
2082     * exceptionally
2083     */
2084     public boolean isCompletedExceptionally() {
2085 dl 1.91 Object r;
2086     return ((r = result) instanceof AltResult) && r != NIL;
2087 dl 1.88 }
2088    
2089     /**
2090 dl 1.28 * Forcibly sets or resets the value subsequently returned by
2091 jsr166 1.42 * method {@link #get()} and related methods, whether or not
2092     * already completed. This method is designed for use only in
2093     * error recovery actions, and even in such situations may result
2094     * in ongoing dependent completions using established versus
2095 dl 1.30 * overwritten outcomes.
2096 dl 1.28 *
2097     * @param value the completion value
2098     */
2099     public void obtrudeValue(T value) {
2100     result = (value == null) ? NIL : value;
2101 dl 1.104 postComplete();
2102 dl 1.28 }
2103    
2104 dl 1.30 /**
2105 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
2106     * and related methods to throw the given exception, whether or
2107     * not already completed. This method is designed for use only in
2108 dl 1.30 * recovery actions, and even in such situations may result in
2109     * ongoing dependent completions using established versus
2110     * overwritten outcomes.
2111     *
2112     * @param ex the exception
2113     */
2114     public void obtrudeException(Throwable ex) {
2115     if (ex == null) throw new NullPointerException();
2116     result = new AltResult(ex);
2117 dl 1.104 postComplete();
2118 dl 1.30 }
2119    
2120 dl 1.35 /**
2121     * Returns the estimated number of CompletableFutures whose
2122     * completions are awaiting completion of this CompletableFuture.
2123     * This method is designed for use in monitoring system state, not
2124     * for synchronization control.
2125     *
2126     * @return the number of dependent CompletableFutures
2127     */
2128     public int getNumberOfDependents() {
2129     int count = 0;
2130 dl 1.110 for (Completion p = completions; p != null; p = p.next)
2131 dl 1.35 ++count;
2132     return count;
2133     }
2134    
2135     /**
2136     * Returns a string identifying this CompletableFuture, as well as
2137 jsr166 1.40 * its completion state. The state, in brackets, contains the
2138 dl 1.35 * String {@code "Completed Normally"} or the String {@code
2139     * "Completed Exceptionally"}, or the String {@code "Not
2140     * completed"} followed by the number of CompletableFutures
2141     * dependent upon its completion, if any.
2142     *
2143     * @return a string identifying this CompletableFuture, as well as its state
2144     */
2145     public String toString() {
2146     Object r = result;
2147 jsr166 1.40 int count;
2148     return super.toString() +
2149     ((r == null) ?
2150     (((count = getNumberOfDependents()) == 0) ?
2151     "[Not completed]" :
2152     "[Not completed, " + count + " dependents]") :
2153     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2154     "[Completed exceptionally]" :
2155     "[Completed normally]"));
2156 dl 1.35 }
2157    
2158 dl 1.1 // Unsafe mechanics
2159     private static final sun.misc.Unsafe UNSAFE;
2160     private static final long RESULT;
2161     private static final long COMPLETIONS;
2162     static {
2163     try {
2164     UNSAFE = sun.misc.Unsafe.getUnsafe();
2165     Class<?> k = CompletableFuture.class;
2166     RESULT = UNSAFE.objectFieldOffset
2167     (k.getDeclaredField("result"));
2168     COMPLETIONS = UNSAFE.objectFieldOffset
2169     (k.getDeclaredField("completions"));
2170 dl 1.104 } catch (Exception x) {
2171     throw new Error(x);
2172 dl 1.1 }
2173     }
2174     }