ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.92
Committed: Fri Jul 5 21:01:29 2013 UTC (10 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.91: +1 -1 lines
Log Message:
whitespace

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.88 import java.util.concurrent.CompletionException;
23     import java.util.concurrent.CompletionStage;
24 dl 1.1 import java.util.concurrent.atomic.AtomicInteger;
25     import java.util.concurrent.locks.LockSupport;
26    
27     /**
28     * A {@link Future} that may be explicitly completed (setting its
29 dl 1.88 * value and status), and may be used as a {@link CompletionStage},
30     * supporting dependent functions and actions that trigger upon its
31     * completion.
32 dl 1.1 *
33 jsr166 1.50 * <p>When two or more threads attempt to
34     * {@link #complete complete},
35 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
36 jsr166 1.50 * {@link #cancel cancel}
37     * a CompletableFuture, only one of them succeeds.
38 dl 1.19 *
39 dl 1.91 * <p>In addition to these and related methods for directly
40     * manipulating status and results, CompletableFuture implements
41     * interface {@link CompletionStage} with the following policies: <ul>
42 dl 1.35 *
43 dl 1.88 * <li>Actions supplied for dependent completions of
44     * <em>non-async</em> methods may be performed by the thread that
45     * completes the current CompletableFuture, or by any other caller of
46     * these methods.</li>
47 jsr166 1.65 *
48 dl 1.88 * <li>All <em>async</em> methods without an explicit Executor
49     * argument are performed using the {@link
50     * ForkJoinPool#commonPool()}. To simplify monitoring, debugging, and
51     * tracking, all generated asynchronous tasks are instances of the
52     * marker interface {@link AsynchronousCompletionTask}. </li>
53 dl 1.35 *
54 dl 1.88 * <li>All CompletionStage methods are implemented independently of
55     * other public methods, so the behavior of one method is not impacted
56     * by overrides of others in subclasses. </li> </ul>
57     *
58 dl 1.91 * <p>CompletableFuture also implements {@link Future} with the following
59 dl 1.88 * policies: <ul>
60     *
61     * <li>Since (unlike {@link FutureTask}) this class has no direct
62 jsr166 1.55 * control over the computation that causes it to be completed,
63 dl 1.88 * cancellation is treated as just another form of exceptional
64     * completion. Method {@link #cancel cancel} has the same effect as
65     * {@code completeExceptionally(new CancellationException())}. Method
66     * {@link #isCompletedExceptionally} can be used to determine if a
67     * CompletableFuture completed in any exceptional fashion.</li>
68 jsr166 1.55 *
69 dl 1.88 * <li>In case of exceptional completion with a CompletionException,
70 jsr166 1.55 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
71     * {@link ExecutionException} with the same cause as held in the
72 dl 1.88 * corresponding CompletionException. To simplify usage in most
73     * contexts, this class also defines methods {@link #join()} and
74     * {@link #getNow} that instead throw the CompletionException directly
75     * in these cases.</li> </ul>
76 jsr166 1.80 *
77 dl 1.1 * @author Doug Lea
78     * @since 1.8
79     */
80 dl 1.88 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
81 dl 1.28
82 dl 1.1 /*
83 dl 1.20 * Overview:
84 dl 1.1 *
85 jsr166 1.32 * 1. Non-nullness of field result (set via CAS) indicates done.
86     * An AltResult is used to box null as a result, as well as to
87     * hold exceptions. Using a single field makes completion fast
88 dl 1.20 * and simple to detect and trigger, at the expense of a lot of
89     * encoding and decoding that infiltrates many methods. One minor
90     * simplification relies on the (static) NIL (to box null results)
91     * being the only AltResult with a null exception field, so we
92 dl 1.28 * don't usually need explicit comparisons with NIL. The CF
93     * exception propagation mechanics surrounding decoding rely on
94     * unchecked casts of decoded results really being unchecked,
95     * where user type errors are caught at point of use, as is
96     * currently the case in Java. These are highlighted by using
97     * SuppressWarnings-annotated temporaries.
98 dl 1.1 *
99     * 2. Waiters are held in a Treiber stack similar to the one used
100 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
101 dl 1.28 * internal documentation for algorithmic details.
102 dl 1.1 *
103     * 3. Completions are also kept in a list/stack, and pulled off
104 dl 1.28 * and run when completion is triggered. (We could even use the
105 jsr166 1.24 * same stack as for waiters, but would give up the potential
106 dl 1.20 * parallelism obtained because woken waiters help release/run
107 dl 1.28 * others -- see method postComplete). Because post-processing
108     * may race with direct calls, class Completion opportunistically
109     * extends AtomicInteger so callers can claim the action via
110     * compareAndSet(0, 1). The Completion.run methods are all
111     * written a boringly similar uniform way (that sometimes includes
112 jsr166 1.55 * unnecessary-looking checks, kept to maintain uniformity).
113     * There are enough dimensions upon which they differ that
114     * attempts to factor commonalities while maintaining efficiency
115     * require more lines of code than they would save.
116 dl 1.20 *
117     * 4. The exported then/and/or methods do support a bit of
118 dl 1.28 * factoring (see doThenApply etc). They must cope with the
119     * intrinsic races surrounding addition of a dependent action
120     * versus performing the action directly because the task is
121     * already complete. For example, a CF may not be complete upon
122     * entry, so a dependent completion is added, but by the time it
123     * is added, the target CF is complete, so must be directly
124 dl 1.20 * executed. This is all done while avoiding unnecessary object
125     * construction in safe-bypass cases.
126 dl 1.1 */
127    
128 dl 1.28 // preliminaries
129 dl 1.20
130 dl 1.1 static final class AltResult {
131     final Throwable ex; // null only for NIL
132 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
133 dl 1.1 }
134    
135     static final AltResult NIL = new AltResult(null);
136    
137 dl 1.20 // Fields
138    
139     volatile Object result; // Either the result or boxed AltResult
140 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
141     volatile CompletionNode completions; // list (Treiber stack) of completions
142    
143 dl 1.20 // Basic utilities for triggering and processing completions
144    
145     /**
146     * Removes and signals all waiting threads and runs all completions.
147     */
148     final void postComplete() {
149     WaitNode q; Thread t;
150     while ((q = waiters) != null) {
151     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
152     (t = q.thread) != null) {
153     q.thread = null;
154     LockSupport.unpark(t);
155     }
156     }
157    
158     CompletionNode h; Completion c;
159     while ((h = completions) != null) {
160     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
161     (c = h.completion) != null)
162     c.run();
163     }
164     }
165    
166     /**
167     * Triggers completion with the encoding of the given arguments:
168     * if the exception is non-null, encodes it as a wrapped
169     * CompletionException unless it is one already. Otherwise uses
170     * the given result, boxed as NIL if null.
171     */
172 dl 1.75 final void internalComplete(T v, Throwable ex) {
173 dl 1.20 if (result == null)
174     UNSAFE.compareAndSwapObject
175     (this, RESULT, null,
176     (ex == null) ? (v == null) ? NIL : v :
177     new AltResult((ex instanceof CompletionException) ? ex :
178     new CompletionException(ex)));
179     postComplete(); // help out even if not triggered
180     }
181    
182     /**
183 jsr166 1.25 * If triggered, helps release and/or process completions.
184 dl 1.20 */
185     final void helpPostComplete() {
186     if (result != null)
187     postComplete();
188     }
189    
190 dl 1.28 /* ------------- waiting for completions -------------- */
191 dl 1.20
192 dl 1.35 /** Number of processors, for spin control */
193     static final int NCPU = Runtime.getRuntime().availableProcessors();
194    
195 dl 1.1 /**
196 dl 1.28 * Heuristic spin value for waitingGet() before blocking on
197     * multiprocessors
198 dl 1.1 */
199 dl 1.35 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
200 dl 1.1
201     /**
202 dl 1.28 * Linked nodes to record waiting threads in a Treiber stack. See
203     * other classes such as Phaser and SynchronousQueue for more
204     * detailed explanation. This class implements ManagedBlocker to
205     * avoid starvation when blocking actions pile up in
206     * ForkJoinPools.
207     */
208     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
209     long nanos; // wait time if timed
210     final long deadline; // non-zero if timed
211     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
212     volatile Thread thread;
213     volatile WaitNode next;
214     WaitNode(boolean interruptible, long nanos, long deadline) {
215     this.thread = Thread.currentThread();
216     this.interruptControl = interruptible ? 1 : 0;
217     this.nanos = nanos;
218     this.deadline = deadline;
219     }
220     public boolean isReleasable() {
221     if (thread == null)
222     return true;
223     if (Thread.interrupted()) {
224     int i = interruptControl;
225     interruptControl = -1;
226     if (i > 0)
227     return true;
228     }
229     if (deadline != 0L &&
230     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
231     thread = null;
232     return true;
233     }
234     return false;
235     }
236     public boolean block() {
237     if (isReleasable())
238     return true;
239     else if (deadline == 0L)
240     LockSupport.park(this);
241     else if (nanos > 0L)
242     LockSupport.parkNanos(this, nanos);
243     return isReleasable();
244     }
245 dl 1.1 }
246    
247     /**
248 dl 1.28 * Returns raw result after waiting, or null if interruptible and
249     * interrupted.
250 dl 1.1 */
251 dl 1.28 private Object waitingGet(boolean interruptible) {
252     WaitNode q = null;
253     boolean queued = false;
254 dl 1.35 int spins = SPINS;
255 dl 1.28 for (Object r;;) {
256     if ((r = result) != null) {
257     if (q != null) { // suppress unpark
258     q.thread = null;
259     if (q.interruptControl < 0) {
260     if (interruptible) {
261     removeWaiter(q);
262     return null;
263     }
264     Thread.currentThread().interrupt();
265     }
266     }
267     postComplete(); // help release others
268     return r;
269     }
270     else if (spins > 0) {
271 dl 1.35 int rnd = ThreadLocalRandom.nextSecondarySeed();
272     if (rnd == 0)
273     rnd = ThreadLocalRandom.current().nextInt();
274     if (rnd >= 0)
275 dl 1.28 --spins;
276     }
277     else if (q == null)
278     q = new WaitNode(interruptible, 0L, 0L);
279     else if (!queued)
280     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
281     q.next = waiters, q);
282     else if (interruptible && q.interruptControl < 0) {
283     removeWaiter(q);
284     return null;
285     }
286     else if (q.thread != null && result == null) {
287     try {
288     ForkJoinPool.managedBlock(q);
289 jsr166 1.31 } catch (InterruptedException ex) {
290 dl 1.28 q.interruptControl = -1;
291     }
292     }
293     }
294 dl 1.1 }
295    
296     /**
297 dl 1.28 * Awaits completion or aborts on interrupt or timeout.
298 dl 1.1 *
299 dl 1.28 * @param nanos time to wait
300     * @return raw result
301 dl 1.1 */
302 dl 1.28 private Object timedAwaitDone(long nanos)
303     throws InterruptedException, TimeoutException {
304     WaitNode q = null;
305     boolean queued = false;
306     for (Object r;;) {
307     if ((r = result) != null) {
308     if (q != null) {
309     q.thread = null;
310     if (q.interruptControl < 0) {
311     removeWaiter(q);
312     throw new InterruptedException();
313     }
314     }
315     postComplete();
316     return r;
317     }
318     else if (q == null) {
319     if (nanos <= 0L)
320     throw new TimeoutException();
321     long d = System.nanoTime() + nanos;
322 jsr166 1.31 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
323 dl 1.28 }
324     else if (!queued)
325     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
326     q.next = waiters, q);
327     else if (q.interruptControl < 0) {
328     removeWaiter(q);
329     throw new InterruptedException();
330     }
331     else if (q.nanos <= 0L) {
332     if (result == null) {
333     removeWaiter(q);
334     throw new TimeoutException();
335     }
336     }
337     else if (q.thread != null && result == null) {
338     try {
339     ForkJoinPool.managedBlock(q);
340 jsr166 1.31 } catch (InterruptedException ex) {
341 dl 1.28 q.interruptControl = -1;
342     }
343     }
344     }
345 dl 1.1 }
346    
347     /**
348 dl 1.28 * Tries to unlink a timed-out or interrupted wait node to avoid
349     * accumulating garbage. Internal nodes are simply unspliced
350     * without CAS since it is harmless if they are traversed anyway
351     * by releasers. To avoid effects of unsplicing from already
352     * removed nodes, the list is retraversed in case of an apparent
353     * race. This is slow when there are a lot of nodes, but we don't
354     * expect lists to be long enough to outweigh higher-overhead
355     * schemes.
356 dl 1.1 */
357 dl 1.28 private void removeWaiter(WaitNode node) {
358     if (node != null) {
359     node.thread = null;
360     retry:
361     for (;;) { // restart on removeWaiter race
362     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
363     s = q.next;
364     if (q.thread != null)
365     pred = q;
366     else if (pred != null) {
367     pred.next = s;
368     if (pred.thread == null) // check for race
369     continue retry;
370     }
371     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
372     continue retry;
373     }
374     break;
375     }
376     }
377 dl 1.1 }
378    
379 dl 1.28 /* ------------- Async tasks -------------- */
380    
381 dl 1.1 /**
382 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
383 dl 1.28 * {@code async} methods. This may be useful for monitoring,
384     * debugging, and tracking asynchronous activities.
385 jsr166 1.57 *
386     * @since 1.8
387 dl 1.1 */
388 dl 1.28 public static interface AsynchronousCompletionTask {
389 dl 1.1 }
390    
391 dl 1.28 /** Base class can act as either FJ or plain Runnable */
392 jsr166 1.33 abstract static class Async extends ForkJoinTask<Void>
393 dl 1.28 implements Runnable, AsynchronousCompletionTask {
394     public final Void getRawResult() { return null; }
395     public final void setRawResult(Void v) { }
396     public final void run() { exec(); }
397 jsr166 1.26 }
398    
399 dl 1.28 static final class AsyncRun extends Async {
400     final Runnable fn;
401     final CompletableFuture<Void> dst;
402     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
403     this.fn = fn; this.dst = dst;
404     }
405     public final boolean exec() {
406     CompletableFuture<Void> d; Throwable ex;
407 dl 1.29 if ((d = this.dst) != null && d.result == null) {
408 dl 1.28 try {
409     fn.run();
410     ex = null;
411     } catch (Throwable rex) {
412     ex = rex;
413     }
414     d.internalComplete(null, ex);
415 dl 1.19 }
416 dl 1.28 return true;
417 dl 1.19 }
418 dl 1.28 private static final long serialVersionUID = 5232453952276885070L;
419 dl 1.19 }
420    
421 dl 1.28 static final class AsyncSupply<U> extends Async {
422     final Supplier<U> fn;
423     final CompletableFuture<U> dst;
424     AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
425     this.fn = fn; this.dst = dst;
426     }
427     public final boolean exec() {
428     CompletableFuture<U> d; U u; Throwable ex;
429 dl 1.29 if ((d = this.dst) != null && d.result == null) {
430 dl 1.28 try {
431     u = fn.get();
432     ex = null;
433     } catch (Throwable rex) {
434     ex = rex;
435     u = null;
436     }
437     d.internalComplete(u, ex);
438 dl 1.1 }
439     return true;
440     }
441     private static final long serialVersionUID = 5232453952276885070L;
442     }
443    
444 dl 1.28 static final class AsyncApply<T,U> extends Async {
445 jsr166 1.63 final T arg;
446 dl 1.28 final Function<? super T,? extends U> fn;
447 dl 1.1 final CompletableFuture<U> dst;
448 dl 1.28 AsyncApply(T arg, Function<? super T,? extends U> fn,
449 dl 1.35 CompletableFuture<U> dst) {
450 dl 1.1 this.arg = arg; this.fn = fn; this.dst = dst;
451     }
452     public final boolean exec() {
453 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
454 dl 1.29 if ((d = this.dst) != null && d.result == null) {
455 dl 1.21 try {
456     u = fn.apply(arg);
457     ex = null;
458     } catch (Throwable rex) {
459     ex = rex;
460     u = null;
461     }
462     d.internalComplete(u, ex);
463 dl 1.1 }
464     return true;
465     }
466     private static final long serialVersionUID = 5232453952276885070L;
467     }
468    
469 jsr166 1.81 static final class AsyncCombine<T,U,V> extends Async {
470 dl 1.1 final T arg1;
471     final U arg2;
472 jsr166 1.63 final BiFunction<? super T,? super U,? extends V> fn;
473 dl 1.1 final CompletableFuture<V> dst;
474 jsr166 1.81 AsyncCombine(T arg1, U arg2,
475 dl 1.35 BiFunction<? super T,? super U,? extends V> fn,
476     CompletableFuture<V> dst) {
477 dl 1.1 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
478     }
479     public final boolean exec() {
480 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
481 dl 1.29 if ((d = this.dst) != null && d.result == null) {
482 dl 1.21 try {
483     v = fn.apply(arg1, arg2);
484     ex = null;
485     } catch (Throwable rex) {
486     ex = rex;
487     v = null;
488     }
489     d.internalComplete(v, ex);
490 dl 1.1 }
491     return true;
492     }
493     private static final long serialVersionUID = 5232453952276885070L;
494     }
495    
496 dl 1.28 static final class AsyncAccept<T> extends Async {
497 jsr166 1.63 final T arg;
498 dl 1.34 final Consumer<? super T> fn;
499 dl 1.88 final CompletableFuture<?> dst;
500 dl 1.34 AsyncAccept(T arg, Consumer<? super T> fn,
501 dl 1.88 CompletableFuture<?> dst) {
502 dl 1.7 this.arg = arg; this.fn = fn; this.dst = dst;
503     }
504     public final boolean exec() {
505 dl 1.88 CompletableFuture<?> d; Throwable ex;
506 dl 1.29 if ((d = this.dst) != null && d.result == null) {
507 dl 1.21 try {
508     fn.accept(arg);
509     ex = null;
510     } catch (Throwable rex) {
511     ex = rex;
512     }
513     d.internalComplete(null, ex);
514 dl 1.7 }
515     return true;
516     }
517     private static final long serialVersionUID = 5232453952276885070L;
518     }
519    
520 jsr166 1.81 static final class AsyncAcceptBoth<T,U> extends Async {
521 dl 1.7 final T arg1;
522     final U arg2;
523 jsr166 1.63 final BiConsumer<? super T,? super U> fn;
524 dl 1.88 final CompletableFuture<?> dst;
525 jsr166 1.81 AsyncAcceptBoth(T arg1, U arg2,
526     BiConsumer<? super T,? super U> fn,
527 dl 1.88 CompletableFuture<?> dst) {
528 dl 1.7 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
529     }
530     public final boolean exec() {
531 dl 1.88 CompletableFuture<?> d; Throwable ex;
532 dl 1.29 if ((d = this.dst) != null && d.result == null) {
533 dl 1.21 try {
534     fn.accept(arg1, arg2);
535     ex = null;
536     } catch (Throwable rex) {
537     ex = rex;
538     }
539     d.internalComplete(null, ex);
540 dl 1.7 }
541     return true;
542     }
543     private static final long serialVersionUID = 5232453952276885070L;
544     }
545    
546 dl 1.37 static final class AsyncCompose<T,U> extends Async {
547 jsr166 1.63 final T arg;
548 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
549 dl 1.37 final CompletableFuture<U> dst;
550     AsyncCompose(T arg,
551 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
552 dl 1.37 CompletableFuture<U> dst) {
553     this.arg = arg; this.fn = fn; this.dst = dst;
554     }
555     public final boolean exec() {
556     CompletableFuture<U> d, fr; U u; Throwable ex;
557     if ((d = this.dst) != null && d.result == null) {
558     try {
559 dl 1.88 CompletionStage<U> cs = fn.apply(arg);
560     fr = (cs == null) ? null : cs.toCompletableFuture();
561 jsr166 1.61 ex = (fr == null) ? new NullPointerException() : null;
562 dl 1.37 } catch (Throwable rex) {
563     ex = rex;
564     fr = null;
565     }
566     if (ex != null)
567     u = null;
568     else {
569     Object r = fr.result;
570 dl 1.67 if (r == null)
571     r = fr.waitingGet(false);
572 dl 1.37 if (r instanceof AltResult) {
573     ex = ((AltResult)r).ex;
574     u = null;
575     }
576     else {
577     @SuppressWarnings("unchecked") U ur = (U) r;
578     u = ur;
579     }
580     }
581     d.internalComplete(u, ex);
582     }
583     return true;
584     }
585     private static final long serialVersionUID = 5232453952276885070L;
586     }
587    
588 dl 1.91 static final class AsyncWhenComplete<T> extends Async {
589     final T arg1;
590     final Throwable arg2;
591     final BiConsumer<? super T,? super Throwable> fn;
592     final CompletableFuture<T> dst;
593     AsyncWhenComplete(T arg1, Throwable arg2,
594     BiConsumer<? super T,? super Throwable> fn,
595     CompletableFuture<T> dst) {
596     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
597     }
598     public final boolean exec() {
599 jsr166 1.92 CompletableFuture<T> d;
600 dl 1.91 if ((d = this.dst) != null && d.result == null) {
601     Throwable ex = arg2;
602     try {
603     fn.accept(arg1, ex);
604     } catch (Throwable rex) {
605     if (ex == null)
606     ex = rex;
607     }
608     d.internalComplete(arg1, ex);
609     }
610     return true;
611     }
612     private static final long serialVersionUID = 5232453952276885070L;
613     }
614    
615 dl 1.1 /* ------------- Completions -------------- */
616    
617 dl 1.28 /**
618     * Simple linked list nodes to record completions, used in
619     * basically the same way as WaitNodes. (We separate nodes from
620     * the Completions themselves mainly because for the And and Or
621     * methods, the same Completion object resides in two lists.)
622     */
623     static final class CompletionNode {
624     final Completion completion;
625     volatile CompletionNode next;
626     CompletionNode(Completion completion) { this.completion = completion; }
627     }
628    
629 dl 1.1 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
630 jsr166 1.33 abstract static class Completion extends AtomicInteger implements Runnable {
631 dl 1.1 }
632    
633 jsr166 1.81 static final class ThenApply<T,U> extends Completion {
634 jsr166 1.2 final CompletableFuture<? extends T> src;
635     final Function<? super T,? extends U> fn;
636     final CompletableFuture<U> dst;
637 dl 1.1 final Executor executor;
638 jsr166 1.81 ThenApply(CompletableFuture<? extends T> src,
639     Function<? super T,? extends U> fn,
640     CompletableFuture<U> dst,
641     Executor executor) {
642 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
643     this.executor = executor;
644     }
645 jsr166 1.26 public final void run() {
646 dl 1.28 final CompletableFuture<? extends T> a;
647     final Function<? super T,? extends U> fn;
648     final CompletableFuture<U> dst;
649 jsr166 1.2 Object r; T t; Throwable ex;
650     if ((dst = this.dst) != null &&
651 dl 1.1 (fn = this.fn) != null &&
652     (a = this.src) != null &&
653     (r = a.result) != null &&
654     compareAndSet(0, 1)) {
655 jsr166 1.2 if (r instanceof AltResult) {
656 dl 1.19 ex = ((AltResult)r).ex;
657 dl 1.1 t = null;
658     }
659 dl 1.17 else {
660     ex = null;
661 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
662     t = tr;
663 dl 1.1 }
664 dl 1.20 Executor e = executor;
665     U u = null;
666 dl 1.17 if (ex == null) {
667     try {
668 dl 1.20 if (e != null)
669 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
670 dl 1.17 else
671 dl 1.20 u = fn.apply(t);
672 dl 1.17 } catch (Throwable rex) {
673     ex = rex;
674     }
675     }
676 dl 1.20 if (e == null || ex != null)
677     dst.internalComplete(u, ex);
678 dl 1.1 }
679     }
680 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
681 dl 1.1 }
682    
683 jsr166 1.81 static final class ThenAccept<T> extends Completion {
684 dl 1.7 final CompletableFuture<? extends T> src;
685 dl 1.34 final Consumer<? super T> fn;
686 dl 1.88 final CompletableFuture<?> dst;
687 dl 1.7 final Executor executor;
688 jsr166 1.81 ThenAccept(CompletableFuture<? extends T> src,
689     Consumer<? super T> fn,
690 dl 1.88 CompletableFuture<?> dst,
691 jsr166 1.81 Executor executor) {
692 dl 1.7 this.src = src; this.fn = fn; this.dst = dst;
693     this.executor = executor;
694     }
695 jsr166 1.26 public final void run() {
696 dl 1.28 final CompletableFuture<? extends T> a;
697 dl 1.34 final Consumer<? super T> fn;
698 dl 1.88 final CompletableFuture<?> dst;
699 dl 1.7 Object r; T t; Throwable ex;
700     if ((dst = this.dst) != null &&
701     (fn = this.fn) != null &&
702     (a = this.src) != null &&
703     (r = a.result) != null &&
704     compareAndSet(0, 1)) {
705     if (r instanceof AltResult) {
706 dl 1.19 ex = ((AltResult)r).ex;
707 dl 1.7 t = null;
708     }
709 dl 1.17 else {
710     ex = null;
711 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
712     t = tr;
713 dl 1.17 }
714 dl 1.20 Executor e = executor;
715 dl 1.17 if (ex == null) {
716     try {
717 dl 1.20 if (e != null)
718 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
719 dl 1.20 else
720 dl 1.17 fn.accept(t);
721     } catch (Throwable rex) {
722     ex = rex;
723 dl 1.7 }
724     }
725 dl 1.20 if (e == null || ex != null)
726     dst.internalComplete(null, ex);
727 dl 1.7 }
728     }
729 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
730 dl 1.7 }
731    
732 jsr166 1.82 static final class ThenRun extends Completion {
733     final CompletableFuture<?> src;
734 jsr166 1.2 final Runnable fn;
735     final CompletableFuture<Void> dst;
736 dl 1.1 final Executor executor;
737 jsr166 1.82 ThenRun(CompletableFuture<?> src,
738 jsr166 1.81 Runnable fn,
739     CompletableFuture<Void> dst,
740     Executor executor) {
741 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
742     this.executor = executor;
743     }
744 jsr166 1.26 public final void run() {
745 jsr166 1.82 final CompletableFuture<?> a;
746 dl 1.28 final Runnable fn;
747     final CompletableFuture<Void> dst;
748 jsr166 1.2 Object r; Throwable ex;
749     if ((dst = this.dst) != null &&
750 dl 1.1 (fn = this.fn) != null &&
751     (a = this.src) != null &&
752     (r = a.result) != null &&
753     compareAndSet(0, 1)) {
754 dl 1.19 if (r instanceof AltResult)
755     ex = ((AltResult)r).ex;
756 dl 1.17 else
757     ex = null;
758 dl 1.20 Executor e = executor;
759 dl 1.17 if (ex == null) {
760     try {
761 dl 1.20 if (e != null)
762 dl 1.28 e.execute(new AsyncRun(fn, dst));
763 dl 1.20 else
764 dl 1.17 fn.run();
765     } catch (Throwable rex) {
766     ex = rex;
767 dl 1.1 }
768     }
769 dl 1.20 if (e == null || ex != null)
770     dst.internalComplete(null, ex);
771 dl 1.1 }
772     }
773 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
774 dl 1.1 }
775    
776 jsr166 1.81 static final class ThenCombine<T,U,V> extends Completion {
777 jsr166 1.2 final CompletableFuture<? extends T> src;
778     final CompletableFuture<? extends U> snd;
779     final BiFunction<? super T,? super U,? extends V> fn;
780     final CompletableFuture<V> dst;
781 dl 1.1 final Executor executor;
782 jsr166 1.81 ThenCombine(CompletableFuture<? extends T> src,
783     CompletableFuture<? extends U> snd,
784     BiFunction<? super T,? super U,? extends V> fn,
785     CompletableFuture<V> dst,
786     Executor executor) {
787 dl 1.1 this.src = src; this.snd = snd;
788     this.fn = fn; this.dst = dst;
789     this.executor = executor;
790     }
791 jsr166 1.26 public final void run() {
792 dl 1.28 final CompletableFuture<? extends T> a;
793     final CompletableFuture<? extends U> b;
794     final BiFunction<? super T,? super U,? extends V> fn;
795     final CompletableFuture<V> dst;
796 dl 1.85 Object r, s; T t; U u; Throwable ex;
797     if ((dst = this.dst) != null &&
798     (fn = this.fn) != null &&
799     (a = this.src) != null &&
800     (r = a.result) != null &&
801     (b = this.snd) != null &&
802     (s = b.result) != null &&
803     compareAndSet(0, 1)) {
804     if (r instanceof AltResult) {
805     ex = ((AltResult)r).ex;
806     t = null;
807     }
808     else {
809     ex = null;
810     @SuppressWarnings("unchecked") T tr = (T) r;
811     t = tr;
812     }
813     if (ex != null)
814     u = null;
815     else if (s instanceof AltResult) {
816     ex = ((AltResult)s).ex;
817     u = null;
818     }
819     else {
820     @SuppressWarnings("unchecked") U us = (U) s;
821     u = us;
822     }
823 dl 1.20 Executor e = executor;
824     V v = null;
825 dl 1.19 if (ex == null) {
826     try {
827 dl 1.20 if (e != null)
828 jsr166 1.81 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
829 dl 1.19 else
830 dl 1.20 v = fn.apply(t, u);
831 dl 1.19 } catch (Throwable rex) {
832     ex = rex;
833     }
834 dl 1.1 }
835 dl 1.20 if (e == null || ex != null)
836     dst.internalComplete(v, ex);
837 jsr166 1.2 }
838     }
839 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
840 dl 1.1 }
841    
842 jsr166 1.81 static final class ThenAcceptBoth<T,U> extends Completion {
843 dl 1.7 final CompletableFuture<? extends T> src;
844     final CompletableFuture<? extends U> snd;
845 dl 1.34 final BiConsumer<? super T,? super U> fn;
846 dl 1.7 final CompletableFuture<Void> dst;
847     final Executor executor;
848 jsr166 1.81 ThenAcceptBoth(CompletableFuture<? extends T> src,
849     CompletableFuture<? extends U> snd,
850     BiConsumer<? super T,? super U> fn,
851     CompletableFuture<Void> dst,
852     Executor executor) {
853 dl 1.7 this.src = src; this.snd = snd;
854     this.fn = fn; this.dst = dst;
855     this.executor = executor;
856     }
857 jsr166 1.26 public final void run() {
858 dl 1.28 final CompletableFuture<? extends T> a;
859     final CompletableFuture<? extends U> b;
860 dl 1.34 final BiConsumer<? super T,? super U> fn;
861 dl 1.28 final CompletableFuture<Void> dst;
862 dl 1.85 Object r, s; T t; U u; Throwable ex;
863     if ((dst = this.dst) != null &&
864     (fn = this.fn) != null &&
865     (a = this.src) != null &&
866     (r = a.result) != null &&
867     (b = this.snd) != null &&
868     (s = b.result) != null &&
869     compareAndSet(0, 1)) {
870     if (r instanceof AltResult) {
871     ex = ((AltResult)r).ex;
872     t = null;
873     }
874     else {
875     ex = null;
876     @SuppressWarnings("unchecked") T tr = (T) r;
877     t = tr;
878     }
879     if (ex != null)
880     u = null;
881     else if (s instanceof AltResult) {
882     ex = ((AltResult)s).ex;
883     u = null;
884     }
885     else {
886     @SuppressWarnings("unchecked") U us = (U) s;
887     u = us;
888     }
889 dl 1.20 Executor e = executor;
890 dl 1.19 if (ex == null) {
891     try {
892 dl 1.20 if (e != null)
893 jsr166 1.81 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
894 dl 1.20 else
895 dl 1.19 fn.accept(t, u);
896     } catch (Throwable rex) {
897     ex = rex;
898 dl 1.7 }
899     }
900 dl 1.20 if (e == null || ex != null)
901     dst.internalComplete(null, ex);
902 dl 1.7 }
903     }
904 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
905 dl 1.7 }
906    
907 jsr166 1.82 static final class RunAfterBoth extends Completion {
908     final CompletableFuture<?> src;
909 jsr166 1.2 final CompletableFuture<?> snd;
910     final Runnable fn;
911     final CompletableFuture<Void> dst;
912 dl 1.1 final Executor executor;
913 jsr166 1.82 RunAfterBoth(CompletableFuture<?> src,
914 jsr166 1.81 CompletableFuture<?> snd,
915     Runnable fn,
916     CompletableFuture<Void> dst,
917     Executor executor) {
918 dl 1.1 this.src = src; this.snd = snd;
919     this.fn = fn; this.dst = dst;
920     this.executor = executor;
921     }
922 jsr166 1.26 public final void run() {
923 jsr166 1.82 final CompletableFuture<?> a;
924 dl 1.1 final CompletableFuture<?> b;
925     final Runnable fn;
926     final CompletableFuture<Void> dst;
927 dl 1.85 Object r, s; Throwable ex;
928     if ((dst = this.dst) != null &&
929     (fn = this.fn) != null &&
930     (a = this.src) != null &&
931     (r = a.result) != null &&
932     (b = this.snd) != null &&
933     (s = b.result) != null &&
934     compareAndSet(0, 1)) {
935     if (r instanceof AltResult)
936     ex = ((AltResult)r).ex;
937     else
938     ex = null;
939     if (ex == null && (s instanceof AltResult))
940     ex = ((AltResult)s).ex;
941 dl 1.20 Executor e = executor;
942 dl 1.19 if (ex == null) {
943     try {
944 dl 1.20 if (e != null)
945 dl 1.28 e.execute(new AsyncRun(fn, dst));
946 dl 1.20 else
947 dl 1.19 fn.run();
948     } catch (Throwable rex) {
949     ex = rex;
950 dl 1.1 }
951 jsr166 1.2 }
952 dl 1.20 if (e == null || ex != null)
953     dst.internalComplete(null, ex);
954 jsr166 1.2 }
955     }
956 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
957 dl 1.1 }
958    
959 dl 1.35 static final class AndCompletion extends Completion {
960     final CompletableFuture<?> src;
961     final CompletableFuture<?> snd;
962     final CompletableFuture<Void> dst;
963     AndCompletion(CompletableFuture<?> src,
964     CompletableFuture<?> snd,
965     CompletableFuture<Void> dst) {
966     this.src = src; this.snd = snd; this.dst = dst;
967     }
968     public final void run() {
969     final CompletableFuture<?> a;
970     final CompletableFuture<?> b;
971     final CompletableFuture<Void> dst;
972     Object r, s; Throwable ex;
973     if ((dst = this.dst) != null &&
974     (a = this.src) != null &&
975     (r = a.result) != null &&
976     (b = this.snd) != null &&
977     (s = b.result) != null &&
978     compareAndSet(0, 1)) {
979     if (r instanceof AltResult)
980     ex = ((AltResult)r).ex;
981     else
982     ex = null;
983     if (ex == null && (s instanceof AltResult))
984     ex = ((AltResult)s).ex;
985     dst.internalComplete(null, ex);
986     }
987     }
988     private static final long serialVersionUID = 5232453952276885070L;
989     }
990    
991 jsr166 1.81 static final class ApplyToEither<T,U> extends Completion {
992 jsr166 1.2 final CompletableFuture<? extends T> src;
993     final CompletableFuture<? extends T> snd;
994     final Function<? super T,? extends U> fn;
995     final CompletableFuture<U> dst;
996 dl 1.1 final Executor executor;
997 jsr166 1.81 ApplyToEither(CompletableFuture<? extends T> src,
998     CompletableFuture<? extends T> snd,
999     Function<? super T,? extends U> fn,
1000     CompletableFuture<U> dst,
1001     Executor executor) {
1002 dl 1.1 this.src = src; this.snd = snd;
1003     this.fn = fn; this.dst = dst;
1004     this.executor = executor;
1005     }
1006 jsr166 1.26 public final void run() {
1007 dl 1.28 final CompletableFuture<? extends T> a;
1008     final CompletableFuture<? extends T> b;
1009     final Function<? super T,? extends U> fn;
1010     final CompletableFuture<U> dst;
1011 jsr166 1.2 Object r; T t; Throwable ex;
1012     if ((dst = this.dst) != null &&
1013 dl 1.1 (fn = this.fn) != null &&
1014     (((a = this.src) != null && (r = a.result) != null) ||
1015     ((b = this.snd) != null && (r = b.result) != null)) &&
1016     compareAndSet(0, 1)) {
1017 jsr166 1.2 if (r instanceof AltResult) {
1018 dl 1.19 ex = ((AltResult)r).ex;
1019 jsr166 1.2 t = null;
1020     }
1021 dl 1.19 else {
1022     ex = null;
1023 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1024     t = tr;
1025 dl 1.1 }
1026 dl 1.20 Executor e = executor;
1027     U u = null;
1028 dl 1.19 if (ex == null) {
1029     try {
1030 dl 1.20 if (e != null)
1031 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
1032 dl 1.19 else
1033 dl 1.20 u = fn.apply(t);
1034 dl 1.19 } catch (Throwable rex) {
1035     ex = rex;
1036     }
1037     }
1038 dl 1.20 if (e == null || ex != null)
1039     dst.internalComplete(u, ex);
1040 jsr166 1.2 }
1041     }
1042 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1043 dl 1.1 }
1044    
1045 jsr166 1.81 static final class AcceptEither<T> extends Completion {
1046 dl 1.7 final CompletableFuture<? extends T> src;
1047     final CompletableFuture<? extends T> snd;
1048 dl 1.34 final Consumer<? super T> fn;
1049 dl 1.7 final CompletableFuture<Void> dst;
1050     final Executor executor;
1051 jsr166 1.81 AcceptEither(CompletableFuture<? extends T> src,
1052     CompletableFuture<? extends T> snd,
1053     Consumer<? super T> fn,
1054     CompletableFuture<Void> dst,
1055     Executor executor) {
1056 dl 1.7 this.src = src; this.snd = snd;
1057     this.fn = fn; this.dst = dst;
1058     this.executor = executor;
1059     }
1060 jsr166 1.26 public final void run() {
1061 dl 1.28 final CompletableFuture<? extends T> a;
1062     final CompletableFuture<? extends T> b;
1063 dl 1.34 final Consumer<? super T> fn;
1064 dl 1.28 final CompletableFuture<Void> dst;
1065 dl 1.7 Object r; T t; Throwable ex;
1066     if ((dst = this.dst) != null &&
1067     (fn = this.fn) != null &&
1068     (((a = this.src) != null && (r = a.result) != null) ||
1069     ((b = this.snd) != null && (r = b.result) != null)) &&
1070     compareAndSet(0, 1)) {
1071     if (r instanceof AltResult) {
1072 dl 1.19 ex = ((AltResult)r).ex;
1073 dl 1.7 t = null;
1074     }
1075 dl 1.19 else {
1076     ex = null;
1077 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1078     t = tr;
1079 dl 1.19 }
1080 dl 1.20 Executor e = executor;
1081 dl 1.19 if (ex == null) {
1082     try {
1083 dl 1.20 if (e != null)
1084 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
1085 dl 1.20 else
1086 dl 1.19 fn.accept(t);
1087     } catch (Throwable rex) {
1088     ex = rex;
1089 dl 1.7 }
1090     }
1091 dl 1.20 if (e == null || ex != null)
1092     dst.internalComplete(null, ex);
1093 dl 1.7 }
1094     }
1095 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1096 dl 1.7 }
1097    
1098 jsr166 1.82 static final class RunAfterEither extends Completion {
1099     final CompletableFuture<?> src;
1100 jsr166 1.2 final CompletableFuture<?> snd;
1101     final Runnable fn;
1102     final CompletableFuture<Void> dst;
1103 dl 1.1 final Executor executor;
1104 jsr166 1.82 RunAfterEither(CompletableFuture<?> src,
1105 jsr166 1.81 CompletableFuture<?> snd,
1106     Runnable fn,
1107     CompletableFuture<Void> dst,
1108     Executor executor) {
1109 dl 1.1 this.src = src; this.snd = snd;
1110     this.fn = fn; this.dst = dst;
1111     this.executor = executor;
1112     }
1113 jsr166 1.26 public final void run() {
1114 jsr166 1.82 final CompletableFuture<?> a;
1115 dl 1.1 final CompletableFuture<?> b;
1116     final Runnable fn;
1117     final CompletableFuture<Void> dst;
1118 dl 1.28 Object r; Throwable ex;
1119 jsr166 1.2 if ((dst = this.dst) != null &&
1120 dl 1.1 (fn = this.fn) != null &&
1121     (((a = this.src) != null && (r = a.result) != null) ||
1122     ((b = this.snd) != null && (r = b.result) != null)) &&
1123     compareAndSet(0, 1)) {
1124 dl 1.19 if (r instanceof AltResult)
1125     ex = ((AltResult)r).ex;
1126     else
1127     ex = null;
1128 dl 1.20 Executor e = executor;
1129 dl 1.19 if (ex == null) {
1130 dl 1.1 try {
1131 dl 1.20 if (e != null)
1132 dl 1.28 e.execute(new AsyncRun(fn, dst));
1133 dl 1.20 else
1134 dl 1.1 fn.run();
1135     } catch (Throwable rex) {
1136 dl 1.19 ex = rex;
1137 dl 1.1 }
1138     }
1139 dl 1.20 if (e == null || ex != null)
1140     dst.internalComplete(null, ex);
1141 jsr166 1.2 }
1142     }
1143 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1144 dl 1.1 }
1145    
1146 dl 1.35 static final class OrCompletion extends Completion {
1147     final CompletableFuture<?> src;
1148     final CompletableFuture<?> snd;
1149 dl 1.77 final CompletableFuture<Object> dst;
1150 dl 1.35 OrCompletion(CompletableFuture<?> src,
1151     CompletableFuture<?> snd,
1152 dl 1.77 CompletableFuture<Object> dst) {
1153 dl 1.35 this.src = src; this.snd = snd; this.dst = dst;
1154     }
1155     public final void run() {
1156     final CompletableFuture<?> a;
1157     final CompletableFuture<?> b;
1158 dl 1.77 final CompletableFuture<Object> dst;
1159     Object r, t; Throwable ex;
1160 dl 1.35 if ((dst = this.dst) != null &&
1161     (((a = this.src) != null && (r = a.result) != null) ||
1162     ((b = this.snd) != null && (r = b.result) != null)) &&
1163     compareAndSet(0, 1)) {
1164 dl 1.77 if (r instanceof AltResult) {
1165 dl 1.35 ex = ((AltResult)r).ex;
1166 dl 1.77 t = null;
1167     }
1168     else {
1169 dl 1.35 ex = null;
1170 dl 1.77 t = r;
1171     }
1172     dst.internalComplete(t, ex);
1173 dl 1.35 }
1174     }
1175     private static final long serialVersionUID = 5232453952276885070L;
1176     }
1177    
1178 dl 1.28 static final class ExceptionCompletion<T> extends Completion {
1179 jsr166 1.2 final CompletableFuture<? extends T> src;
1180 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1181     final CompletableFuture<T> dst;
1182 dl 1.28 ExceptionCompletion(CompletableFuture<? extends T> src,
1183     Function<? super Throwable, ? extends T> fn,
1184     CompletableFuture<T> dst) {
1185 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1186     }
1187 jsr166 1.26 public final void run() {
1188 dl 1.28 final CompletableFuture<? extends T> a;
1189     final Function<? super Throwable, ? extends T> fn;
1190     final CompletableFuture<T> dst;
1191 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1192 jsr166 1.2 if ((dst = this.dst) != null &&
1193 dl 1.1 (fn = this.fn) != null &&
1194     (a = this.src) != null &&
1195     (r = a.result) != null &&
1196     compareAndSet(0, 1)) {
1197 dl 1.20 if ((r instanceof AltResult) &&
1198 jsr166 1.87 (ex = ((AltResult)r).ex) != null) {
1199 dl 1.20 try {
1200     t = fn.apply(ex);
1201     } catch (Throwable rex) {
1202     dx = rex;
1203 dl 1.1 }
1204     }
1205 dl 1.28 else {
1206     @SuppressWarnings("unchecked") T tr = (T) r;
1207     t = tr;
1208     }
1209 dl 1.20 dst.internalComplete(t, dx);
1210 dl 1.1 }
1211     }
1212 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1213 dl 1.1 }
1214    
1215 dl 1.88 static final class WhenCompleteCompletion<T> extends Completion {
1216     final CompletableFuture<? extends T> src;
1217     final BiConsumer<? super T, ? super Throwable> fn;
1218     final CompletableFuture<T> dst;
1219     final Executor executor;
1220     WhenCompleteCompletion(CompletableFuture<? extends T> src,
1221     BiConsumer<? super T, ? super Throwable> fn,
1222     CompletableFuture<T> dst,
1223     Executor executor) {
1224     this.src = src; this.fn = fn; this.dst = dst;
1225     this.executor = executor;
1226     }
1227     public final void run() {
1228     final CompletableFuture<? extends T> a;
1229     final BiConsumer<? super T, ? super Throwable> fn;
1230     final CompletableFuture<T> dst;
1231     Object r; T t; Throwable ex;
1232     if ((dst = this.dst) != null &&
1233     (fn = this.fn) != null &&
1234     (a = this.src) != null &&
1235     (r = a.result) != null &&
1236     compareAndSet(0, 1)) {
1237     if (r instanceof AltResult) {
1238     ex = ((AltResult)r).ex;
1239     t = null;
1240     }
1241     else {
1242     ex = null;
1243     @SuppressWarnings("unchecked") T tr = (T) r;
1244     t = tr;
1245     }
1246     Executor e = executor;
1247     Throwable dx = null;
1248     try {
1249     if (e != null)
1250 dl 1.91 e.execute(new AsyncWhenComplete<T>(t, ex, fn, dst));
1251 dl 1.88 else
1252     fn.accept(t, ex);
1253     } catch (Throwable rex) {
1254     dx = rex;
1255     }
1256     if (e == null || dx != null)
1257     dst.internalComplete(t, ex != null ? ex : dx);
1258     }
1259     }
1260     private static final long serialVersionUID = 5232453952276885070L;
1261     }
1262    
1263 dl 1.75 static final class ThenCopy<T> extends Completion {
1264 dl 1.77 final CompletableFuture<?> src;
1265 dl 1.75 final CompletableFuture<T> dst;
1266 dl 1.77 ThenCopy(CompletableFuture<?> src,
1267 dl 1.75 CompletableFuture<T> dst) {
1268 dl 1.17 this.src = src; this.dst = dst;
1269     }
1270 jsr166 1.26 public final void run() {
1271 dl 1.77 final CompletableFuture<?> a;
1272 dl 1.75 final CompletableFuture<T> dst;
1273     Object r; T t; Throwable ex;
1274 dl 1.17 if ((dst = this.dst) != null &&
1275     (a = this.src) != null &&
1276     (r = a.result) != null &&
1277     compareAndSet(0, 1)) {
1278 dl 1.20 if (r instanceof AltResult) {
1279     ex = ((AltResult)r).ex;
1280     t = null;
1281     }
1282     else {
1283     ex = null;
1284 dl 1.75 @SuppressWarnings("unchecked") T tr = (T) r;
1285     t = tr;
1286 dl 1.20 }
1287     dst.internalComplete(t, ex);
1288 dl 1.17 }
1289     }
1290 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1291 dl 1.17 }
1292    
1293 dl 1.75 // version of ThenCopy for CompletableFuture<Void> dst
1294     static final class ThenPropagate extends Completion {
1295     final CompletableFuture<?> src;
1296     final CompletableFuture<Void> dst;
1297     ThenPropagate(CompletableFuture<?> src,
1298     CompletableFuture<Void> dst) {
1299     this.src = src; this.dst = dst;
1300     }
1301     public final void run() {
1302     final CompletableFuture<?> a;
1303     final CompletableFuture<Void> dst;
1304     Object r; Throwable ex;
1305     if ((dst = this.dst) != null &&
1306     (a = this.src) != null &&
1307     (r = a.result) != null &&
1308     compareAndSet(0, 1)) {
1309     if (r instanceof AltResult)
1310     ex = ((AltResult)r).ex;
1311     else
1312     ex = null;
1313     dst.internalComplete(null, ex);
1314     }
1315     }
1316     private static final long serialVersionUID = 5232453952276885070L;
1317     }
1318    
1319 dl 1.28 static final class HandleCompletion<T,U> extends Completion {
1320 dl 1.17 final CompletableFuture<? extends T> src;
1321     final BiFunction<? super T, Throwable, ? extends U> fn;
1322     final CompletableFuture<U> dst;
1323 dl 1.88 final Executor executor;
1324 dl 1.28 HandleCompletion(CompletableFuture<? extends T> src,
1325     BiFunction<? super T, Throwable, ? extends U> fn,
1326 dl 1.88 CompletableFuture<U> dst,
1327     Executor executor) {
1328 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1329 dl 1.88 this.executor = executor;
1330 dl 1.17 }
1331 jsr166 1.26 public final void run() {
1332 dl 1.28 final CompletableFuture<? extends T> a;
1333     final BiFunction<? super T, Throwable, ? extends U> fn;
1334     final CompletableFuture<U> dst;
1335 dl 1.17 Object r; T t; Throwable ex;
1336     if ((dst = this.dst) != null &&
1337     (fn = this.fn) != null &&
1338     (a = this.src) != null &&
1339     (r = a.result) != null &&
1340     compareAndSet(0, 1)) {
1341     if (r instanceof AltResult) {
1342     ex = ((AltResult)r).ex;
1343     t = null;
1344     }
1345     else {
1346     ex = null;
1347 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1348     t = tr;
1349 dl 1.17 }
1350 dl 1.88 Executor e = executor;
1351     U u = null;
1352     Throwable dx = null;
1353 dl 1.17 try {
1354 dl 1.88 if (e != null)
1355     e.execute(new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1356     else
1357     u = fn.apply(t, ex);
1358 dl 1.17 } catch (Throwable rex) {
1359 dl 1.20 dx = rex;
1360 dl 1.17 }
1361 dl 1.88 if (e == null || dx != null)
1362     dst.internalComplete(u, dx);
1363 dl 1.17 }
1364     }
1365 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1366 dl 1.17 }
1367    
1368 jsr166 1.81 static final class ThenCompose<T,U> extends Completion {
1369 dl 1.17 final CompletableFuture<? extends T> src;
1370 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1371 dl 1.17 final CompletableFuture<U> dst;
1372 dl 1.37 final Executor executor;
1373 jsr166 1.81 ThenCompose(CompletableFuture<? extends T> src,
1374 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
1375 jsr166 1.81 CompletableFuture<U> dst,
1376     Executor executor) {
1377 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1378 dl 1.37 this.executor = executor;
1379 dl 1.17 }
1380 jsr166 1.26 public final void run() {
1381 dl 1.28 final CompletableFuture<? extends T> a;
1382 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1383 dl 1.28 final CompletableFuture<U> dst;
1384 dl 1.37 Object r; T t; Throwable ex; Executor e;
1385 dl 1.17 if ((dst = this.dst) != null &&
1386     (fn = this.fn) != null &&
1387     (a = this.src) != null &&
1388     (r = a.result) != null &&
1389     compareAndSet(0, 1)) {
1390     if (r instanceof AltResult) {
1391     ex = ((AltResult)r).ex;
1392     t = null;
1393     }
1394     else {
1395     ex = null;
1396 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1397     t = tr;
1398 dl 1.17 }
1399     CompletableFuture<U> c = null;
1400 dl 1.20 U u = null;
1401     boolean complete = false;
1402 dl 1.17 if (ex == null) {
1403 dl 1.37 if ((e = executor) != null)
1404     e.execute(new AsyncCompose<T,U>(t, fn, dst));
1405     else {
1406     try {
1407 dl 1.88 CompletionStage<U> cs = fn.apply(t);
1408     c = (cs == null) ? null : cs.toCompletableFuture();
1409     if (c == null)
1410 dl 1.37 ex = new NullPointerException();
1411     } catch (Throwable rex) {
1412     ex = rex;
1413     }
1414 dl 1.17 }
1415     }
1416 dl 1.37 if (c != null) {
1417 dl 1.75 ThenCopy<U> d = null;
1418 dl 1.18 Object s;
1419     if ((s = c.result) == null) {
1420     CompletionNode p = new CompletionNode
1421 dl 1.75 (d = new ThenCopy<U>(c, dst));
1422 dl 1.18 while ((s = c.result) == null) {
1423     if (UNSAFE.compareAndSwapObject
1424     (c, COMPLETIONS, p.next = c.completions, p))
1425     break;
1426     }
1427 dl 1.17 }
1428 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1429 dl 1.20 complete = true;
1430 dl 1.18 if (s instanceof AltResult) {
1431     ex = ((AltResult)s).ex; // no rewrap
1432     u = null;
1433 dl 1.28 }
1434     else {
1435     @SuppressWarnings("unchecked") U us = (U) s;
1436     u = us;
1437     }
1438     }
1439     }
1440     if (complete || ex != null)
1441     dst.internalComplete(u, ex);
1442     if (c != null)
1443     c.helpPostComplete();
1444     }
1445     }
1446     private static final long serialVersionUID = 5232453952276885070L;
1447     }
1448    
1449 dl 1.88 // Implementations of stage methods with (plain, async, Executor) forms
1450 dl 1.28
1451 dl 1.88 private <U> CompletableFuture<U> doThenApply
1452     (Function<? super T,? extends U> fn,
1453     Executor e) {
1454     if (fn == null) throw new NullPointerException();
1455     CompletableFuture<U> dst = new CompletableFuture<U>();
1456     ThenApply<T,U> d = null;
1457     Object r;
1458     if ((r = result) == null) {
1459     CompletionNode p = new CompletionNode
1460     (d = new ThenApply<T,U>(this, fn, dst, e));
1461     while ((r = result) == null) {
1462     if (UNSAFE.compareAndSwapObject
1463     (this, COMPLETIONS, p.next = completions, p))
1464     break;
1465     }
1466     }
1467     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1468     T t; Throwable ex;
1469     if (r instanceof AltResult) {
1470     ex = ((AltResult)r).ex;
1471     t = null;
1472     }
1473     else {
1474     ex = null;
1475     @SuppressWarnings("unchecked") T tr = (T) r;
1476     t = tr;
1477     }
1478     U u = null;
1479     if (ex == null) {
1480     try {
1481     if (e != null)
1482     e.execute(new AsyncApply<T,U>(t, fn, dst));
1483     else
1484     u = fn.apply(t);
1485     } catch (Throwable rex) {
1486     ex = rex;
1487     }
1488     }
1489     if (e == null || ex != null)
1490     dst.internalComplete(u, ex);
1491     }
1492     helpPostComplete();
1493     return dst;
1494 dl 1.28 }
1495    
1496 dl 1.88 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1497     Executor e) {
1498     if (fn == null) throw new NullPointerException();
1499     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1500     ThenAccept<T> d = null;
1501     Object r;
1502     if ((r = result) == null) {
1503     CompletionNode p = new CompletionNode
1504     (d = new ThenAccept<T>(this, fn, dst, e));
1505     while ((r = result) == null) {
1506     if (UNSAFE.compareAndSwapObject
1507     (this, COMPLETIONS, p.next = completions, p))
1508     break;
1509     }
1510     }
1511     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1512     T t; Throwable ex;
1513     if (r instanceof AltResult) {
1514     ex = ((AltResult)r).ex;
1515     t = null;
1516     }
1517     else {
1518     ex = null;
1519     @SuppressWarnings("unchecked") T tr = (T) r;
1520     t = tr;
1521     }
1522     if (ex == null) {
1523     try {
1524     if (e != null)
1525     e.execute(new AsyncAccept<T>(t, fn, dst));
1526     else
1527     fn.accept(t);
1528     } catch (Throwable rex) {
1529     ex = rex;
1530     }
1531     }
1532     if (e == null || ex != null)
1533     dst.internalComplete(null, ex);
1534     }
1535     helpPostComplete();
1536     return dst;
1537 dl 1.28 }
1538    
1539 dl 1.88 private CompletableFuture<Void> doThenRun(Runnable action,
1540     Executor e) {
1541     if (action == null) throw new NullPointerException();
1542     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1543     ThenRun d = null;
1544     Object r;
1545     if ((r = result) == null) {
1546     CompletionNode p = new CompletionNode
1547     (d = new ThenRun(this, action, dst, e));
1548     while ((r = result) == null) {
1549     if (UNSAFE.compareAndSwapObject
1550     (this, COMPLETIONS, p.next = completions, p))
1551     break;
1552     }
1553     }
1554     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1555     Throwable ex;
1556     if (r instanceof AltResult)
1557     ex = ((AltResult)r).ex;
1558     else
1559     ex = null;
1560     if (ex == null) {
1561     try {
1562     if (e != null)
1563     e.execute(new AsyncRun(action, dst));
1564     else
1565     action.run();
1566     } catch (Throwable rex) {
1567     ex = rex;
1568     }
1569     }
1570     if (e == null || ex != null)
1571     dst.internalComplete(null, ex);
1572     }
1573     helpPostComplete();
1574     return dst;
1575     }
1576    
1577     private <U,V> CompletableFuture<V> doThenCombine
1578     (CompletableFuture<? extends U> other,
1579     BiFunction<? super T,? super U,? extends V> fn,
1580     Executor e) {
1581     if (other == null || fn == null) throw new NullPointerException();
1582     CompletableFuture<V> dst = new CompletableFuture<V>();
1583     ThenCombine<T,U,V> d = null;
1584     Object r, s = null;
1585     if ((r = result) == null || (s = other.result) == null) {
1586     d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
1587     CompletionNode q = null, p = new CompletionNode(d);
1588     while ((r == null && (r = result) == null) ||
1589     (s == null && (s = other.result) == null)) {
1590     if (q != null) {
1591     if (s != null ||
1592     UNSAFE.compareAndSwapObject
1593     (other, COMPLETIONS, q.next = other.completions, q))
1594     break;
1595     }
1596     else if (r != null ||
1597     UNSAFE.compareAndSwapObject
1598     (this, COMPLETIONS, p.next = completions, p)) {
1599     if (s != null)
1600     break;
1601     q = new CompletionNode(d);
1602     }
1603     }
1604     }
1605     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1606     T t; U u; Throwable ex;
1607     if (r instanceof AltResult) {
1608     ex = ((AltResult)r).ex;
1609     t = null;
1610     }
1611     else {
1612     ex = null;
1613     @SuppressWarnings("unchecked") T tr = (T) r;
1614     t = tr;
1615     }
1616     if (ex != null)
1617     u = null;
1618     else if (s instanceof AltResult) {
1619     ex = ((AltResult)s).ex;
1620     u = null;
1621     }
1622     else {
1623     @SuppressWarnings("unchecked") U us = (U) s;
1624     u = us;
1625     }
1626     V v = null;
1627     if (ex == null) {
1628     try {
1629     if (e != null)
1630     e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
1631     else
1632     v = fn.apply(t, u);
1633     } catch (Throwable rex) {
1634     ex = rex;
1635     }
1636     }
1637     if (e == null || ex != null)
1638     dst.internalComplete(v, ex);
1639     }
1640     helpPostComplete();
1641     other.helpPostComplete();
1642     return dst;
1643     }
1644    
1645     private <U> CompletableFuture<Void> doThenAcceptBoth
1646     (CompletableFuture<? extends U> other,
1647     BiConsumer<? super T,? super U> fn,
1648     Executor e) {
1649     if (other == null || fn == null) throw new NullPointerException();
1650     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1651     ThenAcceptBoth<T,U> d = null;
1652     Object r, s = null;
1653     if ((r = result) == null || (s = other.result) == null) {
1654     d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1655     CompletionNode q = null, p = new CompletionNode(d);
1656     while ((r == null && (r = result) == null) ||
1657     (s == null && (s = other.result) == null)) {
1658     if (q != null) {
1659     if (s != null ||
1660     UNSAFE.compareAndSwapObject
1661     (other, COMPLETIONS, q.next = other.completions, q))
1662     break;
1663     }
1664     else if (r != null ||
1665     UNSAFE.compareAndSwapObject
1666     (this, COMPLETIONS, p.next = completions, p)) {
1667     if (s != null)
1668     break;
1669     q = new CompletionNode(d);
1670     }
1671     }
1672     }
1673     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1674     T t; U u; Throwable ex;
1675     if (r instanceof AltResult) {
1676     ex = ((AltResult)r).ex;
1677     t = null;
1678     }
1679     else {
1680     ex = null;
1681     @SuppressWarnings("unchecked") T tr = (T) r;
1682     t = tr;
1683     }
1684     if (ex != null)
1685     u = null;
1686     else if (s instanceof AltResult) {
1687     ex = ((AltResult)s).ex;
1688     u = null;
1689     }
1690     else {
1691     @SuppressWarnings("unchecked") U us = (U) s;
1692     u = us;
1693     }
1694     if (ex == null) {
1695     try {
1696     if (e != null)
1697     e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1698     else
1699     fn.accept(t, u);
1700     } catch (Throwable rex) {
1701     ex = rex;
1702     }
1703     }
1704     if (e == null || ex != null)
1705     dst.internalComplete(null, ex);
1706     }
1707     helpPostComplete();
1708     other.helpPostComplete();
1709     return dst;
1710     }
1711    
1712     private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1713     Runnable action,
1714     Executor e) {
1715     if (other == null || action == null) throw new NullPointerException();
1716     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1717     RunAfterBoth d = null;
1718     Object r, s = null;
1719     if ((r = result) == null || (s = other.result) == null) {
1720     d = new RunAfterBoth(this, other, action, dst, e);
1721     CompletionNode q = null, p = new CompletionNode(d);
1722     while ((r == null && (r = result) == null) ||
1723     (s == null && (s = other.result) == null)) {
1724     if (q != null) {
1725     if (s != null ||
1726     UNSAFE.compareAndSwapObject
1727     (other, COMPLETIONS, q.next = other.completions, q))
1728     break;
1729     }
1730     else if (r != null ||
1731     UNSAFE.compareAndSwapObject
1732     (this, COMPLETIONS, p.next = completions, p)) {
1733     if (s != null)
1734     break;
1735     q = new CompletionNode(d);
1736     }
1737     }
1738     }
1739     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1740     Throwable ex;
1741     if (r instanceof AltResult)
1742     ex = ((AltResult)r).ex;
1743     else
1744     ex = null;
1745     if (ex == null && (s instanceof AltResult))
1746     ex = ((AltResult)s).ex;
1747     if (ex == null) {
1748     try {
1749     if (e != null)
1750     e.execute(new AsyncRun(action, dst));
1751     else
1752     action.run();
1753     } catch (Throwable rex) {
1754     ex = rex;
1755     }
1756     }
1757     if (e == null || ex != null)
1758     dst.internalComplete(null, ex);
1759     }
1760     helpPostComplete();
1761     other.helpPostComplete();
1762     return dst;
1763     }
1764    
1765     private <U> CompletableFuture<U> doApplyToEither
1766     (CompletableFuture<? extends T> other,
1767     Function<? super T, U> fn,
1768     Executor e) {
1769     if (other == null || fn == null) throw new NullPointerException();
1770     CompletableFuture<U> dst = new CompletableFuture<U>();
1771     ApplyToEither<T,U> d = null;
1772     Object r;
1773     if ((r = result) == null && (r = other.result) == null) {
1774     d = new ApplyToEither<T,U>(this, other, fn, dst, e);
1775     CompletionNode q = null, p = new CompletionNode(d);
1776     while ((r = result) == null && (r = other.result) == null) {
1777     if (q != null) {
1778     if (UNSAFE.compareAndSwapObject
1779     (other, COMPLETIONS, q.next = other.completions, q))
1780     break;
1781     }
1782     else if (UNSAFE.compareAndSwapObject
1783     (this, COMPLETIONS, p.next = completions, p))
1784     q = new CompletionNode(d);
1785     }
1786     }
1787     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1788     T t; Throwable ex;
1789     if (r instanceof AltResult) {
1790     ex = ((AltResult)r).ex;
1791     t = null;
1792     }
1793     else {
1794     ex = null;
1795     @SuppressWarnings("unchecked") T tr = (T) r;
1796     t = tr;
1797     }
1798     U u = null;
1799     if (ex == null) {
1800     try {
1801     if (e != null)
1802     e.execute(new AsyncApply<T,U>(t, fn, dst));
1803     else
1804     u = fn.apply(t);
1805     } catch (Throwable rex) {
1806     ex = rex;
1807     }
1808     }
1809     if (e == null || ex != null)
1810     dst.internalComplete(u, ex);
1811     }
1812     helpPostComplete();
1813     other.helpPostComplete();
1814     return dst;
1815     }
1816    
1817     private CompletableFuture<Void> doAcceptEither
1818     (CompletableFuture<? extends T> other,
1819     Consumer<? super T> fn,
1820     Executor e) {
1821     if (other == null || fn == null) throw new NullPointerException();
1822     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1823     AcceptEither<T> d = null;
1824     Object r;
1825     if ((r = result) == null && (r = other.result) == null) {
1826     d = new AcceptEither<T>(this, other, fn, dst, e);
1827     CompletionNode q = null, p = new CompletionNode(d);
1828     while ((r = result) == null && (r = other.result) == null) {
1829     if (q != null) {
1830     if (UNSAFE.compareAndSwapObject
1831     (other, COMPLETIONS, q.next = other.completions, q))
1832     break;
1833     }
1834     else if (UNSAFE.compareAndSwapObject
1835     (this, COMPLETIONS, p.next = completions, p))
1836     q = new CompletionNode(d);
1837     }
1838     }
1839     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1840     T t; Throwable ex;
1841     if (r instanceof AltResult) {
1842     ex = ((AltResult)r).ex;
1843     t = null;
1844     }
1845     else {
1846     ex = null;
1847     @SuppressWarnings("unchecked") T tr = (T) r;
1848     t = tr;
1849     }
1850     if (ex == null) {
1851     try {
1852     if (e != null)
1853     e.execute(new AsyncAccept<T>(t, fn, dst));
1854     else
1855     fn.accept(t);
1856     } catch (Throwable rex) {
1857     ex = rex;
1858     }
1859     }
1860     if (e == null || ex != null)
1861     dst.internalComplete(null, ex);
1862     }
1863     helpPostComplete();
1864     other.helpPostComplete();
1865     return dst;
1866     }
1867    
1868     private CompletableFuture<Void> doRunAfterEither
1869     (CompletableFuture<?> other,
1870     Runnable action,
1871     Executor e) {
1872     if (other == null || action == null) throw new NullPointerException();
1873     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1874     RunAfterEither d = null;
1875     Object r;
1876     if ((r = result) == null && (r = other.result) == null) {
1877     d = new RunAfterEither(this, other, action, dst, e);
1878     CompletionNode q = null, p = new CompletionNode(d);
1879     while ((r = result) == null && (r = other.result) == null) {
1880     if (q != null) {
1881     if (UNSAFE.compareAndSwapObject
1882     (other, COMPLETIONS, q.next = other.completions, q))
1883     break;
1884     }
1885     else if (UNSAFE.compareAndSwapObject
1886     (this, COMPLETIONS, p.next = completions, p))
1887     q = new CompletionNode(d);
1888     }
1889     }
1890     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1891     Throwable ex;
1892     if (r instanceof AltResult)
1893     ex = ((AltResult)r).ex;
1894     else
1895     ex = null;
1896     if (ex == null) {
1897     try {
1898     if (e != null)
1899     e.execute(new AsyncRun(action, dst));
1900     else
1901     action.run();
1902     } catch (Throwable rex) {
1903     ex = rex;
1904     }
1905     }
1906     if (e == null || ex != null)
1907     dst.internalComplete(null, ex);
1908     }
1909     helpPostComplete();
1910     other.helpPostComplete();
1911     return dst;
1912     }
1913    
1914     private <U> CompletableFuture<U> doThenCompose
1915     (Function<? super T, ? extends CompletionStage<U>> fn,
1916     Executor e) {
1917     if (fn == null) throw new NullPointerException();
1918     CompletableFuture<U> dst = null;
1919     ThenCompose<T,U> d = null;
1920     Object r;
1921     if ((r = result) == null) {
1922     dst = new CompletableFuture<U>();
1923     CompletionNode p = new CompletionNode
1924     (d = new ThenCompose<T,U>(this, fn, dst, e));
1925     while ((r = result) == null) {
1926     if (UNSAFE.compareAndSwapObject
1927     (this, COMPLETIONS, p.next = completions, p))
1928     break;
1929     }
1930     }
1931     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1932     T t; Throwable ex;
1933     if (r instanceof AltResult) {
1934     ex = ((AltResult)r).ex;
1935     t = null;
1936     }
1937     else {
1938     ex = null;
1939     @SuppressWarnings("unchecked") T tr = (T) r;
1940     t = tr;
1941     }
1942     if (ex == null) {
1943     if (e != null) {
1944     if (dst == null)
1945     dst = new CompletableFuture<U>();
1946     e.execute(new AsyncCompose<T,U>(t, fn, dst));
1947     }
1948     else {
1949     try {
1950     CompletionStage<U> cs = fn.apply(t);
1951     if (cs == null ||
1952     (dst = cs.toCompletableFuture()) == null)
1953     ex = new NullPointerException();
1954     } catch (Throwable rex) {
1955     ex = rex;
1956     }
1957     }
1958     }
1959     if (dst == null)
1960     dst = new CompletableFuture<U>();
1961     if (e == null || ex != null)
1962     dst.internalComplete(null, ex);
1963     }
1964     helpPostComplete();
1965     dst.helpPostComplete();
1966     return dst;
1967     }
1968    
1969     private CompletableFuture<T> doWhenComplete
1970     (BiConsumer<? super T, ? super Throwable> fn,
1971     Executor e) {
1972     if (fn == null) throw new NullPointerException();
1973     CompletableFuture<T> dst = new CompletableFuture<T>();
1974     WhenCompleteCompletion<T> d = null;
1975     Object r;
1976     if ((r = result) == null) {
1977     CompletionNode p =
1978     new CompletionNode(d = new WhenCompleteCompletion<T>
1979     (this, fn, dst, e));
1980     while ((r = result) == null) {
1981     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1982     p.next = completions, p))
1983     break;
1984     }
1985     }
1986     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1987     T t; Throwable ex;
1988     if (r instanceof AltResult) {
1989     ex = ((AltResult)r).ex;
1990     t = null;
1991     }
1992     else {
1993     ex = null;
1994     @SuppressWarnings("unchecked") T tr = (T) r;
1995     t = tr;
1996     }
1997     Throwable dx = null;
1998     try {
1999     if (e != null)
2000 dl 1.91 e.execute(new AsyncWhenComplete<T>(t, ex, fn, dst));
2001 dl 1.88 else
2002     fn.accept(t, ex);
2003     } catch (Throwable rex) {
2004     dx = rex;
2005     }
2006     if (e == null || dx != null)
2007     dst.internalComplete(t, ex != null ? ex : dx);
2008     }
2009     helpPostComplete();
2010     return dst;
2011     }
2012    
2013     private <U> CompletableFuture<U> doHandle
2014     (BiFunction<? super T, Throwable, ? extends U> fn,
2015     Executor e) {
2016     if (fn == null) throw new NullPointerException();
2017     CompletableFuture<U> dst = new CompletableFuture<U>();
2018     HandleCompletion<T,U> d = null;
2019     Object r;
2020     if ((r = result) == null) {
2021     CompletionNode p =
2022     new CompletionNode(d = new HandleCompletion<T,U>
2023     (this, fn, dst, e));
2024     while ((r = result) == null) {
2025     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2026     p.next = completions, p))
2027     break;
2028     }
2029     }
2030     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2031     T t; Throwable ex;
2032     if (r instanceof AltResult) {
2033     ex = ((AltResult)r).ex;
2034     t = null;
2035     }
2036     else {
2037     ex = null;
2038     @SuppressWarnings("unchecked") T tr = (T) r;
2039     t = tr;
2040     }
2041 jsr166 1.90 U u = null;
2042 dl 1.88 Throwable dx = null;
2043     try {
2044     if (e != null)
2045     e.execute(new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2046     else {
2047     u = fn.apply(t, ex);
2048     dx = null;
2049     }
2050     } catch (Throwable rex) {
2051     dx = rex;
2052     u = null;
2053     }
2054     if (e == null || dx != null)
2055     dst.internalComplete(u, dx);
2056     }
2057     helpPostComplete();
2058     return dst;
2059     }
2060    
2061    
2062     // public methods
2063    
2064     /**
2065     * Creates a new incomplete CompletableFuture.
2066     */
2067     public CompletableFuture() {
2068     }
2069    
2070     /**
2071     * Returns a new CompletableFuture that is asynchronously completed
2072     * by a task running in the {@link ForkJoinPool#commonPool()} with
2073     * the value obtained by calling the given Supplier.
2074     *
2075     * @param supplier a function returning the value to be used
2076     * to complete the returned CompletableFuture
2077     * @return the new CompletableFuture
2078     */
2079     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2080     if (supplier == null) throw new NullPointerException();
2081     CompletableFuture<U> f = new CompletableFuture<U>();
2082     ForkJoinPool.commonPool().
2083     execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
2084     return f;
2085     }
2086    
2087     /**
2088     * Returns a new CompletableFuture that is asynchronously completed
2089     * by a task running in the given executor with the value obtained
2090     * by calling the given Supplier.
2091     *
2092     * @param supplier a function returning the value to be used
2093     * to complete the returned CompletableFuture
2094     * @param executor the executor to use for asynchronous execution
2095     * @return the new CompletableFuture
2096     */
2097     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2098     Executor executor) {
2099     if (executor == null || supplier == null)
2100     throw new NullPointerException();
2101     CompletableFuture<U> f = new CompletableFuture<U>();
2102     executor.execute(new AsyncSupply<U>(supplier, f));
2103     return f;
2104 dl 1.28 }
2105    
2106     /**
2107 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2108     * by a task running in the {@link ForkJoinPool#commonPool()} after
2109     * it runs the given action.
2110 dl 1.28 *
2111     * @param runnable the action to run before completing the
2112     * returned CompletableFuture
2113 jsr166 1.58 * @return the new CompletableFuture
2114 dl 1.28 */
2115     public static CompletableFuture<Void> runAsync(Runnable runnable) {
2116     if (runnable == null) throw new NullPointerException();
2117     CompletableFuture<Void> f = new CompletableFuture<Void>();
2118     ForkJoinPool.commonPool().
2119     execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
2120     return f;
2121     }
2122    
2123     /**
2124 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2125     * by a task running in the given executor after it runs the given
2126     * action.
2127 dl 1.28 *
2128     * @param runnable the action to run before completing the
2129     * returned CompletableFuture
2130     * @param executor the executor to use for asynchronous execution
2131 jsr166 1.58 * @return the new CompletableFuture
2132 dl 1.28 */
2133     public static CompletableFuture<Void> runAsync(Runnable runnable,
2134     Executor executor) {
2135     if (executor == null || runnable == null)
2136     throw new NullPointerException();
2137     CompletableFuture<Void> f = new CompletableFuture<Void>();
2138     executor.execute(new AsyncRun(runnable, f));
2139     return f;
2140     }
2141    
2142     /**
2143 dl 1.77 * Returns a new CompletableFuture that is already completed with
2144     * the given value.
2145     *
2146     * @param value the value
2147     * @return the completed CompletableFuture
2148     */
2149     public static <U> CompletableFuture<U> completedFuture(U value) {
2150     CompletableFuture<U> f = new CompletableFuture<U>();
2151 jsr166 1.78 f.result = (value == null) ? NIL : value;
2152 dl 1.77 return f;
2153     }
2154    
2155     /**
2156 dl 1.28 * Returns {@code true} if completed in any fashion: normally,
2157     * exceptionally, or via cancellation.
2158     *
2159     * @return {@code true} if completed
2160     */
2161     public boolean isDone() {
2162     return result != null;
2163     }
2164    
2165     /**
2166 dl 1.49 * Waits if necessary for this future to complete, and then
2167 dl 1.48 * returns its result.
2168 dl 1.28 *
2169 dl 1.48 * @return the result value
2170     * @throws CancellationException if this future was cancelled
2171     * @throws ExecutionException if this future completed exceptionally
2172 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2173     * while waiting
2174     */
2175     public T get() throws InterruptedException, ExecutionException {
2176     Object r; Throwable ex, cause;
2177     if ((r = result) == null && (r = waitingGet(true)) == null)
2178     throw new InterruptedException();
2179 jsr166 1.45 if (!(r instanceof AltResult)) {
2180     @SuppressWarnings("unchecked") T tr = (T) r;
2181     return tr;
2182     }
2183     if ((ex = ((AltResult)r).ex) == null)
2184 dl 1.28 return null;
2185 jsr166 1.45 if (ex instanceof CancellationException)
2186     throw (CancellationException)ex;
2187     if ((ex instanceof CompletionException) &&
2188     (cause = ex.getCause()) != null)
2189     ex = cause;
2190     throw new ExecutionException(ex);
2191 dl 1.28 }
2192    
2193     /**
2194 dl 1.49 * Waits if necessary for at most the given time for this future
2195     * to complete, and then returns its result, if available.
2196 dl 1.28 *
2197     * @param timeout the maximum time to wait
2198     * @param unit the time unit of the timeout argument
2199 dl 1.48 * @return the result value
2200     * @throws CancellationException if this future was cancelled
2201     * @throws ExecutionException if this future completed exceptionally
2202 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2203     * while waiting
2204     * @throws TimeoutException if the wait timed out
2205     */
2206     public T get(long timeout, TimeUnit unit)
2207     throws InterruptedException, ExecutionException, TimeoutException {
2208     Object r; Throwable ex, cause;
2209     long nanos = unit.toNanos(timeout);
2210     if (Thread.interrupted())
2211     throw new InterruptedException();
2212     if ((r = result) == null)
2213     r = timedAwaitDone(nanos);
2214 jsr166 1.45 if (!(r instanceof AltResult)) {
2215     @SuppressWarnings("unchecked") T tr = (T) r;
2216     return tr;
2217     }
2218     if ((ex = ((AltResult)r).ex) == null)
2219 dl 1.28 return null;
2220 jsr166 1.45 if (ex instanceof CancellationException)
2221     throw (CancellationException)ex;
2222     if ((ex instanceof CompletionException) &&
2223     (cause = ex.getCause()) != null)
2224     ex = cause;
2225     throw new ExecutionException(ex);
2226 dl 1.28 }
2227    
2228     /**
2229     * Returns the result value when complete, or throws an
2230     * (unchecked) exception if completed exceptionally. To better
2231     * conform with the use of common functional forms, if a
2232     * computation involved in the completion of this
2233     * CompletableFuture threw an exception, this method throws an
2234     * (unchecked) {@link CompletionException} with the underlying
2235     * exception as its cause.
2236     *
2237     * @return the result value
2238     * @throws CancellationException if the computation was cancelled
2239 jsr166 1.55 * @throws CompletionException if this future completed
2240     * exceptionally or a completion computation threw an exception
2241 dl 1.28 */
2242     public T join() {
2243     Object r; Throwable ex;
2244     if ((r = result) == null)
2245     r = waitingGet(false);
2246 jsr166 1.45 if (!(r instanceof AltResult)) {
2247     @SuppressWarnings("unchecked") T tr = (T) r;
2248     return tr;
2249     }
2250     if ((ex = ((AltResult)r).ex) == null)
2251 dl 1.28 return null;
2252 jsr166 1.45 if (ex instanceof CancellationException)
2253     throw (CancellationException)ex;
2254     if (ex instanceof CompletionException)
2255     throw (CompletionException)ex;
2256     throw new CompletionException(ex);
2257 dl 1.28 }
2258    
2259     /**
2260     * Returns the result value (or throws any encountered exception)
2261     * if completed, else returns the given valueIfAbsent.
2262     *
2263     * @param valueIfAbsent the value to return if not completed
2264     * @return the result value, if completed, else the given valueIfAbsent
2265     * @throws CancellationException if the computation was cancelled
2266 jsr166 1.55 * @throws CompletionException if this future completed
2267     * exceptionally or a completion computation threw an exception
2268 dl 1.28 */
2269     public T getNow(T valueIfAbsent) {
2270     Object r; Throwable ex;
2271     if ((r = result) == null)
2272     return valueIfAbsent;
2273 jsr166 1.45 if (!(r instanceof AltResult)) {
2274     @SuppressWarnings("unchecked") T tr = (T) r;
2275     return tr;
2276     }
2277     if ((ex = ((AltResult)r).ex) == null)
2278 dl 1.28 return null;
2279 jsr166 1.45 if (ex instanceof CancellationException)
2280     throw (CancellationException)ex;
2281     if (ex instanceof CompletionException)
2282     throw (CompletionException)ex;
2283     throw new CompletionException(ex);
2284 dl 1.28 }
2285    
2286     /**
2287     * If not already completed, sets the value returned by {@link
2288     * #get()} and related methods to the given value.
2289     *
2290     * @param value the result value
2291     * @return {@code true} if this invocation caused this CompletableFuture
2292     * to transition to a completed state, else {@code false}
2293     */
2294     public boolean complete(T value) {
2295     boolean triggered = result == null &&
2296     UNSAFE.compareAndSwapObject(this, RESULT, null,
2297     value == null ? NIL : value);
2298     postComplete();
2299     return triggered;
2300     }
2301    
2302     /**
2303     * If not already completed, causes invocations of {@link #get()}
2304     * and related methods to throw the given exception.
2305     *
2306     * @param ex the exception
2307     * @return {@code true} if this invocation caused this CompletableFuture
2308     * to transition to a completed state, else {@code false}
2309     */
2310     public boolean completeExceptionally(Throwable ex) {
2311     if (ex == null) throw new NullPointerException();
2312     boolean triggered = result == null &&
2313     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2314     postComplete();
2315     return triggered;
2316     }
2317    
2318 dl 1.88 // CompletionStage methods
2319    
2320     public <U> CompletableFuture<U> thenApply
2321     (Function<? super T,? extends U> fn) {
2322 dl 1.28 return doThenApply(fn, null);
2323     }
2324    
2325 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2326     (Function<? super T,? extends U> fn) {
2327 dl 1.28 return doThenApply(fn, ForkJoinPool.commonPool());
2328 dl 1.17 }
2329    
2330 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2331     (Function<? super T,? extends U> fn,
2332     Executor executor) {
2333 dl 1.28 if (executor == null) throw new NullPointerException();
2334     return doThenApply(fn, executor);
2335     }
2336 dl 1.1
2337 dl 1.88 public CompletableFuture<Void> thenAccept
2338     (Consumer<? super T> action) {
2339     return doThenAccept(action, null);
2340 dl 1.28 }
2341    
2342 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2343     (Consumer<? super T> action) {
2344     return doThenAccept(action, ForkJoinPool.commonPool());
2345 dl 1.28 }
2346    
2347 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2348     (Consumer<? super T> action,
2349     Executor executor) {
2350 dl 1.28 if (executor == null) throw new NullPointerException();
2351 dl 1.88 return doThenAccept(action, executor);
2352 dl 1.7 }
2353    
2354 dl 1.88 public CompletableFuture<Void> thenRun
2355     (Runnable action) {
2356 dl 1.28 return doThenRun(action, null);
2357     }
2358    
2359 dl 1.88 public CompletableFuture<Void> thenRunAsync
2360     (Runnable action) {
2361 dl 1.28 return doThenRun(action, ForkJoinPool.commonPool());
2362     }
2363    
2364 dl 1.88 public CompletableFuture<Void> thenRunAsync
2365     (Runnable action,
2366     Executor executor) {
2367 dl 1.28 if (executor == null) throw new NullPointerException();
2368     return doThenRun(action, executor);
2369     }
2370    
2371 dl 1.48 public <U,V> CompletableFuture<V> thenCombine
2372 dl 1.88 (CompletionStage<? extends U> other,
2373 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2374 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, null);
2375 dl 1.28 }
2376    
2377 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2378 dl 1.88 (CompletionStage<? extends U> other,
2379 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2380 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn,
2381     ForkJoinPool.commonPool());
2382 dl 1.28 }
2383    
2384 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2385 dl 1.88 (CompletionStage<? extends U> other,
2386 dl 1.48 BiFunction<? super T,? super U,? extends V> fn,
2387     Executor executor) {
2388 dl 1.28 if (executor == null) throw new NullPointerException();
2389 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, executor);
2390 dl 1.1 }
2391    
2392 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBoth
2393 dl 1.88 (CompletionStage<? extends U> other,
2394     BiConsumer<? super T, ? super U> action) {
2395     return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2396 dl 1.28 }
2397    
2398 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2399 dl 1.88 (CompletionStage<? extends U> other,
2400     BiConsumer<? super T, ? super U> action) {
2401     return doThenAcceptBoth(other.toCompletableFuture(), action,
2402     ForkJoinPool.commonPool());
2403 dl 1.28 }
2404    
2405 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2406 dl 1.88 (CompletionStage<? extends U> other,
2407     BiConsumer<? super T, ? super U> action,
2408 dl 1.48 Executor executor) {
2409 dl 1.28 if (executor == null) throw new NullPointerException();
2410 dl 1.88 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2411 dl 1.28 }
2412    
2413 dl 1.88 public CompletableFuture<Void> runAfterBoth
2414     (CompletionStage<?> other,
2415     Runnable action) {
2416     return doRunAfterBoth(other.toCompletableFuture(), action, null);
2417 dl 1.7 }
2418    
2419 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2420     (CompletionStage<?> other,
2421     Runnable action) {
2422     return doRunAfterBoth(other.toCompletableFuture(), action,
2423     ForkJoinPool.commonPool());
2424 dl 1.28 }
2425    
2426 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2427     (CompletionStage<?> other,
2428     Runnable action,
2429     Executor executor) {
2430 dl 1.28 if (executor == null) throw new NullPointerException();
2431 dl 1.88 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2432 dl 1.28 }
2433    
2434 dl 1.1
2435 dl 1.48 public <U> CompletableFuture<U> applyToEither
2436 dl 1.88 (CompletionStage<? extends T> other,
2437 dl 1.48 Function<? super T, U> fn) {
2438 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, null);
2439 dl 1.28 }
2440    
2441 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2442 dl 1.88 (CompletionStage<? extends T> other,
2443 dl 1.48 Function<? super T, U> fn) {
2444 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn,
2445     ForkJoinPool.commonPool());
2446 dl 1.28 }
2447    
2448 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2449 dl 1.88 (CompletionStage<? extends T> other,
2450 dl 1.48 Function<? super T, U> fn,
2451     Executor executor) {
2452 dl 1.28 if (executor == null) throw new NullPointerException();
2453 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2454 dl 1.1 }
2455    
2456 dl 1.48 public CompletableFuture<Void> acceptEither
2457 dl 1.88 (CompletionStage<? extends T> other,
2458     Consumer<? super T> action) {
2459     return doAcceptEither(other.toCompletableFuture(), action, null);
2460 dl 1.28 }
2461    
2462 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2463 dl 1.88 (CompletionStage<? extends T> other,
2464     Consumer<? super T> action) {
2465     return doAcceptEither(other.toCompletableFuture(), action,
2466     ForkJoinPool.commonPool());
2467 dl 1.28 }
2468    
2469 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2470 dl 1.88 (CompletionStage<? extends T> other,
2471     Consumer<? super T> action,
2472 dl 1.48 Executor executor) {
2473 dl 1.28 if (executor == null) throw new NullPointerException();
2474 dl 1.88 return doAcceptEither(other.toCompletableFuture(), action, executor);
2475 dl 1.7 }
2476    
2477 dl 1.88 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2478 dl 1.28 Runnable action) {
2479 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, null);
2480 dl 1.28 }
2481    
2482 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2483 dl 1.88 (CompletionStage<?> other,
2484 dl 1.48 Runnable action) {
2485 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action,
2486     ForkJoinPool.commonPool());
2487 dl 1.28 }
2488    
2489 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2490 dl 1.88 (CompletionStage<?> other,
2491 dl 1.48 Runnable action,
2492     Executor executor) {
2493 dl 1.28 if (executor == null) throw new NullPointerException();
2494 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2495 dl 1.1 }
2496    
2497 dl 1.48 public <U> CompletableFuture<U> thenCompose
2498 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2499 jsr166 1.81 return doThenCompose(fn, null);
2500 dl 1.37 }
2501    
2502 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2503 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2504 jsr166 1.81 return doThenCompose(fn, ForkJoinPool.commonPool());
2505 dl 1.37 }
2506    
2507 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2508 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn,
2509 dl 1.48 Executor executor) {
2510 dl 1.37 if (executor == null) throw new NullPointerException();
2511 jsr166 1.81 return doThenCompose(fn, executor);
2512 dl 1.37 }
2513    
2514 dl 1.88 public CompletableFuture<T> whenComplete
2515     (BiConsumer<? super T, ? super Throwable> action) {
2516     return doWhenComplete(action, null);
2517     }
2518    
2519     public CompletableFuture<T> whenCompleteAsync
2520     (BiConsumer<? super T, ? super Throwable> action) {
2521     return doWhenComplete(action, ForkJoinPool.commonPool());
2522     }
2523    
2524     public CompletableFuture<T> whenCompleteAsync
2525     (BiConsumer<? super T, ? super Throwable> action,
2526     Executor executor) {
2527     if (executor == null) throw new NullPointerException();
2528     return doWhenComplete(action, executor);
2529     }
2530    
2531     public <U> CompletableFuture<U> handle
2532     (BiFunction<? super T, Throwable, ? extends U> fn) {
2533     return doHandle(fn, null);
2534     }
2535    
2536     public <U> CompletableFuture<U> handleAsync
2537     (BiFunction<? super T, Throwable, ? extends U> fn) {
2538     return doHandle(fn, ForkJoinPool.commonPool());
2539     }
2540    
2541     public <U> CompletableFuture<U> handleAsync
2542     (BiFunction<? super T, Throwable, ? extends U> fn,
2543     Executor executor) {
2544     if (executor == null) throw new NullPointerException();
2545     return doHandle(fn, executor);
2546     }
2547    
2548     /**
2549     * Returns this CompletableFuture
2550     *
2551     * @return this CompletableFuture
2552     */
2553     public CompletableFuture<T> toCompletableFuture() {
2554     return this;
2555 dl 1.28 }
2556    
2557 dl 1.88 // not in interface CompletionStage
2558    
2559 dl 1.28 /**
2560 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
2561     * CompletableFuture completes, with the result of the given
2562     * function of the exception triggering this CompletableFuture's
2563     * completion when it completes exceptionally; otherwise, if this
2564     * CompletableFuture completes normally, then the returned
2565     * CompletableFuture also completes normally with the same value.
2566 dl 1.88 * Note: More flexible versions of this functionality are
2567     * available using methods {@code whenComplete} and {@code handle}.
2568 dl 1.28 *
2569     * @param fn the function to use to compute the value of the
2570     * returned CompletableFuture if this CompletableFuture completed
2571     * exceptionally
2572     * @return the new CompletableFuture
2573     */
2574 dl 1.48 public CompletableFuture<T> exceptionally
2575     (Function<Throwable, ? extends T> fn) {
2576 dl 1.28 if (fn == null) throw new NullPointerException();
2577     CompletableFuture<T> dst = new CompletableFuture<T>();
2578     ExceptionCompletion<T> d = null;
2579     Object r;
2580     if ((r = result) == null) {
2581     CompletionNode p =
2582 dl 1.88 new CompletionNode(d = new ExceptionCompletion<T>
2583     (this, fn, dst));
2584 dl 1.28 while ((r = result) == null) {
2585     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2586     p.next = completions, p))
2587     break;
2588     }
2589     }
2590     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2591     T t = null; Throwable ex, dx = null;
2592     if (r instanceof AltResult) {
2593 jsr166 1.87 if ((ex = ((AltResult)r).ex) != null) {
2594 dl 1.28 try {
2595     t = fn.apply(ex);
2596     } catch (Throwable rex) {
2597     dx = rex;
2598     }
2599     }
2600     }
2601     else {
2602     @SuppressWarnings("unchecked") T tr = (T) r;
2603     t = tr;
2604     }
2605     dst.internalComplete(t, dx);
2606     }
2607     helpPostComplete();
2608     return dst;
2609     }
2610    
2611 dl 1.35 /* ------------- Arbitrary-arity constructions -------------- */
2612    
2613     /*
2614     * The basic plan of attack is to recursively form binary
2615     * completion trees of elements. This can be overkill for small
2616     * sets, but scales nicely. The And/All vs Or/Any forms use the
2617     * same idea, but details differ.
2618     */
2619    
2620     /**
2621     * Returns a new CompletableFuture that is completed when all of
2622 jsr166 1.66 * the given CompletableFutures complete. If any of the given
2623 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
2624     * CompletableFuture also does so, with a CompletionException
2625     * holding this exception as its cause. Otherwise, the results,
2626     * if any, of the given CompletableFutures are not reflected in
2627     * the returned CompletableFuture, but may be obtained by
2628     * inspecting them individually. If no CompletableFutures are
2629     * provided, returns a CompletableFuture completed with the value
2630     * {@code null}.
2631 dl 1.35 *
2632     * <p>Among the applications of this method is to await completion
2633     * of a set of independent CompletableFutures before continuing a
2634     * program, as in: {@code CompletableFuture.allOf(c1, c2,
2635     * c3).join();}.
2636     *
2637     * @param cfs the CompletableFutures
2638 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
2639 dl 1.35 * given CompletableFutures complete
2640     * @throws NullPointerException if the array or any of its elements are
2641     * {@code null}
2642     */
2643     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2644     int len = cfs.length; // Directly handle empty and singleton cases
2645     if (len > 1)
2646     return allTree(cfs, 0, len - 1);
2647     else {
2648     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2649     CompletableFuture<?> f;
2650     if (len == 0)
2651     dst.result = NIL;
2652     else if ((f = cfs[0]) == null)
2653     throw new NullPointerException();
2654     else {
2655 dl 1.75 ThenPropagate d = null;
2656 dl 1.35 CompletionNode p = null;
2657     Object r;
2658     while ((r = f.result) == null) {
2659     if (d == null)
2660 dl 1.75 d = new ThenPropagate(f, dst);
2661 dl 1.35 else if (p == null)
2662     p = new CompletionNode(d);
2663     else if (UNSAFE.compareAndSwapObject
2664     (f, COMPLETIONS, p.next = f.completions, p))
2665     break;
2666     }
2667     if (r != null && (d == null || d.compareAndSet(0, 1)))
2668     dst.internalComplete(null, (r instanceof AltResult) ?
2669     ((AltResult)r).ex : null);
2670     f.helpPostComplete();
2671     }
2672     return dst;
2673     }
2674     }
2675    
2676     /**
2677     * Recursively constructs an And'ed tree of CompletableFutures.
2678     * Called only when array known to have at least two elements.
2679     */
2680     private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2681     int lo, int hi) {
2682     CompletableFuture<?> fst, snd;
2683     int mid = (lo + hi) >>> 1;
2684     if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2685     (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2686     throw new NullPointerException();
2687     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2688     AndCompletion d = null;
2689     CompletionNode p = null, q = null;
2690     Object r = null, s = null;
2691     while ((r = fst.result) == null || (s = snd.result) == null) {
2692     if (d == null)
2693     d = new AndCompletion(fst, snd, dst);
2694     else if (p == null)
2695     p = new CompletionNode(d);
2696     else if (q == null) {
2697     if (UNSAFE.compareAndSwapObject
2698     (fst, COMPLETIONS, p.next = fst.completions, p))
2699     q = new CompletionNode(d);
2700     }
2701     else if (UNSAFE.compareAndSwapObject
2702     (snd, COMPLETIONS, q.next = snd.completions, q))
2703     break;
2704     }
2705     if ((r != null || (r = fst.result) != null) &&
2706     (s != null || (s = snd.result) != null) &&
2707     (d == null || d.compareAndSet(0, 1))) {
2708     Throwable ex;
2709     if (r instanceof AltResult)
2710     ex = ((AltResult)r).ex;
2711     else
2712     ex = null;
2713     if (ex == null && (s instanceof AltResult))
2714     ex = ((AltResult)s).ex;
2715     dst.internalComplete(null, ex);
2716     }
2717     fst.helpPostComplete();
2718     snd.helpPostComplete();
2719     return dst;
2720     }
2721    
2722     /**
2723 dl 1.76 * Returns a new CompletableFuture that is completed when any of
2724 jsr166 1.79 * the given CompletableFutures complete, with the same result.
2725     * Otherwise, if it completed exceptionally, the returned
2726 dl 1.77 * CompletableFuture also does so, with a CompletionException
2727     * holding this exception as its cause. If no CompletableFutures
2728     * are provided, returns an incomplete CompletableFuture.
2729 dl 1.35 *
2730     * @param cfs the CompletableFutures
2731 dl 1.77 * @return a new CompletableFuture that is completed with the
2732     * result or exception of any of the given CompletableFutures when
2733     * one completes
2734 dl 1.35 * @throws NullPointerException if the array or any of its elements are
2735     * {@code null}
2736     */
2737 dl 1.77 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2738 dl 1.35 int len = cfs.length; // Same idea as allOf
2739     if (len > 1)
2740     return anyTree(cfs, 0, len - 1);
2741     else {
2742 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2743 dl 1.35 CompletableFuture<?> f;
2744     if (len == 0)
2745 dl 1.48 ; // skip
2746 dl 1.35 else if ((f = cfs[0]) == null)
2747     throw new NullPointerException();
2748     else {
2749 dl 1.77 ThenCopy<Object> d = null;
2750 dl 1.35 CompletionNode p = null;
2751     Object r;
2752     while ((r = f.result) == null) {
2753     if (d == null)
2754 dl 1.77 d = new ThenCopy<Object>(f, dst);
2755 dl 1.35 else if (p == null)
2756     p = new CompletionNode(d);
2757     else if (UNSAFE.compareAndSwapObject
2758     (f, COMPLETIONS, p.next = f.completions, p))
2759     break;
2760     }
2761     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2762     Throwable ex; Object t;
2763 dl 1.77 if (r instanceof AltResult) {
2764 dl 1.35 ex = ((AltResult)r).ex;
2765 dl 1.77 t = null;
2766     }
2767     else {
2768     ex = null;
2769     t = r;
2770     }
2771     dst.internalComplete(t, ex);
2772 dl 1.35 }
2773     f.helpPostComplete();
2774     }
2775     return dst;
2776     }
2777     }
2778    
2779     /**
2780 jsr166 1.44 * Recursively constructs an Or'ed tree of CompletableFutures.
2781 dl 1.35 */
2782 dl 1.77 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2783 jsr166 1.79 int lo, int hi) {
2784 dl 1.35 CompletableFuture<?> fst, snd;
2785     int mid = (lo + hi) >>> 1;
2786     if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2787     (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2788     throw new NullPointerException();
2789 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2790 dl 1.35 OrCompletion d = null;
2791     CompletionNode p = null, q = null;
2792     Object r;
2793     while ((r = fst.result) == null && (r = snd.result) == null) {
2794     if (d == null)
2795     d = new OrCompletion(fst, snd, dst);
2796     else if (p == null)
2797     p = new CompletionNode(d);
2798     else if (q == null) {
2799     if (UNSAFE.compareAndSwapObject
2800     (fst, COMPLETIONS, p.next = fst.completions, p))
2801     q = new CompletionNode(d);
2802     }
2803     else if (UNSAFE.compareAndSwapObject
2804     (snd, COMPLETIONS, q.next = snd.completions, q))
2805     break;
2806     }
2807     if ((r != null || (r = fst.result) != null ||
2808     (r = snd.result) != null) &&
2809     (d == null || d.compareAndSet(0, 1))) {
2810 dl 1.77 Throwable ex; Object t;
2811 dl 1.35 if (r instanceof AltResult) {
2812     ex = ((AltResult)r).ex;
2813 dl 1.77 t = null;
2814 dl 1.35 }
2815 dl 1.77 else {
2816 dl 1.35 ex = null;
2817 dl 1.77 t = r;
2818     }
2819     dst.internalComplete(t, ex);
2820 dl 1.35 }
2821     fst.helpPostComplete();
2822     snd.helpPostComplete();
2823     return dst;
2824     }
2825    
2826     /* ------------- Control and status methods -------------- */
2827    
2828 dl 1.28 /**
2829 dl 1.37 * If not already completed, completes this CompletableFuture with
2830     * a {@link CancellationException}. Dependent CompletableFutures
2831     * that have not already completed will also complete
2832     * exceptionally, with a {@link CompletionException} caused by
2833     * this {@code CancellationException}.
2834 dl 1.28 *
2835     * @param mayInterruptIfRunning this value has no effect in this
2836     * implementation because interrupts are not used to control
2837     * processing.
2838     *
2839     * @return {@code true} if this task is now cancelled
2840     */
2841     public boolean cancel(boolean mayInterruptIfRunning) {
2842 dl 1.46 boolean cancelled = (result == null) &&
2843     UNSAFE.compareAndSwapObject
2844     (this, RESULT, null, new AltResult(new CancellationException()));
2845     postComplete();
2846 dl 1.48 return cancelled || isCancelled();
2847 dl 1.28 }
2848    
2849     /**
2850     * Returns {@code true} if this CompletableFuture was cancelled
2851     * before it completed normally.
2852     *
2853     * @return {@code true} if this CompletableFuture was cancelled
2854     * before it completed normally
2855     */
2856     public boolean isCancelled() {
2857     Object r;
2858 jsr166 1.43 return ((r = result) instanceof AltResult) &&
2859     (((AltResult)r).ex instanceof CancellationException);
2860 dl 1.28 }
2861    
2862     /**
2863 dl 1.88 * Returns {@code true} if this CompletableFuture completed
2864 dl 1.91 * exceptionally, in any way. Possible causes include
2865     * cancellation, explicit invocation of {@code
2866     * completeExceptionally}, and abrupt termination of a
2867     * CompletionStage action.
2868 dl 1.88 *
2869     * @return {@code true} if this CompletableFuture completed
2870     * exceptionally
2871     */
2872     public boolean isCompletedExceptionally() {
2873 dl 1.91 Object r;
2874     return ((r = result) instanceof AltResult) && r != NIL;
2875 dl 1.88 }
2876    
2877     /**
2878 dl 1.28 * Forcibly sets or resets the value subsequently returned by
2879 jsr166 1.42 * method {@link #get()} and related methods, whether or not
2880     * already completed. This method is designed for use only in
2881     * error recovery actions, and even in such situations may result
2882     * in ongoing dependent completions using established versus
2883 dl 1.30 * overwritten outcomes.
2884 dl 1.28 *
2885     * @param value the completion value
2886     */
2887     public void obtrudeValue(T value) {
2888     result = (value == null) ? NIL : value;
2889     postComplete();
2890     }
2891    
2892 dl 1.30 /**
2893 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
2894     * and related methods to throw the given exception, whether or
2895     * not already completed. This method is designed for use only in
2896 dl 1.30 * recovery actions, and even in such situations may result in
2897     * ongoing dependent completions using established versus
2898     * overwritten outcomes.
2899     *
2900     * @param ex the exception
2901     */
2902     public void obtrudeException(Throwable ex) {
2903     if (ex == null) throw new NullPointerException();
2904     result = new AltResult(ex);
2905     postComplete();
2906     }
2907    
2908 dl 1.35 /**
2909     * Returns the estimated number of CompletableFutures whose
2910     * completions are awaiting completion of this CompletableFuture.
2911     * This method is designed for use in monitoring system state, not
2912     * for synchronization control.
2913     *
2914     * @return the number of dependent CompletableFutures
2915     */
2916     public int getNumberOfDependents() {
2917     int count = 0;
2918     for (CompletionNode p = completions; p != null; p = p.next)
2919     ++count;
2920     return count;
2921     }
2922    
2923     /**
2924     * Returns a string identifying this CompletableFuture, as well as
2925 jsr166 1.40 * its completion state. The state, in brackets, contains the
2926 dl 1.35 * String {@code "Completed Normally"} or the String {@code
2927     * "Completed Exceptionally"}, or the String {@code "Not
2928     * completed"} followed by the number of CompletableFutures
2929     * dependent upon its completion, if any.
2930     *
2931     * @return a string identifying this CompletableFuture, as well as its state
2932     */
2933     public String toString() {
2934     Object r = result;
2935 jsr166 1.40 int count;
2936     return super.toString() +
2937     ((r == null) ?
2938     (((count = getNumberOfDependents()) == 0) ?
2939     "[Not completed]" :
2940     "[Not completed, " + count + " dependents]") :
2941     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2942     "[Completed exceptionally]" :
2943     "[Completed normally]"));
2944 dl 1.35 }
2945    
2946 dl 1.1 // Unsafe mechanics
2947     private static final sun.misc.Unsafe UNSAFE;
2948     private static final long RESULT;
2949     private static final long WAITERS;
2950     private static final long COMPLETIONS;
2951     static {
2952     try {
2953     UNSAFE = sun.misc.Unsafe.getUnsafe();
2954     Class<?> k = CompletableFuture.class;
2955     RESULT = UNSAFE.objectFieldOffset
2956     (k.getDeclaredField("result"));
2957     WAITERS = UNSAFE.objectFieldOffset
2958     (k.getDeclaredField("waiters"));
2959     COMPLETIONS = UNSAFE.objectFieldOffset
2960     (k.getDeclaredField("completions"));
2961     } catch (Exception e) {
2962     throw new Error(e);
2963     }
2964     }
2965     }