ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.97
Committed: Fri Aug 9 13:02:57 2013 UTC (10 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.96: +2 -0 lines
Log Message:
Suppress warnings

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.88 import java.util.concurrent.CompletionException;
23     import java.util.concurrent.CompletionStage;
24 dl 1.1 import java.util.concurrent.atomic.AtomicInteger;
25     import java.util.concurrent.locks.LockSupport;
26    
27     /**
28     * A {@link Future} that may be explicitly completed (setting its
29 dl 1.88 * value and status), and may be used as a {@link CompletionStage},
30     * supporting dependent functions and actions that trigger upon its
31     * completion.
32 dl 1.1 *
33 jsr166 1.50 * <p>When two or more threads attempt to
34     * {@link #complete complete},
35 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
36 jsr166 1.50 * {@link #cancel cancel}
37     * a CompletableFuture, only one of them succeeds.
38 dl 1.19 *
39 dl 1.91 * <p>In addition to these and related methods for directly
40     * manipulating status and results, CompletableFuture implements
41     * interface {@link CompletionStage} with the following policies: <ul>
42 dl 1.35 *
43 dl 1.88 * <li>Actions supplied for dependent completions of
44     * <em>non-async</em> methods may be performed by the thread that
45     * completes the current CompletableFuture, or by any other caller of
46 dl 1.96 * a completion method.</li>
47 jsr166 1.65 *
48 dl 1.88 * <li>All <em>async</em> methods without an explicit Executor
49 dl 1.96 * argument are performed using the {@link ForkJoinPool#commonPool()}
50     * (unless it does not support a parallelism level of at least two, in
51     * which case, a new Thread is used). To simplify monitoring,
52     * debugging, and tracking, all generated asynchronous tasks are
53     * instances of the marker interface {@link
54     * AsynchronousCompletionTask}. </li>
55 dl 1.35 *
56 dl 1.88 * <li>All CompletionStage methods are implemented independently of
57     * other public methods, so the behavior of one method is not impacted
58     * by overrides of others in subclasses. </li> </ul>
59     *
60 dl 1.91 * <p>CompletableFuture also implements {@link Future} with the following
61 dl 1.88 * policies: <ul>
62     *
63     * <li>Since (unlike {@link FutureTask}) this class has no direct
64 jsr166 1.55 * control over the computation that causes it to be completed,
65 dl 1.88 * cancellation is treated as just another form of exceptional
66     * completion. Method {@link #cancel cancel} has the same effect as
67     * {@code completeExceptionally(new CancellationException())}. Method
68     * {@link #isCompletedExceptionally} can be used to determine if a
69     * CompletableFuture completed in any exceptional fashion.</li>
70 jsr166 1.55 *
71 dl 1.88 * <li>In case of exceptional completion with a CompletionException,
72 jsr166 1.55 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
73     * {@link ExecutionException} with the same cause as held in the
74 dl 1.88 * corresponding CompletionException. To simplify usage in most
75     * contexts, this class also defines methods {@link #join()} and
76     * {@link #getNow} that instead throw the CompletionException directly
77     * in these cases.</li> </ul>
78 jsr166 1.80 *
79 dl 1.1 * @author Doug Lea
80     * @since 1.8
81     */
82 dl 1.88 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
83 dl 1.28
84 dl 1.1 /*
85 dl 1.20 * Overview:
86 dl 1.1 *
87 jsr166 1.32 * 1. Non-nullness of field result (set via CAS) indicates done.
88     * An AltResult is used to box null as a result, as well as to
89     * hold exceptions. Using a single field makes completion fast
90 dl 1.20 * and simple to detect and trigger, at the expense of a lot of
91     * encoding and decoding that infiltrates many methods. One minor
92     * simplification relies on the (static) NIL (to box null results)
93     * being the only AltResult with a null exception field, so we
94 dl 1.28 * don't usually need explicit comparisons with NIL. The CF
95     * exception propagation mechanics surrounding decoding rely on
96     * unchecked casts of decoded results really being unchecked,
97     * where user type errors are caught at point of use, as is
98     * currently the case in Java. These are highlighted by using
99     * SuppressWarnings-annotated temporaries.
100 dl 1.1 *
101     * 2. Waiters are held in a Treiber stack similar to the one used
102 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
103 dl 1.28 * internal documentation for algorithmic details.
104 dl 1.1 *
105     * 3. Completions are also kept in a list/stack, and pulled off
106 dl 1.28 * and run when completion is triggered. (We could even use the
107 jsr166 1.24 * same stack as for waiters, but would give up the potential
108 dl 1.20 * parallelism obtained because woken waiters help release/run
109 dl 1.28 * others -- see method postComplete). Because post-processing
110     * may race with direct calls, class Completion opportunistically
111     * extends AtomicInteger so callers can claim the action via
112     * compareAndSet(0, 1). The Completion.run methods are all
113     * written a boringly similar uniform way (that sometimes includes
114 jsr166 1.55 * unnecessary-looking checks, kept to maintain uniformity).
115     * There are enough dimensions upon which they differ that
116     * attempts to factor commonalities while maintaining efficiency
117     * require more lines of code than they would save.
118 dl 1.20 *
119     * 4. The exported then/and/or methods do support a bit of
120 dl 1.28 * factoring (see doThenApply etc). They must cope with the
121     * intrinsic races surrounding addition of a dependent action
122     * versus performing the action directly because the task is
123     * already complete. For example, a CF may not be complete upon
124     * entry, so a dependent completion is added, but by the time it
125     * is added, the target CF is complete, so must be directly
126 dl 1.20 * executed. This is all done while avoiding unnecessary object
127     * construction in safe-bypass cases.
128 dl 1.1 */
129    
130 dl 1.28 // preliminaries
131 dl 1.20
132 dl 1.1 static final class AltResult {
133     final Throwable ex; // null only for NIL
134 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
135 dl 1.1 }
136    
137     static final AltResult NIL = new AltResult(null);
138    
139 dl 1.20 // Fields
140    
141     volatile Object result; // Either the result or boxed AltResult
142 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
143     volatile CompletionNode completions; // list (Treiber stack) of completions
144    
145 dl 1.20 // Basic utilities for triggering and processing completions
146    
147     /**
148     * Removes and signals all waiting threads and runs all completions.
149     */
150     final void postComplete() {
151     WaitNode q; Thread t;
152     while ((q = waiters) != null) {
153     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
154     (t = q.thread) != null) {
155     q.thread = null;
156     LockSupport.unpark(t);
157     }
158     }
159    
160     CompletionNode h; Completion c;
161     while ((h = completions) != null) {
162     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
163     (c = h.completion) != null)
164     c.run();
165     }
166     }
167    
168     /**
169     * Triggers completion with the encoding of the given arguments:
170     * if the exception is non-null, encodes it as a wrapped
171     * CompletionException unless it is one already. Otherwise uses
172     * the given result, boxed as NIL if null.
173     */
174 dl 1.75 final void internalComplete(T v, Throwable ex) {
175 dl 1.20 if (result == null)
176     UNSAFE.compareAndSwapObject
177     (this, RESULT, null,
178     (ex == null) ? (v == null) ? NIL : v :
179     new AltResult((ex instanceof CompletionException) ? ex :
180     new CompletionException(ex)));
181     postComplete(); // help out even if not triggered
182     }
183    
184     /**
185 jsr166 1.25 * If triggered, helps release and/or process completions.
186 dl 1.20 */
187     final void helpPostComplete() {
188     if (result != null)
189     postComplete();
190     }
191    
192 dl 1.28 /* ------------- waiting for completions -------------- */
193 dl 1.20
194 dl 1.35 /** Number of processors, for spin control */
195     static final int NCPU = Runtime.getRuntime().availableProcessors();
196    
197 dl 1.1 /**
198 dl 1.28 * Heuristic spin value for waitingGet() before blocking on
199     * multiprocessors
200 dl 1.1 */
201 dl 1.35 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
202 dl 1.1
203     /**
204 dl 1.28 * Linked nodes to record waiting threads in a Treiber stack. See
205     * other classes such as Phaser and SynchronousQueue for more
206     * detailed explanation. This class implements ManagedBlocker to
207     * avoid starvation when blocking actions pile up in
208     * ForkJoinPools.
209     */
210     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
211     long nanos; // wait time if timed
212     final long deadline; // non-zero if timed
213     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
214     volatile Thread thread;
215     volatile WaitNode next;
216     WaitNode(boolean interruptible, long nanos, long deadline) {
217     this.thread = Thread.currentThread();
218     this.interruptControl = interruptible ? 1 : 0;
219     this.nanos = nanos;
220     this.deadline = deadline;
221     }
222     public boolean isReleasable() {
223     if (thread == null)
224     return true;
225     if (Thread.interrupted()) {
226     int i = interruptControl;
227     interruptControl = -1;
228     if (i > 0)
229     return true;
230     }
231     if (deadline != 0L &&
232     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
233     thread = null;
234     return true;
235     }
236     return false;
237     }
238     public boolean block() {
239     if (isReleasable())
240     return true;
241     else if (deadline == 0L)
242     LockSupport.park(this);
243     else if (nanos > 0L)
244     LockSupport.parkNanos(this, nanos);
245     return isReleasable();
246     }
247 dl 1.1 }
248    
249     /**
250 dl 1.28 * Returns raw result after waiting, or null if interruptible and
251     * interrupted.
252 dl 1.1 */
253 dl 1.28 private Object waitingGet(boolean interruptible) {
254     WaitNode q = null;
255     boolean queued = false;
256 dl 1.35 int spins = SPINS;
257 dl 1.28 for (Object r;;) {
258     if ((r = result) != null) {
259     if (q != null) { // suppress unpark
260     q.thread = null;
261     if (q.interruptControl < 0) {
262     if (interruptible) {
263     removeWaiter(q);
264     return null;
265     }
266     Thread.currentThread().interrupt();
267     }
268     }
269     postComplete(); // help release others
270     return r;
271     }
272     else if (spins > 0) {
273 dl 1.35 int rnd = ThreadLocalRandom.nextSecondarySeed();
274     if (rnd == 0)
275     rnd = ThreadLocalRandom.current().nextInt();
276     if (rnd >= 0)
277 dl 1.28 --spins;
278     }
279     else if (q == null)
280     q = new WaitNode(interruptible, 0L, 0L);
281     else if (!queued)
282     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
283     q.next = waiters, q);
284     else if (interruptible && q.interruptControl < 0) {
285     removeWaiter(q);
286     return null;
287     }
288     else if (q.thread != null && result == null) {
289     try {
290     ForkJoinPool.managedBlock(q);
291 jsr166 1.31 } catch (InterruptedException ex) {
292 dl 1.28 q.interruptControl = -1;
293     }
294     }
295     }
296 dl 1.1 }
297    
298     /**
299 dl 1.28 * Awaits completion or aborts on interrupt or timeout.
300 dl 1.1 *
301 dl 1.28 * @param nanos time to wait
302     * @return raw result
303 dl 1.1 */
304 dl 1.28 private Object timedAwaitDone(long nanos)
305     throws InterruptedException, TimeoutException {
306     WaitNode q = null;
307     boolean queued = false;
308     for (Object r;;) {
309     if ((r = result) != null) {
310     if (q != null) {
311     q.thread = null;
312     if (q.interruptControl < 0) {
313     removeWaiter(q);
314     throw new InterruptedException();
315     }
316     }
317     postComplete();
318     return r;
319     }
320     else if (q == null) {
321     if (nanos <= 0L)
322     throw new TimeoutException();
323     long d = System.nanoTime() + nanos;
324 jsr166 1.31 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
325 dl 1.28 }
326     else if (!queued)
327     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
328     q.next = waiters, q);
329     else if (q.interruptControl < 0) {
330     removeWaiter(q);
331     throw new InterruptedException();
332     }
333     else if (q.nanos <= 0L) {
334     if (result == null) {
335     removeWaiter(q);
336     throw new TimeoutException();
337     }
338     }
339     else if (q.thread != null && result == null) {
340     try {
341     ForkJoinPool.managedBlock(q);
342 jsr166 1.31 } catch (InterruptedException ex) {
343 dl 1.28 q.interruptControl = -1;
344     }
345     }
346     }
347 dl 1.1 }
348    
349     /**
350 dl 1.28 * Tries to unlink a timed-out or interrupted wait node to avoid
351     * accumulating garbage. Internal nodes are simply unspliced
352     * without CAS since it is harmless if they are traversed anyway
353     * by releasers. To avoid effects of unsplicing from already
354     * removed nodes, the list is retraversed in case of an apparent
355     * race. This is slow when there are a lot of nodes, but we don't
356     * expect lists to be long enough to outweigh higher-overhead
357     * schemes.
358 dl 1.1 */
359 dl 1.28 private void removeWaiter(WaitNode node) {
360     if (node != null) {
361     node.thread = null;
362     retry:
363     for (;;) { // restart on removeWaiter race
364     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
365     s = q.next;
366     if (q.thread != null)
367     pred = q;
368     else if (pred != null) {
369     pred.next = s;
370     if (pred.thread == null) // check for race
371     continue retry;
372     }
373     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
374     continue retry;
375     }
376     break;
377     }
378     }
379 dl 1.1 }
380    
381 dl 1.28 /* ------------- Async tasks -------------- */
382    
383 dl 1.1 /**
384 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
385 dl 1.28 * {@code async} methods. This may be useful for monitoring,
386     * debugging, and tracking asynchronous activities.
387 jsr166 1.57 *
388     * @since 1.8
389 dl 1.1 */
390 dl 1.28 public static interface AsynchronousCompletionTask {
391 dl 1.1 }
392    
393 dl 1.28 /** Base class can act as either FJ or plain Runnable */
394 dl 1.97 @SuppressWarnings("serial")
395 jsr166 1.33 abstract static class Async extends ForkJoinTask<Void>
396 dl 1.28 implements Runnable, AsynchronousCompletionTask {
397     public final Void getRawResult() { return null; }
398     public final void setRawResult(Void v) { }
399     public final void run() { exec(); }
400 jsr166 1.26 }
401    
402 dl 1.96 /**
403     * Starts the given async task using the given executor, unless
404     * the executor is ForkJoinPool.commonPool and it has been
405     * disabled, in which case starts a new thread.
406     */
407     static void execAsync(Executor e, Async r) {
408     if (e == ForkJoinPool.commonPool() &&
409     ForkJoinPool.getCommonPoolParallelism() <= 1)
410     new Thread(r).start();
411     else
412     e.execute(r);
413     }
414    
415 dl 1.28 static final class AsyncRun extends Async {
416     final Runnable fn;
417     final CompletableFuture<Void> dst;
418     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
419     this.fn = fn; this.dst = dst;
420     }
421     public final boolean exec() {
422     CompletableFuture<Void> d; Throwable ex;
423 dl 1.29 if ((d = this.dst) != null && d.result == null) {
424 dl 1.28 try {
425     fn.run();
426     ex = null;
427     } catch (Throwable rex) {
428     ex = rex;
429     }
430     d.internalComplete(null, ex);
431 dl 1.19 }
432 dl 1.28 return true;
433 dl 1.19 }
434 dl 1.28 private static final long serialVersionUID = 5232453952276885070L;
435 dl 1.19 }
436    
437 dl 1.28 static final class AsyncSupply<U> extends Async {
438     final Supplier<U> fn;
439     final CompletableFuture<U> dst;
440     AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
441     this.fn = fn; this.dst = dst;
442     }
443     public final boolean exec() {
444     CompletableFuture<U> d; U u; Throwable ex;
445 dl 1.29 if ((d = this.dst) != null && d.result == null) {
446 dl 1.28 try {
447     u = fn.get();
448     ex = null;
449     } catch (Throwable rex) {
450     ex = rex;
451     u = null;
452     }
453     d.internalComplete(u, ex);
454 dl 1.1 }
455     return true;
456     }
457     private static final long serialVersionUID = 5232453952276885070L;
458     }
459    
460 dl 1.28 static final class AsyncApply<T,U> extends Async {
461 jsr166 1.63 final T arg;
462 dl 1.28 final Function<? super T,? extends U> fn;
463 dl 1.1 final CompletableFuture<U> dst;
464 dl 1.28 AsyncApply(T arg, Function<? super T,? extends U> fn,
465 dl 1.35 CompletableFuture<U> dst) {
466 dl 1.1 this.arg = arg; this.fn = fn; this.dst = dst;
467     }
468     public final boolean exec() {
469 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
470 dl 1.29 if ((d = this.dst) != null && d.result == null) {
471 dl 1.21 try {
472     u = fn.apply(arg);
473     ex = null;
474     } catch (Throwable rex) {
475     ex = rex;
476     u = null;
477     }
478     d.internalComplete(u, ex);
479 dl 1.1 }
480     return true;
481     }
482     private static final long serialVersionUID = 5232453952276885070L;
483     }
484    
485 jsr166 1.81 static final class AsyncCombine<T,U,V> extends Async {
486 dl 1.1 final T arg1;
487     final U arg2;
488 jsr166 1.63 final BiFunction<? super T,? super U,? extends V> fn;
489 dl 1.1 final CompletableFuture<V> dst;
490 jsr166 1.81 AsyncCombine(T arg1, U arg2,
491 dl 1.35 BiFunction<? super T,? super U,? extends V> fn,
492     CompletableFuture<V> dst) {
493 dl 1.1 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
494     }
495     public final boolean exec() {
496 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
497 dl 1.29 if ((d = this.dst) != null && d.result == null) {
498 dl 1.21 try {
499     v = fn.apply(arg1, arg2);
500     ex = null;
501     } catch (Throwable rex) {
502     ex = rex;
503     v = null;
504     }
505     d.internalComplete(v, ex);
506 dl 1.1 }
507     return true;
508     }
509     private static final long serialVersionUID = 5232453952276885070L;
510     }
511    
512 dl 1.28 static final class AsyncAccept<T> extends Async {
513 jsr166 1.63 final T arg;
514 dl 1.34 final Consumer<? super T> fn;
515 dl 1.88 final CompletableFuture<?> dst;
516 dl 1.34 AsyncAccept(T arg, Consumer<? super T> fn,
517 dl 1.88 CompletableFuture<?> dst) {
518 dl 1.7 this.arg = arg; this.fn = fn; this.dst = dst;
519     }
520     public final boolean exec() {
521 dl 1.88 CompletableFuture<?> d; Throwable ex;
522 dl 1.29 if ((d = this.dst) != null && d.result == null) {
523 dl 1.21 try {
524     fn.accept(arg);
525     ex = null;
526     } catch (Throwable rex) {
527     ex = rex;
528     }
529     d.internalComplete(null, ex);
530 dl 1.7 }
531     return true;
532     }
533     private static final long serialVersionUID = 5232453952276885070L;
534     }
535    
536 jsr166 1.81 static final class AsyncAcceptBoth<T,U> extends Async {
537 dl 1.7 final T arg1;
538     final U arg2;
539 jsr166 1.63 final BiConsumer<? super T,? super U> fn;
540 dl 1.88 final CompletableFuture<?> dst;
541 jsr166 1.81 AsyncAcceptBoth(T arg1, U arg2,
542     BiConsumer<? super T,? super U> fn,
543 dl 1.88 CompletableFuture<?> dst) {
544 dl 1.7 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
545     }
546     public final boolean exec() {
547 dl 1.88 CompletableFuture<?> d; Throwable ex;
548 dl 1.29 if ((d = this.dst) != null && d.result == null) {
549 dl 1.21 try {
550     fn.accept(arg1, arg2);
551     ex = null;
552     } catch (Throwable rex) {
553     ex = rex;
554     }
555     d.internalComplete(null, ex);
556 dl 1.7 }
557     return true;
558     }
559     private static final long serialVersionUID = 5232453952276885070L;
560     }
561    
562 dl 1.37 static final class AsyncCompose<T,U> extends Async {
563 jsr166 1.63 final T arg;
564 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
565 dl 1.37 final CompletableFuture<U> dst;
566     AsyncCompose(T arg,
567 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
568 dl 1.37 CompletableFuture<U> dst) {
569     this.arg = arg; this.fn = fn; this.dst = dst;
570     }
571     public final boolean exec() {
572     CompletableFuture<U> d, fr; U u; Throwable ex;
573     if ((d = this.dst) != null && d.result == null) {
574     try {
575 dl 1.88 CompletionStage<U> cs = fn.apply(arg);
576     fr = (cs == null) ? null : cs.toCompletableFuture();
577 jsr166 1.61 ex = (fr == null) ? new NullPointerException() : null;
578 dl 1.37 } catch (Throwable rex) {
579     ex = rex;
580     fr = null;
581     }
582     if (ex != null)
583     u = null;
584     else {
585     Object r = fr.result;
586 dl 1.67 if (r == null)
587     r = fr.waitingGet(false);
588 dl 1.37 if (r instanceof AltResult) {
589     ex = ((AltResult)r).ex;
590     u = null;
591     }
592     else {
593     @SuppressWarnings("unchecked") U ur = (U) r;
594     u = ur;
595     }
596     }
597     d.internalComplete(u, ex);
598     }
599     return true;
600     }
601     private static final long serialVersionUID = 5232453952276885070L;
602     }
603    
604 dl 1.91 static final class AsyncWhenComplete<T> extends Async {
605     final T arg1;
606     final Throwable arg2;
607     final BiConsumer<? super T,? super Throwable> fn;
608     final CompletableFuture<T> dst;
609     AsyncWhenComplete(T arg1, Throwable arg2,
610     BiConsumer<? super T,? super Throwable> fn,
611     CompletableFuture<T> dst) {
612     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
613     }
614     public final boolean exec() {
615 jsr166 1.92 CompletableFuture<T> d;
616 dl 1.91 if ((d = this.dst) != null && d.result == null) {
617     Throwable ex = arg2;
618     try {
619     fn.accept(arg1, ex);
620     } catch (Throwable rex) {
621     if (ex == null)
622     ex = rex;
623     }
624     d.internalComplete(arg1, ex);
625     }
626     return true;
627     }
628     private static final long serialVersionUID = 5232453952276885070L;
629     }
630    
631 dl 1.1 /* ------------- Completions -------------- */
632    
633 dl 1.28 /**
634     * Simple linked list nodes to record completions, used in
635     * basically the same way as WaitNodes. (We separate nodes from
636     * the Completions themselves mainly because for the And and Or
637     * methods, the same Completion object resides in two lists.)
638     */
639     static final class CompletionNode {
640     final Completion completion;
641     volatile CompletionNode next;
642     CompletionNode(Completion completion) { this.completion = completion; }
643     }
644    
645 dl 1.1 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
646 dl 1.97 @SuppressWarnings("serial")
647 jsr166 1.33 abstract static class Completion extends AtomicInteger implements Runnable {
648 dl 1.1 }
649    
650 jsr166 1.81 static final class ThenApply<T,U> extends Completion {
651 jsr166 1.2 final CompletableFuture<? extends T> src;
652     final Function<? super T,? extends U> fn;
653     final CompletableFuture<U> dst;
654 dl 1.1 final Executor executor;
655 jsr166 1.81 ThenApply(CompletableFuture<? extends T> src,
656     Function<? super T,? extends U> fn,
657     CompletableFuture<U> dst,
658     Executor executor) {
659 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
660     this.executor = executor;
661     }
662 jsr166 1.26 public final void run() {
663 dl 1.28 final CompletableFuture<? extends T> a;
664     final Function<? super T,? extends U> fn;
665     final CompletableFuture<U> dst;
666 jsr166 1.2 Object r; T t; Throwable ex;
667     if ((dst = this.dst) != null &&
668 dl 1.1 (fn = this.fn) != null &&
669     (a = this.src) != null &&
670     (r = a.result) != null &&
671     compareAndSet(0, 1)) {
672 jsr166 1.2 if (r instanceof AltResult) {
673 dl 1.19 ex = ((AltResult)r).ex;
674 dl 1.1 t = null;
675     }
676 dl 1.17 else {
677     ex = null;
678 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
679     t = tr;
680 dl 1.1 }
681 dl 1.20 Executor e = executor;
682     U u = null;
683 dl 1.17 if (ex == null) {
684     try {
685 dl 1.20 if (e != null)
686 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
687 dl 1.17 else
688 dl 1.20 u = fn.apply(t);
689 dl 1.17 } catch (Throwable rex) {
690     ex = rex;
691     }
692     }
693 dl 1.20 if (e == null || ex != null)
694     dst.internalComplete(u, ex);
695 dl 1.1 }
696     }
697 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
698 dl 1.1 }
699    
700 jsr166 1.81 static final class ThenAccept<T> extends Completion {
701 dl 1.7 final CompletableFuture<? extends T> src;
702 dl 1.34 final Consumer<? super T> fn;
703 dl 1.88 final CompletableFuture<?> dst;
704 dl 1.7 final Executor executor;
705 jsr166 1.81 ThenAccept(CompletableFuture<? extends T> src,
706     Consumer<? super T> fn,
707 dl 1.88 CompletableFuture<?> dst,
708 jsr166 1.81 Executor executor) {
709 dl 1.7 this.src = src; this.fn = fn; this.dst = dst;
710     this.executor = executor;
711     }
712 jsr166 1.26 public final void run() {
713 dl 1.28 final CompletableFuture<? extends T> a;
714 dl 1.34 final Consumer<? super T> fn;
715 dl 1.88 final CompletableFuture<?> dst;
716 dl 1.7 Object r; T t; Throwable ex;
717     if ((dst = this.dst) != null &&
718     (fn = this.fn) != null &&
719     (a = this.src) != null &&
720     (r = a.result) != null &&
721     compareAndSet(0, 1)) {
722     if (r instanceof AltResult) {
723 dl 1.19 ex = ((AltResult)r).ex;
724 dl 1.7 t = null;
725     }
726 dl 1.17 else {
727     ex = null;
728 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
729     t = tr;
730 dl 1.17 }
731 dl 1.20 Executor e = executor;
732 dl 1.17 if (ex == null) {
733     try {
734 dl 1.20 if (e != null)
735 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
736 dl 1.20 else
737 dl 1.17 fn.accept(t);
738     } catch (Throwable rex) {
739     ex = rex;
740 dl 1.7 }
741     }
742 dl 1.20 if (e == null || ex != null)
743     dst.internalComplete(null, ex);
744 dl 1.7 }
745     }
746 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
747 dl 1.7 }
748    
749 jsr166 1.82 static final class ThenRun extends Completion {
750     final CompletableFuture<?> src;
751 jsr166 1.2 final Runnable fn;
752     final CompletableFuture<Void> dst;
753 dl 1.1 final Executor executor;
754 jsr166 1.82 ThenRun(CompletableFuture<?> src,
755 jsr166 1.81 Runnable fn,
756     CompletableFuture<Void> dst,
757     Executor executor) {
758 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
759     this.executor = executor;
760     }
761 jsr166 1.26 public final void run() {
762 jsr166 1.82 final CompletableFuture<?> a;
763 dl 1.28 final Runnable fn;
764     final CompletableFuture<Void> dst;
765 jsr166 1.2 Object r; Throwable ex;
766     if ((dst = this.dst) != null &&
767 dl 1.1 (fn = this.fn) != null &&
768     (a = this.src) != null &&
769     (r = a.result) != null &&
770     compareAndSet(0, 1)) {
771 dl 1.19 if (r instanceof AltResult)
772     ex = ((AltResult)r).ex;
773 dl 1.17 else
774     ex = null;
775 dl 1.20 Executor e = executor;
776 dl 1.17 if (ex == null) {
777     try {
778 dl 1.20 if (e != null)
779 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
780 dl 1.20 else
781 dl 1.17 fn.run();
782     } catch (Throwable rex) {
783     ex = rex;
784 dl 1.1 }
785     }
786 dl 1.20 if (e == null || ex != null)
787     dst.internalComplete(null, ex);
788 dl 1.1 }
789     }
790 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
791 dl 1.1 }
792    
793 jsr166 1.81 static final class ThenCombine<T,U,V> extends Completion {
794 jsr166 1.2 final CompletableFuture<? extends T> src;
795     final CompletableFuture<? extends U> snd;
796     final BiFunction<? super T,? super U,? extends V> fn;
797     final CompletableFuture<V> dst;
798 dl 1.1 final Executor executor;
799 jsr166 1.81 ThenCombine(CompletableFuture<? extends T> src,
800     CompletableFuture<? extends U> snd,
801     BiFunction<? super T,? super U,? extends V> fn,
802     CompletableFuture<V> dst,
803     Executor executor) {
804 dl 1.1 this.src = src; this.snd = snd;
805     this.fn = fn; this.dst = dst;
806     this.executor = executor;
807     }
808 jsr166 1.26 public final void run() {
809 dl 1.28 final CompletableFuture<? extends T> a;
810     final CompletableFuture<? extends U> b;
811     final BiFunction<? super T,? super U,? extends V> fn;
812     final CompletableFuture<V> dst;
813 dl 1.85 Object r, s; T t; U u; Throwable ex;
814     if ((dst = this.dst) != null &&
815     (fn = this.fn) != null &&
816     (a = this.src) != null &&
817     (r = a.result) != null &&
818     (b = this.snd) != null &&
819     (s = b.result) != null &&
820     compareAndSet(0, 1)) {
821     if (r instanceof AltResult) {
822     ex = ((AltResult)r).ex;
823     t = null;
824     }
825     else {
826     ex = null;
827     @SuppressWarnings("unchecked") T tr = (T) r;
828     t = tr;
829     }
830     if (ex != null)
831     u = null;
832     else if (s instanceof AltResult) {
833     ex = ((AltResult)s).ex;
834     u = null;
835     }
836     else {
837     @SuppressWarnings("unchecked") U us = (U) s;
838     u = us;
839     }
840 dl 1.20 Executor e = executor;
841     V v = null;
842 dl 1.19 if (ex == null) {
843     try {
844 dl 1.20 if (e != null)
845 dl 1.96 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
846 dl 1.19 else
847 dl 1.20 v = fn.apply(t, u);
848 dl 1.19 } catch (Throwable rex) {
849     ex = rex;
850     }
851 dl 1.1 }
852 dl 1.20 if (e == null || ex != null)
853     dst.internalComplete(v, ex);
854 jsr166 1.2 }
855     }
856 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
857 dl 1.1 }
858    
859 jsr166 1.81 static final class ThenAcceptBoth<T,U> extends Completion {
860 dl 1.7 final CompletableFuture<? extends T> src;
861     final CompletableFuture<? extends U> snd;
862 dl 1.34 final BiConsumer<? super T,? super U> fn;
863 dl 1.7 final CompletableFuture<Void> dst;
864     final Executor executor;
865 jsr166 1.81 ThenAcceptBoth(CompletableFuture<? extends T> src,
866     CompletableFuture<? extends U> snd,
867     BiConsumer<? super T,? super U> fn,
868     CompletableFuture<Void> dst,
869     Executor executor) {
870 dl 1.7 this.src = src; this.snd = snd;
871     this.fn = fn; this.dst = dst;
872     this.executor = executor;
873     }
874 jsr166 1.26 public final void run() {
875 dl 1.28 final CompletableFuture<? extends T> a;
876     final CompletableFuture<? extends U> b;
877 dl 1.34 final BiConsumer<? super T,? super U> fn;
878 dl 1.28 final CompletableFuture<Void> dst;
879 dl 1.85 Object r, s; T t; U u; Throwable ex;
880     if ((dst = this.dst) != null &&
881     (fn = this.fn) != null &&
882     (a = this.src) != null &&
883     (r = a.result) != null &&
884     (b = this.snd) != null &&
885     (s = b.result) != null &&
886     compareAndSet(0, 1)) {
887     if (r instanceof AltResult) {
888     ex = ((AltResult)r).ex;
889     t = null;
890     }
891     else {
892     ex = null;
893     @SuppressWarnings("unchecked") T tr = (T) r;
894     t = tr;
895     }
896     if (ex != null)
897     u = null;
898     else if (s instanceof AltResult) {
899     ex = ((AltResult)s).ex;
900     u = null;
901     }
902     else {
903     @SuppressWarnings("unchecked") U us = (U) s;
904     u = us;
905     }
906 dl 1.20 Executor e = executor;
907 dl 1.19 if (ex == null) {
908     try {
909 dl 1.20 if (e != null)
910 dl 1.96 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
911 dl 1.20 else
912 dl 1.19 fn.accept(t, u);
913     } catch (Throwable rex) {
914     ex = rex;
915 dl 1.7 }
916     }
917 dl 1.20 if (e == null || ex != null)
918     dst.internalComplete(null, ex);
919 dl 1.7 }
920     }
921 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
922 dl 1.7 }
923    
924 jsr166 1.82 static final class RunAfterBoth extends Completion {
925     final CompletableFuture<?> src;
926 jsr166 1.2 final CompletableFuture<?> snd;
927     final Runnable fn;
928     final CompletableFuture<Void> dst;
929 dl 1.1 final Executor executor;
930 jsr166 1.82 RunAfterBoth(CompletableFuture<?> src,
931 jsr166 1.81 CompletableFuture<?> snd,
932     Runnable fn,
933     CompletableFuture<Void> dst,
934     Executor executor) {
935 dl 1.1 this.src = src; this.snd = snd;
936     this.fn = fn; this.dst = dst;
937     this.executor = executor;
938     }
939 jsr166 1.26 public final void run() {
940 jsr166 1.82 final CompletableFuture<?> a;
941 dl 1.1 final CompletableFuture<?> b;
942     final Runnable fn;
943     final CompletableFuture<Void> dst;
944 dl 1.85 Object r, s; Throwable ex;
945     if ((dst = this.dst) != null &&
946     (fn = this.fn) != null &&
947     (a = this.src) != null &&
948     (r = a.result) != null &&
949     (b = this.snd) != null &&
950     (s = b.result) != null &&
951     compareAndSet(0, 1)) {
952     if (r instanceof AltResult)
953     ex = ((AltResult)r).ex;
954     else
955     ex = null;
956     if (ex == null && (s instanceof AltResult))
957     ex = ((AltResult)s).ex;
958 dl 1.20 Executor e = executor;
959 dl 1.19 if (ex == null) {
960     try {
961 dl 1.20 if (e != null)
962 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
963 dl 1.20 else
964 dl 1.19 fn.run();
965     } catch (Throwable rex) {
966     ex = rex;
967 dl 1.1 }
968 jsr166 1.2 }
969 dl 1.20 if (e == null || ex != null)
970     dst.internalComplete(null, ex);
971 jsr166 1.2 }
972     }
973 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
974 dl 1.1 }
975    
976 dl 1.35 static final class AndCompletion extends Completion {
977     final CompletableFuture<?> src;
978     final CompletableFuture<?> snd;
979     final CompletableFuture<Void> dst;
980     AndCompletion(CompletableFuture<?> src,
981     CompletableFuture<?> snd,
982     CompletableFuture<Void> dst) {
983     this.src = src; this.snd = snd; this.dst = dst;
984     }
985     public final void run() {
986     final CompletableFuture<?> a;
987     final CompletableFuture<?> b;
988     final CompletableFuture<Void> dst;
989     Object r, s; Throwable ex;
990     if ((dst = this.dst) != null &&
991     (a = this.src) != null &&
992     (r = a.result) != null &&
993     (b = this.snd) != null &&
994     (s = b.result) != null &&
995     compareAndSet(0, 1)) {
996     if (r instanceof AltResult)
997     ex = ((AltResult)r).ex;
998     else
999     ex = null;
1000     if (ex == null && (s instanceof AltResult))
1001     ex = ((AltResult)s).ex;
1002     dst.internalComplete(null, ex);
1003     }
1004     }
1005     private static final long serialVersionUID = 5232453952276885070L;
1006     }
1007    
1008 jsr166 1.81 static final class ApplyToEither<T,U> extends Completion {
1009 jsr166 1.2 final CompletableFuture<? extends T> src;
1010     final CompletableFuture<? extends T> snd;
1011     final Function<? super T,? extends U> fn;
1012     final CompletableFuture<U> dst;
1013 dl 1.1 final Executor executor;
1014 jsr166 1.81 ApplyToEither(CompletableFuture<? extends T> src,
1015     CompletableFuture<? extends T> snd,
1016     Function<? super T,? extends U> fn,
1017     CompletableFuture<U> dst,
1018     Executor executor) {
1019 dl 1.1 this.src = src; this.snd = snd;
1020     this.fn = fn; this.dst = dst;
1021     this.executor = executor;
1022     }
1023 jsr166 1.26 public final void run() {
1024 dl 1.28 final CompletableFuture<? extends T> a;
1025     final CompletableFuture<? extends T> b;
1026     final Function<? super T,? extends U> fn;
1027     final CompletableFuture<U> dst;
1028 jsr166 1.2 Object r; T t; Throwable ex;
1029     if ((dst = this.dst) != null &&
1030 dl 1.1 (fn = this.fn) != null &&
1031     (((a = this.src) != null && (r = a.result) != null) ||
1032     ((b = this.snd) != null && (r = b.result) != null)) &&
1033     compareAndSet(0, 1)) {
1034 jsr166 1.2 if (r instanceof AltResult) {
1035 dl 1.19 ex = ((AltResult)r).ex;
1036 jsr166 1.2 t = null;
1037     }
1038 dl 1.19 else {
1039     ex = null;
1040 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1041     t = tr;
1042 dl 1.1 }
1043 dl 1.20 Executor e = executor;
1044     U u = null;
1045 dl 1.19 if (ex == null) {
1046     try {
1047 dl 1.20 if (e != null)
1048 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1049 dl 1.19 else
1050 dl 1.20 u = fn.apply(t);
1051 dl 1.19 } catch (Throwable rex) {
1052     ex = rex;
1053     }
1054     }
1055 dl 1.20 if (e == null || ex != null)
1056     dst.internalComplete(u, ex);
1057 jsr166 1.2 }
1058     }
1059 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1060 dl 1.1 }
1061    
1062 jsr166 1.81 static final class AcceptEither<T> extends Completion {
1063 dl 1.7 final CompletableFuture<? extends T> src;
1064     final CompletableFuture<? extends T> snd;
1065 dl 1.34 final Consumer<? super T> fn;
1066 dl 1.7 final CompletableFuture<Void> dst;
1067     final Executor executor;
1068 jsr166 1.81 AcceptEither(CompletableFuture<? extends T> src,
1069     CompletableFuture<? extends T> snd,
1070     Consumer<? super T> fn,
1071     CompletableFuture<Void> dst,
1072     Executor executor) {
1073 dl 1.7 this.src = src; this.snd = snd;
1074     this.fn = fn; this.dst = dst;
1075     this.executor = executor;
1076     }
1077 jsr166 1.26 public final void run() {
1078 dl 1.28 final CompletableFuture<? extends T> a;
1079     final CompletableFuture<? extends T> b;
1080 dl 1.34 final Consumer<? super T> fn;
1081 dl 1.28 final CompletableFuture<Void> dst;
1082 dl 1.7 Object r; T t; Throwable ex;
1083     if ((dst = this.dst) != null &&
1084     (fn = this.fn) != null &&
1085     (((a = this.src) != null && (r = a.result) != null) ||
1086     ((b = this.snd) != null && (r = b.result) != null)) &&
1087     compareAndSet(0, 1)) {
1088     if (r instanceof AltResult) {
1089 dl 1.19 ex = ((AltResult)r).ex;
1090 dl 1.7 t = null;
1091     }
1092 dl 1.19 else {
1093     ex = null;
1094 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1095     t = tr;
1096 dl 1.19 }
1097 dl 1.20 Executor e = executor;
1098 dl 1.19 if (ex == null) {
1099     try {
1100 dl 1.20 if (e != null)
1101 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1102 dl 1.20 else
1103 dl 1.19 fn.accept(t);
1104     } catch (Throwable rex) {
1105     ex = rex;
1106 dl 1.7 }
1107     }
1108 dl 1.20 if (e == null || ex != null)
1109     dst.internalComplete(null, ex);
1110 dl 1.7 }
1111     }
1112 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1113 dl 1.7 }
1114    
1115 jsr166 1.82 static final class RunAfterEither extends Completion {
1116     final CompletableFuture<?> src;
1117 jsr166 1.2 final CompletableFuture<?> snd;
1118     final Runnable fn;
1119     final CompletableFuture<Void> dst;
1120 dl 1.1 final Executor executor;
1121 jsr166 1.82 RunAfterEither(CompletableFuture<?> src,
1122 jsr166 1.81 CompletableFuture<?> snd,
1123     Runnable fn,
1124     CompletableFuture<Void> dst,
1125     Executor executor) {
1126 dl 1.1 this.src = src; this.snd = snd;
1127     this.fn = fn; this.dst = dst;
1128     this.executor = executor;
1129     }
1130 jsr166 1.26 public final void run() {
1131 jsr166 1.82 final CompletableFuture<?> a;
1132 dl 1.1 final CompletableFuture<?> b;
1133     final Runnable fn;
1134     final CompletableFuture<Void> dst;
1135 dl 1.28 Object r; Throwable ex;
1136 jsr166 1.2 if ((dst = this.dst) != null &&
1137 dl 1.1 (fn = this.fn) != null &&
1138     (((a = this.src) != null && (r = a.result) != null) ||
1139     ((b = this.snd) != null && (r = b.result) != null)) &&
1140     compareAndSet(0, 1)) {
1141 dl 1.19 if (r instanceof AltResult)
1142     ex = ((AltResult)r).ex;
1143     else
1144     ex = null;
1145 dl 1.20 Executor e = executor;
1146 dl 1.19 if (ex == null) {
1147 dl 1.1 try {
1148 dl 1.20 if (e != null)
1149 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
1150 dl 1.20 else
1151 dl 1.1 fn.run();
1152     } catch (Throwable rex) {
1153 dl 1.19 ex = rex;
1154 dl 1.1 }
1155     }
1156 dl 1.20 if (e == null || ex != null)
1157     dst.internalComplete(null, ex);
1158 jsr166 1.2 }
1159     }
1160 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1161 dl 1.1 }
1162    
1163 dl 1.35 static final class OrCompletion extends Completion {
1164     final CompletableFuture<?> src;
1165     final CompletableFuture<?> snd;
1166 dl 1.77 final CompletableFuture<Object> dst;
1167 dl 1.35 OrCompletion(CompletableFuture<?> src,
1168     CompletableFuture<?> snd,
1169 dl 1.77 CompletableFuture<Object> dst) {
1170 dl 1.35 this.src = src; this.snd = snd; this.dst = dst;
1171     }
1172     public final void run() {
1173     final CompletableFuture<?> a;
1174     final CompletableFuture<?> b;
1175 dl 1.77 final CompletableFuture<Object> dst;
1176     Object r, t; Throwable ex;
1177 dl 1.35 if ((dst = this.dst) != null &&
1178     (((a = this.src) != null && (r = a.result) != null) ||
1179     ((b = this.snd) != null && (r = b.result) != null)) &&
1180     compareAndSet(0, 1)) {
1181 dl 1.77 if (r instanceof AltResult) {
1182 dl 1.35 ex = ((AltResult)r).ex;
1183 dl 1.77 t = null;
1184     }
1185     else {
1186 dl 1.35 ex = null;
1187 dl 1.77 t = r;
1188     }
1189     dst.internalComplete(t, ex);
1190 dl 1.35 }
1191     }
1192     private static final long serialVersionUID = 5232453952276885070L;
1193     }
1194    
1195 dl 1.28 static final class ExceptionCompletion<T> extends Completion {
1196 jsr166 1.2 final CompletableFuture<? extends T> src;
1197 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1198     final CompletableFuture<T> dst;
1199 dl 1.28 ExceptionCompletion(CompletableFuture<? extends T> src,
1200     Function<? super Throwable, ? extends T> fn,
1201     CompletableFuture<T> dst) {
1202 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1203     }
1204 jsr166 1.26 public final void run() {
1205 dl 1.28 final CompletableFuture<? extends T> a;
1206     final Function<? super Throwable, ? extends T> fn;
1207     final CompletableFuture<T> dst;
1208 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1209 jsr166 1.2 if ((dst = this.dst) != null &&
1210 dl 1.1 (fn = this.fn) != null &&
1211     (a = this.src) != null &&
1212     (r = a.result) != null &&
1213     compareAndSet(0, 1)) {
1214 dl 1.20 if ((r instanceof AltResult) &&
1215 jsr166 1.87 (ex = ((AltResult)r).ex) != null) {
1216 dl 1.20 try {
1217     t = fn.apply(ex);
1218     } catch (Throwable rex) {
1219     dx = rex;
1220 dl 1.1 }
1221     }
1222 dl 1.28 else {
1223     @SuppressWarnings("unchecked") T tr = (T) r;
1224     t = tr;
1225     }
1226 dl 1.20 dst.internalComplete(t, dx);
1227 dl 1.1 }
1228     }
1229 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1230 dl 1.1 }
1231    
1232 dl 1.88 static final class WhenCompleteCompletion<T> extends Completion {
1233     final CompletableFuture<? extends T> src;
1234     final BiConsumer<? super T, ? super Throwable> fn;
1235     final CompletableFuture<T> dst;
1236     final Executor executor;
1237     WhenCompleteCompletion(CompletableFuture<? extends T> src,
1238     BiConsumer<? super T, ? super Throwable> fn,
1239     CompletableFuture<T> dst,
1240     Executor executor) {
1241     this.src = src; this.fn = fn; this.dst = dst;
1242     this.executor = executor;
1243     }
1244     public final void run() {
1245     final CompletableFuture<? extends T> a;
1246     final BiConsumer<? super T, ? super Throwable> fn;
1247     final CompletableFuture<T> dst;
1248     Object r; T t; Throwable ex;
1249     if ((dst = this.dst) != null &&
1250     (fn = this.fn) != null &&
1251     (a = this.src) != null &&
1252     (r = a.result) != null &&
1253     compareAndSet(0, 1)) {
1254     if (r instanceof AltResult) {
1255     ex = ((AltResult)r).ex;
1256     t = null;
1257     }
1258     else {
1259     ex = null;
1260     @SuppressWarnings("unchecked") T tr = (T) r;
1261     t = tr;
1262     }
1263     Executor e = executor;
1264     Throwable dx = null;
1265     try {
1266     if (e != null)
1267 dl 1.96 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
1268 dl 1.88 else
1269     fn.accept(t, ex);
1270     } catch (Throwable rex) {
1271     dx = rex;
1272     }
1273     if (e == null || dx != null)
1274     dst.internalComplete(t, ex != null ? ex : dx);
1275     }
1276     }
1277     private static final long serialVersionUID = 5232453952276885070L;
1278     }
1279    
1280 dl 1.75 static final class ThenCopy<T> extends Completion {
1281 dl 1.77 final CompletableFuture<?> src;
1282 dl 1.75 final CompletableFuture<T> dst;
1283 dl 1.77 ThenCopy(CompletableFuture<?> src,
1284 dl 1.75 CompletableFuture<T> dst) {
1285 dl 1.17 this.src = src; this.dst = dst;
1286     }
1287 jsr166 1.26 public final void run() {
1288 dl 1.77 final CompletableFuture<?> a;
1289 dl 1.75 final CompletableFuture<T> dst;
1290     Object r; T t; Throwable ex;
1291 dl 1.17 if ((dst = this.dst) != null &&
1292     (a = this.src) != null &&
1293     (r = a.result) != null &&
1294     compareAndSet(0, 1)) {
1295 dl 1.20 if (r instanceof AltResult) {
1296     ex = ((AltResult)r).ex;
1297     t = null;
1298     }
1299     else {
1300     ex = null;
1301 dl 1.75 @SuppressWarnings("unchecked") T tr = (T) r;
1302     t = tr;
1303 dl 1.20 }
1304     dst.internalComplete(t, ex);
1305 dl 1.17 }
1306     }
1307 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1308 dl 1.17 }
1309    
1310 dl 1.75 // version of ThenCopy for CompletableFuture<Void> dst
1311     static final class ThenPropagate extends Completion {
1312     final CompletableFuture<?> src;
1313     final CompletableFuture<Void> dst;
1314     ThenPropagate(CompletableFuture<?> src,
1315     CompletableFuture<Void> dst) {
1316     this.src = src; this.dst = dst;
1317     }
1318     public final void run() {
1319     final CompletableFuture<?> a;
1320     final CompletableFuture<Void> dst;
1321     Object r; Throwable ex;
1322     if ((dst = this.dst) != null &&
1323     (a = this.src) != null &&
1324     (r = a.result) != null &&
1325     compareAndSet(0, 1)) {
1326     if (r instanceof AltResult)
1327     ex = ((AltResult)r).ex;
1328     else
1329     ex = null;
1330     dst.internalComplete(null, ex);
1331     }
1332     }
1333     private static final long serialVersionUID = 5232453952276885070L;
1334     }
1335    
1336 dl 1.28 static final class HandleCompletion<T,U> extends Completion {
1337 dl 1.17 final CompletableFuture<? extends T> src;
1338     final BiFunction<? super T, Throwable, ? extends U> fn;
1339     final CompletableFuture<U> dst;
1340 dl 1.88 final Executor executor;
1341 dl 1.28 HandleCompletion(CompletableFuture<? extends T> src,
1342     BiFunction<? super T, Throwable, ? extends U> fn,
1343 dl 1.88 CompletableFuture<U> dst,
1344     Executor executor) {
1345 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1346 dl 1.88 this.executor = executor;
1347 dl 1.17 }
1348 jsr166 1.26 public final void run() {
1349 dl 1.28 final CompletableFuture<? extends T> a;
1350     final BiFunction<? super T, Throwable, ? extends U> fn;
1351     final CompletableFuture<U> dst;
1352 dl 1.17 Object r; T t; Throwable ex;
1353     if ((dst = this.dst) != null &&
1354     (fn = this.fn) != null &&
1355     (a = this.src) != null &&
1356     (r = a.result) != null &&
1357     compareAndSet(0, 1)) {
1358     if (r instanceof AltResult) {
1359     ex = ((AltResult)r).ex;
1360     t = null;
1361     }
1362     else {
1363     ex = null;
1364 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1365     t = tr;
1366 dl 1.17 }
1367 dl 1.88 Executor e = executor;
1368     U u = null;
1369     Throwable dx = null;
1370 dl 1.17 try {
1371 dl 1.88 if (e != null)
1372 dl 1.96 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1373 dl 1.88 else
1374     u = fn.apply(t, ex);
1375 dl 1.17 } catch (Throwable rex) {
1376 dl 1.20 dx = rex;
1377 dl 1.17 }
1378 dl 1.88 if (e == null || dx != null)
1379     dst.internalComplete(u, dx);
1380 dl 1.17 }
1381     }
1382 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1383 dl 1.17 }
1384    
1385 jsr166 1.81 static final class ThenCompose<T,U> extends Completion {
1386 dl 1.17 final CompletableFuture<? extends T> src;
1387 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1388 dl 1.17 final CompletableFuture<U> dst;
1389 dl 1.37 final Executor executor;
1390 jsr166 1.81 ThenCompose(CompletableFuture<? extends T> src,
1391 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
1392 jsr166 1.81 CompletableFuture<U> dst,
1393     Executor executor) {
1394 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1395 dl 1.37 this.executor = executor;
1396 dl 1.17 }
1397 jsr166 1.26 public final void run() {
1398 dl 1.28 final CompletableFuture<? extends T> a;
1399 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1400 dl 1.28 final CompletableFuture<U> dst;
1401 dl 1.37 Object r; T t; Throwable ex; Executor e;
1402 dl 1.17 if ((dst = this.dst) != null &&
1403     (fn = this.fn) != null &&
1404     (a = this.src) != null &&
1405     (r = a.result) != null &&
1406     compareAndSet(0, 1)) {
1407     if (r instanceof AltResult) {
1408     ex = ((AltResult)r).ex;
1409     t = null;
1410     }
1411     else {
1412     ex = null;
1413 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1414     t = tr;
1415 dl 1.17 }
1416     CompletableFuture<U> c = null;
1417 dl 1.20 U u = null;
1418     boolean complete = false;
1419 dl 1.17 if (ex == null) {
1420 dl 1.37 if ((e = executor) != null)
1421 dl 1.96 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1422 dl 1.37 else {
1423     try {
1424 dl 1.88 CompletionStage<U> cs = fn.apply(t);
1425     c = (cs == null) ? null : cs.toCompletableFuture();
1426     if (c == null)
1427 dl 1.37 ex = new NullPointerException();
1428     } catch (Throwable rex) {
1429     ex = rex;
1430     }
1431 dl 1.17 }
1432     }
1433 dl 1.37 if (c != null) {
1434 dl 1.75 ThenCopy<U> d = null;
1435 dl 1.18 Object s;
1436     if ((s = c.result) == null) {
1437     CompletionNode p = new CompletionNode
1438 dl 1.75 (d = new ThenCopy<U>(c, dst));
1439 dl 1.18 while ((s = c.result) == null) {
1440     if (UNSAFE.compareAndSwapObject
1441     (c, COMPLETIONS, p.next = c.completions, p))
1442     break;
1443     }
1444 dl 1.17 }
1445 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1446 dl 1.20 complete = true;
1447 dl 1.18 if (s instanceof AltResult) {
1448     ex = ((AltResult)s).ex; // no rewrap
1449     u = null;
1450 dl 1.28 }
1451     else {
1452     @SuppressWarnings("unchecked") U us = (U) s;
1453     u = us;
1454     }
1455     }
1456     }
1457     if (complete || ex != null)
1458     dst.internalComplete(u, ex);
1459     if (c != null)
1460     c.helpPostComplete();
1461     }
1462     }
1463     private static final long serialVersionUID = 5232453952276885070L;
1464     }
1465    
1466 dl 1.88 // Implementations of stage methods with (plain, async, Executor) forms
1467 dl 1.28
1468 dl 1.88 private <U> CompletableFuture<U> doThenApply
1469     (Function<? super T,? extends U> fn,
1470     Executor e) {
1471     if (fn == null) throw new NullPointerException();
1472     CompletableFuture<U> dst = new CompletableFuture<U>();
1473     ThenApply<T,U> d = null;
1474     Object r;
1475     if ((r = result) == null) {
1476     CompletionNode p = new CompletionNode
1477     (d = new ThenApply<T,U>(this, fn, dst, e));
1478     while ((r = result) == null) {
1479     if (UNSAFE.compareAndSwapObject
1480     (this, COMPLETIONS, p.next = completions, p))
1481     break;
1482     }
1483     }
1484     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1485     T t; Throwable ex;
1486     if (r instanceof AltResult) {
1487     ex = ((AltResult)r).ex;
1488     t = null;
1489     }
1490     else {
1491     ex = null;
1492     @SuppressWarnings("unchecked") T tr = (T) r;
1493     t = tr;
1494     }
1495     U u = null;
1496     if (ex == null) {
1497     try {
1498     if (e != null)
1499 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1500 dl 1.88 else
1501     u = fn.apply(t);
1502     } catch (Throwable rex) {
1503     ex = rex;
1504     }
1505     }
1506     if (e == null || ex != null)
1507     dst.internalComplete(u, ex);
1508     }
1509     helpPostComplete();
1510     return dst;
1511 dl 1.28 }
1512    
1513 dl 1.88 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1514     Executor e) {
1515     if (fn == null) throw new NullPointerException();
1516     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1517     ThenAccept<T> d = null;
1518     Object r;
1519     if ((r = result) == null) {
1520     CompletionNode p = new CompletionNode
1521     (d = new ThenAccept<T>(this, fn, dst, e));
1522     while ((r = result) == null) {
1523     if (UNSAFE.compareAndSwapObject
1524     (this, COMPLETIONS, p.next = completions, p))
1525     break;
1526     }
1527     }
1528     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1529     T t; Throwable ex;
1530     if (r instanceof AltResult) {
1531     ex = ((AltResult)r).ex;
1532     t = null;
1533     }
1534     else {
1535     ex = null;
1536     @SuppressWarnings("unchecked") T tr = (T) r;
1537     t = tr;
1538     }
1539     if (ex == null) {
1540     try {
1541     if (e != null)
1542 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1543 dl 1.88 else
1544     fn.accept(t);
1545     } catch (Throwable rex) {
1546     ex = rex;
1547     }
1548     }
1549     if (e == null || ex != null)
1550     dst.internalComplete(null, ex);
1551     }
1552     helpPostComplete();
1553     return dst;
1554 dl 1.28 }
1555    
1556 dl 1.88 private CompletableFuture<Void> doThenRun(Runnable action,
1557     Executor e) {
1558     if (action == null) throw new NullPointerException();
1559     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1560     ThenRun d = null;
1561     Object r;
1562     if ((r = result) == null) {
1563     CompletionNode p = new CompletionNode
1564     (d = new ThenRun(this, action, dst, e));
1565     while ((r = result) == null) {
1566     if (UNSAFE.compareAndSwapObject
1567     (this, COMPLETIONS, p.next = completions, p))
1568     break;
1569     }
1570     }
1571     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1572     Throwable ex;
1573     if (r instanceof AltResult)
1574     ex = ((AltResult)r).ex;
1575     else
1576     ex = null;
1577     if (ex == null) {
1578     try {
1579     if (e != null)
1580 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1581 dl 1.88 else
1582     action.run();
1583     } catch (Throwable rex) {
1584     ex = rex;
1585     }
1586     }
1587     if (e == null || ex != null)
1588     dst.internalComplete(null, ex);
1589     }
1590     helpPostComplete();
1591     return dst;
1592     }
1593    
1594     private <U,V> CompletableFuture<V> doThenCombine
1595     (CompletableFuture<? extends U> other,
1596     BiFunction<? super T,? super U,? extends V> fn,
1597     Executor e) {
1598     if (other == null || fn == null) throw new NullPointerException();
1599     CompletableFuture<V> dst = new CompletableFuture<V>();
1600     ThenCombine<T,U,V> d = null;
1601     Object r, s = null;
1602     if ((r = result) == null || (s = other.result) == null) {
1603     d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
1604     CompletionNode q = null, p = new CompletionNode(d);
1605     while ((r == null && (r = result) == null) ||
1606     (s == null && (s = other.result) == null)) {
1607     if (q != null) {
1608     if (s != null ||
1609     UNSAFE.compareAndSwapObject
1610     (other, COMPLETIONS, q.next = other.completions, q))
1611     break;
1612     }
1613     else if (r != null ||
1614     UNSAFE.compareAndSwapObject
1615     (this, COMPLETIONS, p.next = completions, p)) {
1616     if (s != null)
1617     break;
1618     q = new CompletionNode(d);
1619     }
1620     }
1621     }
1622     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1623     T t; U u; Throwable ex;
1624     if (r instanceof AltResult) {
1625     ex = ((AltResult)r).ex;
1626     t = null;
1627     }
1628     else {
1629     ex = null;
1630     @SuppressWarnings("unchecked") T tr = (T) r;
1631     t = tr;
1632     }
1633     if (ex != null)
1634     u = null;
1635     else if (s instanceof AltResult) {
1636     ex = ((AltResult)s).ex;
1637     u = null;
1638     }
1639     else {
1640     @SuppressWarnings("unchecked") U us = (U) s;
1641     u = us;
1642     }
1643     V v = null;
1644     if (ex == null) {
1645     try {
1646     if (e != null)
1647 dl 1.96 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
1648 dl 1.88 else
1649     v = fn.apply(t, u);
1650     } catch (Throwable rex) {
1651     ex = rex;
1652     }
1653     }
1654     if (e == null || ex != null)
1655     dst.internalComplete(v, ex);
1656     }
1657     helpPostComplete();
1658     other.helpPostComplete();
1659     return dst;
1660     }
1661    
1662     private <U> CompletableFuture<Void> doThenAcceptBoth
1663     (CompletableFuture<? extends U> other,
1664     BiConsumer<? super T,? super U> fn,
1665     Executor e) {
1666     if (other == null || fn == null) throw new NullPointerException();
1667     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1668     ThenAcceptBoth<T,U> d = null;
1669     Object r, s = null;
1670     if ((r = result) == null || (s = other.result) == null) {
1671     d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1672     CompletionNode q = null, p = new CompletionNode(d);
1673     while ((r == null && (r = result) == null) ||
1674     (s == null && (s = other.result) == null)) {
1675     if (q != null) {
1676     if (s != null ||
1677     UNSAFE.compareAndSwapObject
1678     (other, COMPLETIONS, q.next = other.completions, q))
1679     break;
1680     }
1681     else if (r != null ||
1682     UNSAFE.compareAndSwapObject
1683     (this, COMPLETIONS, p.next = completions, p)) {
1684     if (s != null)
1685     break;
1686     q = new CompletionNode(d);
1687     }
1688     }
1689     }
1690     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1691     T t; U u; Throwable ex;
1692     if (r instanceof AltResult) {
1693     ex = ((AltResult)r).ex;
1694     t = null;
1695     }
1696     else {
1697     ex = null;
1698     @SuppressWarnings("unchecked") T tr = (T) r;
1699     t = tr;
1700     }
1701     if (ex != null)
1702     u = null;
1703     else if (s instanceof AltResult) {
1704     ex = ((AltResult)s).ex;
1705     u = null;
1706     }
1707     else {
1708     @SuppressWarnings("unchecked") U us = (U) s;
1709     u = us;
1710     }
1711     if (ex == null) {
1712     try {
1713     if (e != null)
1714 dl 1.96 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1715 dl 1.88 else
1716     fn.accept(t, u);
1717     } catch (Throwable rex) {
1718     ex = rex;
1719     }
1720     }
1721     if (e == null || ex != null)
1722     dst.internalComplete(null, ex);
1723     }
1724     helpPostComplete();
1725     other.helpPostComplete();
1726     return dst;
1727     }
1728    
1729     private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1730     Runnable action,
1731     Executor e) {
1732     if (other == null || action == null) throw new NullPointerException();
1733     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1734     RunAfterBoth d = null;
1735     Object r, s = null;
1736     if ((r = result) == null || (s = other.result) == null) {
1737     d = new RunAfterBoth(this, other, action, dst, e);
1738     CompletionNode q = null, p = new CompletionNode(d);
1739     while ((r == null && (r = result) == null) ||
1740     (s == null && (s = other.result) == null)) {
1741     if (q != null) {
1742     if (s != null ||
1743     UNSAFE.compareAndSwapObject
1744     (other, COMPLETIONS, q.next = other.completions, q))
1745     break;
1746     }
1747     else if (r != null ||
1748     UNSAFE.compareAndSwapObject
1749     (this, COMPLETIONS, p.next = completions, p)) {
1750     if (s != null)
1751     break;
1752     q = new CompletionNode(d);
1753     }
1754     }
1755     }
1756     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1757     Throwable ex;
1758     if (r instanceof AltResult)
1759     ex = ((AltResult)r).ex;
1760     else
1761     ex = null;
1762     if (ex == null && (s instanceof AltResult))
1763     ex = ((AltResult)s).ex;
1764     if (ex == null) {
1765     try {
1766     if (e != null)
1767 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1768 dl 1.88 else
1769     action.run();
1770     } catch (Throwable rex) {
1771     ex = rex;
1772     }
1773     }
1774     if (e == null || ex != null)
1775     dst.internalComplete(null, ex);
1776     }
1777     helpPostComplete();
1778     other.helpPostComplete();
1779     return dst;
1780     }
1781    
1782     private <U> CompletableFuture<U> doApplyToEither
1783     (CompletableFuture<? extends T> other,
1784     Function<? super T, U> fn,
1785     Executor e) {
1786     if (other == null || fn == null) throw new NullPointerException();
1787     CompletableFuture<U> dst = new CompletableFuture<U>();
1788     ApplyToEither<T,U> d = null;
1789     Object r;
1790     if ((r = result) == null && (r = other.result) == null) {
1791     d = new ApplyToEither<T,U>(this, other, fn, dst, e);
1792     CompletionNode q = null, p = new CompletionNode(d);
1793     while ((r = result) == null && (r = other.result) == null) {
1794     if (q != null) {
1795     if (UNSAFE.compareAndSwapObject
1796     (other, COMPLETIONS, q.next = other.completions, q))
1797     break;
1798     }
1799     else if (UNSAFE.compareAndSwapObject
1800     (this, COMPLETIONS, p.next = completions, p))
1801     q = new CompletionNode(d);
1802     }
1803     }
1804     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1805     T t; Throwable ex;
1806     if (r instanceof AltResult) {
1807     ex = ((AltResult)r).ex;
1808     t = null;
1809     }
1810     else {
1811     ex = null;
1812     @SuppressWarnings("unchecked") T tr = (T) r;
1813     t = tr;
1814     }
1815     U u = null;
1816     if (ex == null) {
1817     try {
1818     if (e != null)
1819 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1820 dl 1.88 else
1821     u = fn.apply(t);
1822     } catch (Throwable rex) {
1823     ex = rex;
1824     }
1825     }
1826     if (e == null || ex != null)
1827     dst.internalComplete(u, ex);
1828     }
1829     helpPostComplete();
1830     other.helpPostComplete();
1831     return dst;
1832     }
1833    
1834     private CompletableFuture<Void> doAcceptEither
1835     (CompletableFuture<? extends T> other,
1836     Consumer<? super T> fn,
1837     Executor e) {
1838     if (other == null || fn == null) throw new NullPointerException();
1839     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1840     AcceptEither<T> d = null;
1841     Object r;
1842     if ((r = result) == null && (r = other.result) == null) {
1843     d = new AcceptEither<T>(this, other, fn, dst, e);
1844     CompletionNode q = null, p = new CompletionNode(d);
1845     while ((r = result) == null && (r = other.result) == null) {
1846     if (q != null) {
1847     if (UNSAFE.compareAndSwapObject
1848     (other, COMPLETIONS, q.next = other.completions, q))
1849     break;
1850     }
1851     else if (UNSAFE.compareAndSwapObject
1852     (this, COMPLETIONS, p.next = completions, p))
1853     q = new CompletionNode(d);
1854     }
1855     }
1856     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1857     T t; Throwable ex;
1858     if (r instanceof AltResult) {
1859     ex = ((AltResult)r).ex;
1860     t = null;
1861     }
1862     else {
1863     ex = null;
1864     @SuppressWarnings("unchecked") T tr = (T) r;
1865     t = tr;
1866     }
1867     if (ex == null) {
1868     try {
1869     if (e != null)
1870 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1871 dl 1.88 else
1872     fn.accept(t);
1873     } catch (Throwable rex) {
1874     ex = rex;
1875     }
1876     }
1877     if (e == null || ex != null)
1878     dst.internalComplete(null, ex);
1879     }
1880     helpPostComplete();
1881     other.helpPostComplete();
1882     return dst;
1883     }
1884    
1885     private CompletableFuture<Void> doRunAfterEither
1886     (CompletableFuture<?> other,
1887     Runnable action,
1888     Executor e) {
1889     if (other == null || action == null) throw new NullPointerException();
1890     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1891     RunAfterEither d = null;
1892     Object r;
1893     if ((r = result) == null && (r = other.result) == null) {
1894     d = new RunAfterEither(this, other, action, dst, e);
1895     CompletionNode q = null, p = new CompletionNode(d);
1896     while ((r = result) == null && (r = other.result) == null) {
1897     if (q != null) {
1898     if (UNSAFE.compareAndSwapObject
1899     (other, COMPLETIONS, q.next = other.completions, q))
1900     break;
1901     }
1902     else if (UNSAFE.compareAndSwapObject
1903     (this, COMPLETIONS, p.next = completions, p))
1904     q = new CompletionNode(d);
1905     }
1906     }
1907     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1908     Throwable ex;
1909     if (r instanceof AltResult)
1910     ex = ((AltResult)r).ex;
1911     else
1912     ex = null;
1913     if (ex == null) {
1914     try {
1915     if (e != null)
1916 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1917 dl 1.88 else
1918     action.run();
1919     } catch (Throwable rex) {
1920     ex = rex;
1921     }
1922     }
1923     if (e == null || ex != null)
1924     dst.internalComplete(null, ex);
1925     }
1926     helpPostComplete();
1927     other.helpPostComplete();
1928     return dst;
1929     }
1930    
1931     private <U> CompletableFuture<U> doThenCompose
1932     (Function<? super T, ? extends CompletionStage<U>> fn,
1933     Executor e) {
1934     if (fn == null) throw new NullPointerException();
1935     CompletableFuture<U> dst = null;
1936     ThenCompose<T,U> d = null;
1937     Object r;
1938     if ((r = result) == null) {
1939     dst = new CompletableFuture<U>();
1940     CompletionNode p = new CompletionNode
1941     (d = new ThenCompose<T,U>(this, fn, dst, e));
1942     while ((r = result) == null) {
1943     if (UNSAFE.compareAndSwapObject
1944     (this, COMPLETIONS, p.next = completions, p))
1945     break;
1946     }
1947     }
1948     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1949     T t; Throwable ex;
1950     if (r instanceof AltResult) {
1951     ex = ((AltResult)r).ex;
1952     t = null;
1953     }
1954     else {
1955     ex = null;
1956     @SuppressWarnings("unchecked") T tr = (T) r;
1957     t = tr;
1958     }
1959     if (ex == null) {
1960     if (e != null) {
1961     if (dst == null)
1962     dst = new CompletableFuture<U>();
1963 dl 1.96 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1964 dl 1.88 }
1965     else {
1966     try {
1967     CompletionStage<U> cs = fn.apply(t);
1968     if (cs == null ||
1969     (dst = cs.toCompletableFuture()) == null)
1970     ex = new NullPointerException();
1971     } catch (Throwable rex) {
1972     ex = rex;
1973     }
1974     }
1975     }
1976     if (dst == null)
1977     dst = new CompletableFuture<U>();
1978     if (e == null || ex != null)
1979     dst.internalComplete(null, ex);
1980     }
1981     helpPostComplete();
1982     dst.helpPostComplete();
1983     return dst;
1984     }
1985    
1986     private CompletableFuture<T> doWhenComplete
1987     (BiConsumer<? super T, ? super Throwable> fn,
1988     Executor e) {
1989     if (fn == null) throw new NullPointerException();
1990     CompletableFuture<T> dst = new CompletableFuture<T>();
1991     WhenCompleteCompletion<T> d = null;
1992     Object r;
1993     if ((r = result) == null) {
1994     CompletionNode p =
1995     new CompletionNode(d = new WhenCompleteCompletion<T>
1996     (this, fn, dst, e));
1997     while ((r = result) == null) {
1998     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1999     p.next = completions, p))
2000     break;
2001     }
2002     }
2003     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2004     T t; Throwable ex;
2005     if (r instanceof AltResult) {
2006     ex = ((AltResult)r).ex;
2007     t = null;
2008     }
2009     else {
2010     ex = null;
2011     @SuppressWarnings("unchecked") T tr = (T) r;
2012     t = tr;
2013     }
2014     Throwable dx = null;
2015     try {
2016     if (e != null)
2017 dl 1.96 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
2018 dl 1.88 else
2019     fn.accept(t, ex);
2020     } catch (Throwable rex) {
2021     dx = rex;
2022     }
2023     if (e == null || dx != null)
2024     dst.internalComplete(t, ex != null ? ex : dx);
2025     }
2026     helpPostComplete();
2027     return dst;
2028     }
2029    
2030     private <U> CompletableFuture<U> doHandle
2031     (BiFunction<? super T, Throwable, ? extends U> fn,
2032     Executor e) {
2033     if (fn == null) throw new NullPointerException();
2034     CompletableFuture<U> dst = new CompletableFuture<U>();
2035     HandleCompletion<T,U> d = null;
2036     Object r;
2037     if ((r = result) == null) {
2038     CompletionNode p =
2039     new CompletionNode(d = new HandleCompletion<T,U>
2040     (this, fn, dst, e));
2041     while ((r = result) == null) {
2042     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2043     p.next = completions, p))
2044     break;
2045     }
2046     }
2047     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2048     T t; Throwable ex;
2049     if (r instanceof AltResult) {
2050     ex = ((AltResult)r).ex;
2051     t = null;
2052     }
2053     else {
2054     ex = null;
2055     @SuppressWarnings("unchecked") T tr = (T) r;
2056     t = tr;
2057     }
2058 jsr166 1.90 U u = null;
2059 dl 1.88 Throwable dx = null;
2060     try {
2061     if (e != null)
2062 dl 1.96 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2063 dl 1.88 else {
2064     u = fn.apply(t, ex);
2065     dx = null;
2066     }
2067     } catch (Throwable rex) {
2068     dx = rex;
2069     u = null;
2070     }
2071     if (e == null || dx != null)
2072     dst.internalComplete(u, dx);
2073     }
2074     helpPostComplete();
2075     return dst;
2076     }
2077    
2078    
2079     // public methods
2080    
2081     /**
2082     * Creates a new incomplete CompletableFuture.
2083     */
2084     public CompletableFuture() {
2085     }
2086    
2087     /**
2088     * Returns a new CompletableFuture that is asynchronously completed
2089     * by a task running in the {@link ForkJoinPool#commonPool()} with
2090     * the value obtained by calling the given Supplier.
2091     *
2092     * @param supplier a function returning the value to be used
2093     * to complete the returned CompletableFuture
2094 jsr166 1.95 * @param <U> the function's return type
2095 dl 1.88 * @return the new CompletableFuture
2096     */
2097     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2098     if (supplier == null) throw new NullPointerException();
2099     CompletableFuture<U> f = new CompletableFuture<U>();
2100 dl 1.96 execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
2101 dl 1.88 return f;
2102     }
2103    
2104     /**
2105     * Returns a new CompletableFuture that is asynchronously completed
2106     * by a task running in the given executor with the value obtained
2107     * by calling the given Supplier.
2108     *
2109     * @param supplier a function returning the value to be used
2110     * to complete the returned CompletableFuture
2111     * @param executor the executor to use for asynchronous execution
2112 jsr166 1.95 * @param <U> the function's return type
2113 dl 1.88 * @return the new CompletableFuture
2114     */
2115     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2116     Executor executor) {
2117     if (executor == null || supplier == null)
2118     throw new NullPointerException();
2119     CompletableFuture<U> f = new CompletableFuture<U>();
2120 dl 1.96 execAsync(executor, new AsyncSupply<U>(supplier, f));
2121 dl 1.88 return f;
2122 dl 1.28 }
2123    
2124     /**
2125 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2126     * by a task running in the {@link ForkJoinPool#commonPool()} after
2127     * it runs the given action.
2128 dl 1.28 *
2129     * @param runnable the action to run before completing the
2130     * returned CompletableFuture
2131 jsr166 1.58 * @return the new CompletableFuture
2132 dl 1.28 */
2133     public static CompletableFuture<Void> runAsync(Runnable runnable) {
2134     if (runnable == null) throw new NullPointerException();
2135     CompletableFuture<Void> f = new CompletableFuture<Void>();
2136 dl 1.96 execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f));
2137 dl 1.28 return f;
2138     }
2139    
2140     /**
2141 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2142     * by a task running in the given executor after it runs the given
2143     * action.
2144 dl 1.28 *
2145     * @param runnable the action to run before completing the
2146     * returned CompletableFuture
2147     * @param executor the executor to use for asynchronous execution
2148 jsr166 1.58 * @return the new CompletableFuture
2149 dl 1.28 */
2150     public static CompletableFuture<Void> runAsync(Runnable runnable,
2151     Executor executor) {
2152     if (executor == null || runnable == null)
2153     throw new NullPointerException();
2154     CompletableFuture<Void> f = new CompletableFuture<Void>();
2155 dl 1.96 execAsync(executor, new AsyncRun(runnable, f));
2156 dl 1.28 return f;
2157     }
2158    
2159     /**
2160 dl 1.77 * Returns a new CompletableFuture that is already completed with
2161     * the given value.
2162     *
2163     * @param value the value
2164 jsr166 1.95 * @param <U> the type of the value
2165 dl 1.77 * @return the completed CompletableFuture
2166     */
2167     public static <U> CompletableFuture<U> completedFuture(U value) {
2168     CompletableFuture<U> f = new CompletableFuture<U>();
2169 jsr166 1.78 f.result = (value == null) ? NIL : value;
2170 dl 1.77 return f;
2171     }
2172    
2173     /**
2174 dl 1.28 * Returns {@code true} if completed in any fashion: normally,
2175     * exceptionally, or via cancellation.
2176     *
2177     * @return {@code true} if completed
2178     */
2179     public boolean isDone() {
2180     return result != null;
2181     }
2182    
2183     /**
2184 dl 1.49 * Waits if necessary for this future to complete, and then
2185 dl 1.48 * returns its result.
2186 dl 1.28 *
2187 dl 1.48 * @return the result value
2188     * @throws CancellationException if this future was cancelled
2189     * @throws ExecutionException if this future completed exceptionally
2190 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2191     * while waiting
2192     */
2193     public T get() throws InterruptedException, ExecutionException {
2194     Object r; Throwable ex, cause;
2195     if ((r = result) == null && (r = waitingGet(true)) == null)
2196     throw new InterruptedException();
2197 jsr166 1.45 if (!(r instanceof AltResult)) {
2198     @SuppressWarnings("unchecked") T tr = (T) r;
2199     return tr;
2200     }
2201     if ((ex = ((AltResult)r).ex) == null)
2202 dl 1.28 return null;
2203 jsr166 1.45 if (ex instanceof CancellationException)
2204     throw (CancellationException)ex;
2205     if ((ex instanceof CompletionException) &&
2206     (cause = ex.getCause()) != null)
2207     ex = cause;
2208     throw new ExecutionException(ex);
2209 dl 1.28 }
2210    
2211     /**
2212 dl 1.49 * Waits if necessary for at most the given time for this future
2213     * to complete, and then returns its result, if available.
2214 dl 1.28 *
2215     * @param timeout the maximum time to wait
2216     * @param unit the time unit of the timeout argument
2217 dl 1.48 * @return the result value
2218     * @throws CancellationException if this future was cancelled
2219     * @throws ExecutionException if this future completed exceptionally
2220 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2221     * while waiting
2222     * @throws TimeoutException if the wait timed out
2223     */
2224     public T get(long timeout, TimeUnit unit)
2225     throws InterruptedException, ExecutionException, TimeoutException {
2226     Object r; Throwable ex, cause;
2227     long nanos = unit.toNanos(timeout);
2228     if (Thread.interrupted())
2229     throw new InterruptedException();
2230     if ((r = result) == null)
2231     r = timedAwaitDone(nanos);
2232 jsr166 1.45 if (!(r instanceof AltResult)) {
2233     @SuppressWarnings("unchecked") T tr = (T) r;
2234     return tr;
2235     }
2236     if ((ex = ((AltResult)r).ex) == null)
2237 dl 1.28 return null;
2238 jsr166 1.45 if (ex instanceof CancellationException)
2239     throw (CancellationException)ex;
2240     if ((ex instanceof CompletionException) &&
2241     (cause = ex.getCause()) != null)
2242     ex = cause;
2243     throw new ExecutionException(ex);
2244 dl 1.28 }
2245    
2246     /**
2247     * Returns the result value when complete, or throws an
2248     * (unchecked) exception if completed exceptionally. To better
2249     * conform with the use of common functional forms, if a
2250     * computation involved in the completion of this
2251     * CompletableFuture threw an exception, this method throws an
2252     * (unchecked) {@link CompletionException} with the underlying
2253     * exception as its cause.
2254     *
2255     * @return the result value
2256     * @throws CancellationException if the computation was cancelled
2257 jsr166 1.55 * @throws CompletionException if this future completed
2258     * exceptionally or a completion computation threw an exception
2259 dl 1.28 */
2260     public T join() {
2261     Object r; Throwable ex;
2262     if ((r = result) == null)
2263     r = waitingGet(false);
2264 jsr166 1.45 if (!(r instanceof AltResult)) {
2265     @SuppressWarnings("unchecked") T tr = (T) r;
2266     return tr;
2267     }
2268     if ((ex = ((AltResult)r).ex) == null)
2269 dl 1.28 return null;
2270 jsr166 1.45 if (ex instanceof CancellationException)
2271     throw (CancellationException)ex;
2272     if (ex instanceof CompletionException)
2273     throw (CompletionException)ex;
2274     throw new CompletionException(ex);
2275 dl 1.28 }
2276    
2277     /**
2278     * Returns the result value (or throws any encountered exception)
2279     * if completed, else returns the given valueIfAbsent.
2280     *
2281     * @param valueIfAbsent the value to return if not completed
2282     * @return the result value, if completed, else the given valueIfAbsent
2283     * @throws CancellationException if the computation was cancelled
2284 jsr166 1.55 * @throws CompletionException if this future completed
2285     * exceptionally or a completion computation threw an exception
2286 dl 1.28 */
2287     public T getNow(T valueIfAbsent) {
2288     Object r; Throwable ex;
2289     if ((r = result) == null)
2290     return valueIfAbsent;
2291 jsr166 1.45 if (!(r instanceof AltResult)) {
2292     @SuppressWarnings("unchecked") T tr = (T) r;
2293     return tr;
2294     }
2295     if ((ex = ((AltResult)r).ex) == null)
2296 dl 1.28 return null;
2297 jsr166 1.45 if (ex instanceof CancellationException)
2298     throw (CancellationException)ex;
2299     if (ex instanceof CompletionException)
2300     throw (CompletionException)ex;
2301     throw new CompletionException(ex);
2302 dl 1.28 }
2303    
2304     /**
2305     * If not already completed, sets the value returned by {@link
2306     * #get()} and related methods to the given value.
2307     *
2308     * @param value the result value
2309     * @return {@code true} if this invocation caused this CompletableFuture
2310     * to transition to a completed state, else {@code false}
2311     */
2312     public boolean complete(T value) {
2313     boolean triggered = result == null &&
2314     UNSAFE.compareAndSwapObject(this, RESULT, null,
2315     value == null ? NIL : value);
2316     postComplete();
2317     return triggered;
2318     }
2319    
2320     /**
2321     * If not already completed, causes invocations of {@link #get()}
2322     * and related methods to throw the given exception.
2323     *
2324     * @param ex the exception
2325     * @return {@code true} if this invocation caused this CompletableFuture
2326     * to transition to a completed state, else {@code false}
2327     */
2328     public boolean completeExceptionally(Throwable ex) {
2329     if (ex == null) throw new NullPointerException();
2330     boolean triggered = result == null &&
2331     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2332     postComplete();
2333     return triggered;
2334     }
2335    
2336 dl 1.88 // CompletionStage methods
2337    
2338     public <U> CompletableFuture<U> thenApply
2339     (Function<? super T,? extends U> fn) {
2340 dl 1.28 return doThenApply(fn, null);
2341     }
2342    
2343 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2344     (Function<? super T,? extends U> fn) {
2345 dl 1.28 return doThenApply(fn, ForkJoinPool.commonPool());
2346 dl 1.17 }
2347    
2348 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2349     (Function<? super T,? extends U> fn,
2350     Executor executor) {
2351 dl 1.28 if (executor == null) throw new NullPointerException();
2352     return doThenApply(fn, executor);
2353     }
2354 dl 1.1
2355 dl 1.88 public CompletableFuture<Void> thenAccept
2356     (Consumer<? super T> action) {
2357     return doThenAccept(action, null);
2358 dl 1.28 }
2359    
2360 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2361     (Consumer<? super T> action) {
2362     return doThenAccept(action, ForkJoinPool.commonPool());
2363 dl 1.28 }
2364    
2365 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2366     (Consumer<? super T> action,
2367     Executor executor) {
2368 dl 1.28 if (executor == null) throw new NullPointerException();
2369 dl 1.88 return doThenAccept(action, executor);
2370 dl 1.7 }
2371    
2372 dl 1.88 public CompletableFuture<Void> thenRun
2373     (Runnable action) {
2374 dl 1.28 return doThenRun(action, null);
2375     }
2376    
2377 dl 1.88 public CompletableFuture<Void> thenRunAsync
2378     (Runnable action) {
2379 dl 1.28 return doThenRun(action, ForkJoinPool.commonPool());
2380     }
2381    
2382 dl 1.88 public CompletableFuture<Void> thenRunAsync
2383     (Runnable action,
2384     Executor executor) {
2385 dl 1.28 if (executor == null) throw new NullPointerException();
2386     return doThenRun(action, executor);
2387     }
2388    
2389 dl 1.48 public <U,V> CompletableFuture<V> thenCombine
2390 dl 1.88 (CompletionStage<? extends U> other,
2391 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2392 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, null);
2393 dl 1.28 }
2394    
2395 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2396 dl 1.88 (CompletionStage<? extends U> other,
2397 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2398 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn,
2399     ForkJoinPool.commonPool());
2400 dl 1.28 }
2401    
2402 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2403 dl 1.88 (CompletionStage<? extends U> other,
2404 dl 1.48 BiFunction<? super T,? super U,? extends V> fn,
2405     Executor executor) {
2406 dl 1.28 if (executor == null) throw new NullPointerException();
2407 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, executor);
2408 dl 1.1 }
2409    
2410 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBoth
2411 dl 1.88 (CompletionStage<? extends U> other,
2412     BiConsumer<? super T, ? super U> action) {
2413     return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2414 dl 1.28 }
2415    
2416 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2417 dl 1.88 (CompletionStage<? extends U> other,
2418     BiConsumer<? super T, ? super U> action) {
2419     return doThenAcceptBoth(other.toCompletableFuture(), action,
2420     ForkJoinPool.commonPool());
2421 dl 1.28 }
2422    
2423 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2424 dl 1.88 (CompletionStage<? extends U> other,
2425     BiConsumer<? super T, ? super U> action,
2426 dl 1.48 Executor executor) {
2427 dl 1.28 if (executor == null) throw new NullPointerException();
2428 dl 1.88 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2429 dl 1.28 }
2430    
2431 dl 1.88 public CompletableFuture<Void> runAfterBoth
2432     (CompletionStage<?> other,
2433     Runnable action) {
2434     return doRunAfterBoth(other.toCompletableFuture(), action, null);
2435 dl 1.7 }
2436    
2437 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2438     (CompletionStage<?> other,
2439     Runnable action) {
2440     return doRunAfterBoth(other.toCompletableFuture(), action,
2441     ForkJoinPool.commonPool());
2442 dl 1.28 }
2443    
2444 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2445     (CompletionStage<?> other,
2446     Runnable action,
2447     Executor executor) {
2448 dl 1.28 if (executor == null) throw new NullPointerException();
2449 dl 1.88 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2450 dl 1.28 }
2451    
2452 dl 1.1
2453 dl 1.48 public <U> CompletableFuture<U> applyToEither
2454 dl 1.88 (CompletionStage<? extends T> other,
2455 dl 1.48 Function<? super T, U> fn) {
2456 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, null);
2457 dl 1.28 }
2458    
2459 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2460 dl 1.88 (CompletionStage<? extends T> other,
2461 dl 1.48 Function<? super T, U> fn) {
2462 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn,
2463     ForkJoinPool.commonPool());
2464 dl 1.28 }
2465    
2466 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2467 dl 1.88 (CompletionStage<? extends T> other,
2468 dl 1.48 Function<? super T, U> fn,
2469     Executor executor) {
2470 dl 1.28 if (executor == null) throw new NullPointerException();
2471 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2472 dl 1.1 }
2473    
2474 dl 1.48 public CompletableFuture<Void> acceptEither
2475 dl 1.88 (CompletionStage<? extends T> other,
2476     Consumer<? super T> action) {
2477     return doAcceptEither(other.toCompletableFuture(), action, null);
2478 dl 1.28 }
2479    
2480 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2481 dl 1.88 (CompletionStage<? extends T> other,
2482     Consumer<? super T> action) {
2483     return doAcceptEither(other.toCompletableFuture(), action,
2484     ForkJoinPool.commonPool());
2485 dl 1.28 }
2486    
2487 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2488 dl 1.88 (CompletionStage<? extends T> other,
2489     Consumer<? super T> action,
2490 dl 1.48 Executor executor) {
2491 dl 1.28 if (executor == null) throw new NullPointerException();
2492 dl 1.88 return doAcceptEither(other.toCompletableFuture(), action, executor);
2493 dl 1.7 }
2494    
2495 dl 1.88 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2496 dl 1.28 Runnable action) {
2497 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, null);
2498 dl 1.28 }
2499    
2500 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2501 dl 1.88 (CompletionStage<?> other,
2502 dl 1.48 Runnable action) {
2503 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action,
2504     ForkJoinPool.commonPool());
2505 dl 1.28 }
2506    
2507 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2508 dl 1.88 (CompletionStage<?> other,
2509 dl 1.48 Runnable action,
2510     Executor executor) {
2511 dl 1.28 if (executor == null) throw new NullPointerException();
2512 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2513 dl 1.1 }
2514    
2515 dl 1.48 public <U> CompletableFuture<U> thenCompose
2516 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2517 jsr166 1.81 return doThenCompose(fn, null);
2518 dl 1.37 }
2519    
2520 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2521 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2522 jsr166 1.81 return doThenCompose(fn, ForkJoinPool.commonPool());
2523 dl 1.37 }
2524    
2525 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2526 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn,
2527 dl 1.48 Executor executor) {
2528 dl 1.37 if (executor == null) throw new NullPointerException();
2529 jsr166 1.81 return doThenCompose(fn, executor);
2530 dl 1.37 }
2531    
2532 dl 1.88 public CompletableFuture<T> whenComplete
2533     (BiConsumer<? super T, ? super Throwable> action) {
2534     return doWhenComplete(action, null);
2535     }
2536    
2537     public CompletableFuture<T> whenCompleteAsync
2538     (BiConsumer<? super T, ? super Throwable> action) {
2539     return doWhenComplete(action, ForkJoinPool.commonPool());
2540     }
2541    
2542     public CompletableFuture<T> whenCompleteAsync
2543     (BiConsumer<? super T, ? super Throwable> action,
2544     Executor executor) {
2545     if (executor == null) throw new NullPointerException();
2546     return doWhenComplete(action, executor);
2547     }
2548    
2549     public <U> CompletableFuture<U> handle
2550     (BiFunction<? super T, Throwable, ? extends U> fn) {
2551     return doHandle(fn, null);
2552     }
2553    
2554     public <U> CompletableFuture<U> handleAsync
2555     (BiFunction<? super T, Throwable, ? extends U> fn) {
2556     return doHandle(fn, ForkJoinPool.commonPool());
2557     }
2558    
2559     public <U> CompletableFuture<U> handleAsync
2560     (BiFunction<? super T, Throwable, ? extends U> fn,
2561     Executor executor) {
2562     if (executor == null) throw new NullPointerException();
2563     return doHandle(fn, executor);
2564     }
2565    
2566     /**
2567     * Returns this CompletableFuture
2568     *
2569     * @return this CompletableFuture
2570     */
2571     public CompletableFuture<T> toCompletableFuture() {
2572     return this;
2573 dl 1.28 }
2574    
2575 dl 1.88 // not in interface CompletionStage
2576    
2577 dl 1.28 /**
2578 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
2579     * CompletableFuture completes, with the result of the given
2580     * function of the exception triggering this CompletableFuture's
2581     * completion when it completes exceptionally; otherwise, if this
2582     * CompletableFuture completes normally, then the returned
2583     * CompletableFuture also completes normally with the same value.
2584 dl 1.88 * Note: More flexible versions of this functionality are
2585     * available using methods {@code whenComplete} and {@code handle}.
2586 dl 1.28 *
2587     * @param fn the function to use to compute the value of the
2588     * returned CompletableFuture if this CompletableFuture completed
2589     * exceptionally
2590     * @return the new CompletableFuture
2591     */
2592 dl 1.48 public CompletableFuture<T> exceptionally
2593     (Function<Throwable, ? extends T> fn) {
2594 dl 1.28 if (fn == null) throw new NullPointerException();
2595     CompletableFuture<T> dst = new CompletableFuture<T>();
2596     ExceptionCompletion<T> d = null;
2597     Object r;
2598     if ((r = result) == null) {
2599     CompletionNode p =
2600 dl 1.88 new CompletionNode(d = new ExceptionCompletion<T>
2601     (this, fn, dst));
2602 dl 1.28 while ((r = result) == null) {
2603     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2604     p.next = completions, p))
2605     break;
2606     }
2607     }
2608     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2609     T t = null; Throwable ex, dx = null;
2610     if (r instanceof AltResult) {
2611 jsr166 1.87 if ((ex = ((AltResult)r).ex) != null) {
2612 dl 1.28 try {
2613     t = fn.apply(ex);
2614     } catch (Throwable rex) {
2615     dx = rex;
2616     }
2617     }
2618     }
2619     else {
2620     @SuppressWarnings("unchecked") T tr = (T) r;
2621     t = tr;
2622     }
2623     dst.internalComplete(t, dx);
2624     }
2625     helpPostComplete();
2626     return dst;
2627     }
2628    
2629 dl 1.35 /* ------------- Arbitrary-arity constructions -------------- */
2630    
2631     /*
2632     * The basic plan of attack is to recursively form binary
2633     * completion trees of elements. This can be overkill for small
2634     * sets, but scales nicely. The And/All vs Or/Any forms use the
2635     * same idea, but details differ.
2636     */
2637    
2638     /**
2639     * Returns a new CompletableFuture that is completed when all of
2640 jsr166 1.66 * the given CompletableFutures complete. If any of the given
2641 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
2642     * CompletableFuture also does so, with a CompletionException
2643     * holding this exception as its cause. Otherwise, the results,
2644     * if any, of the given CompletableFutures are not reflected in
2645     * the returned CompletableFuture, but may be obtained by
2646     * inspecting them individually. If no CompletableFutures are
2647     * provided, returns a CompletableFuture completed with the value
2648     * {@code null}.
2649 dl 1.35 *
2650     * <p>Among the applications of this method is to await completion
2651     * of a set of independent CompletableFutures before continuing a
2652     * program, as in: {@code CompletableFuture.allOf(c1, c2,
2653     * c3).join();}.
2654     *
2655     * @param cfs the CompletableFutures
2656 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
2657 dl 1.35 * given CompletableFutures complete
2658     * @throws NullPointerException if the array or any of its elements are
2659     * {@code null}
2660     */
2661     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2662     int len = cfs.length; // Directly handle empty and singleton cases
2663     if (len > 1)
2664     return allTree(cfs, 0, len - 1);
2665     else {
2666     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2667     CompletableFuture<?> f;
2668     if (len == 0)
2669     dst.result = NIL;
2670     else if ((f = cfs[0]) == null)
2671     throw new NullPointerException();
2672     else {
2673 dl 1.75 ThenPropagate d = null;
2674 dl 1.35 CompletionNode p = null;
2675     Object r;
2676     while ((r = f.result) == null) {
2677     if (d == null)
2678 dl 1.75 d = new ThenPropagate(f, dst);
2679 dl 1.35 else if (p == null)
2680     p = new CompletionNode(d);
2681     else if (UNSAFE.compareAndSwapObject
2682     (f, COMPLETIONS, p.next = f.completions, p))
2683     break;
2684     }
2685     if (r != null && (d == null || d.compareAndSet(0, 1)))
2686     dst.internalComplete(null, (r instanceof AltResult) ?
2687     ((AltResult)r).ex : null);
2688     f.helpPostComplete();
2689     }
2690     return dst;
2691     }
2692     }
2693    
2694     /**
2695     * Recursively constructs an And'ed tree of CompletableFutures.
2696     * Called only when array known to have at least two elements.
2697     */
2698     private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2699     int lo, int hi) {
2700     CompletableFuture<?> fst, snd;
2701     int mid = (lo + hi) >>> 1;
2702     if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2703     (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2704     throw new NullPointerException();
2705     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2706     AndCompletion d = null;
2707     CompletionNode p = null, q = null;
2708     Object r = null, s = null;
2709     while ((r = fst.result) == null || (s = snd.result) == null) {
2710     if (d == null)
2711     d = new AndCompletion(fst, snd, dst);
2712     else if (p == null)
2713     p = new CompletionNode(d);
2714     else if (q == null) {
2715     if (UNSAFE.compareAndSwapObject
2716     (fst, COMPLETIONS, p.next = fst.completions, p))
2717     q = new CompletionNode(d);
2718     }
2719     else if (UNSAFE.compareAndSwapObject
2720     (snd, COMPLETIONS, q.next = snd.completions, q))
2721     break;
2722     }
2723     if ((r != null || (r = fst.result) != null) &&
2724     (s != null || (s = snd.result) != null) &&
2725     (d == null || d.compareAndSet(0, 1))) {
2726     Throwable ex;
2727     if (r instanceof AltResult)
2728     ex = ((AltResult)r).ex;
2729     else
2730     ex = null;
2731     if (ex == null && (s instanceof AltResult))
2732     ex = ((AltResult)s).ex;
2733     dst.internalComplete(null, ex);
2734     }
2735     fst.helpPostComplete();
2736     snd.helpPostComplete();
2737     return dst;
2738     }
2739    
2740     /**
2741 dl 1.76 * Returns a new CompletableFuture that is completed when any of
2742 jsr166 1.79 * the given CompletableFutures complete, with the same result.
2743     * Otherwise, if it completed exceptionally, the returned
2744 dl 1.77 * CompletableFuture also does so, with a CompletionException
2745     * holding this exception as its cause. If no CompletableFutures
2746     * are provided, returns an incomplete CompletableFuture.
2747 dl 1.35 *
2748     * @param cfs the CompletableFutures
2749 dl 1.77 * @return a new CompletableFuture that is completed with the
2750     * result or exception of any of the given CompletableFutures when
2751     * one completes
2752 dl 1.35 * @throws NullPointerException if the array or any of its elements are
2753     * {@code null}
2754     */
2755 dl 1.77 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2756 dl 1.35 int len = cfs.length; // Same idea as allOf
2757     if (len > 1)
2758     return anyTree(cfs, 0, len - 1);
2759     else {
2760 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2761 dl 1.35 CompletableFuture<?> f;
2762     if (len == 0)
2763 dl 1.48 ; // skip
2764 dl 1.35 else if ((f = cfs[0]) == null)
2765     throw new NullPointerException();
2766     else {
2767 dl 1.77 ThenCopy<Object> d = null;
2768 dl 1.35 CompletionNode p = null;
2769     Object r;
2770     while ((r = f.result) == null) {
2771     if (d == null)
2772 dl 1.77 d = new ThenCopy<Object>(f, dst);
2773 dl 1.35 else if (p == null)
2774     p = new CompletionNode(d);
2775     else if (UNSAFE.compareAndSwapObject
2776     (f, COMPLETIONS, p.next = f.completions, p))
2777     break;
2778     }
2779     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2780     Throwable ex; Object t;
2781 dl 1.77 if (r instanceof AltResult) {
2782 dl 1.35 ex = ((AltResult)r).ex;
2783 dl 1.77 t = null;
2784     }
2785     else {
2786     ex = null;
2787     t = r;
2788     }
2789     dst.internalComplete(t, ex);
2790 dl 1.35 }
2791     f.helpPostComplete();
2792     }
2793     return dst;
2794     }
2795     }
2796    
2797     /**
2798 jsr166 1.44 * Recursively constructs an Or'ed tree of CompletableFutures.
2799 dl 1.35 */
2800 dl 1.77 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2801 jsr166 1.79 int lo, int hi) {
2802 dl 1.35 CompletableFuture<?> fst, snd;
2803     int mid = (lo + hi) >>> 1;
2804     if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2805     (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2806     throw new NullPointerException();
2807 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2808 dl 1.35 OrCompletion d = null;
2809     CompletionNode p = null, q = null;
2810     Object r;
2811     while ((r = fst.result) == null && (r = snd.result) == null) {
2812     if (d == null)
2813     d = new OrCompletion(fst, snd, dst);
2814     else if (p == null)
2815     p = new CompletionNode(d);
2816     else if (q == null) {
2817     if (UNSAFE.compareAndSwapObject
2818     (fst, COMPLETIONS, p.next = fst.completions, p))
2819     q = new CompletionNode(d);
2820     }
2821     else if (UNSAFE.compareAndSwapObject
2822     (snd, COMPLETIONS, q.next = snd.completions, q))
2823     break;
2824     }
2825     if ((r != null || (r = fst.result) != null ||
2826     (r = snd.result) != null) &&
2827     (d == null || d.compareAndSet(0, 1))) {
2828 dl 1.77 Throwable ex; Object t;
2829 dl 1.35 if (r instanceof AltResult) {
2830     ex = ((AltResult)r).ex;
2831 dl 1.77 t = null;
2832 dl 1.35 }
2833 dl 1.77 else {
2834 dl 1.35 ex = null;
2835 dl 1.77 t = r;
2836     }
2837     dst.internalComplete(t, ex);
2838 dl 1.35 }
2839     fst.helpPostComplete();
2840     snd.helpPostComplete();
2841     return dst;
2842     }
2843    
2844     /* ------------- Control and status methods -------------- */
2845    
2846 dl 1.28 /**
2847 dl 1.37 * If not already completed, completes this CompletableFuture with
2848     * a {@link CancellationException}. Dependent CompletableFutures
2849     * that have not already completed will also complete
2850     * exceptionally, with a {@link CompletionException} caused by
2851     * this {@code CancellationException}.
2852 dl 1.28 *
2853     * @param mayInterruptIfRunning this value has no effect in this
2854     * implementation because interrupts are not used to control
2855     * processing.
2856     *
2857     * @return {@code true} if this task is now cancelled
2858     */
2859     public boolean cancel(boolean mayInterruptIfRunning) {
2860 dl 1.46 boolean cancelled = (result == null) &&
2861     UNSAFE.compareAndSwapObject
2862     (this, RESULT, null, new AltResult(new CancellationException()));
2863     postComplete();
2864 dl 1.48 return cancelled || isCancelled();
2865 dl 1.28 }
2866    
2867     /**
2868     * Returns {@code true} if this CompletableFuture was cancelled
2869     * before it completed normally.
2870     *
2871     * @return {@code true} if this CompletableFuture was cancelled
2872     * before it completed normally
2873     */
2874     public boolean isCancelled() {
2875     Object r;
2876 jsr166 1.43 return ((r = result) instanceof AltResult) &&
2877     (((AltResult)r).ex instanceof CancellationException);
2878 dl 1.28 }
2879    
2880     /**
2881 dl 1.88 * Returns {@code true} if this CompletableFuture completed
2882 dl 1.91 * exceptionally, in any way. Possible causes include
2883     * cancellation, explicit invocation of {@code
2884     * completeExceptionally}, and abrupt termination of a
2885     * CompletionStage action.
2886 dl 1.88 *
2887     * @return {@code true} if this CompletableFuture completed
2888     * exceptionally
2889     */
2890     public boolean isCompletedExceptionally() {
2891 dl 1.91 Object r;
2892     return ((r = result) instanceof AltResult) && r != NIL;
2893 dl 1.88 }
2894    
2895     /**
2896 dl 1.28 * Forcibly sets or resets the value subsequently returned by
2897 jsr166 1.42 * method {@link #get()} and related methods, whether or not
2898     * already completed. This method is designed for use only in
2899     * error recovery actions, and even in such situations may result
2900     * in ongoing dependent completions using established versus
2901 dl 1.30 * overwritten outcomes.
2902 dl 1.28 *
2903     * @param value the completion value
2904     */
2905     public void obtrudeValue(T value) {
2906     result = (value == null) ? NIL : value;
2907     postComplete();
2908     }
2909    
2910 dl 1.30 /**
2911 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
2912     * and related methods to throw the given exception, whether or
2913     * not already completed. This method is designed for use only in
2914 dl 1.30 * recovery actions, and even in such situations may result in
2915     * ongoing dependent completions using established versus
2916     * overwritten outcomes.
2917     *
2918     * @param ex the exception
2919     */
2920     public void obtrudeException(Throwable ex) {
2921     if (ex == null) throw new NullPointerException();
2922     result = new AltResult(ex);
2923     postComplete();
2924     }
2925    
2926 dl 1.35 /**
2927     * Returns the estimated number of CompletableFutures whose
2928     * completions are awaiting completion of this CompletableFuture.
2929     * This method is designed for use in monitoring system state, not
2930     * for synchronization control.
2931     *
2932     * @return the number of dependent CompletableFutures
2933     */
2934     public int getNumberOfDependents() {
2935     int count = 0;
2936     for (CompletionNode p = completions; p != null; p = p.next)
2937     ++count;
2938     return count;
2939     }
2940    
2941     /**
2942     * Returns a string identifying this CompletableFuture, as well as
2943 jsr166 1.40 * its completion state. The state, in brackets, contains the
2944 dl 1.35 * String {@code "Completed Normally"} or the String {@code
2945     * "Completed Exceptionally"}, or the String {@code "Not
2946     * completed"} followed by the number of CompletableFutures
2947     * dependent upon its completion, if any.
2948     *
2949     * @return a string identifying this CompletableFuture, as well as its state
2950     */
2951     public String toString() {
2952     Object r = result;
2953 jsr166 1.40 int count;
2954     return super.toString() +
2955     ((r == null) ?
2956     (((count = getNumberOfDependents()) == 0) ?
2957     "[Not completed]" :
2958     "[Not completed, " + count + " dependents]") :
2959     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2960     "[Completed exceptionally]" :
2961     "[Completed normally]"));
2962 dl 1.35 }
2963    
2964 dl 1.1 // Unsafe mechanics
2965     private static final sun.misc.Unsafe UNSAFE;
2966     private static final long RESULT;
2967     private static final long WAITERS;
2968     private static final long COMPLETIONS;
2969     static {
2970     try {
2971     UNSAFE = sun.misc.Unsafe.getUnsafe();
2972     Class<?> k = CompletableFuture.class;
2973     RESULT = UNSAFE.objectFieldOffset
2974     (k.getDeclaredField("result"));
2975     WAITERS = UNSAFE.objectFieldOffset
2976     (k.getDeclaredField("waiters"));
2977     COMPLETIONS = UNSAFE.objectFieldOffset
2978     (k.getDeclaredField("completions"));
2979     } catch (Exception e) {
2980     throw new Error(e);
2981     }
2982     }
2983     }