ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.106
Committed: Fri May 2 23:29:33 2014 UTC (10 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.105: +4 -4 lines
Log Message:
coding style

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.88 import java.util.concurrent.CompletionException;
23     import java.util.concurrent.CompletionStage;
24 dl 1.1 import java.util.concurrent.locks.LockSupport;
25    
26     /**
27     * A {@link Future} that may be explicitly completed (setting its
28 dl 1.88 * value and status), and may be used as a {@link CompletionStage},
29     * supporting dependent functions and actions that trigger upon its
30     * completion.
31 dl 1.1 *
32 jsr166 1.50 * <p>When two or more threads attempt to
33     * {@link #complete complete},
34 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
35 jsr166 1.50 * {@link #cancel cancel}
36     * a CompletableFuture, only one of them succeeds.
37 dl 1.19 *
38 dl 1.91 * <p>In addition to these and related methods for directly
39     * manipulating status and results, CompletableFuture implements
40     * interface {@link CompletionStage} with the following policies: <ul>
41 dl 1.35 *
42 dl 1.88 * <li>Actions supplied for dependent completions of
43     * <em>non-async</em> methods may be performed by the thread that
44     * completes the current CompletableFuture, or by any other caller of
45 dl 1.96 * a completion method.</li>
46 jsr166 1.65 *
47 dl 1.88 * <li>All <em>async</em> methods without an explicit Executor
48 dl 1.96 * argument are performed using the {@link ForkJoinPool#commonPool()}
49     * (unless it does not support a parallelism level of at least two, in
50     * which case, a new Thread is used). To simplify monitoring,
51     * debugging, and tracking, all generated asynchronous tasks are
52     * instances of the marker interface {@link
53     * AsynchronousCompletionTask}. </li>
54 dl 1.35 *
55 dl 1.88 * <li>All CompletionStage methods are implemented independently of
56     * other public methods, so the behavior of one method is not impacted
57     * by overrides of others in subclasses. </li> </ul>
58     *
59 dl 1.91 * <p>CompletableFuture also implements {@link Future} with the following
60 dl 1.88 * policies: <ul>
61     *
62     * <li>Since (unlike {@link FutureTask}) this class has no direct
63 jsr166 1.55 * control over the computation that causes it to be completed,
64 dl 1.88 * cancellation is treated as just another form of exceptional
65     * completion. Method {@link #cancel cancel} has the same effect as
66     * {@code completeExceptionally(new CancellationException())}. Method
67     * {@link #isCompletedExceptionally} can be used to determine if a
68     * CompletableFuture completed in any exceptional fashion.</li>
69 jsr166 1.55 *
70 dl 1.88 * <li>In case of exceptional completion with a CompletionException,
71 jsr166 1.55 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
72     * {@link ExecutionException} with the same cause as held in the
73 dl 1.88 * corresponding CompletionException. To simplify usage in most
74     * contexts, this class also defines methods {@link #join()} and
75     * {@link #getNow} that instead throw the CompletionException directly
76     * in these cases.</li> </ul>
77 jsr166 1.80 *
78 dl 1.1 * @author Doug Lea
79     * @since 1.8
80     */
81 dl 1.88 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
82 dl 1.28
83 dl 1.1 /*
84 dl 1.20 * Overview:
85 dl 1.1 *
86 dl 1.104 * A CompletableFuture may have dependent completion actions,
87     * collected in a linked stack. It atomically completes by CASing
88     * a result field, and then pops off and runs those actions. This
89     * applies across normal vs exceptional outcomes, sync vs async
90     * actions, binary triggers, and various forms of completions.
91     *
92     * Non-nullness of field result (set via CAS) indicates done. An
93     * AltResult is used to box null as a result, as well as to hold
94     * exceptions. Using a single field makes completion simple to
95     * detect and trigger. Encoding and decoding is straightforward
96     * but adds vertical sprawl. One minor simplification relies on
97     * the (static) NIL (to box null results) being the only AltResult
98     * with a null exception field, so we don't usually need explicit
99     * comparisons with NIL. Exception propagation mechanics
100     * surrounding decoding rely on unchecked casts of decoded results
101     * really being unchecked, and user type errors being caught at
102     * point of use, as is currently the case in Java. These are
103     * highlighted by using SuppressWarnings annotated temporaries.
104     *
105     * Dependent actions are represented by Completion objects linked
106     * as Treiber stacks headed by field completions. There are four
107     * kinds of Completions: single-source (UniCompletion), two-source
108     * (BiCompletion), shared (CoBiCompletion, used by the second
109     * source of a BiCompletion), and Signallers that unblock waiters.
110     *
111     * The same patterns of methods and classes are used for each form
112     * of Completion (apply, combine, etc), and are written in a
113     * similar style. For each form X there is, when applicable:
114     *
115     * * Method nowX (for example nowApply) that immediately executes
116     * a supplied function and sets result
117     * * Class AsyncX class (for example AsyncApply) that calls nowX
118     * from another task,
119     * * Class DelayedX (for example DelayedApply) that holds
120     * arguments and calls Xnow when ready.
121     *
122     * For each public CompletionStage method M* (for example
123     * thenApply{Async}), there is a method doM (for example
124     * doThenApply) that creates and/or invokes the appropriate form.
125     * Each deals with three cases that can arise when adding a
126     * dependent completion to CompletableFuture f:
127     *
128     * * f is already complete, so the dependent action is run
129     * immediately, via a "now" method, which, if async,
130     * starts the action in a new task.
131     * * f is not complete, so a Completion action is created and
132     * pushed to f's completions. It is triggered via
133     * f.postComplete when f completes.
134     * * f is not complete, but completes while adding the completion
135     * action, so we try to trigger it upon adding (see method
136     * unipush and derivatives) to cover races.
137     *
138     * Methods with two sources (for example thenCombine) must deal
139     * with races across both while pushing actions. The second
140     * completion is an CoBiCompletion pointing to the first, shared
141     * to ensure that at most one claims and performs the action. The
142     * multiple-arity method allOf does this pairwise to form a tree
143     * of completions. (Method anyOf just uses a depth-one Or tree.)
144     *
145     * Upon setting results, method postComplete is called unless
146     * the target is guaranteed not to be observable (i.e., not yet
147     * returned or linked). Multiple threads can call postComplete,
148     * which atomically pops each dependent action, and tries to
149     * trigger it via method tryAct. Any such action must be performed
150     * only once, even if called from several threads, so Completions
151     * maintain status via CAS, and on success run one of the "now"
152     * methods. Triggering can propagate recursively, so tryAct
153     * returns a completed dependent (if one exists) for further
154     * processing by its caller.
155     *
156     * Blocking methods get() and join() rely on Signaller Completions
157     * that wake up waiting threads. The mechanics are similar to
158     * Treiber stack wait-nodes used in FutureTask, Phaser, and
159     * SynchronousQueue. See their internal documentation for
160     * algorithmic details.
161     *
162     * Without precautions, CompletableFutures would be prone to
163     * garbage accumulation as chains of completions build up, each
164     * pointing back to its sources. So we detach (null out) most
165     * Completion fields as soon as possible. To support this,
166     * internal methods check for and harmlessly ignore null arguments
167     * that may have been obtained during races with threads nulling
168     * out fields. (Some of these checked cases cannot currently
169     * happen.) Fields of Async classes can be but currently are not
170     * fully detached, because they do not in general form cycles.
171     */
172    
173     volatile Object result; // Either the result or boxed AltResult
174     volatile Completion<?> completions; // Treiber stack of dependent actions
175    
176     final boolean internalComplete(Object r) { // CAS from null to r
177     return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
178     }
179    
180     final boolean casCompletions(Completion<?> cmp, Completion<?> val) {
181     return UNSAFE.compareAndSwapObject(this, COMPLETIONS, cmp, val);
182     }
183    
184     /* ------------- Encoding and decoding outcomes -------------- */
185    
186     static final class AltResult { // See above
187     final Throwable ex; // null only for NIL
188     AltResult(Throwable x) { this.ex = x; }
189 dl 1.1 }
190    
191     static final AltResult NIL = new AltResult(null);
192    
193 dl 1.20 /**
194 dl 1.104 * Returns the encoding of the given (non-null) exception as a
195     * wrapped CompletionException unless it is one already.
196 dl 1.99 */
197 dl 1.104 static AltResult altThrowable(Throwable x) {
198     return new AltResult((x instanceof CompletionException) ? x :
199     new CompletionException(x));
200 dl 1.99 }
201    
202     /**
203 dl 1.104 * Returns the encoding of the given arguments: if the exception
204     * is non-null, encodes as altThrowable. Otherwise uses the given
205     * value, boxed as NIL if null.
206 dl 1.99 */
207 dl 1.104 static Object encodeOutcome(Object v, Throwable x) {
208     return (x != null) ? altThrowable(x) : (v == null) ? NIL : v;
209 dl 1.20 }
210    
211 dl 1.1 /**
212 dl 1.104 * Decodes outcome to return result or throw unchecked exception
213 dl 1.1 */
214 dl 1.104 private static <T> T reportJoin(Object r) {
215     if (r instanceof AltResult) {
216     Throwable x;
217     if ((x = ((AltResult)r).ex) == null)
218     return null;
219     if (x instanceof CancellationException)
220     throw (CancellationException)x;
221     if (x instanceof CompletionException)
222     throw (CompletionException)x;
223     throw new CompletionException(x);
224 dl 1.28 }
225 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r;
226     return tr;
227 dl 1.1 }
228    
229     /**
230 dl 1.104 * Reports result using Future.get conventions
231 dl 1.1 */
232 dl 1.104 private static <T> T reportGet(Object r)
233     throws InterruptedException, ExecutionException {
234     if (r == null) // by convention below, null means interrupted
235     throw new InterruptedException();
236     if (r instanceof AltResult) {
237     Throwable x, cause;
238     if ((x = ((AltResult)r).ex) == null)
239 dl 1.28 return null;
240 dl 1.104 if (x instanceof CancellationException)
241     throw (CancellationException)x;
242     if ((x instanceof CompletionException) &&
243     (cause = x.getCause()) != null)
244     x = cause;
245     throw new ExecutionException(x);
246 dl 1.28 }
247 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r;
248     return tr;
249 dl 1.1 }
250    
251 dl 1.104 /* ------------- Async Tasks -------------- */
252    
253 dl 1.1 /**
254 dl 1.104 * Default executor -- ForkJoinPool.commonPool() unless it cannot
255     * support parallelism.
256 dl 1.1 */
257 dl 1.104 static final Executor asyncPool =
258     (ForkJoinPool.getCommonPoolParallelism() > 1) ?
259     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
260 dl 1.1
261 dl 1.104 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */
262     static final class ThreadPerTaskExecutor implements Executor {
263     public void execute(Runnable r) { new Thread(r).start(); }
264 dl 1.1 }
265    
266     /**
267 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
268 dl 1.28 * {@code async} methods. This may be useful for monitoring,
269     * debugging, and tracking asynchronous activities.
270 jsr166 1.57 *
271     * @since 1.8
272 dl 1.1 */
273 dl 1.28 public static interface AsynchronousCompletionTask {
274 dl 1.1 }
275    
276 dl 1.104 /**
277     * Base class for tasks that can act as either FJ or plain
278     * Runnables. Abstract method compute calls an associated "now"
279     * method. Method exec calls compute if its CompletableFuture is
280     * not already done, and runs completions if done. Fields are not
281     * in general final and can be nulled out after use (but most
282     * currently are not). Classes include serialVersionUIDs even
283     * though they are currently never serialized.
284     */
285     abstract static class Async<T> extends ForkJoinTask<Void>
286 dl 1.28 implements Runnable, AsynchronousCompletionTask {
287 dl 1.104 CompletableFuture<T> dep; // the CompletableFuture to trigger
288     Async(CompletableFuture<T> dep) { this.dep = dep; }
289    
290 jsr166 1.105 abstract void compute(); // call the associated "now" method
291 dl 1.104
292 jsr166 1.105 public final boolean exec() {
293 dl 1.104 CompletableFuture<T> d;
294     if ((d = dep) != null) {
295     if (d.result == null) // suppress if cancelled
296     compute();
297     if (d.result != null)
298     d.postComplete();
299     dep = null; // detach
300     }
301     return true;
302     }
303 dl 1.28 public final Void getRawResult() { return null; }
304     public final void setRawResult(Void v) { }
305     public final void run() { exec(); }
306 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
307 jsr166 1.26 }
308    
309 dl 1.104 /* ------------- Completions -------------- */
310    
311 jsr166 1.106 abstract static class Completion<T> { // See above
312 dl 1.104 volatile Completion<?> next; // Treiber stack link
313    
314     /**
315     * Performs completion action if enabled, returning a
316     * completed dependent Completablefuture, if one exists.
317     */
318     abstract CompletableFuture<?> tryAct();
319 dl 1.96 }
320    
321 dl 1.104 /**
322     * Triggers all reaachble enabled dependents. Call only when
323     * known to be done.
324     */
325     final void postComplete() {
326     /*
327     * On each step, variable f holds current completions to pop
328     * and run. It is extended along only one path at a time,
329     * pushing others to avoid StackOverflowErrors on recursion.
330     */
331     CompletableFuture<?> f = this; Completion<?> h;
332     while ((h = f.completions) != null ||
333     (f != this && (h = (f = this).completions) != null)) {
334     CompletableFuture<?> d; Completion<?> t;
335     if (f.casCompletions(h, t = h.next)) {
336     if (t != null) {
337     if (f != this) { // push
338     do {} while (!casCompletions(h.next = completions, h));
339     continue;
340     }
341     h.next = null; // detach
342 dl 1.28 }
343 dl 1.104 f = (d = h.tryAct()) == null ? this : d;
344 dl 1.19 }
345     }
346     }
347    
348 dl 1.104 /* ------------- One-source Completions -------------- */
349    
350     /**
351     * A Completion with a source and dependent. The "dep" field acts
352     * as a claim, nulled out to disable further attempts to
353     * trigger. Fields can only be observed by other threads upon
354     * successful push; and should be nulled out after claim.
355     */
356 jsr166 1.106 abstract static class UniCompletion<T> extends Completion<T> {
357 dl 1.104 Executor async; // executor to use (null if none)
358     CompletableFuture<T> dep; // the dependent to complete
359     CompletableFuture<?> src; // source of value for tryAct
360    
361     UniCompletion(Executor async, CompletableFuture<T> dep,
362     CompletableFuture<?> src) {
363     this.async = async; this.dep = dep; this.src = src;
364     }
365    
366     /** Tries to claim completion action by CASing dep to null */
367     final boolean claim(CompletableFuture<T> d) {
368     return UNSAFE.compareAndSwapObject(this, DEP, d, null);
369     }
370    
371     private static final sun.misc.Unsafe UNSAFE;
372     private static final long DEP;
373     static {
374     try {
375     UNSAFE = sun.misc.Unsafe.getUnsafe();
376     Class<?> k = UniCompletion.class;
377     DEP = UNSAFE.objectFieldOffset
378     (k.getDeclaredField("dep"));
379     } catch (Exception x) {
380     throw new Error(x);
381 dl 1.1 }
382     }
383     }
384    
385 dl 1.104 /** Pushes c on to completions, and triggers c if done. */
386     private void unipush(UniCompletion<?> c) {
387     if (c != null) {
388     CompletableFuture<?> d;
389     while (result == null && !casCompletions(c.next = completions, c))
390     c.next = null; // clear on CAS failure
391     if ((d = c.tryAct()) != null) // cover races
392     d.postComplete();
393     if (result != null) // clean stack
394     postComplete();
395     }
396     }
397    
398     // Immediate, async, delayed, and routing support for Function/apply
399    
400     static <T,U> void nowApply(Executor e, CompletableFuture<U> d, Object r,
401     Function<? super T,? extends U> f) {
402     if (d != null && f != null) {
403     T t; U u; Throwable x;
404     if (r instanceof AltResult) {
405     t = null;
406     x = ((AltResult)r).ex;
407     }
408     else {
409     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
410     x = null;
411     }
412     if (x == null) {
413 dl 1.21 try {
414 dl 1.104 if (e != null) {
415     e.execute(new AsyncApply<T,U>(d, t, f));
416     return;
417     }
418     u = f.apply(t);
419     } catch (Throwable ex) {
420     x = ex;
421 dl 1.21 u = null;
422     }
423 dl 1.1 }
424 dl 1.104 else
425     u = null;
426     d.internalComplete(encodeOutcome(u, x));
427 dl 1.1 }
428     }
429    
430 dl 1.104 static final class AsyncApply<T,U> extends Async<U> {
431 jsr166 1.105 T arg; Function<? super T,? extends U> fn;
432 dl 1.104 AsyncApply(CompletableFuture<U> dep, T arg,
433     Function<? super T,? extends U> fn) {
434     super(dep); this.arg = arg; this.fn = fn;
435 dl 1.1 }
436 jsr166 1.105 final void compute() { nowApply(null, dep, arg, fn); }
437 dl 1.1 private static final long serialVersionUID = 5232453952276885070L;
438     }
439    
440 dl 1.104 static final class DelayedApply<T,U> extends UniCompletion<U> {
441     Function<? super T,? extends U> fn;
442     DelayedApply(Executor async, CompletableFuture<U> dep,
443     CompletableFuture<?> src,
444     Function<? super T,? extends U> fn) {
445     super(async, dep, src); this.fn = fn;
446     }
447     final CompletableFuture<?> tryAct() {
448     CompletableFuture<U> d; CompletableFuture<?> a; Object r;
449     if ((d = dep) != null && (a = src) != null &&
450     (r = a.result) != null && claim(d)) {
451     nowApply(async, d, r, fn);
452     src = null; fn = null;
453     if (d.result != null) return d;
454 dl 1.7 }
455 dl 1.104 return null;
456 dl 1.7 }
457     }
458    
459 dl 1.104 private <U> CompletableFuture<U> doThenApply(
460     Function<? super T,? extends U> fn, Executor e) {
461     if (fn == null) throw new NullPointerException();
462     CompletableFuture<U> d = new CompletableFuture<U>();
463     Object r = result;
464     if (r == null)
465     unipush(new DelayedApply<T,U>(e, d, this, fn));
466     else
467     nowApply(e, d, r, fn);
468     return d;
469     }
470    
471     // Consumer/accept
472    
473     static <T,U> void nowAccept(Executor e, CompletableFuture<U> d,
474     Object r, Consumer<? super T> f) {
475     if (d != null && f != null) {
476     T t; Throwable x;
477     if (r instanceof AltResult) {
478     t = null;
479     x = ((AltResult)r).ex;
480     }
481     else {
482     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
483     x = null;
484     }
485     if (x == null) {
486 dl 1.21 try {
487 dl 1.104 if (e != null) {
488     e.execute(new AsyncAccept<T,U>(d, t, f));
489     return;
490     }
491     f.accept(t);
492     } catch (Throwable ex) {
493     x = ex;
494 dl 1.21 }
495 dl 1.7 }
496 dl 1.104 d.internalComplete(encodeOutcome(null, x));
497 dl 1.7 }
498     }
499    
500 dl 1.104 static final class AsyncAccept<T,U> extends Async<U> {
501 jsr166 1.105 T arg; Consumer<? super T> fn;
502 dl 1.104 AsyncAccept(CompletableFuture<U> dep, T arg,
503     Consumer<? super T> fn) {
504     super(dep); this.arg = arg; this.fn = fn;
505 dl 1.37 }
506 jsr166 1.105 final void compute() { nowAccept(null, dep, arg, fn); }
507 dl 1.37 private static final long serialVersionUID = 5232453952276885070L;
508     }
509    
510 dl 1.104 static final class DelayedAccept<T> extends UniCompletion<Void> {
511     Consumer<? super T> fn;
512     DelayedAccept(Executor async, CompletableFuture<Void> dep,
513     CompletableFuture<?> src, Consumer<? super T> fn) {
514     super(async, dep, src); this.fn = fn;
515 dl 1.91 }
516 dl 1.104 final CompletableFuture<?> tryAct() {
517     CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
518     if ((d = dep) != null && (a = src) != null &&
519     (r = a.result) != null && claim(d)) {
520     nowAccept(async, d, r, fn);
521     src = null; fn = null;
522     if (d.result != null) return d;
523 dl 1.91 }
524 dl 1.104 return null;
525 dl 1.91 }
526     }
527    
528 dl 1.104 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
529     Executor e) {
530     if (fn == null) throw new NullPointerException();
531     CompletableFuture<Void> d = new CompletableFuture<Void>();
532     Object r = result;
533     if (r == null)
534     unipush(new DelayedAccept<T>(e, d, this, fn));
535     else
536     nowAccept(e, d, r, fn);
537     return d;
538 dl 1.28 }
539    
540 dl 1.104 // Runnable/run
541 dl 1.1
542 dl 1.104 static <T> void nowRun(Executor e, CompletableFuture<T> d, Object r,
543     Runnable f) {
544     if (d != null && f != null) {
545     Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
546     if (x == null) {
547     try {
548     if (e != null) {
549     e.execute(new AsyncRun<T>(d, f));
550     return;
551 dl 1.17 }
552 dl 1.104 f.run();
553     } catch (Throwable ex) {
554     x = ex;
555 dl 1.17 }
556 dl 1.1 }
557 dl 1.104 d.internalComplete(encodeOutcome(null, x));
558 dl 1.1 }
559     }
560    
561 dl 1.104 static final class AsyncRun<T> extends Async<T> {
562 jsr166 1.105 Runnable fn;
563 dl 1.104 AsyncRun(CompletableFuture<T> dep, Runnable fn) {
564     super(dep); this.fn = fn;
565 dl 1.7 }
566 jsr166 1.105 final void compute() { nowRun(null, dep, null, fn); }
567 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
568 dl 1.7 }
569    
570 dl 1.104 static final class DelayedRun extends UniCompletion<Void> {
571 jsr166 1.105 Runnable fn;
572 dl 1.104 DelayedRun(Executor async, CompletableFuture<Void> dep,
573     CompletableFuture<?> src, Runnable fn) {
574     super(async, dep, src); this.fn = fn;
575     }
576     final CompletableFuture<?> tryAct() {
577     CompletableFuture<Void> d; CompletableFuture<?> a; Object r;
578     if ((d = dep) != null && (a = src) != null &&
579     (r = a.result) != null && claim(d)) {
580     nowRun(async, d, r, fn);
581     src = null; fn = null; // clear refs
582     if (d.result != null) return d;
583 dl 1.1 }
584 dl 1.104 return null;
585 dl 1.1 }
586     }
587    
588 dl 1.104 private CompletableFuture<Void> doThenRun(Runnable fn, Executor e) {
589     if (fn == null) throw new NullPointerException();
590     CompletableFuture<Void> d = new CompletableFuture<Void>();
591     Object r = result;
592     if (r == null)
593     unipush(new DelayedRun(e, d, this, fn));
594     else
595     nowRun(e, d, r, fn);
596     return d;
597     }
598    
599     // Supplier/get
600    
601     static <T> void nowSupply(CompletableFuture<T> d, Supplier<T> f) {
602     if (d != null && f != null) {
603     T t; Throwable x;
604     try {
605     t = f.get();
606     x = null;
607     } catch (Throwable ex) {
608     x = ex;
609     t = null;
610 jsr166 1.2 }
611 dl 1.104 d.internalComplete(encodeOutcome(t, x));
612 jsr166 1.2 }
613 dl 1.1 }
614    
615 dl 1.104 static final class AsyncSupply<T> extends Async<T> {
616 jsr166 1.105 Supplier<T> fn;
617 dl 1.104 AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
618     super(dep); this.fn = fn;
619 dl 1.7 }
620 jsr166 1.105 final void compute() { nowSupply(dep, fn); }
621 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
622 dl 1.7 }
623    
624 dl 1.104 // WhenComplete
625    
626     static <T> void nowWhen(Executor e, CompletableFuture<T> d, Object r,
627     BiConsumer<? super T,? super Throwable> f) {
628     if (d != null && f != null) {
629     T t; Throwable x, dx;
630     if (r instanceof AltResult) {
631     t = null;
632     x = ((AltResult)r).ex;
633     }
634     else {
635     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
636     x = null;
637     }
638     try {
639     if (e != null) {
640     e.execute(new AsyncWhen<T>(d, r, f));
641     return;
642 jsr166 1.2 }
643 dl 1.104 f.accept(t, x);
644     dx = null;
645     } catch (Throwable ex) {
646     dx = ex;
647 jsr166 1.2 }
648 dl 1.104 d.internalComplete(encodeOutcome(t, x != null ? x : dx));
649 jsr166 1.2 }
650 dl 1.1 }
651    
652 dl 1.104 static final class AsyncWhen<T> extends Async<T> {
653 jsr166 1.105 Object arg; BiConsumer<? super T,? super Throwable> fn;
654 dl 1.104 AsyncWhen(CompletableFuture<T> dep, Object arg,
655     BiConsumer<? super T,? super Throwable> fn) {
656     super(dep); this.arg = arg; this.fn = fn;
657 dl 1.35 }
658 jsr166 1.105 final void compute() { nowWhen(null, dep, arg, fn); }
659 dl 1.35 private static final long serialVersionUID = 5232453952276885070L;
660     }
661    
662 dl 1.104 static final class DelayedWhen<T> extends UniCompletion<T> {
663     BiConsumer<? super T, ? super Throwable> fn;
664     DelayedWhen(Executor async, CompletableFuture<T> dep,
665     CompletableFuture<?> src,
666     BiConsumer<? super T, ? super Throwable> fn) {
667     super(async, dep, src); this.fn = fn;
668     }
669     final CompletableFuture<?> tryAct() {
670     CompletableFuture<T> d; CompletableFuture<?> a; Object r;
671     if ((d = dep) != null && (a = src) != null &&
672     (r = a.result) != null && claim(d)) {
673     nowWhen(async, d, r, fn);
674     src = null; fn = null;
675     if (d.result != null) return d;
676 jsr166 1.2 }
677 dl 1.104 return null;
678 jsr166 1.2 }
679 dl 1.1 }
680    
681 dl 1.104 private CompletableFuture<T> doWhenComplete(
682     BiConsumer<? super T, ? super Throwable> fn, Executor e) {
683     if (fn == null) throw new NullPointerException();
684     CompletableFuture<T> d = new CompletableFuture<T>();
685     Object r = result;
686     if (r == null)
687     unipush(new DelayedWhen<T>(e, d, this, fn));
688     else
689     nowWhen(e, d, r, fn);
690     return d;
691 dl 1.7 }
692    
693 dl 1.104 // Handle
694    
695     static <T,U> void nowHandle(Executor e, CompletableFuture<U> d, Object r,
696     BiFunction<? super T, Throwable, ? extends U> f) {
697     if (d != null && f != null) {
698     T t; U u; Throwable x, dx;
699     if (r instanceof AltResult) {
700     t = null;
701     x = ((AltResult)r).ex;
702     }
703     else {
704     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
705     x = null;
706     }
707     try {
708     if (e != null) {
709     e.execute(new AsyncCombine<T,Throwable,U>(d, t, x, f));
710     return;
711 dl 1.1 }
712 dl 1.104 u = f.apply(t, x);
713     dx = null;
714     } catch (Throwable ex) {
715     dx = ex;
716     u = null;
717 jsr166 1.2 }
718 dl 1.104 d.internalComplete(encodeOutcome(u, dx));
719 jsr166 1.2 }
720 dl 1.1 }
721    
722 dl 1.104 static final class DelayedHandle<T,U> extends UniCompletion<U> {
723     BiFunction<? super T, Throwable, ? extends U> fn;
724     DelayedHandle(Executor async, CompletableFuture<U> dep,
725     CompletableFuture<?> src,
726     BiFunction<? super T, Throwable, ? extends U> fn) {
727     super(async, dep, src); this.fn = fn;
728     }
729     final CompletableFuture<?> tryAct() {
730     CompletableFuture<U> d; CompletableFuture<?> a; Object r;
731     if ((d = dep) != null && (a = src) != null &&
732     (r = a.result) != null && claim(d)) {
733     nowHandle(async, d, r, fn);
734     src = null; fn = null;
735     if (d.result != null) return d;
736 dl 1.35 }
737 dl 1.104 return null;
738 dl 1.35 }
739     }
740    
741 dl 1.104 private <U> CompletableFuture<U> doHandle(
742     BiFunction<? super T, Throwable, ? extends U> fn,
743     Executor e) {
744     if (fn == null) throw new NullPointerException();
745     CompletableFuture<U> d = new CompletableFuture<U>();
746     Object r = result;
747     if (r == null)
748     unipush(new DelayedHandle<T,U>(e, d, this, fn));
749     else
750     nowHandle(e, d, r, fn);
751     return d;
752 dl 1.1 }
753    
754 dl 1.104 // Exceptionally
755    
756     static <T> void nowExceptionally(CompletableFuture<T> d, Object r,
757     Function<? super Throwable, ? extends T> f) {
758     if (d != null && f != null) {
759     T t; Throwable x, dx;
760     if ((r instanceof AltResult) && (x = ((AltResult)r).ex) != null) {
761     try {
762     t = f.apply(x);
763     dx = null;
764     } catch (Throwable ex) {
765     dx = ex;
766 dl 1.88 t = null;
767     }
768     }
769 dl 1.104 else {
770     @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
771     dx = null;
772     }
773     d.internalComplete(encodeOutcome(t, dx));
774 dl 1.88 }
775     }
776    
777 dl 1.104 static final class DelayedExceptionally<T> extends UniCompletion<T> {
778     Function<? super Throwable, ? extends T> fn;
779     DelayedExceptionally(CompletableFuture<T> dep, CompletableFuture<?> src,
780     Function<? super Throwable, ? extends T> fn) {
781     super(null, dep, src); this.fn = fn;
782     }
783     final CompletableFuture<?> tryAct() {
784     CompletableFuture<T> d; CompletableFuture<?> a; Object r;
785     if ((d = dep) != null && (a = src) != null &&
786     (r = a.result) != null && claim(d)) {
787     nowExceptionally(d, r, fn);
788     src = null; fn = null;
789     if (d.result != null) return d;
790 dl 1.17 }
791 dl 1.104 return null;
792 dl 1.17 }
793     }
794    
795 dl 1.104 private CompletableFuture<T> doExceptionally(
796     Function<Throwable, ? extends T> fn) {
797     if (fn == null) throw new NullPointerException();
798     CompletableFuture<T> d = new CompletableFuture<T>();
799     Object r = result;
800     if (r == null)
801     unipush(new DelayedExceptionally<T>(d, this, fn));
802     else
803     nowExceptionally(d, r, fn);
804     return d;
805 dl 1.75 }
806    
807 dl 1.104 // Identity function used by nowCompose and anyOf
808    
809     static <T> void nowCopy(CompletableFuture<T> d, Object r) {
810     if (d != null && d.result == null) {
811     Throwable x;
812     d.internalComplete(((r instanceof AltResult) &&
813     (x = ((AltResult)r).ex) != null &&
814     !(x instanceof CompletionException)) ?
815     new AltResult(new CompletionException(x)): r);
816 dl 1.17 }
817     }
818    
819 dl 1.104 static final class DelayedCopy<T> extends UniCompletion<T> {
820     DelayedCopy(CompletableFuture<T> dep, CompletableFuture<?> src) {
821     super(null, dep, src);
822     }
823     final CompletableFuture<?> tryAct() {
824     CompletableFuture<T> d; CompletableFuture<?> a; Object r;
825     if ((d = dep) != null && (a = src) != null &&
826     (r = a.result) != null && claim(d)) {
827     nowCopy(d, r);
828     src = null;
829     if (d.result != null) return d;
830 dl 1.28 }
831 dl 1.104 return null;
832 dl 1.28 }
833     }
834    
835 dl 1.104 // Compose
836 dl 1.28
837 dl 1.104 static <T,U> void nowCompose(Executor e, CompletableFuture<U> d, Object r,
838     Function<? super T, ? extends CompletionStage<U>> f) {
839     if (d != null && f != null) {
840     T t; Throwable x;
841 dl 1.88 if (r instanceof AltResult) {
842     t = null;
843 dl 1.104 x = ((AltResult)r).ex;
844 dl 1.88 }
845     else {
846 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
847     x = null;
848 dl 1.88 }
849 dl 1.104 if (x == null) {
850 dl 1.88 try {
851     if (e != null)
852 dl 1.104 e.execute(new AsyncCompose<T,U>(d, t, f));
853     else {
854     CompletableFuture<U> c =
855     f.apply(t).toCompletableFuture();
856     Object s = c.result;
857     if (s == null)
858     c.unipush(new DelayedCopy<U>(d, c));
859     else
860     nowCopy(d, s);
861     }
862     return;
863     } catch (Throwable ex) {
864     x = ex;
865 dl 1.88 }
866     }
867 dl 1.104 d.internalComplete(encodeOutcome(null, x));
868 dl 1.88 }
869 dl 1.28 }
870    
871 dl 1.104 static final class AsyncCompose<T,U> extends Async<U> {
872 jsr166 1.105 T arg; Function<? super T, ? extends CompletionStage<U>> fn;
873 dl 1.104 AsyncCompose(CompletableFuture<U> dep, T arg,
874     Function<? super T, ? extends CompletionStage<U>> fn) {
875     super(dep); this.arg = arg; this.fn = fn;
876     }
877 jsr166 1.105 final void compute() { nowCompose(null, dep, arg, fn); }
878 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
879     }
880    
881     static final class DelayedCompose<T,U> extends UniCompletion<U> {
882     Function<? super T, ? extends CompletionStage<U>> fn;
883     DelayedCompose(Executor async, CompletableFuture<U> dep,
884     CompletableFuture<?> src,
885     Function<? super T, ? extends CompletionStage<U>> fn) {
886     super(async, dep, src); this.fn = fn;
887     }
888     final CompletableFuture<?> tryAct() {
889     CompletableFuture<U> d; CompletableFuture<?> a; Object r;
890     if ((d = dep) != null && (a = src) != null &&
891     (r = a.result) != null && claim(d)) {
892     nowCompose(async, d, r, fn);
893     src = null; fn = null;
894     if (d.result != null) return d;
895     }
896     return null;
897     }
898     }
899    
900     private <U> CompletableFuture<U> doThenCompose(
901     Function<? super T, ? extends CompletionStage<U>> fn, Executor e) {
902 dl 1.88 if (fn == null) throw new NullPointerException();
903 dl 1.104 Object r = result;
904     if (r == null || e != null) {
905     CompletableFuture<U> d = new CompletableFuture<U>();
906     if (r == null)
907     unipush(new DelayedCompose<T,U>(e, d, this, fn));
908     else
909     nowCompose(e, d, r, fn);
910     return d;
911 dl 1.88 }
912 dl 1.104 else { // try to return function result
913     T t; Throwable x;
914 dl 1.88 if (r instanceof AltResult) {
915     t = null;
916 dl 1.104 x = ((AltResult)r).ex;
917 dl 1.88 }
918     else {
919 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
920     x = null;
921 dl 1.88 }
922 dl 1.104 if (x == null) {
923 dl 1.88 try {
924 dl 1.104 return fn.apply(t).toCompletableFuture();
925     } catch (Throwable ex) {
926     x = ex;
927 dl 1.88 }
928     }
929 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
930     d.result = encodeOutcome(null, x);
931     return d;
932 dl 1.88 }
933 dl 1.28 }
934    
935 dl 1.104 /* ------------- Two-source Completions -------------- */
936    
937     /** A Completion with two sources */
938 jsr166 1.106 abstract static class BiCompletion<T> extends UniCompletion<T> {
939 dl 1.104 CompletableFuture<?> snd; // second source for tryAct
940     BiCompletion(Executor async, CompletableFuture<T> dep,
941     CompletableFuture<?> src, CompletableFuture<?> snd) {
942     super(async, dep, src); this.snd = snd;
943     }
944     }
945    
946     /** A Completion delegating to a shared BiCompletion */
947     static final class CoBiCompletion<T> extends Completion<T> {
948     BiCompletion<T> completion;
949     CoBiCompletion(BiCompletion<T> completion) {
950     this.completion = completion;
951 dl 1.88 }
952 dl 1.104 final CompletableFuture<?> tryAct() {
953     BiCompletion<T> c;
954     return (c = completion) == null ? null : c.tryAct();
955 dl 1.88 }
956     }
957    
958 dl 1.104 /* ------------- Two-source Anded -------------- */
959    
960     /* Pushes c on to completions and o's completions unless both done. */
961     private <U> void bipushAnded(CompletableFuture<?> o, BiCompletion<U> c) {
962     if (c != null && o != null) {
963     Object r; CompletableFuture<?> d;
964     while ((r = result) == null &&
965     !casCompletions(c.next = completions, c))
966     c.next = null;
967     if (o.result == null) {
968     Completion<U> q = (r != null) ? c : new CoBiCompletion<U>(c);
969     while (o.result == null &&
970     !o.casCompletions(q.next = o.completions, q))
971     q.next = null;
972 dl 1.88 }
973 dl 1.104 if ((d = c.tryAct()) != null)
974     d.postComplete();
975     if (o.result != null)
976     o.postComplete();
977     if (result != null)
978     postComplete();
979 dl 1.88 }
980 dl 1.104 }
981    
982     // BiFunction/combine
983    
984     static <T,U,V> void nowCombine(Executor e, CompletableFuture<V> d,
985     Object r, Object s,
986     BiFunction<? super T,? super U,? extends V> f) {
987     if (d != null && f != null) {
988     T t; U u; V v; Throwable x;
989 dl 1.88 if (r instanceof AltResult) {
990     t = null;
991 dl 1.104 x = ((AltResult)r).ex;
992 dl 1.88 }
993     else {
994 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
995     x = null;
996 dl 1.88 }
997 dl 1.104 if (x != null)
998 dl 1.88 u = null;
999     else if (s instanceof AltResult) {
1000 dl 1.104 x = ((AltResult)s).ex;
1001 dl 1.88 u = null;
1002     }
1003     else {
1004 dl 1.104 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1005 dl 1.88 }
1006 dl 1.104 if (x == null) {
1007 dl 1.88 try {
1008 dl 1.104 if (e != null) {
1009     e.execute(new AsyncCombine<T,U,V>(d, t, u, f));
1010     return;
1011     }
1012     v = f.apply(t, u);
1013     } catch (Throwable ex) {
1014     x = ex;
1015     v = null;
1016 dl 1.88 }
1017     }
1018 dl 1.104 else
1019     v = null;
1020     d.internalComplete(encodeOutcome(v, x));
1021 dl 1.88 }
1022     }
1023    
1024 dl 1.104 static final class AsyncCombine<T,U,V> extends Async<V> {
1025 jsr166 1.105 T arg1; U arg2; BiFunction<? super T,? super U,? extends V> fn;
1026 dl 1.104 AsyncCombine(CompletableFuture<V> dep, T arg1, U arg2,
1027     BiFunction<? super T,? super U,? extends V> fn) {
1028     super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1029     }
1030 jsr166 1.105 final void compute() { nowCombine(null, dep, arg1, arg2, fn); }
1031 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
1032     }
1033    
1034     static final class DelayedCombine<T,U,V> extends BiCompletion<V> {
1035     BiFunction<? super T,? super U,? extends V> fn;
1036     DelayedCombine(Executor async, CompletableFuture<V> dep,
1037     CompletableFuture<?> src, CompletableFuture<?> snd,
1038     BiFunction<? super T,? super U,? extends V> fn) {
1039     super(async, dep, src, snd); this.fn = fn;
1040     }
1041     final CompletableFuture<?> tryAct() {
1042     CompletableFuture<V> d; CompletableFuture<?> a, b; Object r, s;
1043     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1044     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1045     nowCombine(async, d, r, s, fn);
1046     src = null; snd = null; fn = null;
1047     if (d.result != null) return d;
1048 dl 1.88 }
1049 dl 1.104 return null;
1050 dl 1.88 }
1051 dl 1.104 }
1052    
1053     private <U,V> CompletableFuture<V> doThenCombine(
1054     CompletableFuture<? extends U> o,
1055     BiFunction<? super T,? super U,? extends V> fn,
1056     Executor e) {
1057     if (o == null || fn == null) throw new NullPointerException();
1058     CompletableFuture<V> d = new CompletableFuture<V>();
1059     Object r = result, s = o.result;
1060     if (r == null || s == null)
1061     bipushAnded(o, new DelayedCombine<T,U,V>(e, d, this, o, fn));
1062     else
1063     nowCombine(e, d, r, s, fn);
1064     return d;
1065     }
1066    
1067     // BiConsumer/AcceptBoth
1068    
1069     static <T,U,V> void nowAcceptBoth(Executor e, CompletableFuture<V> d,
1070     Object r, Object s,
1071     BiConsumer<? super T,? super U> f) {
1072     if (d != null && f != null) {
1073     T t; U u; Throwable x;
1074 dl 1.88 if (r instanceof AltResult) {
1075     t = null;
1076 dl 1.104 x = ((AltResult)r).ex;
1077 dl 1.88 }
1078     else {
1079 dl 1.104 @SuppressWarnings("unchecked") T tr = (T) r; t = tr;
1080     x = null;
1081 dl 1.88 }
1082 dl 1.104 if (x != null)
1083 dl 1.88 u = null;
1084     else if (s instanceof AltResult) {
1085 dl 1.104 x = ((AltResult)s).ex;
1086 dl 1.88 u = null;
1087     }
1088     else {
1089 dl 1.104 @SuppressWarnings("unchecked") U us = (U) s; u = us;
1090 dl 1.88 }
1091 dl 1.104 if (x == null) {
1092 dl 1.88 try {
1093 dl 1.104 if (e != null) {
1094     e.execute(new AsyncAcceptBoth<T,U,V>(d, t, u, f));
1095     return;
1096     }
1097     f.accept(t, u);
1098     } catch (Throwable ex) {
1099     x = ex;
1100 dl 1.88 }
1101     }
1102 dl 1.104 d.internalComplete(encodeOutcome(null, x));
1103     }
1104     }
1105    
1106     static final class AsyncAcceptBoth<T,U,V> extends Async<V> {
1107 jsr166 1.105 T arg1; U arg2; BiConsumer<? super T,? super U> fn;
1108 dl 1.104 AsyncAcceptBoth(CompletableFuture<V> dep, T arg1, U arg2,
1109     BiConsumer<? super T,? super U> fn) {
1110     super(dep); this.arg1 = arg1; this.arg2 = arg2; this.fn = fn;
1111     }
1112 jsr166 1.105 final void compute() { nowAcceptBoth(null, dep, arg1, arg2, fn); }
1113 dl 1.104 private static final long serialVersionUID = 5232453952276885070L;
1114     }
1115    
1116     static final class DelayedAcceptBoth<T,U> extends BiCompletion<Void> {
1117     BiConsumer<? super T,? super U> fn;
1118     DelayedAcceptBoth(Executor async, CompletableFuture<Void> dep,
1119     CompletableFuture<?> src, CompletableFuture<?> snd,
1120     BiConsumer<? super T,? super U> fn) {
1121     super(async, dep, src, snd); this.fn = fn;
1122     }
1123     final CompletableFuture<?> tryAct() {
1124     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1125     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1126     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1127     nowAcceptBoth(async, d, r, s, fn);
1128     src = null; snd = null; fn = null;
1129     if (d.result != null) return d;
1130     }
1131     return null;
1132 dl 1.88 }
1133     }
1134    
1135 dl 1.104 private <U> CompletableFuture<Void> doThenAcceptBoth(
1136     CompletableFuture<? extends U> o,
1137     BiConsumer<? super T, ? super U> fn,
1138     Executor e) {
1139     if (o == null || fn == null) throw new NullPointerException();
1140     CompletableFuture<Void> d = new CompletableFuture<Void>();
1141     Object r = result, s = o.result;
1142     if (r == null || s == null)
1143     bipushAnded(o, new DelayedAcceptBoth<T,U>(e, d, this, o, fn));
1144     else
1145     nowAcceptBoth(e, d, r, s, fn);
1146     return d;
1147     }
1148    
1149     // Runnable/both
1150    
1151     static final class DelayedRunAfterBoth extends BiCompletion<Void> {
1152     Runnable fn;
1153     DelayedRunAfterBoth(Executor async, CompletableFuture<Void> dep,
1154     CompletableFuture<?> src, CompletableFuture<?> snd,
1155     Runnable fn) {
1156     super(async, dep, src, snd); this.fn = fn;
1157     }
1158     final CompletableFuture<?> tryAct() {
1159     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1160     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1161     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1162     Throwable x = (r instanceof AltResult) ?
1163     ((AltResult)r).ex : null;
1164     nowRun(async, d, (x == null) ? s : r, fn);
1165     src = null; snd = null; fn = null;
1166     if (d.result != null) return d;
1167 dl 1.88 }
1168 dl 1.104 return null;
1169 dl 1.88 }
1170 dl 1.104 }
1171    
1172     private CompletableFuture<Void> doRunAfterBoth(
1173     CompletableFuture<?> o, Runnable fn, Executor e) {
1174     if (o == null || fn == null) throw new NullPointerException();
1175     CompletableFuture<Void> d = new CompletableFuture<Void>();
1176     Object r = result, s = o.result;
1177     if (r == null || s == null)
1178     bipushAnded(o, new DelayedRunAfterBoth(e, d, this, o, fn));
1179 dl 1.101 else {
1180 dl 1.104 Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1181     nowRun(e, d, (x == null) ? s : r, fn);
1182     }
1183     return d;
1184     }
1185    
1186     // allOf
1187    
1188     static <T> void nowAnd(CompletableFuture<T> d, Object r, Object s) {
1189     if (d != null) {
1190     Throwable x = (r instanceof AltResult) ? ((AltResult)r).ex : null;
1191     if (x == null && (s instanceof AltResult))
1192     x = ((AltResult)s).ex;
1193     d.internalComplete(encodeOutcome(null, x));
1194 dl 1.88 }
1195     }
1196    
1197 dl 1.104 static final class DelayedAnd extends BiCompletion<Void> {
1198     DelayedAnd(CompletableFuture<Void> dep,
1199     CompletableFuture<?> src, CompletableFuture<?> snd) {
1200     super(null, dep, src, snd);
1201     }
1202     final CompletableFuture<?> tryAct() {
1203     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r, s;
1204     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1205     (r = a.result) != null && (s = b.result) != null && claim(d)) {
1206     nowAnd(d, r, s);
1207     src = null; snd = null;
1208     if (d.result != null) return d;
1209 dl 1.88 }
1210 dl 1.104 return null;
1211 dl 1.88 }
1212 dl 1.104 }
1213    
1214     /** Recursively constructs a tree of And completions */
1215     private static CompletableFuture<Void> doAllOf(CompletableFuture<?>[] cfs,
1216     int lo, int hi) {
1217     CompletableFuture<Void> d = new CompletableFuture<Void>();
1218     if (lo > hi) // empty
1219     d.result = NIL;
1220 dl 1.101 else {
1221 dl 1.104 int mid = (lo + hi) >>> 1;
1222     CompletableFuture<?> fst = (lo == mid ? cfs[lo] :
1223     doAllOf(cfs, lo, mid));
1224     CompletableFuture<?> snd = (lo == hi ? fst : // and fst with self
1225     (hi == mid+1) ? cfs[hi] :
1226     doAllOf(cfs, mid+1, hi));
1227     Object r = fst.result, s = snd.result; // throw NPE if null elements
1228     if (r == null || s == null) {
1229     DelayedAnd a = new DelayedAnd(d, fst, snd);
1230     if (fst == snd)
1231     fst.unipush(a);
1232     else
1233     fst.bipushAnded(snd, a);
1234 dl 1.88 }
1235 dl 1.104 else
1236     nowAnd(d, r, s);
1237 dl 1.88 }
1238 dl 1.104 return d;
1239 dl 1.88 }
1240    
1241 dl 1.104 /* ------------- Two-source Ored -------------- */
1242    
1243     /* Pushes c on to completions and o's completions unless either done. */
1244     private <U> void bipushOred(CompletableFuture<?> o, BiCompletion<U> c) {
1245     if (c != null && o != null) {
1246     CompletableFuture<?> d;
1247     while (o.result == null && result == null) {
1248     if (casCompletions(c.next = completions, c)) {
1249     CoBiCompletion<U> q = new CoBiCompletion<U>(c);
1250     while (result == null && o.result == null &&
1251     !o.casCompletions(q.next = o.completions, q))
1252     q.next = null;
1253     break;
1254 dl 1.88 }
1255 dl 1.104 c.next = null;
1256 dl 1.88 }
1257 dl 1.104 if ((d = c.tryAct()) != null)
1258     d.postComplete();
1259     if (o.result != null)
1260     o.postComplete();
1261     if (result != null)
1262     postComplete();
1263 dl 1.88 }
1264 dl 1.104 }
1265    
1266     // Function/applyEither
1267    
1268     static final class DelayedApplyToEither<T,U> extends BiCompletion<U> {
1269     Function<? super T,? extends U> fn;
1270     DelayedApplyToEither(Executor async, CompletableFuture<U> dep,
1271     CompletableFuture<?> src, CompletableFuture<?> snd,
1272     Function<? super T,? extends U> fn) {
1273     super(async, dep, src, snd); this.fn = fn;
1274     }
1275     final CompletableFuture<?> tryAct() {
1276     CompletableFuture<U> d; CompletableFuture<?> a, b; Object r;
1277     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1278     ((r = a.result) != null || (r = b.result) != null) &&
1279     claim(d)) {
1280     nowApply(async, d, r, fn);
1281     src = null; snd = null; fn = null;
1282     if (d.result != null) return d;
1283 dl 1.88 }
1284 dl 1.104 return null;
1285 dl 1.88 }
1286     }
1287    
1288 dl 1.104 private <U> CompletableFuture<U> doApplyToEither(
1289     CompletableFuture<? extends T> o,
1290     Function<? super T, U> fn, Executor e) {
1291     if (o == null || fn == null) throw new NullPointerException();
1292     CompletableFuture<U> d = new CompletableFuture<U>();
1293     Object r = result;
1294     if (r == null && (r = o.result) == null)
1295     bipushOred(o, new DelayedApplyToEither<T,U>(e, d, this, o, fn));
1296     else
1297     nowApply(e, d, r, fn);
1298     return d;
1299     }
1300    
1301     // Consumer/acceptEither
1302    
1303     static final class DelayedAcceptEither<T> extends BiCompletion<Void> {
1304     Consumer<? super T> fn;
1305     DelayedAcceptEither(Executor async, CompletableFuture<Void> dep,
1306     CompletableFuture<?> src, CompletableFuture<?> snd,
1307     Consumer<? super T> fn) {
1308     super(async, dep, src, snd); this.fn = fn;
1309     }
1310     final CompletableFuture<?> tryAct() {
1311     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1312     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1313     ((r = a.result) != null || (r = b.result) != null) &&
1314     claim(d)) {
1315     nowAccept(async, d, r, fn);
1316     src = null; snd = null; fn = null;
1317     if (d.result != null) return d;
1318 dl 1.88 }
1319 dl 1.104 return null;
1320 dl 1.88 }
1321 dl 1.104 }
1322    
1323     private CompletableFuture<Void> doAcceptEither(
1324     CompletableFuture<? extends T> o,
1325     Consumer<? super T> fn, Executor e) {
1326     if (o == null || fn == null) throw new NullPointerException();
1327     CompletableFuture<Void> d = new CompletableFuture<Void>();
1328     Object r = result;
1329     if (r == null && (r = o.result) == null)
1330     bipushOred(o, new DelayedAcceptEither<T>(e, d, this, o, fn));
1331     else
1332     nowAccept(e, d, r, fn);
1333     return d;
1334     }
1335    
1336     // Runnable/runEither
1337    
1338     static final class DelayedRunAfterEither extends BiCompletion<Void> {
1339     Runnable fn;
1340     DelayedRunAfterEither(Executor async, CompletableFuture<Void> dep,
1341     CompletableFuture<?> src,
1342     CompletableFuture<?> snd, Runnable fn) {
1343     super(async, dep, src, snd); this.fn = fn;
1344     }
1345     final CompletableFuture<?> tryAct() {
1346     CompletableFuture<Void> d; CompletableFuture<?> a, b; Object r;
1347     if ((d = dep) != null && (a = src) != null && (b = snd) != null &&
1348     ((r = a.result) != null || (r = b.result) != null) &&
1349     claim(d)) {
1350     nowRun(async, d, r, fn);
1351     src = null; snd = null; fn = null;
1352     if (d.result != null) return d;
1353 dl 1.88 }
1354 dl 1.104 return null;
1355 dl 1.88 }
1356     }
1357    
1358 dl 1.104 private CompletableFuture<Void> doRunAfterEither(
1359     CompletableFuture<?> o, Runnable fn, Executor e) {
1360     if (o == null || fn == null) throw new NullPointerException();
1361     CompletableFuture<Void> d = new CompletableFuture<Void>();
1362     Object r = result;
1363     if (r == null && (r = o.result) == null)
1364     bipushOred(o, new DelayedRunAfterEither(e, d, this, o, fn));
1365     else
1366     nowRun(e, d, r, fn);
1367     return d;
1368     }
1369    
1370     /* ------------- Signallers -------------- */
1371    
1372     /**
1373     * Heuristic spin value for waitingGet() before blocking on
1374     * multiprocessors
1375     */
1376     static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ?
1377     1 << 8 : 0);
1378    
1379     /**
1380     * Completion for recording and releasing a waiting thread. See
1381     * other classes such as Phaser and SynchronousQueue for more
1382     * detailed explanation. This class implements ManagedBlocker to
1383     * avoid starvation when blocking actions pile up in
1384     * ForkJoinPools.
1385     */
1386     static final class Signaller extends Completion<Void>
1387     implements ForkJoinPool.ManagedBlocker {
1388     long nanos; // wait time if timed
1389     final long deadline; // non-zero if timed
1390     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
1391     volatile Thread thread;
1392     Signaller(boolean interruptible, long nanos, long deadline) {
1393     this.thread = Thread.currentThread();
1394     this.interruptControl = interruptible ? 1 : 0;
1395     this.nanos = nanos;
1396     this.deadline = deadline;
1397     }
1398     final CompletableFuture<?> tryAct() {
1399     Thread w = thread;
1400     if (w != null) {
1401     thread = null; // no need to CAS
1402     LockSupport.unpark(w);
1403 dl 1.88 }
1404 dl 1.104 return null;
1405 dl 1.88 }
1406 dl 1.104 public boolean isReleasable() {
1407     if (thread == null)
1408     return true;
1409     if (Thread.interrupted()) {
1410     int i = interruptControl;
1411     interruptControl = -1;
1412     if (i > 0)
1413     return true;
1414 dl 1.88 }
1415 dl 1.104 if (deadline != 0L &&
1416     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
1417     thread = null;
1418     return true;
1419 dl 1.88 }
1420 dl 1.104 return false;
1421     }
1422     public boolean block() {
1423     if (isReleasable())
1424     return true;
1425     else if (deadline == 0L)
1426     LockSupport.park(this);
1427     else if (nanos > 0L)
1428     LockSupport.parkNanos(this, nanos);
1429     return isReleasable();
1430 dl 1.88 }
1431     }
1432    
1433 dl 1.104 /**
1434     * Returns raw result after waiting, or null if interruptible and
1435     * interrupted.
1436     */
1437     private Object waitingGet(boolean interruptible) {
1438     Signaller q = null;
1439     boolean queued = false;
1440     int spins = SPINS;
1441 dl 1.88 Object r;
1442 dl 1.104 while ((r = result) == null) {
1443     if (spins > 0) {
1444     if (ThreadLocalRandom.nextSecondarySeed() >= 0)
1445     --spins;
1446 dl 1.88 }
1447 dl 1.104 else if (q == null)
1448     q = new Signaller(interruptible, 0L, 0L);
1449     else if (!queued)
1450     queued = casCompletions(q.next = completions, q);
1451     else if (interruptible && q.interruptControl < 0) {
1452     q.thread = null;
1453     removeCancelledSignallers();
1454     return null;
1455 dl 1.88 }
1456 dl 1.104 else if (q.thread != null && result == null) {
1457     try {
1458     ForkJoinPool.managedBlock(q);
1459     } catch (InterruptedException ie) {
1460     q.interruptControl = -1;
1461     }
1462 dl 1.88 }
1463 dl 1.104 }
1464     if (q != null) {
1465     q.thread = null;
1466     if (q.interruptControl < 0) {
1467     if (interruptible)
1468     r = null; // report interruption
1469 dl 1.88 else
1470 dl 1.104 Thread.currentThread().interrupt();
1471 dl 1.88 }
1472     }
1473 dl 1.104 postComplete();
1474     return r;
1475 dl 1.88 }
1476    
1477 dl 1.104 /**
1478     * Returns raw result after waiting, or null if interrupted, or
1479     * throws TimeoutException on timeout.
1480     */
1481     private Object timedGet(long nanos) throws TimeoutException {
1482     if (Thread.interrupted())
1483     return null;
1484     if (nanos <= 0L)
1485     throw new TimeoutException();
1486     long d = System.nanoTime() + nanos;
1487     Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0
1488     boolean queued = false;
1489 dl 1.88 Object r;
1490 dl 1.104 while ((r = result) == null) {
1491     if (!queued)
1492     queued = casCompletions(q.next = completions, q);
1493     else if (q.interruptControl < 0 || q.nanos <= 0L) {
1494     q.thread = null;
1495     removeCancelledSignallers();
1496     if (q.interruptControl < 0)
1497     return null;
1498     throw new TimeoutException();
1499     }
1500     else if (q.thread != null && result == null) {
1501     try {
1502     ForkJoinPool.managedBlock(q);
1503     } catch (InterruptedException ie) {
1504     q.interruptControl = -1;
1505     }
1506 dl 1.88 }
1507     }
1508 dl 1.104 q.thread = null;
1509     postComplete();
1510     return (q.interruptControl < 0) ? null : r;
1511     }
1512    
1513     /**
1514     * Unlinks cancelled Signallers to avoid accumulating garbage.
1515     * Internal nodes are simply unspliced without CAS since it is
1516     * harmless if they are traversed anyway. To avoid effects of
1517     * unsplicing from already removed nodes, the list is retraversed
1518     * in case of an apparent race.
1519     */
1520     private void removeCancelledSignallers() {
1521     for (Completion<?> p = null, q = completions; q != null;) {
1522     Completion<?> s = q.next;
1523     if ((q instanceof Signaller) && ((Signaller)q).thread == null) {
1524     if (p != null) {
1525     p.next = s;
1526     if (!(p instanceof Signaller) ||
1527     ((Signaller)p).thread != null)
1528     break;
1529     }
1530     else if (casCompletions(q, s))
1531     break;
1532     p = null; // restart
1533     q = completions;
1534 dl 1.88 }
1535     else {
1536 dl 1.104 p = q;
1537     q = s;
1538 dl 1.88 }
1539     }
1540     }
1541    
1542 dl 1.104 /* ------------- public methods -------------- */
1543 dl 1.88
1544     /**
1545     * Creates a new incomplete CompletableFuture.
1546     */
1547     public CompletableFuture() {
1548     }
1549    
1550     /**
1551     * Returns a new CompletableFuture that is asynchronously completed
1552     * by a task running in the {@link ForkJoinPool#commonPool()} with
1553     * the value obtained by calling the given Supplier.
1554     *
1555     * @param supplier a function returning the value to be used
1556     * to complete the returned CompletableFuture
1557 jsr166 1.95 * @param <U> the function's return type
1558 dl 1.88 * @return the new CompletableFuture
1559     */
1560     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1561     if (supplier == null) throw new NullPointerException();
1562 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
1563     asyncPool.execute(new AsyncSupply<U>(d, supplier));
1564     return d;
1565 dl 1.88 }
1566    
1567     /**
1568     * Returns a new CompletableFuture that is asynchronously completed
1569     * by a task running in the given executor with the value obtained
1570     * by calling the given Supplier.
1571     *
1572     * @param supplier a function returning the value to be used
1573     * to complete the returned CompletableFuture
1574     * @param executor the executor to use for asynchronous execution
1575 jsr166 1.95 * @param <U> the function's return type
1576 dl 1.88 * @return the new CompletableFuture
1577     */
1578     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1579     Executor executor) {
1580     if (executor == null || supplier == null)
1581     throw new NullPointerException();
1582 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
1583     executor.execute(new AsyncSupply<U>(d, supplier));
1584     return d;
1585 dl 1.28 }
1586    
1587     /**
1588 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1589     * by a task running in the {@link ForkJoinPool#commonPool()} after
1590     * it runs the given action.
1591 dl 1.28 *
1592     * @param runnable the action to run before completing the
1593     * returned CompletableFuture
1594 jsr166 1.58 * @return the new CompletableFuture
1595 dl 1.28 */
1596     public static CompletableFuture<Void> runAsync(Runnable runnable) {
1597     if (runnable == null) throw new NullPointerException();
1598 dl 1.104 CompletableFuture<Void> d = new CompletableFuture<Void>();
1599     asyncPool.execute(new AsyncRun<Void>(d, runnable));
1600     return d;
1601 dl 1.28 }
1602    
1603     /**
1604 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1605     * by a task running in the given executor after it runs the given
1606     * action.
1607 dl 1.28 *
1608     * @param runnable the action to run before completing the
1609     * returned CompletableFuture
1610     * @param executor the executor to use for asynchronous execution
1611 jsr166 1.58 * @return the new CompletableFuture
1612 dl 1.28 */
1613     public static CompletableFuture<Void> runAsync(Runnable runnable,
1614     Executor executor) {
1615     if (executor == null || runnable == null)
1616     throw new NullPointerException();
1617 dl 1.104 CompletableFuture<Void> d = new CompletableFuture<Void>();
1618     executor.execute(new AsyncRun<Void>(d, runnable));
1619     return d;
1620 dl 1.28 }
1621    
1622     /**
1623 dl 1.77 * Returns a new CompletableFuture that is already completed with
1624     * the given value.
1625     *
1626     * @param value the value
1627 jsr166 1.95 * @param <U> the type of the value
1628 dl 1.77 * @return the completed CompletableFuture
1629     */
1630     public static <U> CompletableFuture<U> completedFuture(U value) {
1631 dl 1.104 CompletableFuture<U> d = new CompletableFuture<U>();
1632     d.result = (value == null) ? NIL : value;
1633     return d;
1634 dl 1.77 }
1635    
1636     /**
1637 dl 1.28 * Returns {@code true} if completed in any fashion: normally,
1638     * exceptionally, or via cancellation.
1639     *
1640     * @return {@code true} if completed
1641     */
1642     public boolean isDone() {
1643     return result != null;
1644     }
1645    
1646     /**
1647 dl 1.49 * Waits if necessary for this future to complete, and then
1648 dl 1.48 * returns its result.
1649 dl 1.28 *
1650 dl 1.48 * @return the result value
1651     * @throws CancellationException if this future was cancelled
1652     * @throws ExecutionException if this future completed exceptionally
1653 dl 1.28 * @throws InterruptedException if the current thread was interrupted
1654     * while waiting
1655     */
1656     public T get() throws InterruptedException, ExecutionException {
1657 jsr166 1.105 Object r;
1658 dl 1.104 return reportGet((r = result) == null ? waitingGet(true) : r);
1659 dl 1.28 }
1660    
1661     /**
1662 dl 1.49 * Waits if necessary for at most the given time for this future
1663     * to complete, and then returns its result, if available.
1664 dl 1.28 *
1665     * @param timeout the maximum time to wait
1666     * @param unit the time unit of the timeout argument
1667 dl 1.48 * @return the result value
1668     * @throws CancellationException if this future was cancelled
1669     * @throws ExecutionException if this future completed exceptionally
1670 dl 1.28 * @throws InterruptedException if the current thread was interrupted
1671     * while waiting
1672     * @throws TimeoutException if the wait timed out
1673     */
1674     public T get(long timeout, TimeUnit unit)
1675     throws InterruptedException, ExecutionException, TimeoutException {
1676 jsr166 1.105 Object r;
1677 dl 1.28 long nanos = unit.toNanos(timeout);
1678 dl 1.104 return reportGet((r = result) == null ? timedGet(nanos) : r);
1679 dl 1.28 }
1680    
1681     /**
1682     * Returns the result value when complete, or throws an
1683     * (unchecked) exception if completed exceptionally. To better
1684     * conform with the use of common functional forms, if a
1685     * computation involved in the completion of this
1686     * CompletableFuture threw an exception, this method throws an
1687     * (unchecked) {@link CompletionException} with the underlying
1688     * exception as its cause.
1689     *
1690     * @return the result value
1691     * @throws CancellationException if the computation was cancelled
1692 jsr166 1.55 * @throws CompletionException if this future completed
1693     * exceptionally or a completion computation threw an exception
1694 dl 1.28 */
1695     public T join() {
1696 dl 1.104 Object r;
1697 jsr166 1.105 return reportJoin((r = result) == null ? waitingGet(false) : r);
1698 dl 1.28 }
1699    
1700     /**
1701     * Returns the result value (or throws any encountered exception)
1702     * if completed, else returns the given valueIfAbsent.
1703     *
1704     * @param valueIfAbsent the value to return if not completed
1705     * @return the result value, if completed, else the given valueIfAbsent
1706     * @throws CancellationException if the computation was cancelled
1707 jsr166 1.55 * @throws CompletionException if this future completed
1708     * exceptionally or a completion computation threw an exception
1709 dl 1.28 */
1710     public T getNow(T valueIfAbsent) {
1711 dl 1.104 Object r;
1712 jsr166 1.106 return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
1713 dl 1.28 }
1714    
1715     /**
1716     * If not already completed, sets the value returned by {@link
1717     * #get()} and related methods to the given value.
1718     *
1719     * @param value the result value
1720     * @return {@code true} if this invocation caused this CompletableFuture
1721     * to transition to a completed state, else {@code false}
1722     */
1723     public boolean complete(T value) {
1724 dl 1.104 boolean triggered = internalComplete(value == null ? NIL : value);
1725     postComplete();
1726 dl 1.28 return triggered;
1727     }
1728    
1729     /**
1730     * If not already completed, causes invocations of {@link #get()}
1731     * and related methods to throw the given exception.
1732     *
1733     * @param ex the exception
1734     * @return {@code true} if this invocation caused this CompletableFuture
1735     * to transition to a completed state, else {@code false}
1736     */
1737     public boolean completeExceptionally(Throwable ex) {
1738     if (ex == null) throw new NullPointerException();
1739 dl 1.104 boolean triggered = internalComplete(new AltResult(ex));
1740     postComplete();
1741 dl 1.28 return triggered;
1742     }
1743    
1744 dl 1.104 public <U> CompletableFuture<U> thenApply(
1745     Function<? super T,? extends U> fn) {
1746 dl 1.28 return doThenApply(fn, null);
1747     }
1748    
1749 dl 1.104 public <U> CompletableFuture<U> thenApplyAsync(
1750     Function<? super T,? extends U> fn) {
1751     return doThenApply(fn, asyncPool);
1752 dl 1.17 }
1753    
1754 dl 1.104 public <U> CompletableFuture<U> thenApplyAsync(
1755     Function<? super T,? extends U> fn, Executor executor) {
1756 dl 1.28 if (executor == null) throw new NullPointerException();
1757     return doThenApply(fn, executor);
1758     }
1759 dl 1.1
1760 dl 1.104 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
1761 dl 1.88 return doThenAccept(action, null);
1762 dl 1.28 }
1763    
1764 dl 1.104 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
1765     return doThenAccept(action, asyncPool);
1766 dl 1.28 }
1767    
1768 dl 1.104 public CompletableFuture<Void> thenAcceptAsync(
1769     Consumer<? super T> action, Executor executor) {
1770 dl 1.28 if (executor == null) throw new NullPointerException();
1771 dl 1.88 return doThenAccept(action, executor);
1772 dl 1.7 }
1773    
1774 dl 1.104 public CompletableFuture<Void> thenRun(Runnable action) {
1775 dl 1.28 return doThenRun(action, null);
1776     }
1777    
1778 dl 1.104 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1779     return doThenRun(action, asyncPool);
1780 dl 1.28 }
1781    
1782 dl 1.104 public CompletableFuture<Void> thenRunAsync(
1783     Runnable action, Executor executor) {
1784 dl 1.28 if (executor == null) throw new NullPointerException();
1785     return doThenRun(action, executor);
1786     }
1787    
1788 dl 1.104 public <U,V> CompletableFuture<V> thenCombine(
1789     CompletionStage<? extends U> other,
1790     BiFunction<? super T,? super U,? extends V> fn) {
1791 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, null);
1792 dl 1.28 }
1793    
1794 dl 1.104 public <U,V> CompletableFuture<V> thenCombineAsync(
1795     CompletionStage<? extends U> other,
1796     BiFunction<? super T,? super U,? extends V> fn) {
1797     return doThenCombine(other.toCompletableFuture(), fn, asyncPool);
1798 dl 1.28 }
1799    
1800 dl 1.104 public <U,V> CompletableFuture<V> thenCombineAsync(
1801     CompletionStage<? extends U> other,
1802     BiFunction<? super T,? super U,? extends V> fn,
1803     Executor executor) {
1804 dl 1.28 if (executor == null) throw new NullPointerException();
1805 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, executor);
1806 dl 1.1 }
1807    
1808 dl 1.104 public <U> CompletableFuture<Void> thenAcceptBoth(
1809     CompletionStage<? extends U> other,
1810     BiConsumer<? super T, ? super U> action) {
1811 dl 1.88 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
1812 dl 1.28 }
1813    
1814 dl 1.104 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1815     CompletionStage<? extends U> other,
1816     BiConsumer<? super T, ? super U> action) {
1817     return doThenAcceptBoth(other.toCompletableFuture(), action, asyncPool);
1818 dl 1.28 }
1819    
1820 dl 1.104 public <U> CompletableFuture<Void> thenAcceptBothAsync(
1821     CompletionStage<? extends U> other,
1822     BiConsumer<? super T, ? super U> action,
1823     Executor executor) {
1824 dl 1.28 if (executor == null) throw new NullPointerException();
1825 dl 1.88 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
1826 dl 1.28 }
1827    
1828 dl 1.104 public CompletableFuture<Void> runAfterBoth(
1829     CompletionStage<?> other, Runnable action) {
1830 dl 1.88 return doRunAfterBoth(other.toCompletableFuture(), action, null);
1831 dl 1.7 }
1832    
1833 dl 1.104 public CompletableFuture<Void> runAfterBothAsync(
1834     CompletionStage<?> other, Runnable action) {
1835     return doRunAfterBoth(other.toCompletableFuture(), action, asyncPool);
1836 dl 1.28 }
1837    
1838 dl 1.104 public CompletableFuture<Void> runAfterBothAsync(
1839     CompletionStage<?> other, Runnable action, Executor executor) {
1840 dl 1.28 if (executor == null) throw new NullPointerException();
1841 dl 1.88 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
1842 dl 1.28 }
1843    
1844 dl 1.104 public <U> CompletableFuture<U> applyToEither(
1845     CompletionStage<? extends T> other, Function<? super T, U> fn) {
1846 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, null);
1847 dl 1.28 }
1848    
1849 dl 1.104 public <U> CompletableFuture<U> applyToEitherAsync(
1850     CompletionStage<? extends T> other, Function<? super T, U> fn) {
1851     return doApplyToEither(other.toCompletableFuture(), fn, asyncPool);
1852 dl 1.28 }
1853    
1854 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
1855 dl 1.104 (CompletionStage<? extends T> other, Function<? super T, U> fn,
1856 dl 1.48 Executor executor) {
1857 dl 1.28 if (executor == null) throw new NullPointerException();
1858 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, executor);
1859 dl 1.1 }
1860    
1861 dl 1.104 public CompletableFuture<Void> acceptEither(
1862     CompletionStage<? extends T> other, Consumer<? super T> action) {
1863 dl 1.88 return doAcceptEither(other.toCompletableFuture(), action, null);
1864 dl 1.28 }
1865    
1866 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
1867 dl 1.104 (CompletionStage<? extends T> other, Consumer<? super T> action) {
1868     return doAcceptEither(other.toCompletableFuture(), action, asyncPool);
1869 dl 1.28 }
1870    
1871 dl 1.104 public CompletableFuture<Void> acceptEitherAsync(
1872     CompletionStage<? extends T> other, Consumer<? super T> action,
1873     Executor executor) {
1874 dl 1.28 if (executor == null) throw new NullPointerException();
1875 dl 1.88 return doAcceptEither(other.toCompletableFuture(), action, executor);
1876 dl 1.7 }
1877    
1878 dl 1.104 public CompletableFuture<Void> runAfterEither(
1879     CompletionStage<?> other, Runnable action) {
1880 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, null);
1881 dl 1.28 }
1882    
1883 dl 1.104 public CompletableFuture<Void> runAfterEitherAsync(
1884     CompletionStage<?> other, Runnable action) {
1885     return doRunAfterEither(other.toCompletableFuture(), action, asyncPool);
1886 dl 1.28 }
1887    
1888 dl 1.104 public CompletableFuture<Void> runAfterEitherAsync(
1889     CompletionStage<?> other, Runnable action, Executor executor) {
1890 dl 1.28 if (executor == null) throw new NullPointerException();
1891 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, executor);
1892 dl 1.1 }
1893    
1894 dl 1.48 public <U> CompletableFuture<U> thenCompose
1895 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
1896 jsr166 1.81 return doThenCompose(fn, null);
1897 dl 1.37 }
1898    
1899 dl 1.104 public <U> CompletableFuture<U> thenComposeAsync(
1900     Function<? super T, ? extends CompletionStage<U>> fn) {
1901     return doThenCompose(fn, asyncPool);
1902 dl 1.37 }
1903    
1904 dl 1.104 public <U> CompletableFuture<U> thenComposeAsync(
1905     Function<? super T, ? extends CompletionStage<U>> fn,
1906     Executor executor) {
1907 dl 1.37 if (executor == null) throw new NullPointerException();
1908 jsr166 1.81 return doThenCompose(fn, executor);
1909 dl 1.37 }
1910    
1911 dl 1.104 public CompletableFuture<T> whenComplete(
1912     BiConsumer<? super T, ? super Throwable> action) {
1913 dl 1.88 return doWhenComplete(action, null);
1914     }
1915    
1916 dl 1.104 public CompletableFuture<T> whenCompleteAsync(
1917     BiConsumer<? super T, ? super Throwable> action) {
1918     return doWhenComplete(action, asyncPool);
1919 dl 1.88 }
1920    
1921 dl 1.104 public CompletableFuture<T> whenCompleteAsync(
1922     BiConsumer<? super T, ? super Throwable> action, Executor executor) {
1923 dl 1.88 if (executor == null) throw new NullPointerException();
1924     return doWhenComplete(action, executor);
1925     }
1926    
1927 dl 1.104 public <U> CompletableFuture<U> handle(
1928     BiFunction<? super T, Throwable, ? extends U> fn) {
1929 dl 1.88 return doHandle(fn, null);
1930     }
1931    
1932 dl 1.104 public <U> CompletableFuture<U> handleAsync(
1933     BiFunction<? super T, Throwable, ? extends U> fn) {
1934     return doHandle(fn, asyncPool);
1935 dl 1.88 }
1936    
1937 dl 1.104 public <U> CompletableFuture<U> handleAsync(
1938     BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
1939 dl 1.88 if (executor == null) throw new NullPointerException();
1940     return doHandle(fn, executor);
1941     }
1942    
1943     /**
1944     * Returns this CompletableFuture
1945     *
1946     * @return this CompletableFuture
1947     */
1948     public CompletableFuture<T> toCompletableFuture() {
1949     return this;
1950 dl 1.28 }
1951    
1952 dl 1.88 // not in interface CompletionStage
1953    
1954 dl 1.28 /**
1955 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
1956     * CompletableFuture completes, with the result of the given
1957     * function of the exception triggering this CompletableFuture's
1958     * completion when it completes exceptionally; otherwise, if this
1959     * CompletableFuture completes normally, then the returned
1960     * CompletableFuture also completes normally with the same value.
1961 dl 1.88 * Note: More flexible versions of this functionality are
1962     * available using methods {@code whenComplete} and {@code handle}.
1963 dl 1.28 *
1964     * @param fn the function to use to compute the value of the
1965     * returned CompletableFuture if this CompletableFuture completed
1966     * exceptionally
1967     * @return the new CompletableFuture
1968     */
1969 dl 1.104 public CompletableFuture<T> exceptionally(
1970     Function<Throwable, ? extends T> fn) {
1971     return doExceptionally(fn);
1972 dl 1.28 }
1973    
1974 dl 1.35 /* ------------- Arbitrary-arity constructions -------------- */
1975    
1976     /**
1977     * Returns a new CompletableFuture that is completed when all of
1978 jsr166 1.66 * the given CompletableFutures complete. If any of the given
1979 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
1980     * CompletableFuture also does so, with a CompletionException
1981     * holding this exception as its cause. Otherwise, the results,
1982     * if any, of the given CompletableFutures are not reflected in
1983     * the returned CompletableFuture, but may be obtained by
1984     * inspecting them individually. If no CompletableFutures are
1985     * provided, returns a CompletableFuture completed with the value
1986     * {@code null}.
1987 dl 1.35 *
1988     * <p>Among the applications of this method is to await completion
1989     * of a set of independent CompletableFutures before continuing a
1990     * program, as in: {@code CompletableFuture.allOf(c1, c2,
1991     * c3).join();}.
1992     *
1993     * @param cfs the CompletableFutures
1994 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
1995 dl 1.35 * given CompletableFutures complete
1996     * @throws NullPointerException if the array or any of its elements are
1997     * {@code null}
1998     */
1999     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2000 dl 1.104 return doAllOf(cfs, 0, cfs.length - 1);
2001 dl 1.35 }
2002    
2003     /**
2004 dl 1.76 * Returns a new CompletableFuture that is completed when any of
2005 jsr166 1.79 * the given CompletableFutures complete, with the same result.
2006     * Otherwise, if it completed exceptionally, the returned
2007 dl 1.77 * CompletableFuture also does so, with a CompletionException
2008     * holding this exception as its cause. If no CompletableFutures
2009     * are provided, returns an incomplete CompletableFuture.
2010 dl 1.35 *
2011     * @param cfs the CompletableFutures
2012 dl 1.77 * @return a new CompletableFuture that is completed with the
2013     * result or exception of any of the given CompletableFutures when
2014     * one completes
2015 dl 1.35 * @throws NullPointerException if the array or any of its elements are
2016     * {@code null}
2017     */
2018 dl 1.77 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2019 dl 1.104 CompletableFuture<Object> d = new CompletableFuture<Object>();
2020     for (int i = 0; i < cfs.length; ++i) {
2021     CompletableFuture<?> c = cfs[i];
2022     Object r = c.result; // throw NPE if null element
2023     if (d.result == null) {
2024     if (r == null)
2025     c.unipush(new DelayedCopy<Object>(d, c));
2026     else
2027     nowCopy(d, r);
2028 dl 1.77 }
2029 dl 1.35 }
2030 dl 1.104 return d;
2031 dl 1.35 }
2032    
2033     /* ------------- Control and status methods -------------- */
2034    
2035 dl 1.28 /**
2036 dl 1.37 * If not already completed, completes this CompletableFuture with
2037     * a {@link CancellationException}. Dependent CompletableFutures
2038     * that have not already completed will also complete
2039     * exceptionally, with a {@link CompletionException} caused by
2040     * this {@code CancellationException}.
2041 dl 1.28 *
2042     * @param mayInterruptIfRunning this value has no effect in this
2043     * implementation because interrupts are not used to control
2044     * processing.
2045     *
2046     * @return {@code true} if this task is now cancelled
2047     */
2048     public boolean cancel(boolean mayInterruptIfRunning) {
2049 dl 1.46 boolean cancelled = (result == null) &&
2050 dl 1.104 internalComplete(new AltResult(new CancellationException()));
2051     postComplete();
2052 dl 1.48 return cancelled || isCancelled();
2053 dl 1.28 }
2054    
2055     /**
2056     * Returns {@code true} if this CompletableFuture was cancelled
2057     * before it completed normally.
2058     *
2059     * @return {@code true} if this CompletableFuture was cancelled
2060     * before it completed normally
2061     */
2062     public boolean isCancelled() {
2063     Object r;
2064 jsr166 1.43 return ((r = result) instanceof AltResult) &&
2065     (((AltResult)r).ex instanceof CancellationException);
2066 dl 1.28 }
2067    
2068     /**
2069 dl 1.88 * Returns {@code true} if this CompletableFuture completed
2070 dl 1.91 * exceptionally, in any way. Possible causes include
2071     * cancellation, explicit invocation of {@code
2072     * completeExceptionally}, and abrupt termination of a
2073     * CompletionStage action.
2074 dl 1.88 *
2075     * @return {@code true} if this CompletableFuture completed
2076     * exceptionally
2077     */
2078     public boolean isCompletedExceptionally() {
2079 dl 1.91 Object r;
2080     return ((r = result) instanceof AltResult) && r != NIL;
2081 dl 1.88 }
2082    
2083     /**
2084 dl 1.28 * Forcibly sets or resets the value subsequently returned by
2085 jsr166 1.42 * method {@link #get()} and related methods, whether or not
2086     * already completed. This method is designed for use only in
2087     * error recovery actions, and even in such situations may result
2088     * in ongoing dependent completions using established versus
2089 dl 1.30 * overwritten outcomes.
2090 dl 1.28 *
2091     * @param value the completion value
2092     */
2093     public void obtrudeValue(T value) {
2094     result = (value == null) ? NIL : value;
2095 dl 1.104 postComplete();
2096 dl 1.28 }
2097    
2098 dl 1.30 /**
2099 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
2100     * and related methods to throw the given exception, whether or
2101     * not already completed. This method is designed for use only in
2102 dl 1.30 * recovery actions, and even in such situations may result in
2103     * ongoing dependent completions using established versus
2104     * overwritten outcomes.
2105     *
2106     * @param ex the exception
2107     */
2108     public void obtrudeException(Throwable ex) {
2109     if (ex == null) throw new NullPointerException();
2110     result = new AltResult(ex);
2111 dl 1.104 postComplete();
2112 dl 1.30 }
2113    
2114 dl 1.35 /**
2115     * Returns the estimated number of CompletableFutures whose
2116     * completions are awaiting completion of this CompletableFuture.
2117     * This method is designed for use in monitoring system state, not
2118     * for synchronization control.
2119     *
2120     * @return the number of dependent CompletableFutures
2121     */
2122     public int getNumberOfDependents() {
2123     int count = 0;
2124 dl 1.104 for (Completion<?> p = completions; p != null; p = p.next)
2125 dl 1.35 ++count;
2126     return count;
2127     }
2128    
2129     /**
2130     * Returns a string identifying this CompletableFuture, as well as
2131 jsr166 1.40 * its completion state. The state, in brackets, contains the
2132 dl 1.35 * String {@code "Completed Normally"} or the String {@code
2133     * "Completed Exceptionally"}, or the String {@code "Not
2134     * completed"} followed by the number of CompletableFutures
2135     * dependent upon its completion, if any.
2136     *
2137     * @return a string identifying this CompletableFuture, as well as its state
2138     */
2139     public String toString() {
2140     Object r = result;
2141 jsr166 1.40 int count;
2142     return super.toString() +
2143     ((r == null) ?
2144     (((count = getNumberOfDependents()) == 0) ?
2145     "[Not completed]" :
2146     "[Not completed, " + count + " dependents]") :
2147     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2148     "[Completed exceptionally]" :
2149     "[Completed normally]"));
2150 dl 1.35 }
2151    
2152 dl 1.1 // Unsafe mechanics
2153     private static final sun.misc.Unsafe UNSAFE;
2154     private static final long RESULT;
2155     private static final long COMPLETIONS;
2156     static {
2157     try {
2158     UNSAFE = sun.misc.Unsafe.getUnsafe();
2159     Class<?> k = CompletableFuture.class;
2160     RESULT = UNSAFE.objectFieldOffset
2161     (k.getDeclaredField("result"));
2162     COMPLETIONS = UNSAFE.objectFieldOffset
2163     (k.getDeclaredField("completions"));
2164 dl 1.104 } catch (Exception x) {
2165     throw new Error(x);
2166 dl 1.1 }
2167     }
2168     }