ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.95
Committed: Thu Jul 18 17:13:42 2013 UTC (10 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.94: +3 -3 lines
Log Message:
doclint warning fixes

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.88 import java.util.concurrent.CompletionException;
23     import java.util.concurrent.CompletionStage;
24 dl 1.1 import java.util.concurrent.atomic.AtomicInteger;
25     import java.util.concurrent.locks.LockSupport;
26    
27     /**
28     * A {@link Future} that may be explicitly completed (setting its
29 dl 1.88 * value and status), and may be used as a {@link CompletionStage},
30     * supporting dependent functions and actions that trigger upon its
31     * completion.
32 dl 1.1 *
33 jsr166 1.50 * <p>When two or more threads attempt to
34     * {@link #complete complete},
35 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
36 jsr166 1.50 * {@link #cancel cancel}
37     * a CompletableFuture, only one of them succeeds.
38 dl 1.19 *
39 dl 1.91 * <p>In addition to these and related methods for directly
40     * manipulating status and results, CompletableFuture implements
41     * interface {@link CompletionStage} with the following policies: <ul>
42 dl 1.35 *
43 dl 1.88 * <li>Actions supplied for dependent completions of
44     * <em>non-async</em> methods may be performed by the thread that
45     * completes the current CompletableFuture, or by any other caller of
46     * these methods.</li>
47 jsr166 1.65 *
48 dl 1.88 * <li>All <em>async</em> methods without an explicit Executor
49     * argument are performed using the {@link
50     * ForkJoinPool#commonPool()}. To simplify monitoring, debugging, and
51     * tracking, all generated asynchronous tasks are instances of the
52     * marker interface {@link AsynchronousCompletionTask}. </li>
53 dl 1.35 *
54 dl 1.88 * <li>All CompletionStage methods are implemented independently of
55     * other public methods, so the behavior of one method is not impacted
56     * by overrides of others in subclasses. </li> </ul>
57     *
58 dl 1.91 * <p>CompletableFuture also implements {@link Future} with the following
59 dl 1.88 * policies: <ul>
60     *
61     * <li>Since (unlike {@link FutureTask}) this class has no direct
62 jsr166 1.55 * control over the computation that causes it to be completed,
63 dl 1.88 * cancellation is treated as just another form of exceptional
64     * completion. Method {@link #cancel cancel} has the same effect as
65     * {@code completeExceptionally(new CancellationException())}. Method
66     * {@link #isCompletedExceptionally} can be used to determine if a
67     * CompletableFuture completed in any exceptional fashion.</li>
68 jsr166 1.55 *
69 dl 1.88 * <li>In case of exceptional completion with a CompletionException,
70 jsr166 1.55 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
71     * {@link ExecutionException} with the same cause as held in the
72 dl 1.88 * corresponding CompletionException. To simplify usage in most
73     * contexts, this class also defines methods {@link #join()} and
74     * {@link #getNow} that instead throw the CompletionException directly
75     * in these cases.</li> </ul>
76 jsr166 1.80 *
77 dl 1.1 * @author Doug Lea
78     * @since 1.8
79     */
80 dl 1.88 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
81 dl 1.28
82 dl 1.1 /*
83 dl 1.20 * Overview:
84 dl 1.1 *
85 jsr166 1.32 * 1. Non-nullness of field result (set via CAS) indicates done.
86     * An AltResult is used to box null as a result, as well as to
87     * hold exceptions. Using a single field makes completion fast
88 dl 1.20 * and simple to detect and trigger, at the expense of a lot of
89     * encoding and decoding that infiltrates many methods. One minor
90     * simplification relies on the (static) NIL (to box null results)
91     * being the only AltResult with a null exception field, so we
92 dl 1.28 * don't usually need explicit comparisons with NIL. The CF
93     * exception propagation mechanics surrounding decoding rely on
94     * unchecked casts of decoded results really being unchecked,
95     * where user type errors are caught at point of use, as is
96     * currently the case in Java. These are highlighted by using
97     * SuppressWarnings-annotated temporaries.
98 dl 1.1 *
99     * 2. Waiters are held in a Treiber stack similar to the one used
100 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
101 dl 1.28 * internal documentation for algorithmic details.
102 dl 1.1 *
103     * 3. Completions are also kept in a list/stack, and pulled off
104 dl 1.28 * and run when completion is triggered. (We could even use the
105 jsr166 1.24 * same stack as for waiters, but would give up the potential
106 dl 1.20 * parallelism obtained because woken waiters help release/run
107 dl 1.28 * others -- see method postComplete). Because post-processing
108     * may race with direct calls, class Completion opportunistically
109     * extends AtomicInteger so callers can claim the action via
110     * compareAndSet(0, 1). The Completion.run methods are all
111     * written a boringly similar uniform way (that sometimes includes
112 jsr166 1.55 * unnecessary-looking checks, kept to maintain uniformity).
113     * There are enough dimensions upon which they differ that
114     * attempts to factor commonalities while maintaining efficiency
115     * require more lines of code than they would save.
116 dl 1.20 *
117     * 4. The exported then/and/or methods do support a bit of
118 dl 1.28 * factoring (see doThenApply etc). They must cope with the
119     * intrinsic races surrounding addition of a dependent action
120     * versus performing the action directly because the task is
121     * already complete. For example, a CF may not be complete upon
122     * entry, so a dependent completion is added, but by the time it
123     * is added, the target CF is complete, so must be directly
124 dl 1.20 * executed. This is all done while avoiding unnecessary object
125     * construction in safe-bypass cases.
126 dl 1.1 */
127    
128 dl 1.28 // preliminaries
129 dl 1.20
130 dl 1.1 static final class AltResult {
131     final Throwable ex; // null only for NIL
132 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
133 dl 1.1 }
134    
135     static final AltResult NIL = new AltResult(null);
136    
137 dl 1.20 // Fields
138    
139     volatile Object result; // Either the result or boxed AltResult
140 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
141     volatile CompletionNode completions; // list (Treiber stack) of completions
142    
143 dl 1.20 // Basic utilities for triggering and processing completions
144    
145     /**
146     * Removes and signals all waiting threads and runs all completions.
147     */
148     final void postComplete() {
149     WaitNode q; Thread t;
150     while ((q = waiters) != null) {
151     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
152     (t = q.thread) != null) {
153     q.thread = null;
154     LockSupport.unpark(t);
155     }
156     }
157    
158     CompletionNode h; Completion c;
159     while ((h = completions) != null) {
160     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
161     (c = h.completion) != null)
162     c.run();
163     }
164     }
165    
166     /**
167     * Triggers completion with the encoding of the given arguments:
168     * if the exception is non-null, encodes it as a wrapped
169     * CompletionException unless it is one already. Otherwise uses
170     * the given result, boxed as NIL if null.
171     */
172 dl 1.75 final void internalComplete(T v, Throwable ex) {
173 dl 1.20 if (result == null)
174     UNSAFE.compareAndSwapObject
175     (this, RESULT, null,
176     (ex == null) ? (v == null) ? NIL : v :
177     new AltResult((ex instanceof CompletionException) ? ex :
178     new CompletionException(ex)));
179     postComplete(); // help out even if not triggered
180     }
181    
182     /**
183 jsr166 1.25 * If triggered, helps release and/or process completions.
184 dl 1.20 */
185     final void helpPostComplete() {
186     if (result != null)
187     postComplete();
188     }
189    
190 dl 1.28 /* ------------- waiting for completions -------------- */
191 dl 1.20
192 dl 1.35 /** Number of processors, for spin control */
193     static final int NCPU = Runtime.getRuntime().availableProcessors();
194    
195 dl 1.1 /**
196 dl 1.28 * Heuristic spin value for waitingGet() before blocking on
197     * multiprocessors
198 dl 1.1 */
199 dl 1.35 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
200 dl 1.1
201     /**
202 dl 1.28 * Linked nodes to record waiting threads in a Treiber stack. See
203     * other classes such as Phaser and SynchronousQueue for more
204     * detailed explanation. This class implements ManagedBlocker to
205     * avoid starvation when blocking actions pile up in
206     * ForkJoinPools.
207     */
208     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
209     long nanos; // wait time if timed
210     final long deadline; // non-zero if timed
211     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
212     volatile Thread thread;
213     volatile WaitNode next;
214     WaitNode(boolean interruptible, long nanos, long deadline) {
215     this.thread = Thread.currentThread();
216     this.interruptControl = interruptible ? 1 : 0;
217     this.nanos = nanos;
218     this.deadline = deadline;
219     }
220     public boolean isReleasable() {
221     if (thread == null)
222     return true;
223     if (Thread.interrupted()) {
224     int i = interruptControl;
225     interruptControl = -1;
226     if (i > 0)
227     return true;
228     }
229     if (deadline != 0L &&
230     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
231     thread = null;
232     return true;
233     }
234     return false;
235     }
236     public boolean block() {
237     if (isReleasable())
238     return true;
239     else if (deadline == 0L)
240     LockSupport.park(this);
241     else if (nanos > 0L)
242     LockSupport.parkNanos(this, nanos);
243     return isReleasable();
244     }
245 dl 1.1 }
246    
247     /**
248 dl 1.28 * Returns raw result after waiting, or null if interruptible and
249     * interrupted.
250 dl 1.1 */
251 dl 1.28 private Object waitingGet(boolean interruptible) {
252     WaitNode q = null;
253     boolean queued = false;
254 dl 1.35 int spins = SPINS;
255 dl 1.28 for (Object r;;) {
256     if ((r = result) != null) {
257     if (q != null) { // suppress unpark
258     q.thread = null;
259     if (q.interruptControl < 0) {
260     if (interruptible) {
261     removeWaiter(q);
262     return null;
263     }
264     Thread.currentThread().interrupt();
265     }
266     }
267     postComplete(); // help release others
268     return r;
269     }
270     else if (spins > 0) {
271 dl 1.35 int rnd = ThreadLocalRandom.nextSecondarySeed();
272     if (rnd == 0)
273     rnd = ThreadLocalRandom.current().nextInt();
274     if (rnd >= 0)
275 dl 1.28 --spins;
276     }
277     else if (q == null)
278     q = new WaitNode(interruptible, 0L, 0L);
279     else if (!queued)
280     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
281     q.next = waiters, q);
282     else if (interruptible && q.interruptControl < 0) {
283     removeWaiter(q);
284     return null;
285     }
286     else if (q.thread != null && result == null) {
287     try {
288     ForkJoinPool.managedBlock(q);
289 jsr166 1.31 } catch (InterruptedException ex) {
290 dl 1.28 q.interruptControl = -1;
291     }
292     }
293     }
294 dl 1.1 }
295    
296     /**
297 dl 1.28 * Awaits completion or aborts on interrupt or timeout.
298 dl 1.1 *
299 dl 1.28 * @param nanos time to wait
300     * @return raw result
301 dl 1.1 */
302 dl 1.28 private Object timedAwaitDone(long nanos)
303     throws InterruptedException, TimeoutException {
304     WaitNode q = null;
305     boolean queued = false;
306     for (Object r;;) {
307     if ((r = result) != null) {
308     if (q != null) {
309     q.thread = null;
310     if (q.interruptControl < 0) {
311     removeWaiter(q);
312     throw new InterruptedException();
313     }
314     }
315     postComplete();
316     return r;
317     }
318     else if (q == null) {
319     if (nanos <= 0L)
320     throw new TimeoutException();
321     long d = System.nanoTime() + nanos;
322 jsr166 1.31 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
323 dl 1.28 }
324     else if (!queued)
325     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
326     q.next = waiters, q);
327     else if (q.interruptControl < 0) {
328     removeWaiter(q);
329     throw new InterruptedException();
330     }
331     else if (q.nanos <= 0L) {
332     if (result == null) {
333     removeWaiter(q);
334     throw new TimeoutException();
335     }
336     }
337     else if (q.thread != null && result == null) {
338     try {
339     ForkJoinPool.managedBlock(q);
340 jsr166 1.31 } catch (InterruptedException ex) {
341 dl 1.28 q.interruptControl = -1;
342     }
343     }
344     }
345 dl 1.1 }
346    
347     /**
348 dl 1.28 * Tries to unlink a timed-out or interrupted wait node to avoid
349     * accumulating garbage. Internal nodes are simply unspliced
350     * without CAS since it is harmless if they are traversed anyway
351     * by releasers. To avoid effects of unsplicing from already
352     * removed nodes, the list is retraversed in case of an apparent
353     * race. This is slow when there are a lot of nodes, but we don't
354     * expect lists to be long enough to outweigh higher-overhead
355     * schemes.
356 dl 1.1 */
357 dl 1.28 private void removeWaiter(WaitNode node) {
358     if (node != null) {
359     node.thread = null;
360     retry:
361     for (;;) { // restart on removeWaiter race
362     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
363     s = q.next;
364     if (q.thread != null)
365     pred = q;
366     else if (pred != null) {
367     pred.next = s;
368     if (pred.thread == null) // check for race
369     continue retry;
370     }
371     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
372     continue retry;
373     }
374     break;
375     }
376     }
377 dl 1.1 }
378    
379 dl 1.28 /* ------------- Async tasks -------------- */
380    
381 dl 1.1 /**
382 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
383 dl 1.28 * {@code async} methods. This may be useful for monitoring,
384     * debugging, and tracking asynchronous activities.
385 jsr166 1.57 *
386     * @since 1.8
387 dl 1.1 */
388 dl 1.28 public static interface AsynchronousCompletionTask {
389 dl 1.1 }
390    
391 dl 1.28 /** Base class can act as either FJ or plain Runnable */
392 jsr166 1.33 abstract static class Async extends ForkJoinTask<Void>
393 dl 1.28 implements Runnable, AsynchronousCompletionTask {
394     public final Void getRawResult() { return null; }
395     public final void setRawResult(Void v) { }
396     public final void run() { exec(); }
397 jsr166 1.26 }
398    
399 dl 1.28 static final class AsyncRun extends Async {
400     final Runnable fn;
401     final CompletableFuture<Void> dst;
402     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
403     this.fn = fn; this.dst = dst;
404     }
405     public final boolean exec() {
406     CompletableFuture<Void> d; Throwable ex;
407 dl 1.29 if ((d = this.dst) != null && d.result == null) {
408 dl 1.28 try {
409     fn.run();
410     ex = null;
411     } catch (Throwable rex) {
412     ex = rex;
413     }
414     d.internalComplete(null, ex);
415 dl 1.19 }
416 dl 1.28 return true;
417 dl 1.19 }
418 dl 1.28 private static final long serialVersionUID = 5232453952276885070L;
419 dl 1.19 }
420    
421 dl 1.28 static final class AsyncSupply<U> extends Async {
422     final Supplier<U> fn;
423     final CompletableFuture<U> dst;
424     AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
425     this.fn = fn; this.dst = dst;
426     }
427     public final boolean exec() {
428     CompletableFuture<U> d; U u; Throwable ex;
429 dl 1.29 if ((d = this.dst) != null && d.result == null) {
430 dl 1.28 try {
431     u = fn.get();
432     ex = null;
433     } catch (Throwable rex) {
434     ex = rex;
435     u = null;
436     }
437     d.internalComplete(u, ex);
438 dl 1.1 }
439     return true;
440     }
441     private static final long serialVersionUID = 5232453952276885070L;
442     }
443    
444 dl 1.28 static final class AsyncApply<T,U> extends Async {
445 jsr166 1.63 final T arg;
446 dl 1.28 final Function<? super T,? extends U> fn;
447 dl 1.1 final CompletableFuture<U> dst;
448 dl 1.28 AsyncApply(T arg, Function<? super T,? extends U> fn,
449 dl 1.35 CompletableFuture<U> dst) {
450 dl 1.1 this.arg = arg; this.fn = fn; this.dst = dst;
451     }
452     public final boolean exec() {
453 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
454 dl 1.29 if ((d = this.dst) != null && d.result == null) {
455 dl 1.21 try {
456     u = fn.apply(arg);
457     ex = null;
458     } catch (Throwable rex) {
459     ex = rex;
460     u = null;
461     }
462     d.internalComplete(u, ex);
463 dl 1.1 }
464     return true;
465     }
466     private static final long serialVersionUID = 5232453952276885070L;
467     }
468    
469 jsr166 1.81 static final class AsyncCombine<T,U,V> extends Async {
470 dl 1.1 final T arg1;
471     final U arg2;
472 jsr166 1.63 final BiFunction<? super T,? super U,? extends V> fn;
473 dl 1.1 final CompletableFuture<V> dst;
474 jsr166 1.81 AsyncCombine(T arg1, U arg2,
475 dl 1.35 BiFunction<? super T,? super U,? extends V> fn,
476     CompletableFuture<V> dst) {
477 dl 1.1 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
478     }
479     public final boolean exec() {
480 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
481 dl 1.29 if ((d = this.dst) != null && d.result == null) {
482 dl 1.21 try {
483     v = fn.apply(arg1, arg2);
484     ex = null;
485     } catch (Throwable rex) {
486     ex = rex;
487     v = null;
488     }
489     d.internalComplete(v, ex);
490 dl 1.1 }
491     return true;
492     }
493     private static final long serialVersionUID = 5232453952276885070L;
494     }
495    
496 dl 1.28 static final class AsyncAccept<T> extends Async {
497 jsr166 1.63 final T arg;
498 dl 1.34 final Consumer<? super T> fn;
499 dl 1.88 final CompletableFuture<?> dst;
500 dl 1.34 AsyncAccept(T arg, Consumer<? super T> fn,
501 dl 1.88 CompletableFuture<?> dst) {
502 dl 1.7 this.arg = arg; this.fn = fn; this.dst = dst;
503     }
504     public final boolean exec() {
505 dl 1.88 CompletableFuture<?> d; Throwable ex;
506 dl 1.29 if ((d = this.dst) != null && d.result == null) {
507 dl 1.21 try {
508     fn.accept(arg);
509     ex = null;
510     } catch (Throwable rex) {
511     ex = rex;
512     }
513     d.internalComplete(null, ex);
514 dl 1.7 }
515     return true;
516     }
517     private static final long serialVersionUID = 5232453952276885070L;
518     }
519    
520 jsr166 1.81 static final class AsyncAcceptBoth<T,U> extends Async {
521 dl 1.7 final T arg1;
522     final U arg2;
523 jsr166 1.63 final BiConsumer<? super T,? super U> fn;
524 dl 1.88 final CompletableFuture<?> dst;
525 jsr166 1.81 AsyncAcceptBoth(T arg1, U arg2,
526     BiConsumer<? super T,? super U> fn,
527 dl 1.88 CompletableFuture<?> dst) {
528 dl 1.7 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
529     }
530     public final boolean exec() {
531 dl 1.88 CompletableFuture<?> d; Throwable ex;
532 dl 1.29 if ((d = this.dst) != null && d.result == null) {
533 dl 1.21 try {
534     fn.accept(arg1, arg2);
535     ex = null;
536     } catch (Throwable rex) {
537     ex = rex;
538     }
539     d.internalComplete(null, ex);
540 dl 1.7 }
541     return true;
542     }
543     private static final long serialVersionUID = 5232453952276885070L;
544     }
545    
546 dl 1.37 static final class AsyncCompose<T,U> extends Async {
547 jsr166 1.63 final T arg;
548 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
549 dl 1.37 final CompletableFuture<U> dst;
550     AsyncCompose(T arg,
551 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
552 dl 1.37 CompletableFuture<U> dst) {
553     this.arg = arg; this.fn = fn; this.dst = dst;
554     }
555     public final boolean exec() {
556     CompletableFuture<U> d, fr; U u; Throwable ex;
557     if ((d = this.dst) != null && d.result == null) {
558     try {
559 dl 1.88 CompletionStage<U> cs = fn.apply(arg);
560     fr = (cs == null) ? null : cs.toCompletableFuture();
561 jsr166 1.61 ex = (fr == null) ? new NullPointerException() : null;
562 dl 1.37 } catch (Throwable rex) {
563     ex = rex;
564     fr = null;
565     }
566     if (ex != null)
567     u = null;
568     else {
569     Object r = fr.result;
570 dl 1.67 if (r == null)
571     r = fr.waitingGet(false);
572 dl 1.37 if (r instanceof AltResult) {
573     ex = ((AltResult)r).ex;
574     u = null;
575     }
576     else {
577     @SuppressWarnings("unchecked") U ur = (U) r;
578     u = ur;
579     }
580     }
581     d.internalComplete(u, ex);
582     }
583     return true;
584     }
585     private static final long serialVersionUID = 5232453952276885070L;
586     }
587    
588 dl 1.91 static final class AsyncWhenComplete<T> extends Async {
589     final T arg1;
590     final Throwable arg2;
591     final BiConsumer<? super T,? super Throwable> fn;
592     final CompletableFuture<T> dst;
593     AsyncWhenComplete(T arg1, Throwable arg2,
594     BiConsumer<? super T,? super Throwable> fn,
595     CompletableFuture<T> dst) {
596     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
597     }
598     public final boolean exec() {
599 jsr166 1.92 CompletableFuture<T> d;
600 dl 1.91 if ((d = this.dst) != null && d.result == null) {
601     Throwable ex = arg2;
602     try {
603     fn.accept(arg1, ex);
604     } catch (Throwable rex) {
605     if (ex == null)
606     ex = rex;
607     }
608     d.internalComplete(arg1, ex);
609     }
610     return true;
611     }
612     private static final long serialVersionUID = 5232453952276885070L;
613     }
614    
615 dl 1.1 /* ------------- Completions -------------- */
616    
617 dl 1.28 /**
618     * Simple linked list nodes to record completions, used in
619     * basically the same way as WaitNodes. (We separate nodes from
620     * the Completions themselves mainly because for the And and Or
621     * methods, the same Completion object resides in two lists.)
622     */
623     static final class CompletionNode {
624     final Completion completion;
625     volatile CompletionNode next;
626     CompletionNode(Completion completion) { this.completion = completion; }
627     }
628    
629 dl 1.1 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
630 jsr166 1.33 abstract static class Completion extends AtomicInteger implements Runnable {
631 dl 1.1 }
632    
633 jsr166 1.81 static final class ThenApply<T,U> extends Completion {
634 jsr166 1.2 final CompletableFuture<? extends T> src;
635     final Function<? super T,? extends U> fn;
636     final CompletableFuture<U> dst;
637 dl 1.1 final Executor executor;
638 jsr166 1.81 ThenApply(CompletableFuture<? extends T> src,
639     Function<? super T,? extends U> fn,
640     CompletableFuture<U> dst,
641     Executor executor) {
642 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
643     this.executor = executor;
644     }
645 jsr166 1.26 public final void run() {
646 dl 1.28 final CompletableFuture<? extends T> a;
647     final Function<? super T,? extends U> fn;
648     final CompletableFuture<U> dst;
649 jsr166 1.2 Object r; T t; Throwable ex;
650     if ((dst = this.dst) != null &&
651 dl 1.1 (fn = this.fn) != null &&
652     (a = this.src) != null &&
653     (r = a.result) != null &&
654     compareAndSet(0, 1)) {
655 jsr166 1.2 if (r instanceof AltResult) {
656 dl 1.19 ex = ((AltResult)r).ex;
657 dl 1.1 t = null;
658     }
659 dl 1.17 else {
660     ex = null;
661 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
662     t = tr;
663 dl 1.1 }
664 dl 1.20 Executor e = executor;
665     U u = null;
666 dl 1.17 if (ex == null) {
667     try {
668 dl 1.20 if (e != null)
669 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
670 dl 1.17 else
671 dl 1.20 u = fn.apply(t);
672 dl 1.17 } catch (Throwable rex) {
673     ex = rex;
674     }
675     }
676 dl 1.20 if (e == null || ex != null)
677     dst.internalComplete(u, ex);
678 dl 1.1 }
679     }
680 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
681 dl 1.1 }
682    
683 jsr166 1.81 static final class ThenAccept<T> extends Completion {
684 dl 1.7 final CompletableFuture<? extends T> src;
685 dl 1.34 final Consumer<? super T> fn;
686 dl 1.88 final CompletableFuture<?> dst;
687 dl 1.7 final Executor executor;
688 jsr166 1.81 ThenAccept(CompletableFuture<? extends T> src,
689     Consumer<? super T> fn,
690 dl 1.88 CompletableFuture<?> dst,
691 jsr166 1.81 Executor executor) {
692 dl 1.7 this.src = src; this.fn = fn; this.dst = dst;
693     this.executor = executor;
694     }
695 jsr166 1.26 public final void run() {
696 dl 1.28 final CompletableFuture<? extends T> a;
697 dl 1.34 final Consumer<? super T> fn;
698 dl 1.88 final CompletableFuture<?> dst;
699 dl 1.7 Object r; T t; Throwable ex;
700     if ((dst = this.dst) != null &&
701     (fn = this.fn) != null &&
702     (a = this.src) != null &&
703     (r = a.result) != null &&
704     compareAndSet(0, 1)) {
705     if (r instanceof AltResult) {
706 dl 1.19 ex = ((AltResult)r).ex;
707 dl 1.7 t = null;
708     }
709 dl 1.17 else {
710     ex = null;
711 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
712     t = tr;
713 dl 1.17 }
714 dl 1.20 Executor e = executor;
715 dl 1.17 if (ex == null) {
716     try {
717 dl 1.20 if (e != null)
718 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
719 dl 1.20 else
720 dl 1.17 fn.accept(t);
721     } catch (Throwable rex) {
722     ex = rex;
723 dl 1.7 }
724     }
725 dl 1.20 if (e == null || ex != null)
726     dst.internalComplete(null, ex);
727 dl 1.7 }
728     }
729 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
730 dl 1.7 }
731    
732 jsr166 1.82 static final class ThenRun extends Completion {
733     final CompletableFuture<?> src;
734 jsr166 1.2 final Runnable fn;
735     final CompletableFuture<Void> dst;
736 dl 1.1 final Executor executor;
737 jsr166 1.82 ThenRun(CompletableFuture<?> src,
738 jsr166 1.81 Runnable fn,
739     CompletableFuture<Void> dst,
740     Executor executor) {
741 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
742     this.executor = executor;
743     }
744 jsr166 1.26 public final void run() {
745 jsr166 1.82 final CompletableFuture<?> a;
746 dl 1.28 final Runnable fn;
747     final CompletableFuture<Void> dst;
748 jsr166 1.2 Object r; Throwable ex;
749     if ((dst = this.dst) != null &&
750 dl 1.1 (fn = this.fn) != null &&
751     (a = this.src) != null &&
752     (r = a.result) != null &&
753     compareAndSet(0, 1)) {
754 dl 1.19 if (r instanceof AltResult)
755     ex = ((AltResult)r).ex;
756 dl 1.17 else
757     ex = null;
758 dl 1.20 Executor e = executor;
759 dl 1.17 if (ex == null) {
760     try {
761 dl 1.20 if (e != null)
762 dl 1.28 e.execute(new AsyncRun(fn, dst));
763 dl 1.20 else
764 dl 1.17 fn.run();
765     } catch (Throwable rex) {
766     ex = rex;
767 dl 1.1 }
768     }
769 dl 1.20 if (e == null || ex != null)
770     dst.internalComplete(null, ex);
771 dl 1.1 }
772     }
773 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
774 dl 1.1 }
775    
776 jsr166 1.81 static final class ThenCombine<T,U,V> extends Completion {
777 jsr166 1.2 final CompletableFuture<? extends T> src;
778     final CompletableFuture<? extends U> snd;
779     final BiFunction<? super T,? super U,? extends V> fn;
780     final CompletableFuture<V> dst;
781 dl 1.1 final Executor executor;
782 jsr166 1.81 ThenCombine(CompletableFuture<? extends T> src,
783     CompletableFuture<? extends U> snd,
784     BiFunction<? super T,? super U,? extends V> fn,
785     CompletableFuture<V> dst,
786     Executor executor) {
787 dl 1.1 this.src = src; this.snd = snd;
788     this.fn = fn; this.dst = dst;
789     this.executor = executor;
790     }
791 jsr166 1.26 public final void run() {
792 dl 1.28 final CompletableFuture<? extends T> a;
793     final CompletableFuture<? extends U> b;
794     final BiFunction<? super T,? super U,? extends V> fn;
795     final CompletableFuture<V> dst;
796 dl 1.85 Object r, s; T t; U u; Throwable ex;
797     if ((dst = this.dst) != null &&
798     (fn = this.fn) != null &&
799     (a = this.src) != null &&
800     (r = a.result) != null &&
801     (b = this.snd) != null &&
802     (s = b.result) != null &&
803     compareAndSet(0, 1)) {
804     if (r instanceof AltResult) {
805     ex = ((AltResult)r).ex;
806     t = null;
807     }
808     else {
809     ex = null;
810     @SuppressWarnings("unchecked") T tr = (T) r;
811     t = tr;
812     }
813     if (ex != null)
814     u = null;
815     else if (s instanceof AltResult) {
816     ex = ((AltResult)s).ex;
817     u = null;
818     }
819     else {
820     @SuppressWarnings("unchecked") U us = (U) s;
821     u = us;
822     }
823 dl 1.20 Executor e = executor;
824     V v = null;
825 dl 1.19 if (ex == null) {
826     try {
827 dl 1.20 if (e != null)
828 jsr166 1.81 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
829 dl 1.19 else
830 dl 1.20 v = fn.apply(t, u);
831 dl 1.19 } catch (Throwable rex) {
832     ex = rex;
833     }
834 dl 1.1 }
835 dl 1.20 if (e == null || ex != null)
836     dst.internalComplete(v, ex);
837 jsr166 1.2 }
838     }
839 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
840 dl 1.1 }
841    
842 jsr166 1.81 static final class ThenAcceptBoth<T,U> extends Completion {
843 dl 1.7 final CompletableFuture<? extends T> src;
844     final CompletableFuture<? extends U> snd;
845 dl 1.34 final BiConsumer<? super T,? super U> fn;
846 dl 1.7 final CompletableFuture<Void> dst;
847     final Executor executor;
848 jsr166 1.81 ThenAcceptBoth(CompletableFuture<? extends T> src,
849     CompletableFuture<? extends U> snd,
850     BiConsumer<? super T,? super U> fn,
851     CompletableFuture<Void> dst,
852     Executor executor) {
853 dl 1.7 this.src = src; this.snd = snd;
854     this.fn = fn; this.dst = dst;
855     this.executor = executor;
856     }
857 jsr166 1.26 public final void run() {
858 dl 1.28 final CompletableFuture<? extends T> a;
859     final CompletableFuture<? extends U> b;
860 dl 1.34 final BiConsumer<? super T,? super U> fn;
861 dl 1.28 final CompletableFuture<Void> dst;
862 dl 1.85 Object r, s; T t; U u; Throwable ex;
863     if ((dst = this.dst) != null &&
864     (fn = this.fn) != null &&
865     (a = this.src) != null &&
866     (r = a.result) != null &&
867     (b = this.snd) != null &&
868     (s = b.result) != null &&
869     compareAndSet(0, 1)) {
870     if (r instanceof AltResult) {
871     ex = ((AltResult)r).ex;
872     t = null;
873     }
874     else {
875     ex = null;
876     @SuppressWarnings("unchecked") T tr = (T) r;
877     t = tr;
878     }
879     if (ex != null)
880     u = null;
881     else if (s instanceof AltResult) {
882     ex = ((AltResult)s).ex;
883     u = null;
884     }
885     else {
886     @SuppressWarnings("unchecked") U us = (U) s;
887     u = us;
888     }
889 dl 1.20 Executor e = executor;
890 dl 1.19 if (ex == null) {
891     try {
892 dl 1.20 if (e != null)
893 jsr166 1.81 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
894 dl 1.20 else
895 dl 1.19 fn.accept(t, u);
896     } catch (Throwable rex) {
897     ex = rex;
898 dl 1.7 }
899     }
900 dl 1.20 if (e == null || ex != null)
901     dst.internalComplete(null, ex);
902 dl 1.7 }
903     }
904 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
905 dl 1.7 }
906    
907 jsr166 1.82 static final class RunAfterBoth extends Completion {
908     final CompletableFuture<?> src;
909 jsr166 1.2 final CompletableFuture<?> snd;
910     final Runnable fn;
911     final CompletableFuture<Void> dst;
912 dl 1.1 final Executor executor;
913 jsr166 1.82 RunAfterBoth(CompletableFuture<?> src,
914 jsr166 1.81 CompletableFuture<?> snd,
915     Runnable fn,
916     CompletableFuture<Void> dst,
917     Executor executor) {
918 dl 1.1 this.src = src; this.snd = snd;
919     this.fn = fn; this.dst = dst;
920     this.executor = executor;
921     }
922 jsr166 1.26 public final void run() {
923 jsr166 1.82 final CompletableFuture<?> a;
924 dl 1.1 final CompletableFuture<?> b;
925     final Runnable fn;
926     final CompletableFuture<Void> dst;
927 dl 1.85 Object r, s; Throwable ex;
928     if ((dst = this.dst) != null &&
929     (fn = this.fn) != null &&
930     (a = this.src) != null &&
931     (r = a.result) != null &&
932     (b = this.snd) != null &&
933     (s = b.result) != null &&
934     compareAndSet(0, 1)) {
935     if (r instanceof AltResult)
936     ex = ((AltResult)r).ex;
937     else
938     ex = null;
939     if (ex == null && (s instanceof AltResult))
940     ex = ((AltResult)s).ex;
941 dl 1.20 Executor e = executor;
942 dl 1.19 if (ex == null) {
943     try {
944 dl 1.20 if (e != null)
945 dl 1.28 e.execute(new AsyncRun(fn, dst));
946 dl 1.20 else
947 dl 1.19 fn.run();
948     } catch (Throwable rex) {
949     ex = rex;
950 dl 1.1 }
951 jsr166 1.2 }
952 dl 1.20 if (e == null || ex != null)
953     dst.internalComplete(null, ex);
954 jsr166 1.2 }
955     }
956 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
957 dl 1.1 }
958    
959 dl 1.35 static final class AndCompletion extends Completion {
960     final CompletableFuture<?> src;
961     final CompletableFuture<?> snd;
962     final CompletableFuture<Void> dst;
963     AndCompletion(CompletableFuture<?> src,
964     CompletableFuture<?> snd,
965     CompletableFuture<Void> dst) {
966     this.src = src; this.snd = snd; this.dst = dst;
967     }
968     public final void run() {
969     final CompletableFuture<?> a;
970     final CompletableFuture<?> b;
971     final CompletableFuture<Void> dst;
972     Object r, s; Throwable ex;
973     if ((dst = this.dst) != null &&
974     (a = this.src) != null &&
975     (r = a.result) != null &&
976     (b = this.snd) != null &&
977     (s = b.result) != null &&
978     compareAndSet(0, 1)) {
979     if (r instanceof AltResult)
980     ex = ((AltResult)r).ex;
981     else
982     ex = null;
983     if (ex == null && (s instanceof AltResult))
984     ex = ((AltResult)s).ex;
985     dst.internalComplete(null, ex);
986     }
987     }
988     private static final long serialVersionUID = 5232453952276885070L;
989     }
990    
991 jsr166 1.81 static final class ApplyToEither<T,U> extends Completion {
992 jsr166 1.2 final CompletableFuture<? extends T> src;
993     final CompletableFuture<? extends T> snd;
994     final Function<? super T,? extends U> fn;
995     final CompletableFuture<U> dst;
996 dl 1.1 final Executor executor;
997 jsr166 1.81 ApplyToEither(CompletableFuture<? extends T> src,
998     CompletableFuture<? extends T> snd,
999     Function<? super T,? extends U> fn,
1000     CompletableFuture<U> dst,
1001     Executor executor) {
1002 dl 1.1 this.src = src; this.snd = snd;
1003     this.fn = fn; this.dst = dst;
1004     this.executor = executor;
1005     }
1006 jsr166 1.26 public final void run() {
1007 dl 1.28 final CompletableFuture<? extends T> a;
1008     final CompletableFuture<? extends T> b;
1009     final Function<? super T,? extends U> fn;
1010     final CompletableFuture<U> dst;
1011 jsr166 1.2 Object r; T t; Throwable ex;
1012     if ((dst = this.dst) != null &&
1013 dl 1.1 (fn = this.fn) != null &&
1014     (((a = this.src) != null && (r = a.result) != null) ||
1015     ((b = this.snd) != null && (r = b.result) != null)) &&
1016     compareAndSet(0, 1)) {
1017 jsr166 1.2 if (r instanceof AltResult) {
1018 dl 1.19 ex = ((AltResult)r).ex;
1019 jsr166 1.2 t = null;
1020     }
1021 dl 1.19 else {
1022     ex = null;
1023 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1024     t = tr;
1025 dl 1.1 }
1026 dl 1.20 Executor e = executor;
1027     U u = null;
1028 dl 1.19 if (ex == null) {
1029     try {
1030 dl 1.20 if (e != null)
1031 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
1032 dl 1.19 else
1033 dl 1.20 u = fn.apply(t);
1034 dl 1.19 } catch (Throwable rex) {
1035     ex = rex;
1036     }
1037     }
1038 dl 1.20 if (e == null || ex != null)
1039     dst.internalComplete(u, ex);
1040 jsr166 1.2 }
1041     }
1042 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1043 dl 1.1 }
1044    
1045 jsr166 1.81 static final class AcceptEither<T> extends Completion {
1046 dl 1.7 final CompletableFuture<? extends T> src;
1047     final CompletableFuture<? extends T> snd;
1048 dl 1.34 final Consumer<? super T> fn;
1049 dl 1.7 final CompletableFuture<Void> dst;
1050     final Executor executor;
1051 jsr166 1.81 AcceptEither(CompletableFuture<? extends T> src,
1052     CompletableFuture<? extends T> snd,
1053     Consumer<? super T> fn,
1054     CompletableFuture<Void> dst,
1055     Executor executor) {
1056 dl 1.7 this.src = src; this.snd = snd;
1057     this.fn = fn; this.dst = dst;
1058     this.executor = executor;
1059     }
1060 jsr166 1.26 public final void run() {
1061 dl 1.28 final CompletableFuture<? extends T> a;
1062     final CompletableFuture<? extends T> b;
1063 dl 1.34 final Consumer<? super T> fn;
1064 dl 1.28 final CompletableFuture<Void> dst;
1065 dl 1.7 Object r; T t; Throwable ex;
1066     if ((dst = this.dst) != null &&
1067     (fn = this.fn) != null &&
1068     (((a = this.src) != null && (r = a.result) != null) ||
1069     ((b = this.snd) != null && (r = b.result) != null)) &&
1070     compareAndSet(0, 1)) {
1071     if (r instanceof AltResult) {
1072 dl 1.19 ex = ((AltResult)r).ex;
1073 dl 1.7 t = null;
1074     }
1075 dl 1.19 else {
1076     ex = null;
1077 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1078     t = tr;
1079 dl 1.19 }
1080 dl 1.20 Executor e = executor;
1081 dl 1.19 if (ex == null) {
1082     try {
1083 dl 1.20 if (e != null)
1084 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
1085 dl 1.20 else
1086 dl 1.19 fn.accept(t);
1087     } catch (Throwable rex) {
1088     ex = rex;
1089 dl 1.7 }
1090     }
1091 dl 1.20 if (e == null || ex != null)
1092     dst.internalComplete(null, ex);
1093 dl 1.7 }
1094     }
1095 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1096 dl 1.7 }
1097    
1098 jsr166 1.82 static final class RunAfterEither extends Completion {
1099     final CompletableFuture<?> src;
1100 jsr166 1.2 final CompletableFuture<?> snd;
1101     final Runnable fn;
1102     final CompletableFuture<Void> dst;
1103 dl 1.1 final Executor executor;
1104 jsr166 1.82 RunAfterEither(CompletableFuture<?> src,
1105 jsr166 1.81 CompletableFuture<?> snd,
1106     Runnable fn,
1107     CompletableFuture<Void> dst,
1108     Executor executor) {
1109 dl 1.1 this.src = src; this.snd = snd;
1110     this.fn = fn; this.dst = dst;
1111     this.executor = executor;
1112     }
1113 jsr166 1.26 public final void run() {
1114 jsr166 1.82 final CompletableFuture<?> a;
1115 dl 1.1 final CompletableFuture<?> b;
1116     final Runnable fn;
1117     final CompletableFuture<Void> dst;
1118 dl 1.28 Object r; Throwable ex;
1119 jsr166 1.2 if ((dst = this.dst) != null &&
1120 dl 1.1 (fn = this.fn) != null &&
1121     (((a = this.src) != null && (r = a.result) != null) ||
1122     ((b = this.snd) != null && (r = b.result) != null)) &&
1123     compareAndSet(0, 1)) {
1124 dl 1.19 if (r instanceof AltResult)
1125     ex = ((AltResult)r).ex;
1126     else
1127     ex = null;
1128 dl 1.20 Executor e = executor;
1129 dl 1.19 if (ex == null) {
1130 dl 1.1 try {
1131 dl 1.20 if (e != null)
1132 dl 1.28 e.execute(new AsyncRun(fn, dst));
1133 dl 1.20 else
1134 dl 1.1 fn.run();
1135     } catch (Throwable rex) {
1136 dl 1.19 ex = rex;
1137 dl 1.1 }
1138     }
1139 dl 1.20 if (e == null || ex != null)
1140     dst.internalComplete(null, ex);
1141 jsr166 1.2 }
1142     }
1143 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1144 dl 1.1 }
1145    
1146 dl 1.35 static final class OrCompletion extends Completion {
1147     final CompletableFuture<?> src;
1148     final CompletableFuture<?> snd;
1149 dl 1.77 final CompletableFuture<Object> dst;
1150 dl 1.35 OrCompletion(CompletableFuture<?> src,
1151     CompletableFuture<?> snd,
1152 dl 1.77 CompletableFuture<Object> dst) {
1153 dl 1.35 this.src = src; this.snd = snd; this.dst = dst;
1154     }
1155     public final void run() {
1156     final CompletableFuture<?> a;
1157     final CompletableFuture<?> b;
1158 dl 1.77 final CompletableFuture<Object> dst;
1159     Object r, t; Throwable ex;
1160 dl 1.35 if ((dst = this.dst) != null &&
1161     (((a = this.src) != null && (r = a.result) != null) ||
1162     ((b = this.snd) != null && (r = b.result) != null)) &&
1163     compareAndSet(0, 1)) {
1164 dl 1.77 if (r instanceof AltResult) {
1165 dl 1.35 ex = ((AltResult)r).ex;
1166 dl 1.77 t = null;
1167     }
1168     else {
1169 dl 1.35 ex = null;
1170 dl 1.77 t = r;
1171     }
1172     dst.internalComplete(t, ex);
1173 dl 1.35 }
1174     }
1175     private static final long serialVersionUID = 5232453952276885070L;
1176     }
1177    
1178 dl 1.28 static final class ExceptionCompletion<T> extends Completion {
1179 jsr166 1.2 final CompletableFuture<? extends T> src;
1180 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1181     final CompletableFuture<T> dst;
1182 dl 1.28 ExceptionCompletion(CompletableFuture<? extends T> src,
1183     Function<? super Throwable, ? extends T> fn,
1184     CompletableFuture<T> dst) {
1185 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1186     }
1187 jsr166 1.26 public final void run() {
1188 dl 1.28 final CompletableFuture<? extends T> a;
1189     final Function<? super Throwable, ? extends T> fn;
1190     final CompletableFuture<T> dst;
1191 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1192 jsr166 1.2 if ((dst = this.dst) != null &&
1193 dl 1.1 (fn = this.fn) != null &&
1194     (a = this.src) != null &&
1195     (r = a.result) != null &&
1196     compareAndSet(0, 1)) {
1197 dl 1.20 if ((r instanceof AltResult) &&
1198 jsr166 1.87 (ex = ((AltResult)r).ex) != null) {
1199 dl 1.20 try {
1200     t = fn.apply(ex);
1201     } catch (Throwable rex) {
1202     dx = rex;
1203 dl 1.1 }
1204     }
1205 dl 1.28 else {
1206     @SuppressWarnings("unchecked") T tr = (T) r;
1207     t = tr;
1208     }
1209 dl 1.20 dst.internalComplete(t, dx);
1210 dl 1.1 }
1211     }
1212 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1213 dl 1.1 }
1214    
1215 dl 1.88 static final class WhenCompleteCompletion<T> extends Completion {
1216     final CompletableFuture<? extends T> src;
1217     final BiConsumer<? super T, ? super Throwable> fn;
1218     final CompletableFuture<T> dst;
1219     final Executor executor;
1220     WhenCompleteCompletion(CompletableFuture<? extends T> src,
1221     BiConsumer<? super T, ? super Throwable> fn,
1222     CompletableFuture<T> dst,
1223     Executor executor) {
1224     this.src = src; this.fn = fn; this.dst = dst;
1225     this.executor = executor;
1226     }
1227     public final void run() {
1228     final CompletableFuture<? extends T> a;
1229     final BiConsumer<? super T, ? super Throwable> fn;
1230     final CompletableFuture<T> dst;
1231     Object r; T t; Throwable ex;
1232     if ((dst = this.dst) != null &&
1233     (fn = this.fn) != null &&
1234     (a = this.src) != null &&
1235     (r = a.result) != null &&
1236     compareAndSet(0, 1)) {
1237     if (r instanceof AltResult) {
1238     ex = ((AltResult)r).ex;
1239     t = null;
1240     }
1241     else {
1242     ex = null;
1243     @SuppressWarnings("unchecked") T tr = (T) r;
1244     t = tr;
1245     }
1246     Executor e = executor;
1247     Throwable dx = null;
1248     try {
1249     if (e != null)
1250 dl 1.91 e.execute(new AsyncWhenComplete<T>(t, ex, fn, dst));
1251 dl 1.88 else
1252     fn.accept(t, ex);
1253     } catch (Throwable rex) {
1254     dx = rex;
1255     }
1256     if (e == null || dx != null)
1257     dst.internalComplete(t, ex != null ? ex : dx);
1258     }
1259     }
1260     private static final long serialVersionUID = 5232453952276885070L;
1261     }
1262    
1263 dl 1.75 static final class ThenCopy<T> extends Completion {
1264 dl 1.77 final CompletableFuture<?> src;
1265 dl 1.75 final CompletableFuture<T> dst;
1266 dl 1.77 ThenCopy(CompletableFuture<?> src,
1267 dl 1.75 CompletableFuture<T> dst) {
1268 dl 1.17 this.src = src; this.dst = dst;
1269     }
1270 jsr166 1.26 public final void run() {
1271 dl 1.77 final CompletableFuture<?> a;
1272 dl 1.75 final CompletableFuture<T> dst;
1273     Object r; T t; Throwable ex;
1274 dl 1.17 if ((dst = this.dst) != null &&
1275     (a = this.src) != null &&
1276     (r = a.result) != null &&
1277     compareAndSet(0, 1)) {
1278 dl 1.20 if (r instanceof AltResult) {
1279     ex = ((AltResult)r).ex;
1280     t = null;
1281     }
1282     else {
1283     ex = null;
1284 dl 1.75 @SuppressWarnings("unchecked") T tr = (T) r;
1285     t = tr;
1286 dl 1.20 }
1287     dst.internalComplete(t, ex);
1288 dl 1.17 }
1289     }
1290 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1291 dl 1.17 }
1292    
1293 dl 1.75 // version of ThenCopy for CompletableFuture<Void> dst
1294     static final class ThenPropagate extends Completion {
1295     final CompletableFuture<?> src;
1296     final CompletableFuture<Void> dst;
1297     ThenPropagate(CompletableFuture<?> src,
1298     CompletableFuture<Void> dst) {
1299     this.src = src; this.dst = dst;
1300     }
1301     public final void run() {
1302     final CompletableFuture<?> a;
1303     final CompletableFuture<Void> dst;
1304     Object r; Throwable ex;
1305     if ((dst = this.dst) != null &&
1306     (a = this.src) != null &&
1307     (r = a.result) != null &&
1308     compareAndSet(0, 1)) {
1309     if (r instanceof AltResult)
1310     ex = ((AltResult)r).ex;
1311     else
1312     ex = null;
1313     dst.internalComplete(null, ex);
1314     }
1315     }
1316     private static final long serialVersionUID = 5232453952276885070L;
1317     }
1318    
1319 dl 1.28 static final class HandleCompletion<T,U> extends Completion {
1320 dl 1.17 final CompletableFuture<? extends T> src;
1321     final BiFunction<? super T, Throwable, ? extends U> fn;
1322     final CompletableFuture<U> dst;
1323 dl 1.88 final Executor executor;
1324 dl 1.28 HandleCompletion(CompletableFuture<? extends T> src,
1325     BiFunction<? super T, Throwable, ? extends U> fn,
1326 dl 1.88 CompletableFuture<U> dst,
1327     Executor executor) {
1328 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1329 dl 1.88 this.executor = executor;
1330 dl 1.17 }
1331 jsr166 1.26 public final void run() {
1332 dl 1.28 final CompletableFuture<? extends T> a;
1333     final BiFunction<? super T, Throwable, ? extends U> fn;
1334     final CompletableFuture<U> dst;
1335 dl 1.17 Object r; T t; Throwable ex;
1336     if ((dst = this.dst) != null &&
1337     (fn = this.fn) != null &&
1338     (a = this.src) != null &&
1339     (r = a.result) != null &&
1340     compareAndSet(0, 1)) {
1341     if (r instanceof AltResult) {
1342     ex = ((AltResult)r).ex;
1343     t = null;
1344     }
1345     else {
1346     ex = null;
1347 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1348     t = tr;
1349 dl 1.17 }
1350 dl 1.88 Executor e = executor;
1351     U u = null;
1352     Throwable dx = null;
1353 dl 1.17 try {
1354 dl 1.88 if (e != null)
1355     e.execute(new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1356     else
1357     u = fn.apply(t, ex);
1358 dl 1.17 } catch (Throwable rex) {
1359 dl 1.20 dx = rex;
1360 dl 1.17 }
1361 dl 1.88 if (e == null || dx != null)
1362     dst.internalComplete(u, dx);
1363 dl 1.17 }
1364     }
1365 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1366 dl 1.17 }
1367    
1368 jsr166 1.81 static final class ThenCompose<T,U> extends Completion {
1369 dl 1.17 final CompletableFuture<? extends T> src;
1370 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1371 dl 1.17 final CompletableFuture<U> dst;
1372 dl 1.37 final Executor executor;
1373 jsr166 1.81 ThenCompose(CompletableFuture<? extends T> src,
1374 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
1375 jsr166 1.81 CompletableFuture<U> dst,
1376     Executor executor) {
1377 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1378 dl 1.37 this.executor = executor;
1379 dl 1.17 }
1380 jsr166 1.26 public final void run() {
1381 dl 1.28 final CompletableFuture<? extends T> a;
1382 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1383 dl 1.28 final CompletableFuture<U> dst;
1384 dl 1.37 Object r; T t; Throwable ex; Executor e;
1385 dl 1.17 if ((dst = this.dst) != null &&
1386     (fn = this.fn) != null &&
1387     (a = this.src) != null &&
1388     (r = a.result) != null &&
1389     compareAndSet(0, 1)) {
1390     if (r instanceof AltResult) {
1391     ex = ((AltResult)r).ex;
1392     t = null;
1393     }
1394     else {
1395     ex = null;
1396 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1397     t = tr;
1398 dl 1.17 }
1399     CompletableFuture<U> c = null;
1400 dl 1.20 U u = null;
1401     boolean complete = false;
1402 dl 1.17 if (ex == null) {
1403 dl 1.37 if ((e = executor) != null)
1404     e.execute(new AsyncCompose<T,U>(t, fn, dst));
1405     else {
1406     try {
1407 dl 1.88 CompletionStage<U> cs = fn.apply(t);
1408     c = (cs == null) ? null : cs.toCompletableFuture();
1409     if (c == null)
1410 dl 1.37 ex = new NullPointerException();
1411     } catch (Throwable rex) {
1412     ex = rex;
1413     }
1414 dl 1.17 }
1415     }
1416 dl 1.37 if (c != null) {
1417 dl 1.75 ThenCopy<U> d = null;
1418 dl 1.18 Object s;
1419     if ((s = c.result) == null) {
1420     CompletionNode p = new CompletionNode
1421 dl 1.75 (d = new ThenCopy<U>(c, dst));
1422 dl 1.18 while ((s = c.result) == null) {
1423     if (UNSAFE.compareAndSwapObject
1424     (c, COMPLETIONS, p.next = c.completions, p))
1425     break;
1426     }
1427 dl 1.17 }
1428 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1429 dl 1.20 complete = true;
1430 dl 1.18 if (s instanceof AltResult) {
1431     ex = ((AltResult)s).ex; // no rewrap
1432     u = null;
1433 dl 1.28 }
1434     else {
1435     @SuppressWarnings("unchecked") U us = (U) s;
1436     u = us;
1437     }
1438     }
1439     }
1440     if (complete || ex != null)
1441     dst.internalComplete(u, ex);
1442     if (c != null)
1443     c.helpPostComplete();
1444     }
1445     }
1446     private static final long serialVersionUID = 5232453952276885070L;
1447     }
1448    
1449 dl 1.88 // Implementations of stage methods with (plain, async, Executor) forms
1450 dl 1.28
1451 dl 1.88 private <U> CompletableFuture<U> doThenApply
1452     (Function<? super T,? extends U> fn,
1453     Executor e) {
1454     if (fn == null) throw new NullPointerException();
1455     CompletableFuture<U> dst = new CompletableFuture<U>();
1456     ThenApply<T,U> d = null;
1457     Object r;
1458     if ((r = result) == null) {
1459     CompletionNode p = new CompletionNode
1460     (d = new ThenApply<T,U>(this, fn, dst, e));
1461     while ((r = result) == null) {
1462     if (UNSAFE.compareAndSwapObject
1463     (this, COMPLETIONS, p.next = completions, p))
1464     break;
1465     }
1466     }
1467     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1468     T t; Throwable ex;
1469     if (r instanceof AltResult) {
1470     ex = ((AltResult)r).ex;
1471     t = null;
1472     }
1473     else {
1474     ex = null;
1475     @SuppressWarnings("unchecked") T tr = (T) r;
1476     t = tr;
1477     }
1478     U u = null;
1479     if (ex == null) {
1480     try {
1481     if (e != null)
1482     e.execute(new AsyncApply<T,U>(t, fn, dst));
1483     else
1484     u = fn.apply(t);
1485     } catch (Throwable rex) {
1486     ex = rex;
1487     }
1488     }
1489     if (e == null || ex != null)
1490     dst.internalComplete(u, ex);
1491     }
1492     helpPostComplete();
1493     return dst;
1494 dl 1.28 }
1495    
1496 dl 1.88 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1497     Executor e) {
1498     if (fn == null) throw new NullPointerException();
1499     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1500     ThenAccept<T> d = null;
1501     Object r;
1502     if ((r = result) == null) {
1503     CompletionNode p = new CompletionNode
1504     (d = new ThenAccept<T>(this, fn, dst, e));
1505     while ((r = result) == null) {
1506     if (UNSAFE.compareAndSwapObject
1507     (this, COMPLETIONS, p.next = completions, p))
1508     break;
1509     }
1510     }
1511     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1512     T t; Throwable ex;
1513     if (r instanceof AltResult) {
1514     ex = ((AltResult)r).ex;
1515     t = null;
1516     }
1517     else {
1518     ex = null;
1519     @SuppressWarnings("unchecked") T tr = (T) r;
1520     t = tr;
1521     }
1522     if (ex == null) {
1523     try {
1524     if (e != null)
1525     e.execute(new AsyncAccept<T>(t, fn, dst));
1526     else
1527     fn.accept(t);
1528     } catch (Throwable rex) {
1529     ex = rex;
1530     }
1531     }
1532     if (e == null || ex != null)
1533     dst.internalComplete(null, ex);
1534     }
1535     helpPostComplete();
1536     return dst;
1537 dl 1.28 }
1538    
1539 dl 1.88 private CompletableFuture<Void> doThenRun(Runnable action,
1540     Executor e) {
1541     if (action == null) throw new NullPointerException();
1542     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1543     ThenRun d = null;
1544     Object r;
1545     if ((r = result) == null) {
1546     CompletionNode p = new CompletionNode
1547     (d = new ThenRun(this, action, dst, e));
1548     while ((r = result) == null) {
1549     if (UNSAFE.compareAndSwapObject
1550     (this, COMPLETIONS, p.next = completions, p))
1551     break;
1552     }
1553     }
1554     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1555     Throwable ex;
1556     if (r instanceof AltResult)
1557     ex = ((AltResult)r).ex;
1558     else
1559     ex = null;
1560     if (ex == null) {
1561     try {
1562     if (e != null)
1563     e.execute(new AsyncRun(action, dst));
1564     else
1565     action.run();
1566     } catch (Throwable rex) {
1567     ex = rex;
1568     }
1569     }
1570     if (e == null || ex != null)
1571     dst.internalComplete(null, ex);
1572     }
1573     helpPostComplete();
1574     return dst;
1575     }
1576    
1577     private <U,V> CompletableFuture<V> doThenCombine
1578     (CompletableFuture<? extends U> other,
1579     BiFunction<? super T,? super U,? extends V> fn,
1580     Executor e) {
1581     if (other == null || fn == null) throw new NullPointerException();
1582     CompletableFuture<V> dst = new CompletableFuture<V>();
1583     ThenCombine<T,U,V> d = null;
1584     Object r, s = null;
1585     if ((r = result) == null || (s = other.result) == null) {
1586     d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
1587     CompletionNode q = null, p = new CompletionNode(d);
1588     while ((r == null && (r = result) == null) ||
1589     (s == null && (s = other.result) == null)) {
1590     if (q != null) {
1591     if (s != null ||
1592     UNSAFE.compareAndSwapObject
1593     (other, COMPLETIONS, q.next = other.completions, q))
1594     break;
1595     }
1596     else if (r != null ||
1597     UNSAFE.compareAndSwapObject
1598     (this, COMPLETIONS, p.next = completions, p)) {
1599     if (s != null)
1600     break;
1601     q = new CompletionNode(d);
1602     }
1603     }
1604     }
1605     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1606     T t; U u; Throwable ex;
1607     if (r instanceof AltResult) {
1608     ex = ((AltResult)r).ex;
1609     t = null;
1610     }
1611     else {
1612     ex = null;
1613     @SuppressWarnings("unchecked") T tr = (T) r;
1614     t = tr;
1615     }
1616     if (ex != null)
1617     u = null;
1618     else if (s instanceof AltResult) {
1619     ex = ((AltResult)s).ex;
1620     u = null;
1621     }
1622     else {
1623     @SuppressWarnings("unchecked") U us = (U) s;
1624     u = us;
1625     }
1626     V v = null;
1627     if (ex == null) {
1628     try {
1629     if (e != null)
1630     e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
1631     else
1632     v = fn.apply(t, u);
1633     } catch (Throwable rex) {
1634     ex = rex;
1635     }
1636     }
1637     if (e == null || ex != null)
1638     dst.internalComplete(v, ex);
1639     }
1640     helpPostComplete();
1641     other.helpPostComplete();
1642     return dst;
1643     }
1644    
1645     private <U> CompletableFuture<Void> doThenAcceptBoth
1646     (CompletableFuture<? extends U> other,
1647     BiConsumer<? super T,? super U> fn,
1648     Executor e) {
1649     if (other == null || fn == null) throw new NullPointerException();
1650     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1651     ThenAcceptBoth<T,U> d = null;
1652     Object r, s = null;
1653     if ((r = result) == null || (s = other.result) == null) {
1654     d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1655     CompletionNode q = null, p = new CompletionNode(d);
1656     while ((r == null && (r = result) == null) ||
1657     (s == null && (s = other.result) == null)) {
1658     if (q != null) {
1659     if (s != null ||
1660     UNSAFE.compareAndSwapObject
1661     (other, COMPLETIONS, q.next = other.completions, q))
1662     break;
1663     }
1664     else if (r != null ||
1665     UNSAFE.compareAndSwapObject
1666     (this, COMPLETIONS, p.next = completions, p)) {
1667     if (s != null)
1668     break;
1669     q = new CompletionNode(d);
1670     }
1671     }
1672     }
1673     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1674     T t; U u; Throwable ex;
1675     if (r instanceof AltResult) {
1676     ex = ((AltResult)r).ex;
1677     t = null;
1678     }
1679     else {
1680     ex = null;
1681     @SuppressWarnings("unchecked") T tr = (T) r;
1682     t = tr;
1683     }
1684     if (ex != null)
1685     u = null;
1686     else if (s instanceof AltResult) {
1687     ex = ((AltResult)s).ex;
1688     u = null;
1689     }
1690     else {
1691     @SuppressWarnings("unchecked") U us = (U) s;
1692     u = us;
1693     }
1694     if (ex == null) {
1695     try {
1696     if (e != null)
1697     e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1698     else
1699     fn.accept(t, u);
1700     } catch (Throwable rex) {
1701     ex = rex;
1702     }
1703     }
1704     if (e == null || ex != null)
1705     dst.internalComplete(null, ex);
1706     }
1707     helpPostComplete();
1708     other.helpPostComplete();
1709     return dst;
1710     }
1711    
1712     private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1713     Runnable action,
1714     Executor e) {
1715     if (other == null || action == null) throw new NullPointerException();
1716     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1717     RunAfterBoth d = null;
1718     Object r, s = null;
1719     if ((r = result) == null || (s = other.result) == null) {
1720     d = new RunAfterBoth(this, other, action, dst, e);
1721     CompletionNode q = null, p = new CompletionNode(d);
1722     while ((r == null && (r = result) == null) ||
1723     (s == null && (s = other.result) == null)) {
1724     if (q != null) {
1725     if (s != null ||
1726     UNSAFE.compareAndSwapObject
1727     (other, COMPLETIONS, q.next = other.completions, q))
1728     break;
1729     }
1730     else if (r != null ||
1731     UNSAFE.compareAndSwapObject
1732     (this, COMPLETIONS, p.next = completions, p)) {
1733     if (s != null)
1734     break;
1735     q = new CompletionNode(d);
1736     }
1737     }
1738     }
1739     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1740     Throwable ex;
1741     if (r instanceof AltResult)
1742     ex = ((AltResult)r).ex;
1743     else
1744     ex = null;
1745     if (ex == null && (s instanceof AltResult))
1746     ex = ((AltResult)s).ex;
1747     if (ex == null) {
1748     try {
1749     if (e != null)
1750     e.execute(new AsyncRun(action, dst));
1751     else
1752     action.run();
1753     } catch (Throwable rex) {
1754     ex = rex;
1755     }
1756     }
1757     if (e == null || ex != null)
1758     dst.internalComplete(null, ex);
1759     }
1760     helpPostComplete();
1761     other.helpPostComplete();
1762     return dst;
1763     }
1764    
1765     private <U> CompletableFuture<U> doApplyToEither
1766     (CompletableFuture<? extends T> other,
1767     Function<? super T, U> fn,
1768     Executor e) {
1769     if (other == null || fn == null) throw new NullPointerException();
1770     CompletableFuture<U> dst = new CompletableFuture<U>();
1771     ApplyToEither<T,U> d = null;
1772     Object r;
1773     if ((r = result) == null && (r = other.result) == null) {
1774     d = new ApplyToEither<T,U>(this, other, fn, dst, e);
1775     CompletionNode q = null, p = new CompletionNode(d);
1776     while ((r = result) == null && (r = other.result) == null) {
1777     if (q != null) {
1778     if (UNSAFE.compareAndSwapObject
1779     (other, COMPLETIONS, q.next = other.completions, q))
1780     break;
1781     }
1782     else if (UNSAFE.compareAndSwapObject
1783     (this, COMPLETIONS, p.next = completions, p))
1784     q = new CompletionNode(d);
1785     }
1786     }
1787     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1788     T t; Throwable ex;
1789     if (r instanceof AltResult) {
1790     ex = ((AltResult)r).ex;
1791     t = null;
1792     }
1793     else {
1794     ex = null;
1795     @SuppressWarnings("unchecked") T tr = (T) r;
1796     t = tr;
1797     }
1798     U u = null;
1799     if (ex == null) {
1800     try {
1801     if (e != null)
1802     e.execute(new AsyncApply<T,U>(t, fn, dst));
1803     else
1804     u = fn.apply(t);
1805     } catch (Throwable rex) {
1806     ex = rex;
1807     }
1808     }
1809     if (e == null || ex != null)
1810     dst.internalComplete(u, ex);
1811     }
1812     helpPostComplete();
1813     other.helpPostComplete();
1814     return dst;
1815     }
1816    
1817     private CompletableFuture<Void> doAcceptEither
1818     (CompletableFuture<? extends T> other,
1819     Consumer<? super T> fn,
1820     Executor e) {
1821     if (other == null || fn == null) throw new NullPointerException();
1822     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1823     AcceptEither<T> d = null;
1824     Object r;
1825     if ((r = result) == null && (r = other.result) == null) {
1826     d = new AcceptEither<T>(this, other, fn, dst, e);
1827     CompletionNode q = null, p = new CompletionNode(d);
1828     while ((r = result) == null && (r = other.result) == null) {
1829     if (q != null) {
1830     if (UNSAFE.compareAndSwapObject
1831     (other, COMPLETIONS, q.next = other.completions, q))
1832     break;
1833     }
1834     else if (UNSAFE.compareAndSwapObject
1835     (this, COMPLETIONS, p.next = completions, p))
1836     q = new CompletionNode(d);
1837     }
1838     }
1839     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1840     T t; Throwable ex;
1841     if (r instanceof AltResult) {
1842     ex = ((AltResult)r).ex;
1843     t = null;
1844     }
1845     else {
1846     ex = null;
1847     @SuppressWarnings("unchecked") T tr = (T) r;
1848     t = tr;
1849     }
1850     if (ex == null) {
1851     try {
1852     if (e != null)
1853     e.execute(new AsyncAccept<T>(t, fn, dst));
1854     else
1855     fn.accept(t);
1856     } catch (Throwable rex) {
1857     ex = rex;
1858     }
1859     }
1860     if (e == null || ex != null)
1861     dst.internalComplete(null, ex);
1862     }
1863     helpPostComplete();
1864     other.helpPostComplete();
1865     return dst;
1866     }
1867    
1868     private CompletableFuture<Void> doRunAfterEither
1869     (CompletableFuture<?> other,
1870     Runnable action,
1871     Executor e) {
1872     if (other == null || action == null) throw new NullPointerException();
1873     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1874     RunAfterEither d = null;
1875     Object r;
1876     if ((r = result) == null && (r = other.result) == null) {
1877     d = new RunAfterEither(this, other, action, dst, e);
1878     CompletionNode q = null, p = new CompletionNode(d);
1879     while ((r = result) == null && (r = other.result) == null) {
1880     if (q != null) {
1881     if (UNSAFE.compareAndSwapObject
1882     (other, COMPLETIONS, q.next = other.completions, q))
1883     break;
1884     }
1885     else if (UNSAFE.compareAndSwapObject
1886     (this, COMPLETIONS, p.next = completions, p))
1887     q = new CompletionNode(d);
1888     }
1889     }
1890     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1891     Throwable ex;
1892     if (r instanceof AltResult)
1893     ex = ((AltResult)r).ex;
1894     else
1895     ex = null;
1896     if (ex == null) {
1897     try {
1898     if (e != null)
1899     e.execute(new AsyncRun(action, dst));
1900     else
1901     action.run();
1902     } catch (Throwable rex) {
1903     ex = rex;
1904     }
1905     }
1906     if (e == null || ex != null)
1907     dst.internalComplete(null, ex);
1908     }
1909     helpPostComplete();
1910     other.helpPostComplete();
1911     return dst;
1912     }
1913    
1914     private <U> CompletableFuture<U> doThenCompose
1915     (Function<? super T, ? extends CompletionStage<U>> fn,
1916     Executor e) {
1917     if (fn == null) throw new NullPointerException();
1918     CompletableFuture<U> dst = null;
1919     ThenCompose<T,U> d = null;
1920     Object r;
1921     if ((r = result) == null) {
1922     dst = new CompletableFuture<U>();
1923     CompletionNode p = new CompletionNode
1924     (d = new ThenCompose<T,U>(this, fn, dst, e));
1925     while ((r = result) == null) {
1926     if (UNSAFE.compareAndSwapObject
1927     (this, COMPLETIONS, p.next = completions, p))
1928     break;
1929     }
1930     }
1931     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1932     T t; Throwable ex;
1933     if (r instanceof AltResult) {
1934     ex = ((AltResult)r).ex;
1935     t = null;
1936     }
1937     else {
1938     ex = null;
1939     @SuppressWarnings("unchecked") T tr = (T) r;
1940     t = tr;
1941     }
1942     if (ex == null) {
1943     if (e != null) {
1944     if (dst == null)
1945     dst = new CompletableFuture<U>();
1946     e.execute(new AsyncCompose<T,U>(t, fn, dst));
1947     }
1948     else {
1949     try {
1950     CompletionStage<U> cs = fn.apply(t);
1951     if (cs == null ||
1952     (dst = cs.toCompletableFuture()) == null)
1953     ex = new NullPointerException();
1954     } catch (Throwable rex) {
1955     ex = rex;
1956     }
1957     }
1958     }
1959     if (dst == null)
1960     dst = new CompletableFuture<U>();
1961     if (e == null || ex != null)
1962     dst.internalComplete(null, ex);
1963     }
1964     helpPostComplete();
1965     dst.helpPostComplete();
1966     return dst;
1967     }
1968    
1969     private CompletableFuture<T> doWhenComplete
1970     (BiConsumer<? super T, ? super Throwable> fn,
1971     Executor e) {
1972     if (fn == null) throw new NullPointerException();
1973     CompletableFuture<T> dst = new CompletableFuture<T>();
1974     WhenCompleteCompletion<T> d = null;
1975     Object r;
1976     if ((r = result) == null) {
1977     CompletionNode p =
1978     new CompletionNode(d = new WhenCompleteCompletion<T>
1979     (this, fn, dst, e));
1980     while ((r = result) == null) {
1981     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1982     p.next = completions, p))
1983     break;
1984     }
1985     }
1986     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1987     T t; Throwable ex;
1988     if (r instanceof AltResult) {
1989     ex = ((AltResult)r).ex;
1990     t = null;
1991     }
1992     else {
1993     ex = null;
1994     @SuppressWarnings("unchecked") T tr = (T) r;
1995     t = tr;
1996     }
1997     Throwable dx = null;
1998     try {
1999     if (e != null)
2000 dl 1.91 e.execute(new AsyncWhenComplete<T>(t, ex, fn, dst));
2001 dl 1.88 else
2002     fn.accept(t, ex);
2003     } catch (Throwable rex) {
2004     dx = rex;
2005     }
2006     if (e == null || dx != null)
2007     dst.internalComplete(t, ex != null ? ex : dx);
2008     }
2009     helpPostComplete();
2010     return dst;
2011     }
2012    
2013     private <U> CompletableFuture<U> doHandle
2014     (BiFunction<? super T, Throwable, ? extends U> fn,
2015     Executor e) {
2016     if (fn == null) throw new NullPointerException();
2017     CompletableFuture<U> dst = new CompletableFuture<U>();
2018     HandleCompletion<T,U> d = null;
2019     Object r;
2020     if ((r = result) == null) {
2021     CompletionNode p =
2022     new CompletionNode(d = new HandleCompletion<T,U>
2023     (this, fn, dst, e));
2024     while ((r = result) == null) {
2025     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2026     p.next = completions, p))
2027     break;
2028     }
2029     }
2030     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2031     T t; Throwable ex;
2032     if (r instanceof AltResult) {
2033     ex = ((AltResult)r).ex;
2034     t = null;
2035     }
2036     else {
2037     ex = null;
2038     @SuppressWarnings("unchecked") T tr = (T) r;
2039     t = tr;
2040     }
2041 jsr166 1.90 U u = null;
2042 dl 1.88 Throwable dx = null;
2043     try {
2044     if (e != null)
2045     e.execute(new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2046     else {
2047     u = fn.apply(t, ex);
2048     dx = null;
2049     }
2050     } catch (Throwable rex) {
2051     dx = rex;
2052     u = null;
2053     }
2054     if (e == null || dx != null)
2055     dst.internalComplete(u, dx);
2056     }
2057     helpPostComplete();
2058     return dst;
2059     }
2060    
2061    
2062     // public methods
2063    
2064     /**
2065     * Creates a new incomplete CompletableFuture.
2066     */
2067     public CompletableFuture() {
2068     }
2069    
2070     /**
2071     * Returns a new CompletableFuture that is asynchronously completed
2072     * by a task running in the {@link ForkJoinPool#commonPool()} with
2073     * the value obtained by calling the given Supplier.
2074     *
2075     * @param supplier a function returning the value to be used
2076     * to complete the returned CompletableFuture
2077 jsr166 1.95 * @param <U> the function's return type
2078 dl 1.88 * @return the new CompletableFuture
2079     */
2080     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2081     if (supplier == null) throw new NullPointerException();
2082     CompletableFuture<U> f = new CompletableFuture<U>();
2083     ForkJoinPool.commonPool().
2084     execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
2085     return f;
2086     }
2087    
2088     /**
2089     * Returns a new CompletableFuture that is asynchronously completed
2090     * by a task running in the given executor with the value obtained
2091     * by calling the given Supplier.
2092     *
2093     * @param supplier a function returning the value to be used
2094     * to complete the returned CompletableFuture
2095     * @param executor the executor to use for asynchronous execution
2096 jsr166 1.95 * @param <U> the function's return type
2097 dl 1.88 * @return the new CompletableFuture
2098     */
2099     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2100     Executor executor) {
2101     if (executor == null || supplier == null)
2102     throw new NullPointerException();
2103     CompletableFuture<U> f = new CompletableFuture<U>();
2104     executor.execute(new AsyncSupply<U>(supplier, f));
2105     return f;
2106 dl 1.28 }
2107    
2108     /**
2109 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2110     * by a task running in the {@link ForkJoinPool#commonPool()} after
2111     * it runs the given action.
2112 dl 1.28 *
2113     * @param runnable the action to run before completing the
2114     * returned CompletableFuture
2115 jsr166 1.58 * @return the new CompletableFuture
2116 dl 1.28 */
2117     public static CompletableFuture<Void> runAsync(Runnable runnable) {
2118     if (runnable == null) throw new NullPointerException();
2119     CompletableFuture<Void> f = new CompletableFuture<Void>();
2120     ForkJoinPool.commonPool().
2121     execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
2122     return f;
2123     }
2124    
2125     /**
2126 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2127     * by a task running in the given executor after it runs the given
2128     * action.
2129 dl 1.28 *
2130     * @param runnable the action to run before completing the
2131     * returned CompletableFuture
2132     * @param executor the executor to use for asynchronous execution
2133 jsr166 1.58 * @return the new CompletableFuture
2134 dl 1.28 */
2135     public static CompletableFuture<Void> runAsync(Runnable runnable,
2136     Executor executor) {
2137     if (executor == null || runnable == null)
2138     throw new NullPointerException();
2139     CompletableFuture<Void> f = new CompletableFuture<Void>();
2140     executor.execute(new AsyncRun(runnable, f));
2141     return f;
2142     }
2143    
2144     /**
2145 dl 1.77 * Returns a new CompletableFuture that is already completed with
2146     * the given value.
2147     *
2148     * @param value the value
2149 jsr166 1.95 * @param <U> the type of the value
2150 dl 1.77 * @return the completed CompletableFuture
2151     */
2152     public static <U> CompletableFuture<U> completedFuture(U value) {
2153     CompletableFuture<U> f = new CompletableFuture<U>();
2154 jsr166 1.78 f.result = (value == null) ? NIL : value;
2155 dl 1.77 return f;
2156     }
2157    
2158     /**
2159 dl 1.28 * Returns {@code true} if completed in any fashion: normally,
2160     * exceptionally, or via cancellation.
2161     *
2162     * @return {@code true} if completed
2163     */
2164     public boolean isDone() {
2165     return result != null;
2166     }
2167    
2168     /**
2169 dl 1.49 * Waits if necessary for this future to complete, and then
2170 dl 1.48 * returns its result.
2171 dl 1.28 *
2172 dl 1.48 * @return the result value
2173     * @throws CancellationException if this future was cancelled
2174     * @throws ExecutionException if this future completed exceptionally
2175 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2176     * while waiting
2177     */
2178     public T get() throws InterruptedException, ExecutionException {
2179     Object r; Throwable ex, cause;
2180     if ((r = result) == null && (r = waitingGet(true)) == null)
2181     throw new InterruptedException();
2182 jsr166 1.45 if (!(r instanceof AltResult)) {
2183     @SuppressWarnings("unchecked") T tr = (T) r;
2184     return tr;
2185     }
2186     if ((ex = ((AltResult)r).ex) == null)
2187 dl 1.28 return null;
2188 jsr166 1.45 if (ex instanceof CancellationException)
2189     throw (CancellationException)ex;
2190     if ((ex instanceof CompletionException) &&
2191     (cause = ex.getCause()) != null)
2192     ex = cause;
2193     throw new ExecutionException(ex);
2194 dl 1.28 }
2195    
2196     /**
2197 dl 1.49 * Waits if necessary for at most the given time for this future
2198     * to complete, and then returns its result, if available.
2199 dl 1.28 *
2200     * @param timeout the maximum time to wait
2201     * @param unit the time unit of the timeout argument
2202 dl 1.48 * @return the result value
2203     * @throws CancellationException if this future was cancelled
2204     * @throws ExecutionException if this future completed exceptionally
2205 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2206     * while waiting
2207     * @throws TimeoutException if the wait timed out
2208     */
2209     public T get(long timeout, TimeUnit unit)
2210     throws InterruptedException, ExecutionException, TimeoutException {
2211     Object r; Throwable ex, cause;
2212     long nanos = unit.toNanos(timeout);
2213     if (Thread.interrupted())
2214     throw new InterruptedException();
2215     if ((r = result) == null)
2216     r = timedAwaitDone(nanos);
2217 jsr166 1.45 if (!(r instanceof AltResult)) {
2218     @SuppressWarnings("unchecked") T tr = (T) r;
2219     return tr;
2220     }
2221     if ((ex = ((AltResult)r).ex) == null)
2222 dl 1.28 return null;
2223 jsr166 1.45 if (ex instanceof CancellationException)
2224     throw (CancellationException)ex;
2225     if ((ex instanceof CompletionException) &&
2226     (cause = ex.getCause()) != null)
2227     ex = cause;
2228     throw new ExecutionException(ex);
2229 dl 1.28 }
2230    
2231     /**
2232     * Returns the result value when complete, or throws an
2233     * (unchecked) exception if completed exceptionally. To better
2234     * conform with the use of common functional forms, if a
2235     * computation involved in the completion of this
2236     * CompletableFuture threw an exception, this method throws an
2237     * (unchecked) {@link CompletionException} with the underlying
2238     * exception as its cause.
2239     *
2240     * @return the result value
2241     * @throws CancellationException if the computation was cancelled
2242 jsr166 1.55 * @throws CompletionException if this future completed
2243     * exceptionally or a completion computation threw an exception
2244 dl 1.28 */
2245     public T join() {
2246     Object r; Throwable ex;
2247     if ((r = result) == null)
2248     r = waitingGet(false);
2249 jsr166 1.45 if (!(r instanceof AltResult)) {
2250     @SuppressWarnings("unchecked") T tr = (T) r;
2251     return tr;
2252     }
2253     if ((ex = ((AltResult)r).ex) == null)
2254 dl 1.28 return null;
2255 jsr166 1.45 if (ex instanceof CancellationException)
2256     throw (CancellationException)ex;
2257     if (ex instanceof CompletionException)
2258     throw (CompletionException)ex;
2259     throw new CompletionException(ex);
2260 dl 1.28 }
2261    
2262     /**
2263     * Returns the result value (or throws any encountered exception)
2264     * if completed, else returns the given valueIfAbsent.
2265     *
2266     * @param valueIfAbsent the value to return if not completed
2267     * @return the result value, if completed, else the given valueIfAbsent
2268     * @throws CancellationException if the computation was cancelled
2269 jsr166 1.55 * @throws CompletionException if this future completed
2270     * exceptionally or a completion computation threw an exception
2271 dl 1.28 */
2272     public T getNow(T valueIfAbsent) {
2273     Object r; Throwable ex;
2274     if ((r = result) == null)
2275     return valueIfAbsent;
2276 jsr166 1.45 if (!(r instanceof AltResult)) {
2277     @SuppressWarnings("unchecked") T tr = (T) r;
2278     return tr;
2279     }
2280     if ((ex = ((AltResult)r).ex) == null)
2281 dl 1.28 return null;
2282 jsr166 1.45 if (ex instanceof CancellationException)
2283     throw (CancellationException)ex;
2284     if (ex instanceof CompletionException)
2285     throw (CompletionException)ex;
2286     throw new CompletionException(ex);
2287 dl 1.28 }
2288    
2289     /**
2290     * If not already completed, sets the value returned by {@link
2291     * #get()} and related methods to the given value.
2292     *
2293     * @param value the result value
2294     * @return {@code true} if this invocation caused this CompletableFuture
2295     * to transition to a completed state, else {@code false}
2296     */
2297     public boolean complete(T value) {
2298     boolean triggered = result == null &&
2299     UNSAFE.compareAndSwapObject(this, RESULT, null,
2300     value == null ? NIL : value);
2301     postComplete();
2302     return triggered;
2303     }
2304    
2305     /**
2306     * If not already completed, causes invocations of {@link #get()}
2307     * and related methods to throw the given exception.
2308     *
2309     * @param ex the exception
2310     * @return {@code true} if this invocation caused this CompletableFuture
2311     * to transition to a completed state, else {@code false}
2312     */
2313     public boolean completeExceptionally(Throwable ex) {
2314     if (ex == null) throw new NullPointerException();
2315     boolean triggered = result == null &&
2316     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2317     postComplete();
2318     return triggered;
2319     }
2320    
2321 dl 1.88 // CompletionStage methods
2322    
2323     public <U> CompletableFuture<U> thenApply
2324     (Function<? super T,? extends U> fn) {
2325 dl 1.28 return doThenApply(fn, null);
2326     }
2327    
2328 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2329     (Function<? super T,? extends U> fn) {
2330 dl 1.28 return doThenApply(fn, ForkJoinPool.commonPool());
2331 dl 1.17 }
2332    
2333 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2334     (Function<? super T,? extends U> fn,
2335     Executor executor) {
2336 dl 1.28 if (executor == null) throw new NullPointerException();
2337     return doThenApply(fn, executor);
2338     }
2339 dl 1.1
2340 dl 1.88 public CompletableFuture<Void> thenAccept
2341     (Consumer<? super T> action) {
2342     return doThenAccept(action, null);
2343 dl 1.28 }
2344    
2345 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2346     (Consumer<? super T> action) {
2347     return doThenAccept(action, ForkJoinPool.commonPool());
2348 dl 1.28 }
2349    
2350 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2351     (Consumer<? super T> action,
2352     Executor executor) {
2353 dl 1.28 if (executor == null) throw new NullPointerException();
2354 dl 1.88 return doThenAccept(action, executor);
2355 dl 1.7 }
2356    
2357 dl 1.88 public CompletableFuture<Void> thenRun
2358     (Runnable action) {
2359 dl 1.28 return doThenRun(action, null);
2360     }
2361    
2362 dl 1.88 public CompletableFuture<Void> thenRunAsync
2363     (Runnable action) {
2364 dl 1.28 return doThenRun(action, ForkJoinPool.commonPool());
2365     }
2366    
2367 dl 1.88 public CompletableFuture<Void> thenRunAsync
2368     (Runnable action,
2369     Executor executor) {
2370 dl 1.28 if (executor == null) throw new NullPointerException();
2371     return doThenRun(action, executor);
2372     }
2373    
2374 dl 1.48 public <U,V> CompletableFuture<V> thenCombine
2375 dl 1.88 (CompletionStage<? extends U> other,
2376 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2377 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, null);
2378 dl 1.28 }
2379    
2380 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2381 dl 1.88 (CompletionStage<? extends U> other,
2382 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2383 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn,
2384     ForkJoinPool.commonPool());
2385 dl 1.28 }
2386    
2387 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2388 dl 1.88 (CompletionStage<? extends U> other,
2389 dl 1.48 BiFunction<? super T,? super U,? extends V> fn,
2390     Executor executor) {
2391 dl 1.28 if (executor == null) throw new NullPointerException();
2392 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, executor);
2393 dl 1.1 }
2394    
2395 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBoth
2396 dl 1.88 (CompletionStage<? extends U> other,
2397     BiConsumer<? super T, ? super U> action) {
2398     return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2399 dl 1.28 }
2400    
2401 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2402 dl 1.88 (CompletionStage<? extends U> other,
2403     BiConsumer<? super T, ? super U> action) {
2404     return doThenAcceptBoth(other.toCompletableFuture(), action,
2405     ForkJoinPool.commonPool());
2406 dl 1.28 }
2407    
2408 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2409 dl 1.88 (CompletionStage<? extends U> other,
2410     BiConsumer<? super T, ? super U> action,
2411 dl 1.48 Executor executor) {
2412 dl 1.28 if (executor == null) throw new NullPointerException();
2413 dl 1.88 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2414 dl 1.28 }
2415    
2416 dl 1.88 public CompletableFuture<Void> runAfterBoth
2417     (CompletionStage<?> other,
2418     Runnable action) {
2419     return doRunAfterBoth(other.toCompletableFuture(), action, null);
2420 dl 1.7 }
2421    
2422 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2423     (CompletionStage<?> other,
2424     Runnable action) {
2425     return doRunAfterBoth(other.toCompletableFuture(), action,
2426     ForkJoinPool.commonPool());
2427 dl 1.28 }
2428    
2429 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2430     (CompletionStage<?> other,
2431     Runnable action,
2432     Executor executor) {
2433 dl 1.28 if (executor == null) throw new NullPointerException();
2434 dl 1.88 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2435 dl 1.28 }
2436    
2437 dl 1.1
2438 dl 1.48 public <U> CompletableFuture<U> applyToEither
2439 dl 1.88 (CompletionStage<? extends T> other,
2440 dl 1.48 Function<? super T, U> fn) {
2441 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, null);
2442 dl 1.28 }
2443    
2444 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2445 dl 1.88 (CompletionStage<? extends T> other,
2446 dl 1.48 Function<? super T, U> fn) {
2447 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn,
2448     ForkJoinPool.commonPool());
2449 dl 1.28 }
2450    
2451 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2452 dl 1.88 (CompletionStage<? extends T> other,
2453 dl 1.48 Function<? super T, U> fn,
2454     Executor executor) {
2455 dl 1.28 if (executor == null) throw new NullPointerException();
2456 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2457 dl 1.1 }
2458    
2459 dl 1.48 public CompletableFuture<Void> acceptEither
2460 dl 1.88 (CompletionStage<? extends T> other,
2461     Consumer<? super T> action) {
2462     return doAcceptEither(other.toCompletableFuture(), action, null);
2463 dl 1.28 }
2464    
2465 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2466 dl 1.88 (CompletionStage<? extends T> other,
2467     Consumer<? super T> action) {
2468     return doAcceptEither(other.toCompletableFuture(), action,
2469     ForkJoinPool.commonPool());
2470 dl 1.28 }
2471    
2472 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2473 dl 1.88 (CompletionStage<? extends T> other,
2474     Consumer<? super T> action,
2475 dl 1.48 Executor executor) {
2476 dl 1.28 if (executor == null) throw new NullPointerException();
2477 dl 1.88 return doAcceptEither(other.toCompletableFuture(), action, executor);
2478 dl 1.7 }
2479    
2480 dl 1.88 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2481 dl 1.28 Runnable action) {
2482 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, null);
2483 dl 1.28 }
2484    
2485 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2486 dl 1.88 (CompletionStage<?> other,
2487 dl 1.48 Runnable action) {
2488 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action,
2489     ForkJoinPool.commonPool());
2490 dl 1.28 }
2491    
2492 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2493 dl 1.88 (CompletionStage<?> other,
2494 dl 1.48 Runnable action,
2495     Executor executor) {
2496 dl 1.28 if (executor == null) throw new NullPointerException();
2497 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2498 dl 1.1 }
2499    
2500 dl 1.48 public <U> CompletableFuture<U> thenCompose
2501 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2502 jsr166 1.81 return doThenCompose(fn, null);
2503 dl 1.37 }
2504    
2505 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2506 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2507 jsr166 1.81 return doThenCompose(fn, ForkJoinPool.commonPool());
2508 dl 1.37 }
2509    
2510 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2511 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn,
2512 dl 1.48 Executor executor) {
2513 dl 1.37 if (executor == null) throw new NullPointerException();
2514 jsr166 1.81 return doThenCompose(fn, executor);
2515 dl 1.37 }
2516    
2517 dl 1.88 public CompletableFuture<T> whenComplete
2518     (BiConsumer<? super T, ? super Throwable> action) {
2519     return doWhenComplete(action, null);
2520     }
2521    
2522     public CompletableFuture<T> whenCompleteAsync
2523     (BiConsumer<? super T, ? super Throwable> action) {
2524     return doWhenComplete(action, ForkJoinPool.commonPool());
2525     }
2526    
2527     public CompletableFuture<T> whenCompleteAsync
2528     (BiConsumer<? super T, ? super Throwable> action,
2529     Executor executor) {
2530     if (executor == null) throw new NullPointerException();
2531     return doWhenComplete(action, executor);
2532     }
2533    
2534     public <U> CompletableFuture<U> handle
2535     (BiFunction<? super T, Throwable, ? extends U> fn) {
2536     return doHandle(fn, null);
2537     }
2538    
2539     public <U> CompletableFuture<U> handleAsync
2540     (BiFunction<? super T, Throwable, ? extends U> fn) {
2541     return doHandle(fn, ForkJoinPool.commonPool());
2542     }
2543    
2544     public <U> CompletableFuture<U> handleAsync
2545     (BiFunction<? super T, Throwable, ? extends U> fn,
2546     Executor executor) {
2547     if (executor == null) throw new NullPointerException();
2548     return doHandle(fn, executor);
2549     }
2550    
2551     /**
2552     * Returns this CompletableFuture
2553     *
2554     * @return this CompletableFuture
2555     */
2556     public CompletableFuture<T> toCompletableFuture() {
2557     return this;
2558 dl 1.28 }
2559    
2560 dl 1.88 // not in interface CompletionStage
2561    
2562 dl 1.28 /**
2563 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
2564     * CompletableFuture completes, with the result of the given
2565     * function of the exception triggering this CompletableFuture's
2566     * completion when it completes exceptionally; otherwise, if this
2567     * CompletableFuture completes normally, then the returned
2568     * CompletableFuture also completes normally with the same value.
2569 dl 1.88 * Note: More flexible versions of this functionality are
2570     * available using methods {@code whenComplete} and {@code handle}.
2571 dl 1.28 *
2572     * @param fn the function to use to compute the value of the
2573     * returned CompletableFuture if this CompletableFuture completed
2574     * exceptionally
2575     * @return the new CompletableFuture
2576     */
2577 dl 1.48 public CompletableFuture<T> exceptionally
2578     (Function<Throwable, ? extends T> fn) {
2579 dl 1.28 if (fn == null) throw new NullPointerException();
2580     CompletableFuture<T> dst = new CompletableFuture<T>();
2581     ExceptionCompletion<T> d = null;
2582     Object r;
2583     if ((r = result) == null) {
2584     CompletionNode p =
2585 dl 1.88 new CompletionNode(d = new ExceptionCompletion<T>
2586     (this, fn, dst));
2587 dl 1.28 while ((r = result) == null) {
2588     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2589     p.next = completions, p))
2590     break;
2591     }
2592     }
2593     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2594     T t = null; Throwable ex, dx = null;
2595     if (r instanceof AltResult) {
2596 jsr166 1.87 if ((ex = ((AltResult)r).ex) != null) {
2597 dl 1.28 try {
2598     t = fn.apply(ex);
2599     } catch (Throwable rex) {
2600     dx = rex;
2601     }
2602     }
2603     }
2604     else {
2605     @SuppressWarnings("unchecked") T tr = (T) r;
2606     t = tr;
2607     }
2608     dst.internalComplete(t, dx);
2609     }
2610     helpPostComplete();
2611     return dst;
2612     }
2613    
2614 dl 1.35 /* ------------- Arbitrary-arity constructions -------------- */
2615    
2616     /*
2617     * The basic plan of attack is to recursively form binary
2618     * completion trees of elements. This can be overkill for small
2619     * sets, but scales nicely. The And/All vs Or/Any forms use the
2620     * same idea, but details differ.
2621     */
2622    
2623     /**
2624     * Returns a new CompletableFuture that is completed when all of
2625 jsr166 1.66 * the given CompletableFutures complete. If any of the given
2626 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
2627     * CompletableFuture also does so, with a CompletionException
2628     * holding this exception as its cause. Otherwise, the results,
2629     * if any, of the given CompletableFutures are not reflected in
2630     * the returned CompletableFuture, but may be obtained by
2631     * inspecting them individually. If no CompletableFutures are
2632     * provided, returns a CompletableFuture completed with the value
2633     * {@code null}.
2634 dl 1.35 *
2635     * <p>Among the applications of this method is to await completion
2636     * of a set of independent CompletableFutures before continuing a
2637     * program, as in: {@code CompletableFuture.allOf(c1, c2,
2638     * c3).join();}.
2639     *
2640     * @param cfs the CompletableFutures
2641 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
2642 dl 1.35 * given CompletableFutures complete
2643     * @throws NullPointerException if the array or any of its elements are
2644     * {@code null}
2645     */
2646     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2647     int len = cfs.length; // Directly handle empty and singleton cases
2648     if (len > 1)
2649     return allTree(cfs, 0, len - 1);
2650     else {
2651     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2652     CompletableFuture<?> f;
2653     if (len == 0)
2654     dst.result = NIL;
2655     else if ((f = cfs[0]) == null)
2656     throw new NullPointerException();
2657     else {
2658 dl 1.75 ThenPropagate d = null;
2659 dl 1.35 CompletionNode p = null;
2660     Object r;
2661     while ((r = f.result) == null) {
2662     if (d == null)
2663 dl 1.75 d = new ThenPropagate(f, dst);
2664 dl 1.35 else if (p == null)
2665     p = new CompletionNode(d);
2666     else if (UNSAFE.compareAndSwapObject
2667     (f, COMPLETIONS, p.next = f.completions, p))
2668     break;
2669     }
2670     if (r != null && (d == null || d.compareAndSet(0, 1)))
2671     dst.internalComplete(null, (r instanceof AltResult) ?
2672     ((AltResult)r).ex : null);
2673     f.helpPostComplete();
2674     }
2675     return dst;
2676     }
2677     }
2678    
2679     /**
2680     * Recursively constructs an And'ed tree of CompletableFutures.
2681     * Called only when array known to have at least two elements.
2682     */
2683     private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2684     int lo, int hi) {
2685     CompletableFuture<?> fst, snd;
2686     int mid = (lo + hi) >>> 1;
2687     if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2688     (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2689     throw new NullPointerException();
2690     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2691     AndCompletion d = null;
2692     CompletionNode p = null, q = null;
2693     Object r = null, s = null;
2694     while ((r = fst.result) == null || (s = snd.result) == null) {
2695     if (d == null)
2696     d = new AndCompletion(fst, snd, dst);
2697     else if (p == null)
2698     p = new CompletionNode(d);
2699     else if (q == null) {
2700     if (UNSAFE.compareAndSwapObject
2701     (fst, COMPLETIONS, p.next = fst.completions, p))
2702     q = new CompletionNode(d);
2703     }
2704     else if (UNSAFE.compareAndSwapObject
2705     (snd, COMPLETIONS, q.next = snd.completions, q))
2706     break;
2707     }
2708     if ((r != null || (r = fst.result) != null) &&
2709     (s != null || (s = snd.result) != null) &&
2710     (d == null || d.compareAndSet(0, 1))) {
2711     Throwable ex;
2712     if (r instanceof AltResult)
2713     ex = ((AltResult)r).ex;
2714     else
2715     ex = null;
2716     if (ex == null && (s instanceof AltResult))
2717     ex = ((AltResult)s).ex;
2718     dst.internalComplete(null, ex);
2719     }
2720     fst.helpPostComplete();
2721     snd.helpPostComplete();
2722     return dst;
2723     }
2724    
2725     /**
2726 dl 1.76 * Returns a new CompletableFuture that is completed when any of
2727 jsr166 1.79 * the given CompletableFutures complete, with the same result.
2728     * Otherwise, if it completed exceptionally, the returned
2729 dl 1.77 * CompletableFuture also does so, with a CompletionException
2730     * holding this exception as its cause. If no CompletableFutures
2731     * are provided, returns an incomplete CompletableFuture.
2732 dl 1.35 *
2733     * @param cfs the CompletableFutures
2734 dl 1.77 * @return a new CompletableFuture that is completed with the
2735     * result or exception of any of the given CompletableFutures when
2736     * one completes
2737 dl 1.35 * @throws NullPointerException if the array or any of its elements are
2738     * {@code null}
2739     */
2740 dl 1.77 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2741 dl 1.35 int len = cfs.length; // Same idea as allOf
2742     if (len > 1)
2743     return anyTree(cfs, 0, len - 1);
2744     else {
2745 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2746 dl 1.35 CompletableFuture<?> f;
2747     if (len == 0)
2748 dl 1.48 ; // skip
2749 dl 1.35 else if ((f = cfs[0]) == null)
2750     throw new NullPointerException();
2751     else {
2752 dl 1.77 ThenCopy<Object> d = null;
2753 dl 1.35 CompletionNode p = null;
2754     Object r;
2755     while ((r = f.result) == null) {
2756     if (d == null)
2757 dl 1.77 d = new ThenCopy<Object>(f, dst);
2758 dl 1.35 else if (p == null)
2759     p = new CompletionNode(d);
2760     else if (UNSAFE.compareAndSwapObject
2761     (f, COMPLETIONS, p.next = f.completions, p))
2762     break;
2763     }
2764     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2765     Throwable ex; Object t;
2766 dl 1.77 if (r instanceof AltResult) {
2767 dl 1.35 ex = ((AltResult)r).ex;
2768 dl 1.77 t = null;
2769     }
2770     else {
2771     ex = null;
2772     t = r;
2773     }
2774     dst.internalComplete(t, ex);
2775 dl 1.35 }
2776     f.helpPostComplete();
2777     }
2778     return dst;
2779     }
2780     }
2781    
2782     /**
2783 jsr166 1.44 * Recursively constructs an Or'ed tree of CompletableFutures.
2784 dl 1.35 */
2785 dl 1.77 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2786 jsr166 1.79 int lo, int hi) {
2787 dl 1.35 CompletableFuture<?> fst, snd;
2788     int mid = (lo + hi) >>> 1;
2789     if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2790     (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2791     throw new NullPointerException();
2792 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2793 dl 1.35 OrCompletion d = null;
2794     CompletionNode p = null, q = null;
2795     Object r;
2796     while ((r = fst.result) == null && (r = snd.result) == null) {
2797     if (d == null)
2798     d = new OrCompletion(fst, snd, dst);
2799     else if (p == null)
2800     p = new CompletionNode(d);
2801     else if (q == null) {
2802     if (UNSAFE.compareAndSwapObject
2803     (fst, COMPLETIONS, p.next = fst.completions, p))
2804     q = new CompletionNode(d);
2805     }
2806     else if (UNSAFE.compareAndSwapObject
2807     (snd, COMPLETIONS, q.next = snd.completions, q))
2808     break;
2809     }
2810     if ((r != null || (r = fst.result) != null ||
2811     (r = snd.result) != null) &&
2812     (d == null || d.compareAndSet(0, 1))) {
2813 dl 1.77 Throwable ex; Object t;
2814 dl 1.35 if (r instanceof AltResult) {
2815     ex = ((AltResult)r).ex;
2816 dl 1.77 t = null;
2817 dl 1.35 }
2818 dl 1.77 else {
2819 dl 1.35 ex = null;
2820 dl 1.77 t = r;
2821     }
2822     dst.internalComplete(t, ex);
2823 dl 1.35 }
2824     fst.helpPostComplete();
2825     snd.helpPostComplete();
2826     return dst;
2827     }
2828    
2829     /* ------------- Control and status methods -------------- */
2830    
2831 dl 1.28 /**
2832 dl 1.37 * If not already completed, completes this CompletableFuture with
2833     * a {@link CancellationException}. Dependent CompletableFutures
2834     * that have not already completed will also complete
2835     * exceptionally, with a {@link CompletionException} caused by
2836     * this {@code CancellationException}.
2837 dl 1.28 *
2838     * @param mayInterruptIfRunning this value has no effect in this
2839     * implementation because interrupts are not used to control
2840     * processing.
2841     *
2842     * @return {@code true} if this task is now cancelled
2843     */
2844     public boolean cancel(boolean mayInterruptIfRunning) {
2845 dl 1.46 boolean cancelled = (result == null) &&
2846     UNSAFE.compareAndSwapObject
2847     (this, RESULT, null, new AltResult(new CancellationException()));
2848     postComplete();
2849 dl 1.48 return cancelled || isCancelled();
2850 dl 1.28 }
2851    
2852     /**
2853     * Returns {@code true} if this CompletableFuture was cancelled
2854     * before it completed normally.
2855     *
2856     * @return {@code true} if this CompletableFuture was cancelled
2857     * before it completed normally
2858     */
2859     public boolean isCancelled() {
2860     Object r;
2861 jsr166 1.43 return ((r = result) instanceof AltResult) &&
2862     (((AltResult)r).ex instanceof CancellationException);
2863 dl 1.28 }
2864    
2865     /**
2866 dl 1.88 * Returns {@code true} if this CompletableFuture completed
2867 dl 1.91 * exceptionally, in any way. Possible causes include
2868     * cancellation, explicit invocation of {@code
2869     * completeExceptionally}, and abrupt termination of a
2870     * CompletionStage action.
2871 dl 1.88 *
2872     * @return {@code true} if this CompletableFuture completed
2873     * exceptionally
2874     */
2875     public boolean isCompletedExceptionally() {
2876 dl 1.91 Object r;
2877     return ((r = result) instanceof AltResult) && r != NIL;
2878 dl 1.88 }
2879    
2880     /**
2881 dl 1.28 * Forcibly sets or resets the value subsequently returned by
2882 jsr166 1.42 * method {@link #get()} and related methods, whether or not
2883     * already completed. This method is designed for use only in
2884     * error recovery actions, and even in such situations may result
2885     * in ongoing dependent completions using established versus
2886 dl 1.30 * overwritten outcomes.
2887 dl 1.28 *
2888     * @param value the completion value
2889     */
2890     public void obtrudeValue(T value) {
2891     result = (value == null) ? NIL : value;
2892     postComplete();
2893     }
2894    
2895 dl 1.30 /**
2896 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
2897     * and related methods to throw the given exception, whether or
2898     * not already completed. This method is designed for use only in
2899 dl 1.30 * recovery actions, and even in such situations may result in
2900     * ongoing dependent completions using established versus
2901     * overwritten outcomes.
2902     *
2903     * @param ex the exception
2904     */
2905     public void obtrudeException(Throwable ex) {
2906     if (ex == null) throw new NullPointerException();
2907     result = new AltResult(ex);
2908     postComplete();
2909     }
2910    
2911 dl 1.35 /**
2912     * Returns the estimated number of CompletableFutures whose
2913     * completions are awaiting completion of this CompletableFuture.
2914     * This method is designed for use in monitoring system state, not
2915     * for synchronization control.
2916     *
2917     * @return the number of dependent CompletableFutures
2918     */
2919     public int getNumberOfDependents() {
2920     int count = 0;
2921     for (CompletionNode p = completions; p != null; p = p.next)
2922     ++count;
2923     return count;
2924     }
2925    
2926     /**
2927     * Returns a string identifying this CompletableFuture, as well as
2928 jsr166 1.40 * its completion state. The state, in brackets, contains the
2929 dl 1.35 * String {@code "Completed Normally"} or the String {@code
2930     * "Completed Exceptionally"}, or the String {@code "Not
2931     * completed"} followed by the number of CompletableFutures
2932     * dependent upon its completion, if any.
2933     *
2934     * @return a string identifying this CompletableFuture, as well as its state
2935     */
2936     public String toString() {
2937     Object r = result;
2938 jsr166 1.40 int count;
2939     return super.toString() +
2940     ((r == null) ?
2941     (((count = getNumberOfDependents()) == 0) ?
2942     "[Not completed]" :
2943     "[Not completed, " + count + " dependents]") :
2944     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2945     "[Completed exceptionally]" :
2946     "[Completed normally]"));
2947 dl 1.35 }
2948    
2949 dl 1.1 // Unsafe mechanics
2950     private static final sun.misc.Unsafe UNSAFE;
2951     private static final long RESULT;
2952     private static final long WAITERS;
2953     private static final long COMPLETIONS;
2954     static {
2955     try {
2956     UNSAFE = sun.misc.Unsafe.getUnsafe();
2957     Class<?> k = CompletableFuture.class;
2958     RESULT = UNSAFE.objectFieldOffset
2959     (k.getDeclaredField("result"));
2960     WAITERS = UNSAFE.objectFieldOffset
2961     (k.getDeclaredField("waiters"));
2962     COMPLETIONS = UNSAFE.objectFieldOffset
2963     (k.getDeclaredField("completions"));
2964     } catch (Exception e) {
2965     throw new Error(e);
2966     }
2967     }
2968     }