ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.99
Committed: Sun Apr 6 20:58:46 2014 UTC (10 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.98: +117 -57 lines
Log Message:
avoid StackOverflowError for long linear chains

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.88 import java.util.concurrent.CompletionException;
23     import java.util.concurrent.CompletionStage;
24 dl 1.1 import java.util.concurrent.atomic.AtomicInteger;
25     import java.util.concurrent.locks.LockSupport;
26    
27     /**
28     * A {@link Future} that may be explicitly completed (setting its
29 dl 1.88 * value and status), and may be used as a {@link CompletionStage},
30     * supporting dependent functions and actions that trigger upon its
31     * completion.
32 dl 1.1 *
33 jsr166 1.50 * <p>When two or more threads attempt to
34     * {@link #complete complete},
35 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
36 jsr166 1.50 * {@link #cancel cancel}
37     * a CompletableFuture, only one of them succeeds.
38 dl 1.19 *
39 dl 1.91 * <p>In addition to these and related methods for directly
40     * manipulating status and results, CompletableFuture implements
41     * interface {@link CompletionStage} with the following policies: <ul>
42 dl 1.35 *
43 dl 1.88 * <li>Actions supplied for dependent completions of
44     * <em>non-async</em> methods may be performed by the thread that
45     * completes the current CompletableFuture, or by any other caller of
46 dl 1.96 * a completion method.</li>
47 jsr166 1.65 *
48 dl 1.88 * <li>All <em>async</em> methods without an explicit Executor
49 dl 1.96 * argument are performed using the {@link ForkJoinPool#commonPool()}
50     * (unless it does not support a parallelism level of at least two, in
51     * which case, a new Thread is used). To simplify monitoring,
52     * debugging, and tracking, all generated asynchronous tasks are
53     * instances of the marker interface {@link
54     * AsynchronousCompletionTask}. </li>
55 dl 1.35 *
56 dl 1.88 * <li>All CompletionStage methods are implemented independently of
57     * other public methods, so the behavior of one method is not impacted
58     * by overrides of others in subclasses. </li> </ul>
59     *
60 dl 1.91 * <p>CompletableFuture also implements {@link Future} with the following
61 dl 1.88 * policies: <ul>
62     *
63     * <li>Since (unlike {@link FutureTask}) this class has no direct
64 jsr166 1.55 * control over the computation that causes it to be completed,
65 dl 1.88 * cancellation is treated as just another form of exceptional
66     * completion. Method {@link #cancel cancel} has the same effect as
67     * {@code completeExceptionally(new CancellationException())}. Method
68     * {@link #isCompletedExceptionally} can be used to determine if a
69     * CompletableFuture completed in any exceptional fashion.</li>
70 jsr166 1.55 *
71 dl 1.88 * <li>In case of exceptional completion with a CompletionException,
72 jsr166 1.55 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
73     * {@link ExecutionException} with the same cause as held in the
74 dl 1.88 * corresponding CompletionException. To simplify usage in most
75     * contexts, this class also defines methods {@link #join()} and
76     * {@link #getNow} that instead throw the CompletionException directly
77     * in these cases.</li> </ul>
78 jsr166 1.80 *
79 dl 1.1 * @author Doug Lea
80     * @since 1.8
81     */
82 dl 1.88 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
83 dl 1.28
84 dl 1.1 /*
85 dl 1.20 * Overview:
86 dl 1.1 *
87 jsr166 1.32 * 1. Non-nullness of field result (set via CAS) indicates done.
88     * An AltResult is used to box null as a result, as well as to
89     * hold exceptions. Using a single field makes completion fast
90 dl 1.20 * and simple to detect and trigger, at the expense of a lot of
91     * encoding and decoding that infiltrates many methods. One minor
92     * simplification relies on the (static) NIL (to box null results)
93     * being the only AltResult with a null exception field, so we
94 dl 1.28 * don't usually need explicit comparisons with NIL. The CF
95     * exception propagation mechanics surrounding decoding rely on
96     * unchecked casts of decoded results really being unchecked,
97     * where user type errors are caught at point of use, as is
98     * currently the case in Java. These are highlighted by using
99     * SuppressWarnings-annotated temporaries.
100 dl 1.1 *
101     * 2. Waiters are held in a Treiber stack similar to the one used
102 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
103 dl 1.28 * internal documentation for algorithmic details.
104 dl 1.1 *
105     * 3. Completions are also kept in a list/stack, and pulled off
106 dl 1.28 * and run when completion is triggered. (We could even use the
107 jsr166 1.24 * same stack as for waiters, but would give up the potential
108 dl 1.20 * parallelism obtained because woken waiters help release/run
109 dl 1.28 * others -- see method postComplete). Because post-processing
110     * may race with direct calls, class Completion opportunistically
111     * extends AtomicInteger so callers can claim the action via
112 dl 1.99 * compareAndSet(0, 1). The Completion.trigger methods are all
113 dl 1.28 * written a boringly similar uniform way (that sometimes includes
114 jsr166 1.55 * unnecessary-looking checks, kept to maintain uniformity).
115     * There are enough dimensions upon which they differ that
116     * attempts to factor commonalities while maintaining efficiency
117     * require more lines of code than they would save.
118 dl 1.20 *
119     * 4. The exported then/and/or methods do support a bit of
120 dl 1.28 * factoring (see doThenApply etc). They must cope with the
121     * intrinsic races surrounding addition of a dependent action
122     * versus performing the action directly because the task is
123     * already complete. For example, a CF may not be complete upon
124     * entry, so a dependent completion is added, but by the time it
125     * is added, the target CF is complete, so must be directly
126 dl 1.20 * executed. This is all done while avoiding unnecessary object
127     * construction in safe-bypass cases.
128 dl 1.1 */
129    
130 dl 1.28 // preliminaries
131 dl 1.20
132 dl 1.1 static final class AltResult {
133     final Throwable ex; // null only for NIL
134 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
135 dl 1.1 }
136    
137     static final AltResult NIL = new AltResult(null);
138    
139 dl 1.20 // Fields
140    
141     volatile Object result; // Either the result or boxed AltResult
142 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
143     volatile CompletionNode completions; // list (Treiber stack) of completions
144    
145 dl 1.20 // Basic utilities for triggering and processing completions
146    
147     /**
148 dl 1.99 * Triggers completion with the encoding of the given arguments:
149     * if the exception is non-null, encodes it as a wrapped
150     * CompletionException unless it is one already. Otherwise uses
151     * the given result, boxed as NIL if null.
152     */
153     final void setInternalResult(T v, Throwable ex) {
154     if (result == null)
155     UNSAFE.compareAndSwapObject
156     (this, RESULT, null,
157     (ex == null) ? (v == null) ? NIL : v :
158     new AltResult((ex instanceof CompletionException) ? ex :
159     new CompletionException(ex)));
160     }
161    
162     /**
163     * Removes and signals all waiting threads
164 dl 1.20 */
165 dl 1.99 final void removeAndSignalWaiters() {
166 dl 1.20 WaitNode q; Thread t;
167     while ((q = waiters) != null) {
168     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
169     (t = q.thread) != null) {
170     q.thread = null;
171     LockSupport.unpark(t);
172     }
173     }
174 dl 1.99 }
175 dl 1.20
176 dl 1.99 /**
177     * Triggers all enabled completions reachable from b. Loopifies
178     * the final recursive call for each stage to avoid potential
179     * StackOverflowErrors in cases of long linear chains.
180     *
181     * @param b if non-null, a completed CompletableFuture
182     */
183     static final void removeAndTriggerCompletions(CompletableFuture<?> b) {
184     CompletionNode h; Completion c; CompletableFuture<?> f;
185     while (b != null && (h = b.completions) != null) {
186     if (UNSAFE.compareAndSwapObject(b, COMPLETIONS, h, h.next) &&
187     (c = h.completion) != null &&
188     (f = c.trigger()) != null &&
189     f.result != null) {
190     f.removeAndSignalWaiters();
191     if (f.completions != null) {
192     if (b.completions == null)
193     b = f; // tail-recurse
194     else
195     removeAndTriggerCompletions(f);
196     }
197     }
198 dl 1.20 }
199     }
200    
201     /**
202 dl 1.99 * Sets result, signals waiters, and triggers dependents
203 dl 1.20 */
204 dl 1.75 final void internalComplete(T v, Throwable ex) {
205 dl 1.99 setInternalResult(v, ex);
206     removeAndSignalWaiters();
207     removeAndTriggerCompletions(this);
208     }
209    
210    
211     /**
212     * Signals waiters and triggers dependents. Call only if known to
213     * be completed.
214     */
215     final void postComplete() {
216     removeAndSignalWaiters();
217     removeAndTriggerCompletions(this);
218 dl 1.20 }
219    
220     /**
221 dl 1.99 * If completed, helps signal waiters and trigger dependents
222 dl 1.20 */
223     final void helpPostComplete() {
224 dl 1.99 if (result != null) {
225     removeAndSignalWaiters();
226     removeAndTriggerCompletions(this);
227     }
228 dl 1.20 }
229    
230 dl 1.28 /* ------------- waiting for completions -------------- */
231 dl 1.20
232 dl 1.35 /** Number of processors, for spin control */
233     static final int NCPU = Runtime.getRuntime().availableProcessors();
234    
235 dl 1.1 /**
236 dl 1.28 * Heuristic spin value for waitingGet() before blocking on
237     * multiprocessors
238 dl 1.1 */
239 dl 1.35 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
240 dl 1.1
241     /**
242 dl 1.28 * Linked nodes to record waiting threads in a Treiber stack. See
243     * other classes such as Phaser and SynchronousQueue for more
244     * detailed explanation. This class implements ManagedBlocker to
245     * avoid starvation when blocking actions pile up in
246     * ForkJoinPools.
247     */
248     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
249     long nanos; // wait time if timed
250     final long deadline; // non-zero if timed
251     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
252     volatile Thread thread;
253     volatile WaitNode next;
254     WaitNode(boolean interruptible, long nanos, long deadline) {
255     this.thread = Thread.currentThread();
256     this.interruptControl = interruptible ? 1 : 0;
257     this.nanos = nanos;
258     this.deadline = deadline;
259     }
260     public boolean isReleasable() {
261     if (thread == null)
262     return true;
263     if (Thread.interrupted()) {
264     int i = interruptControl;
265     interruptControl = -1;
266     if (i > 0)
267     return true;
268     }
269     if (deadline != 0L &&
270     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
271     thread = null;
272     return true;
273     }
274     return false;
275     }
276     public boolean block() {
277     if (isReleasable())
278     return true;
279     else if (deadline == 0L)
280     LockSupport.park(this);
281     else if (nanos > 0L)
282     LockSupport.parkNanos(this, nanos);
283     return isReleasable();
284     }
285 dl 1.1 }
286    
287     /**
288 dl 1.28 * Returns raw result after waiting, or null if interruptible and
289     * interrupted.
290 dl 1.1 */
291 dl 1.28 private Object waitingGet(boolean interruptible) {
292     WaitNode q = null;
293     boolean queued = false;
294 dl 1.35 int spins = SPINS;
295 dl 1.28 for (Object r;;) {
296     if ((r = result) != null) {
297     if (q != null) { // suppress unpark
298     q.thread = null;
299     if (q.interruptControl < 0) {
300     if (interruptible) {
301     removeWaiter(q);
302     return null;
303     }
304     Thread.currentThread().interrupt();
305     }
306     }
307     postComplete(); // help release others
308     return r;
309     }
310     else if (spins > 0) {
311 dl 1.35 int rnd = ThreadLocalRandom.nextSecondarySeed();
312     if (rnd == 0)
313     rnd = ThreadLocalRandom.current().nextInt();
314     if (rnd >= 0)
315 dl 1.28 --spins;
316     }
317     else if (q == null)
318     q = new WaitNode(interruptible, 0L, 0L);
319     else if (!queued)
320     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
321     q.next = waiters, q);
322     else if (interruptible && q.interruptControl < 0) {
323     removeWaiter(q);
324     return null;
325     }
326     else if (q.thread != null && result == null) {
327     try {
328     ForkJoinPool.managedBlock(q);
329 jsr166 1.31 } catch (InterruptedException ex) {
330 dl 1.28 q.interruptControl = -1;
331     }
332     }
333     }
334 dl 1.1 }
335    
336     /**
337 dl 1.28 * Awaits completion or aborts on interrupt or timeout.
338 dl 1.1 *
339 dl 1.28 * @param nanos time to wait
340     * @return raw result
341 dl 1.1 */
342 dl 1.28 private Object timedAwaitDone(long nanos)
343     throws InterruptedException, TimeoutException {
344     WaitNode q = null;
345     boolean queued = false;
346     for (Object r;;) {
347     if ((r = result) != null) {
348     if (q != null) {
349     q.thread = null;
350     if (q.interruptControl < 0) {
351     removeWaiter(q);
352     throw new InterruptedException();
353     }
354     }
355     postComplete();
356     return r;
357     }
358     else if (q == null) {
359     if (nanos <= 0L)
360     throw new TimeoutException();
361     long d = System.nanoTime() + nanos;
362 jsr166 1.31 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
363 dl 1.28 }
364     else if (!queued)
365     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
366     q.next = waiters, q);
367     else if (q.interruptControl < 0) {
368     removeWaiter(q);
369     throw new InterruptedException();
370     }
371     else if (q.nanos <= 0L) {
372     if (result == null) {
373     removeWaiter(q);
374     throw new TimeoutException();
375     }
376     }
377     else if (q.thread != null && result == null) {
378     try {
379     ForkJoinPool.managedBlock(q);
380 jsr166 1.31 } catch (InterruptedException ex) {
381 dl 1.28 q.interruptControl = -1;
382     }
383     }
384     }
385 dl 1.1 }
386    
387     /**
388 dl 1.28 * Tries to unlink a timed-out or interrupted wait node to avoid
389     * accumulating garbage. Internal nodes are simply unspliced
390     * without CAS since it is harmless if they are traversed anyway
391     * by releasers. To avoid effects of unsplicing from already
392     * removed nodes, the list is retraversed in case of an apparent
393     * race. This is slow when there are a lot of nodes, but we don't
394     * expect lists to be long enough to outweigh higher-overhead
395     * schemes.
396 dl 1.1 */
397 dl 1.28 private void removeWaiter(WaitNode node) {
398     if (node != null) {
399     node.thread = null;
400     retry:
401     for (;;) { // restart on removeWaiter race
402     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
403     s = q.next;
404     if (q.thread != null)
405     pred = q;
406     else if (pred != null) {
407     pred.next = s;
408     if (pred.thread == null) // check for race
409     continue retry;
410     }
411     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
412     continue retry;
413     }
414     break;
415     }
416     }
417 dl 1.1 }
418    
419 dl 1.28 /* ------------- Async tasks -------------- */
420    
421 dl 1.1 /**
422 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
423 dl 1.28 * {@code async} methods. This may be useful for monitoring,
424     * debugging, and tracking asynchronous activities.
425 jsr166 1.57 *
426     * @since 1.8
427 dl 1.1 */
428 dl 1.28 public static interface AsynchronousCompletionTask {
429 dl 1.1 }
430    
431 dl 1.28 /** Base class can act as either FJ or plain Runnable */
432 dl 1.97 @SuppressWarnings("serial")
433 jsr166 1.33 abstract static class Async extends ForkJoinTask<Void>
434 dl 1.28 implements Runnable, AsynchronousCompletionTask {
435     public final Void getRawResult() { return null; }
436     public final void setRawResult(Void v) { }
437     public final void run() { exec(); }
438 jsr166 1.26 }
439    
440 dl 1.96 /**
441     * Starts the given async task using the given executor, unless
442     * the executor is ForkJoinPool.commonPool and it has been
443     * disabled, in which case starts a new thread.
444     */
445     static void execAsync(Executor e, Async r) {
446     if (e == ForkJoinPool.commonPool() &&
447     ForkJoinPool.getCommonPoolParallelism() <= 1)
448     new Thread(r).start();
449     else
450     e.execute(r);
451     }
452    
453 dl 1.28 static final class AsyncRun extends Async {
454     final Runnable fn;
455     final CompletableFuture<Void> dst;
456     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
457     this.fn = fn; this.dst = dst;
458     }
459     public final boolean exec() {
460     CompletableFuture<Void> d; Throwable ex;
461 dl 1.29 if ((d = this.dst) != null && d.result == null) {
462 dl 1.28 try {
463     fn.run();
464     ex = null;
465     } catch (Throwable rex) {
466     ex = rex;
467     }
468     d.internalComplete(null, ex);
469 dl 1.19 }
470 dl 1.28 return true;
471 dl 1.19 }
472 dl 1.28 private static final long serialVersionUID = 5232453952276885070L;
473 dl 1.19 }
474    
475 dl 1.28 static final class AsyncSupply<U> extends Async {
476     final Supplier<U> fn;
477     final CompletableFuture<U> dst;
478     AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
479     this.fn = fn; this.dst = dst;
480     }
481     public final boolean exec() {
482     CompletableFuture<U> d; U u; Throwable ex;
483 dl 1.29 if ((d = this.dst) != null && d.result == null) {
484 dl 1.28 try {
485     u = fn.get();
486     ex = null;
487     } catch (Throwable rex) {
488     ex = rex;
489     u = null;
490     }
491     d.internalComplete(u, ex);
492 dl 1.1 }
493     return true;
494     }
495     private static final long serialVersionUID = 5232453952276885070L;
496     }
497    
498 dl 1.28 static final class AsyncApply<T,U> extends Async {
499 jsr166 1.63 final T arg;
500 dl 1.28 final Function<? super T,? extends U> fn;
501 dl 1.1 final CompletableFuture<U> dst;
502 dl 1.28 AsyncApply(T arg, Function<? super T,? extends U> fn,
503 dl 1.35 CompletableFuture<U> dst) {
504 dl 1.1 this.arg = arg; this.fn = fn; this.dst = dst;
505     }
506     public final boolean exec() {
507 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
508 dl 1.29 if ((d = this.dst) != null && d.result == null) {
509 dl 1.21 try {
510     u = fn.apply(arg);
511     ex = null;
512     } catch (Throwable rex) {
513     ex = rex;
514     u = null;
515     }
516     d.internalComplete(u, ex);
517 dl 1.1 }
518     return true;
519     }
520     private static final long serialVersionUID = 5232453952276885070L;
521     }
522    
523 jsr166 1.81 static final class AsyncCombine<T,U,V> extends Async {
524 dl 1.1 final T arg1;
525     final U arg2;
526 jsr166 1.63 final BiFunction<? super T,? super U,? extends V> fn;
527 dl 1.1 final CompletableFuture<V> dst;
528 jsr166 1.81 AsyncCombine(T arg1, U arg2,
529 dl 1.35 BiFunction<? super T,? super U,? extends V> fn,
530     CompletableFuture<V> dst) {
531 dl 1.1 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
532     }
533     public final boolean exec() {
534 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
535 dl 1.29 if ((d = this.dst) != null && d.result == null) {
536 dl 1.21 try {
537     v = fn.apply(arg1, arg2);
538     ex = null;
539     } catch (Throwable rex) {
540     ex = rex;
541     v = null;
542     }
543     d.internalComplete(v, ex);
544 dl 1.1 }
545     return true;
546     }
547     private static final long serialVersionUID = 5232453952276885070L;
548     }
549    
550 dl 1.28 static final class AsyncAccept<T> extends Async {
551 jsr166 1.63 final T arg;
552 dl 1.34 final Consumer<? super T> fn;
553 dl 1.88 final CompletableFuture<?> dst;
554 dl 1.34 AsyncAccept(T arg, Consumer<? super T> fn,
555 dl 1.88 CompletableFuture<?> dst) {
556 dl 1.7 this.arg = arg; this.fn = fn; this.dst = dst;
557     }
558     public final boolean exec() {
559 dl 1.88 CompletableFuture<?> d; Throwable ex;
560 dl 1.29 if ((d = this.dst) != null && d.result == null) {
561 dl 1.21 try {
562     fn.accept(arg);
563     ex = null;
564     } catch (Throwable rex) {
565     ex = rex;
566     }
567     d.internalComplete(null, ex);
568 dl 1.7 }
569     return true;
570     }
571     private static final long serialVersionUID = 5232453952276885070L;
572     }
573    
574 jsr166 1.81 static final class AsyncAcceptBoth<T,U> extends Async {
575 dl 1.7 final T arg1;
576     final U arg2;
577 jsr166 1.63 final BiConsumer<? super T,? super U> fn;
578 dl 1.88 final CompletableFuture<?> dst;
579 jsr166 1.81 AsyncAcceptBoth(T arg1, U arg2,
580     BiConsumer<? super T,? super U> fn,
581 dl 1.88 CompletableFuture<?> dst) {
582 dl 1.7 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
583     }
584     public final boolean exec() {
585 dl 1.88 CompletableFuture<?> d; Throwable ex;
586 dl 1.29 if ((d = this.dst) != null && d.result == null) {
587 dl 1.21 try {
588     fn.accept(arg1, arg2);
589     ex = null;
590     } catch (Throwable rex) {
591     ex = rex;
592     }
593     d.internalComplete(null, ex);
594 dl 1.7 }
595     return true;
596     }
597     private static final long serialVersionUID = 5232453952276885070L;
598     }
599    
600 dl 1.37 static final class AsyncCompose<T,U> extends Async {
601 jsr166 1.63 final T arg;
602 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
603 dl 1.37 final CompletableFuture<U> dst;
604     AsyncCompose(T arg,
605 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
606 dl 1.37 CompletableFuture<U> dst) {
607     this.arg = arg; this.fn = fn; this.dst = dst;
608     }
609     public final boolean exec() {
610     CompletableFuture<U> d, fr; U u; Throwable ex;
611     if ((d = this.dst) != null && d.result == null) {
612     try {
613 dl 1.88 CompletionStage<U> cs = fn.apply(arg);
614     fr = (cs == null) ? null : cs.toCompletableFuture();
615 jsr166 1.61 ex = (fr == null) ? new NullPointerException() : null;
616 dl 1.37 } catch (Throwable rex) {
617     ex = rex;
618     fr = null;
619     }
620     if (ex != null)
621     u = null;
622     else {
623     Object r = fr.result;
624 dl 1.67 if (r == null)
625     r = fr.waitingGet(false);
626 dl 1.37 if (r instanceof AltResult) {
627     ex = ((AltResult)r).ex;
628     u = null;
629     }
630     else {
631     @SuppressWarnings("unchecked") U ur = (U) r;
632     u = ur;
633     }
634     }
635     d.internalComplete(u, ex);
636     }
637     return true;
638     }
639     private static final long serialVersionUID = 5232453952276885070L;
640     }
641    
642 dl 1.91 static final class AsyncWhenComplete<T> extends Async {
643     final T arg1;
644     final Throwable arg2;
645     final BiConsumer<? super T,? super Throwable> fn;
646     final CompletableFuture<T> dst;
647     AsyncWhenComplete(T arg1, Throwable arg2,
648     BiConsumer<? super T,? super Throwable> fn,
649     CompletableFuture<T> dst) {
650     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
651     }
652     public final boolean exec() {
653 jsr166 1.92 CompletableFuture<T> d;
654 dl 1.91 if ((d = this.dst) != null && d.result == null) {
655     Throwable ex = arg2;
656     try {
657     fn.accept(arg1, ex);
658     } catch (Throwable rex) {
659     if (ex == null)
660     ex = rex;
661     }
662     d.internalComplete(arg1, ex);
663     }
664     return true;
665     }
666     private static final long serialVersionUID = 5232453952276885070L;
667     }
668    
669 dl 1.1 /* ------------- Completions -------------- */
670    
671 dl 1.28 /**
672     * Simple linked list nodes to record completions, used in
673     * basically the same way as WaitNodes. (We separate nodes from
674     * the Completions themselves mainly because for the And and Or
675     * methods, the same Completion object resides in two lists.)
676     */
677     static final class CompletionNode {
678     final Completion completion;
679     volatile CompletionNode next;
680     CompletionNode(Completion completion) { this.completion = completion; }
681     }
682    
683 dl 1.1 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
684 dl 1.97 @SuppressWarnings("serial")
685 dl 1.99 abstract static class Completion extends AtomicInteger {
686     /**
687     * Complete a dependent Completablefuture if enabled
688     * @return the dependent Completablefuture
689     */
690     public abstract CompletableFuture<?> trigger();
691 dl 1.1 }
692    
693 jsr166 1.81 static final class ThenApply<T,U> extends Completion {
694 jsr166 1.2 final CompletableFuture<? extends T> src;
695     final Function<? super T,? extends U> fn;
696     final CompletableFuture<U> dst;
697 dl 1.1 final Executor executor;
698 jsr166 1.81 ThenApply(CompletableFuture<? extends T> src,
699     Function<? super T,? extends U> fn,
700     CompletableFuture<U> dst,
701     Executor executor) {
702 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
703     this.executor = executor;
704     }
705 dl 1.99 public final CompletableFuture<?> trigger() {
706 dl 1.28 final CompletableFuture<? extends T> a;
707     final Function<? super T,? extends U> fn;
708     final CompletableFuture<U> dst;
709 jsr166 1.2 Object r; T t; Throwable ex;
710     if ((dst = this.dst) != null &&
711 dl 1.1 (fn = this.fn) != null &&
712     (a = this.src) != null &&
713     (r = a.result) != null &&
714     compareAndSet(0, 1)) {
715 jsr166 1.2 if (r instanceof AltResult) {
716 dl 1.19 ex = ((AltResult)r).ex;
717 dl 1.1 t = null;
718     }
719 dl 1.17 else {
720     ex = null;
721 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
722     t = tr;
723 dl 1.1 }
724 dl 1.20 Executor e = executor;
725     U u = null;
726 dl 1.17 if (ex == null) {
727     try {
728 dl 1.20 if (e != null)
729 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
730 dl 1.17 else
731 dl 1.20 u = fn.apply(t);
732 dl 1.17 } catch (Throwable rex) {
733     ex = rex;
734     }
735     }
736 dl 1.20 if (e == null || ex != null)
737 dl 1.99 dst.setInternalResult(u, ex);
738 dl 1.1 }
739 dl 1.99 return dst;
740 dl 1.1 }
741 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
742 dl 1.1 }
743    
744 jsr166 1.81 static final class ThenAccept<T> extends Completion {
745 dl 1.7 final CompletableFuture<? extends T> src;
746 dl 1.34 final Consumer<? super T> fn;
747 dl 1.88 final CompletableFuture<?> dst;
748 dl 1.7 final Executor executor;
749 jsr166 1.81 ThenAccept(CompletableFuture<? extends T> src,
750     Consumer<? super T> fn,
751 dl 1.88 CompletableFuture<?> dst,
752 jsr166 1.81 Executor executor) {
753 dl 1.7 this.src = src; this.fn = fn; this.dst = dst;
754     this.executor = executor;
755     }
756 dl 1.99 public final CompletableFuture<?> trigger() {
757 dl 1.28 final CompletableFuture<? extends T> a;
758 dl 1.34 final Consumer<? super T> fn;
759 dl 1.88 final CompletableFuture<?> dst;
760 dl 1.7 Object r; T t; Throwable ex;
761     if ((dst = this.dst) != null &&
762     (fn = this.fn) != null &&
763     (a = this.src) != null &&
764     (r = a.result) != null &&
765     compareAndSet(0, 1)) {
766     if (r instanceof AltResult) {
767 dl 1.19 ex = ((AltResult)r).ex;
768 dl 1.7 t = null;
769     }
770 dl 1.17 else {
771     ex = null;
772 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
773     t = tr;
774 dl 1.17 }
775 dl 1.20 Executor e = executor;
776 dl 1.17 if (ex == null) {
777     try {
778 dl 1.20 if (e != null)
779 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
780 dl 1.20 else
781 dl 1.17 fn.accept(t);
782     } catch (Throwable rex) {
783     ex = rex;
784 dl 1.7 }
785     }
786 dl 1.20 if (e == null || ex != null)
787 dl 1.99 dst.setInternalResult(null, ex);
788 dl 1.7 }
789 dl 1.99 return dst;
790 dl 1.7 }
791 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
792 dl 1.7 }
793    
794 jsr166 1.82 static final class ThenRun extends Completion {
795     final CompletableFuture<?> src;
796 jsr166 1.2 final Runnable fn;
797     final CompletableFuture<Void> dst;
798 dl 1.1 final Executor executor;
799 jsr166 1.82 ThenRun(CompletableFuture<?> src,
800 jsr166 1.81 Runnable fn,
801     CompletableFuture<Void> dst,
802     Executor executor) {
803 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
804     this.executor = executor;
805     }
806 dl 1.99 public final CompletableFuture<?> trigger() {
807 jsr166 1.82 final CompletableFuture<?> a;
808 dl 1.28 final Runnable fn;
809     final CompletableFuture<Void> dst;
810 jsr166 1.2 Object r; Throwable ex;
811     if ((dst = this.dst) != null &&
812 dl 1.1 (fn = this.fn) != null &&
813     (a = this.src) != null &&
814     (r = a.result) != null &&
815     compareAndSet(0, 1)) {
816 dl 1.19 if (r instanceof AltResult)
817     ex = ((AltResult)r).ex;
818 dl 1.17 else
819     ex = null;
820 dl 1.20 Executor e = executor;
821 dl 1.17 if (ex == null) {
822     try {
823 dl 1.20 if (e != null)
824 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
825 dl 1.20 else
826 dl 1.17 fn.run();
827     } catch (Throwable rex) {
828     ex = rex;
829 dl 1.1 }
830     }
831 dl 1.20 if (e == null || ex != null)
832 dl 1.99 dst.setInternalResult(null, ex);
833 dl 1.1 }
834 dl 1.99 return dst;
835 dl 1.1 }
836 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
837 dl 1.1 }
838    
839 jsr166 1.81 static final class ThenCombine<T,U,V> extends Completion {
840 jsr166 1.2 final CompletableFuture<? extends T> src;
841     final CompletableFuture<? extends U> snd;
842     final BiFunction<? super T,? super U,? extends V> fn;
843     final CompletableFuture<V> dst;
844 dl 1.1 final Executor executor;
845 jsr166 1.81 ThenCombine(CompletableFuture<? extends T> src,
846     CompletableFuture<? extends U> snd,
847     BiFunction<? super T,? super U,? extends V> fn,
848     CompletableFuture<V> dst,
849     Executor executor) {
850 dl 1.1 this.src = src; this.snd = snd;
851     this.fn = fn; this.dst = dst;
852     this.executor = executor;
853     }
854 dl 1.99 public final CompletableFuture<?> trigger() {
855 dl 1.28 final CompletableFuture<? extends T> a;
856     final CompletableFuture<? extends U> b;
857     final BiFunction<? super T,? super U,? extends V> fn;
858     final CompletableFuture<V> dst;
859 dl 1.85 Object r, s; T t; U u; Throwable ex;
860     if ((dst = this.dst) != null &&
861     (fn = this.fn) != null &&
862     (a = this.src) != null &&
863     (r = a.result) != null &&
864     (b = this.snd) != null &&
865     (s = b.result) != null &&
866     compareAndSet(0, 1)) {
867     if (r instanceof AltResult) {
868     ex = ((AltResult)r).ex;
869     t = null;
870     }
871     else {
872     ex = null;
873     @SuppressWarnings("unchecked") T tr = (T) r;
874     t = tr;
875     }
876     if (ex != null)
877     u = null;
878     else if (s instanceof AltResult) {
879     ex = ((AltResult)s).ex;
880     u = null;
881     }
882     else {
883     @SuppressWarnings("unchecked") U us = (U) s;
884     u = us;
885     }
886 dl 1.20 Executor e = executor;
887     V v = null;
888 dl 1.19 if (ex == null) {
889     try {
890 dl 1.20 if (e != null)
891 dl 1.96 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
892 dl 1.19 else
893 dl 1.20 v = fn.apply(t, u);
894 dl 1.19 } catch (Throwable rex) {
895     ex = rex;
896     }
897 dl 1.1 }
898 dl 1.20 if (e == null || ex != null)
899 dl 1.99 dst.setInternalResult(v, ex);
900 jsr166 1.2 }
901 dl 1.99 return dst;
902 jsr166 1.2 }
903 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
904 dl 1.1 }
905    
906 jsr166 1.81 static final class ThenAcceptBoth<T,U> extends Completion {
907 dl 1.7 final CompletableFuture<? extends T> src;
908     final CompletableFuture<? extends U> snd;
909 dl 1.34 final BiConsumer<? super T,? super U> fn;
910 dl 1.7 final CompletableFuture<Void> dst;
911     final Executor executor;
912 jsr166 1.81 ThenAcceptBoth(CompletableFuture<? extends T> src,
913     CompletableFuture<? extends U> snd,
914     BiConsumer<? super T,? super U> fn,
915     CompletableFuture<Void> dst,
916     Executor executor) {
917 dl 1.7 this.src = src; this.snd = snd;
918     this.fn = fn; this.dst = dst;
919     this.executor = executor;
920     }
921 dl 1.99 public final CompletableFuture<?> trigger() {
922 dl 1.28 final CompletableFuture<? extends T> a;
923     final CompletableFuture<? extends U> b;
924 dl 1.34 final BiConsumer<? super T,? super U> fn;
925 dl 1.28 final CompletableFuture<Void> dst;
926 dl 1.85 Object r, s; T t; U u; Throwable ex;
927     if ((dst = this.dst) != null &&
928     (fn = this.fn) != null &&
929     (a = this.src) != null &&
930     (r = a.result) != null &&
931     (b = this.snd) != null &&
932     (s = b.result) != null &&
933     compareAndSet(0, 1)) {
934     if (r instanceof AltResult) {
935     ex = ((AltResult)r).ex;
936     t = null;
937     }
938     else {
939     ex = null;
940     @SuppressWarnings("unchecked") T tr = (T) r;
941     t = tr;
942     }
943     if (ex != null)
944     u = null;
945     else if (s instanceof AltResult) {
946     ex = ((AltResult)s).ex;
947     u = null;
948     }
949     else {
950     @SuppressWarnings("unchecked") U us = (U) s;
951     u = us;
952     }
953 dl 1.20 Executor e = executor;
954 dl 1.19 if (ex == null) {
955     try {
956 dl 1.20 if (e != null)
957 dl 1.96 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
958 dl 1.20 else
959 dl 1.19 fn.accept(t, u);
960     } catch (Throwable rex) {
961     ex = rex;
962 dl 1.7 }
963     }
964 dl 1.20 if (e == null || ex != null)
965 dl 1.99 dst.setInternalResult(null, ex);
966 dl 1.7 }
967 dl 1.99 return dst;
968 dl 1.7 }
969 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
970 dl 1.7 }
971    
972 jsr166 1.82 static final class RunAfterBoth extends Completion {
973     final CompletableFuture<?> src;
974 jsr166 1.2 final CompletableFuture<?> snd;
975     final Runnable fn;
976     final CompletableFuture<Void> dst;
977 dl 1.1 final Executor executor;
978 jsr166 1.82 RunAfterBoth(CompletableFuture<?> src,
979 jsr166 1.81 CompletableFuture<?> snd,
980     Runnable fn,
981     CompletableFuture<Void> dst,
982     Executor executor) {
983 dl 1.1 this.src = src; this.snd = snd;
984     this.fn = fn; this.dst = dst;
985     this.executor = executor;
986     }
987 dl 1.99 public final CompletableFuture<?> trigger() {
988 jsr166 1.82 final CompletableFuture<?> a;
989 dl 1.1 final CompletableFuture<?> b;
990     final Runnable fn;
991     final CompletableFuture<Void> dst;
992 dl 1.85 Object r, s; Throwable ex;
993     if ((dst = this.dst) != null &&
994     (fn = this.fn) != null &&
995     (a = this.src) != null &&
996     (r = a.result) != null &&
997     (b = this.snd) != null &&
998     (s = b.result) != null &&
999     compareAndSet(0, 1)) {
1000     if (r instanceof AltResult)
1001     ex = ((AltResult)r).ex;
1002     else
1003     ex = null;
1004     if (ex == null && (s instanceof AltResult))
1005     ex = ((AltResult)s).ex;
1006 dl 1.20 Executor e = executor;
1007 dl 1.19 if (ex == null) {
1008     try {
1009 dl 1.20 if (e != null)
1010 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
1011 dl 1.20 else
1012 dl 1.19 fn.run();
1013     } catch (Throwable rex) {
1014     ex = rex;
1015 dl 1.1 }
1016 jsr166 1.2 }
1017 dl 1.20 if (e == null || ex != null)
1018 dl 1.99 dst.setInternalResult(null, ex);
1019 jsr166 1.2 }
1020 dl 1.99 return dst;
1021 jsr166 1.2 }
1022 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1023 dl 1.1 }
1024    
1025 dl 1.35 static final class AndCompletion extends Completion {
1026     final CompletableFuture<?> src;
1027     final CompletableFuture<?> snd;
1028     final CompletableFuture<Void> dst;
1029     AndCompletion(CompletableFuture<?> src,
1030     CompletableFuture<?> snd,
1031     CompletableFuture<Void> dst) {
1032     this.src = src; this.snd = snd; this.dst = dst;
1033     }
1034 dl 1.99 public final CompletableFuture<?> trigger() {
1035 dl 1.35 final CompletableFuture<?> a;
1036     final CompletableFuture<?> b;
1037     final CompletableFuture<Void> dst;
1038     Object r, s; Throwable ex;
1039     if ((dst = this.dst) != null &&
1040     (a = this.src) != null &&
1041     (r = a.result) != null &&
1042     (b = this.snd) != null &&
1043     (s = b.result) != null &&
1044     compareAndSet(0, 1)) {
1045     if (r instanceof AltResult)
1046     ex = ((AltResult)r).ex;
1047     else
1048     ex = null;
1049     if (ex == null && (s instanceof AltResult))
1050     ex = ((AltResult)s).ex;
1051 dl 1.99 dst.setInternalResult(null, ex);
1052 dl 1.35 }
1053 dl 1.99 return dst;
1054 dl 1.35 }
1055     private static final long serialVersionUID = 5232453952276885070L;
1056     }
1057    
1058 jsr166 1.81 static final class ApplyToEither<T,U> extends Completion {
1059 jsr166 1.2 final CompletableFuture<? extends T> src;
1060     final CompletableFuture<? extends T> snd;
1061     final Function<? super T,? extends U> fn;
1062     final CompletableFuture<U> dst;
1063 dl 1.1 final Executor executor;
1064 jsr166 1.81 ApplyToEither(CompletableFuture<? extends T> src,
1065     CompletableFuture<? extends T> snd,
1066     Function<? super T,? extends U> fn,
1067     CompletableFuture<U> dst,
1068     Executor executor) {
1069 dl 1.1 this.src = src; this.snd = snd;
1070     this.fn = fn; this.dst = dst;
1071     this.executor = executor;
1072     }
1073 dl 1.99 public final CompletableFuture<?> trigger() {
1074 dl 1.28 final CompletableFuture<? extends T> a;
1075     final CompletableFuture<? extends T> b;
1076     final Function<? super T,? extends U> fn;
1077     final CompletableFuture<U> dst;
1078 jsr166 1.2 Object r; T t; Throwable ex;
1079     if ((dst = this.dst) != null &&
1080 dl 1.1 (fn = this.fn) != null &&
1081     (((a = this.src) != null && (r = a.result) != null) ||
1082     ((b = this.snd) != null && (r = b.result) != null)) &&
1083     compareAndSet(0, 1)) {
1084 jsr166 1.2 if (r instanceof AltResult) {
1085 dl 1.19 ex = ((AltResult)r).ex;
1086 jsr166 1.2 t = null;
1087     }
1088 dl 1.19 else {
1089     ex = null;
1090 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1091     t = tr;
1092 dl 1.1 }
1093 dl 1.20 Executor e = executor;
1094     U u = null;
1095 dl 1.19 if (ex == null) {
1096     try {
1097 dl 1.20 if (e != null)
1098 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1099 dl 1.19 else
1100 dl 1.20 u = fn.apply(t);
1101 dl 1.19 } catch (Throwable rex) {
1102     ex = rex;
1103     }
1104     }
1105 dl 1.20 if (e == null || ex != null)
1106 dl 1.99 dst.setInternalResult(u, ex);
1107 jsr166 1.2 }
1108 dl 1.99 return dst;
1109 jsr166 1.2 }
1110 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1111 dl 1.1 }
1112    
1113 jsr166 1.81 static final class AcceptEither<T> extends Completion {
1114 dl 1.7 final CompletableFuture<? extends T> src;
1115     final CompletableFuture<? extends T> snd;
1116 dl 1.34 final Consumer<? super T> fn;
1117 dl 1.7 final CompletableFuture<Void> dst;
1118     final Executor executor;
1119 jsr166 1.81 AcceptEither(CompletableFuture<? extends T> src,
1120     CompletableFuture<? extends T> snd,
1121     Consumer<? super T> fn,
1122     CompletableFuture<Void> dst,
1123     Executor executor) {
1124 dl 1.7 this.src = src; this.snd = snd;
1125     this.fn = fn; this.dst = dst;
1126     this.executor = executor;
1127     }
1128 dl 1.99 public final CompletableFuture<?> trigger() {
1129 dl 1.28 final CompletableFuture<? extends T> a;
1130     final CompletableFuture<? extends T> b;
1131 dl 1.34 final Consumer<? super T> fn;
1132 dl 1.28 final CompletableFuture<Void> dst;
1133 dl 1.7 Object r; T t; Throwable ex;
1134     if ((dst = this.dst) != null &&
1135     (fn = this.fn) != null &&
1136     (((a = this.src) != null && (r = a.result) != null) ||
1137     ((b = this.snd) != null && (r = b.result) != null)) &&
1138     compareAndSet(0, 1)) {
1139     if (r instanceof AltResult) {
1140 dl 1.19 ex = ((AltResult)r).ex;
1141 dl 1.7 t = null;
1142     }
1143 dl 1.19 else {
1144     ex = null;
1145 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1146     t = tr;
1147 dl 1.19 }
1148 dl 1.20 Executor e = executor;
1149 dl 1.19 if (ex == null) {
1150     try {
1151 dl 1.20 if (e != null)
1152 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1153 dl 1.20 else
1154 dl 1.19 fn.accept(t);
1155     } catch (Throwable rex) {
1156     ex = rex;
1157 dl 1.7 }
1158     }
1159 dl 1.20 if (e == null || ex != null)
1160 dl 1.99 dst.setInternalResult(null, ex);
1161 dl 1.7 }
1162 dl 1.99 return dst;
1163 dl 1.7 }
1164 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1165 dl 1.7 }
1166    
1167 jsr166 1.82 static final class RunAfterEither extends Completion {
1168     final CompletableFuture<?> src;
1169 jsr166 1.2 final CompletableFuture<?> snd;
1170     final Runnable fn;
1171     final CompletableFuture<Void> dst;
1172 dl 1.1 final Executor executor;
1173 jsr166 1.82 RunAfterEither(CompletableFuture<?> src,
1174 jsr166 1.81 CompletableFuture<?> snd,
1175     Runnable fn,
1176     CompletableFuture<Void> dst,
1177     Executor executor) {
1178 dl 1.1 this.src = src; this.snd = snd;
1179     this.fn = fn; this.dst = dst;
1180     this.executor = executor;
1181     }
1182 dl 1.99 public final CompletableFuture<?> trigger() {
1183 jsr166 1.82 final CompletableFuture<?> a;
1184 dl 1.1 final CompletableFuture<?> b;
1185     final Runnable fn;
1186     final CompletableFuture<Void> dst;
1187 dl 1.28 Object r; Throwable ex;
1188 jsr166 1.2 if ((dst = this.dst) != null &&
1189 dl 1.1 (fn = this.fn) != null &&
1190     (((a = this.src) != null && (r = a.result) != null) ||
1191     ((b = this.snd) != null && (r = b.result) != null)) &&
1192     compareAndSet(0, 1)) {
1193 dl 1.19 if (r instanceof AltResult)
1194     ex = ((AltResult)r).ex;
1195     else
1196     ex = null;
1197 dl 1.20 Executor e = executor;
1198 dl 1.19 if (ex == null) {
1199 dl 1.1 try {
1200 dl 1.20 if (e != null)
1201 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
1202 dl 1.20 else
1203 dl 1.1 fn.run();
1204     } catch (Throwable rex) {
1205 dl 1.19 ex = rex;
1206 dl 1.1 }
1207     }
1208 dl 1.20 if (e == null || ex != null)
1209 dl 1.99 dst.setInternalResult(null, ex);
1210 jsr166 1.2 }
1211 dl 1.99 return dst;
1212 jsr166 1.2 }
1213 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1214 dl 1.1 }
1215    
1216 dl 1.35 static final class OrCompletion extends Completion {
1217     final CompletableFuture<?> src;
1218     final CompletableFuture<?> snd;
1219 dl 1.77 final CompletableFuture<Object> dst;
1220 dl 1.35 OrCompletion(CompletableFuture<?> src,
1221     CompletableFuture<?> snd,
1222 dl 1.77 CompletableFuture<Object> dst) {
1223 dl 1.35 this.src = src; this.snd = snd; this.dst = dst;
1224     }
1225 dl 1.99 public final CompletableFuture<?> trigger() {
1226 dl 1.35 final CompletableFuture<?> a;
1227     final CompletableFuture<?> b;
1228 dl 1.77 final CompletableFuture<Object> dst;
1229     Object r, t; Throwable ex;
1230 dl 1.35 if ((dst = this.dst) != null &&
1231     (((a = this.src) != null && (r = a.result) != null) ||
1232     ((b = this.snd) != null && (r = b.result) != null)) &&
1233     compareAndSet(0, 1)) {
1234 dl 1.77 if (r instanceof AltResult) {
1235 dl 1.35 ex = ((AltResult)r).ex;
1236 dl 1.77 t = null;
1237     }
1238     else {
1239 dl 1.35 ex = null;
1240 dl 1.77 t = r;
1241     }
1242 dl 1.99 dst.setInternalResult(t, ex);
1243 dl 1.35 }
1244 dl 1.99 return dst;
1245 dl 1.35 }
1246     private static final long serialVersionUID = 5232453952276885070L;
1247     }
1248    
1249 dl 1.28 static final class ExceptionCompletion<T> extends Completion {
1250 jsr166 1.2 final CompletableFuture<? extends T> src;
1251 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1252     final CompletableFuture<T> dst;
1253 dl 1.28 ExceptionCompletion(CompletableFuture<? extends T> src,
1254     Function<? super Throwable, ? extends T> fn,
1255     CompletableFuture<T> dst) {
1256 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1257     }
1258 dl 1.99 public final CompletableFuture<?> trigger() {
1259 dl 1.28 final CompletableFuture<? extends T> a;
1260     final Function<? super Throwable, ? extends T> fn;
1261     final CompletableFuture<T> dst;
1262 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1263 jsr166 1.2 if ((dst = this.dst) != null &&
1264 dl 1.1 (fn = this.fn) != null &&
1265     (a = this.src) != null &&
1266     (r = a.result) != null &&
1267     compareAndSet(0, 1)) {
1268 dl 1.20 if ((r instanceof AltResult) &&
1269 jsr166 1.87 (ex = ((AltResult)r).ex) != null) {
1270 dl 1.20 try {
1271     t = fn.apply(ex);
1272     } catch (Throwable rex) {
1273     dx = rex;
1274 dl 1.1 }
1275     }
1276 dl 1.28 else {
1277     @SuppressWarnings("unchecked") T tr = (T) r;
1278     t = tr;
1279     }
1280 dl 1.99 dst.setInternalResult(t, dx);
1281 dl 1.1 }
1282 dl 1.99 return dst;
1283 dl 1.1 }
1284 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1285 dl 1.1 }
1286    
1287 dl 1.88 static final class WhenCompleteCompletion<T> extends Completion {
1288     final CompletableFuture<? extends T> src;
1289     final BiConsumer<? super T, ? super Throwable> fn;
1290     final CompletableFuture<T> dst;
1291     final Executor executor;
1292     WhenCompleteCompletion(CompletableFuture<? extends T> src,
1293     BiConsumer<? super T, ? super Throwable> fn,
1294     CompletableFuture<T> dst,
1295     Executor executor) {
1296     this.src = src; this.fn = fn; this.dst = dst;
1297     this.executor = executor;
1298     }
1299 dl 1.99 public final CompletableFuture<?> trigger() {
1300 dl 1.88 final CompletableFuture<? extends T> a;
1301     final BiConsumer<? super T, ? super Throwable> fn;
1302     final CompletableFuture<T> dst;
1303     Object r; T t; Throwable ex;
1304     if ((dst = this.dst) != null &&
1305     (fn = this.fn) != null &&
1306     (a = this.src) != null &&
1307     (r = a.result) != null &&
1308     compareAndSet(0, 1)) {
1309     if (r instanceof AltResult) {
1310     ex = ((AltResult)r).ex;
1311     t = null;
1312     }
1313     else {
1314     ex = null;
1315     @SuppressWarnings("unchecked") T tr = (T) r;
1316     t = tr;
1317     }
1318     Executor e = executor;
1319     Throwable dx = null;
1320     try {
1321     if (e != null)
1322 dl 1.96 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
1323 dl 1.88 else
1324     fn.accept(t, ex);
1325     } catch (Throwable rex) {
1326     dx = rex;
1327     }
1328     if (e == null || dx != null)
1329 dl 1.99 dst.setInternalResult(t, ex != null ? ex : dx);
1330 dl 1.88 }
1331 dl 1.99 return dst;
1332 dl 1.88 }
1333     private static final long serialVersionUID = 5232453952276885070L;
1334     }
1335    
1336 dl 1.75 static final class ThenCopy<T> extends Completion {
1337 dl 1.77 final CompletableFuture<?> src;
1338 dl 1.75 final CompletableFuture<T> dst;
1339 dl 1.77 ThenCopy(CompletableFuture<?> src,
1340 dl 1.75 CompletableFuture<T> dst) {
1341 dl 1.17 this.src = src; this.dst = dst;
1342     }
1343 dl 1.99 public final CompletableFuture<?> trigger() {
1344 dl 1.77 final CompletableFuture<?> a;
1345 dl 1.75 final CompletableFuture<T> dst;
1346     Object r; T t; Throwable ex;
1347 dl 1.17 if ((dst = this.dst) != null &&
1348     (a = this.src) != null &&
1349     (r = a.result) != null &&
1350     compareAndSet(0, 1)) {
1351 dl 1.20 if (r instanceof AltResult) {
1352     ex = ((AltResult)r).ex;
1353     t = null;
1354     }
1355     else {
1356     ex = null;
1357 dl 1.75 @SuppressWarnings("unchecked") T tr = (T) r;
1358     t = tr;
1359 dl 1.20 }
1360 dl 1.99 dst.setInternalResult(t, ex);
1361 dl 1.17 }
1362 dl 1.99 return dst;
1363 dl 1.17 }
1364 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1365 dl 1.17 }
1366    
1367 dl 1.75 // version of ThenCopy for CompletableFuture<Void> dst
1368     static final class ThenPropagate extends Completion {
1369     final CompletableFuture<?> src;
1370     final CompletableFuture<Void> dst;
1371     ThenPropagate(CompletableFuture<?> src,
1372     CompletableFuture<Void> dst) {
1373     this.src = src; this.dst = dst;
1374     }
1375 dl 1.99 public final CompletableFuture<?> trigger() {
1376 dl 1.75 final CompletableFuture<?> a;
1377     final CompletableFuture<Void> dst;
1378     Object r; Throwable ex;
1379     if ((dst = this.dst) != null &&
1380     (a = this.src) != null &&
1381     (r = a.result) != null &&
1382     compareAndSet(0, 1)) {
1383     if (r instanceof AltResult)
1384     ex = ((AltResult)r).ex;
1385     else
1386     ex = null;
1387 dl 1.99 dst.setInternalResult(null, ex);
1388 dl 1.75 }
1389 dl 1.99 return dst;
1390 dl 1.75 }
1391     private static final long serialVersionUID = 5232453952276885070L;
1392     }
1393    
1394 dl 1.28 static final class HandleCompletion<T,U> extends Completion {
1395 dl 1.17 final CompletableFuture<? extends T> src;
1396     final BiFunction<? super T, Throwable, ? extends U> fn;
1397     final CompletableFuture<U> dst;
1398 dl 1.88 final Executor executor;
1399 dl 1.28 HandleCompletion(CompletableFuture<? extends T> src,
1400     BiFunction<? super T, Throwable, ? extends U> fn,
1401 dl 1.88 CompletableFuture<U> dst,
1402     Executor executor) {
1403 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1404 dl 1.88 this.executor = executor;
1405 dl 1.17 }
1406 dl 1.99 public final CompletableFuture<?> trigger() {
1407 dl 1.28 final CompletableFuture<? extends T> a;
1408     final BiFunction<? super T, Throwable, ? extends U> fn;
1409     final CompletableFuture<U> dst;
1410 dl 1.17 Object r; T t; Throwable ex;
1411     if ((dst = this.dst) != null &&
1412     (fn = this.fn) != null &&
1413     (a = this.src) != null &&
1414     (r = a.result) != null &&
1415     compareAndSet(0, 1)) {
1416     if (r instanceof AltResult) {
1417     ex = ((AltResult)r).ex;
1418     t = null;
1419     }
1420     else {
1421     ex = null;
1422 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1423     t = tr;
1424 dl 1.17 }
1425 dl 1.88 Executor e = executor;
1426     U u = null;
1427     Throwable dx = null;
1428 dl 1.17 try {
1429 dl 1.88 if (e != null)
1430 dl 1.96 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1431 dl 1.88 else
1432     u = fn.apply(t, ex);
1433 dl 1.17 } catch (Throwable rex) {
1434 dl 1.20 dx = rex;
1435 dl 1.17 }
1436 dl 1.88 if (e == null || dx != null)
1437 dl 1.99 dst.setInternalResult(u, dx);
1438 dl 1.17 }
1439 dl 1.99 return dst;
1440 dl 1.17 }
1441 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1442 dl 1.17 }
1443    
1444 jsr166 1.81 static final class ThenCompose<T,U> extends Completion {
1445 dl 1.17 final CompletableFuture<? extends T> src;
1446 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1447 dl 1.17 final CompletableFuture<U> dst;
1448 dl 1.37 final Executor executor;
1449 jsr166 1.81 ThenCompose(CompletableFuture<? extends T> src,
1450 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
1451 jsr166 1.81 CompletableFuture<U> dst,
1452     Executor executor) {
1453 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1454 dl 1.37 this.executor = executor;
1455 dl 1.17 }
1456 dl 1.99 public final CompletableFuture<?> trigger() {
1457 dl 1.28 final CompletableFuture<? extends T> a;
1458 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1459 dl 1.28 final CompletableFuture<U> dst;
1460 dl 1.37 Object r; T t; Throwable ex; Executor e;
1461 dl 1.17 if ((dst = this.dst) != null &&
1462     (fn = this.fn) != null &&
1463     (a = this.src) != null &&
1464     (r = a.result) != null &&
1465     compareAndSet(0, 1)) {
1466     if (r instanceof AltResult) {
1467     ex = ((AltResult)r).ex;
1468     t = null;
1469     }
1470     else {
1471     ex = null;
1472 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1473     t = tr;
1474 dl 1.17 }
1475     CompletableFuture<U> c = null;
1476 dl 1.20 U u = null;
1477     boolean complete = false;
1478 dl 1.17 if (ex == null) {
1479 dl 1.37 if ((e = executor) != null)
1480 dl 1.96 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1481 dl 1.37 else {
1482     try {
1483 dl 1.88 CompletionStage<U> cs = fn.apply(t);
1484     c = (cs == null) ? null : cs.toCompletableFuture();
1485     if (c == null)
1486 dl 1.37 ex = new NullPointerException();
1487     } catch (Throwable rex) {
1488     ex = rex;
1489     }
1490 dl 1.17 }
1491     }
1492 dl 1.37 if (c != null) {
1493 dl 1.75 ThenCopy<U> d = null;
1494 dl 1.18 Object s;
1495     if ((s = c.result) == null) {
1496     CompletionNode p = new CompletionNode
1497 dl 1.75 (d = new ThenCopy<U>(c, dst));
1498 dl 1.18 while ((s = c.result) == null) {
1499     if (UNSAFE.compareAndSwapObject
1500     (c, COMPLETIONS, p.next = c.completions, p))
1501     break;
1502     }
1503 dl 1.17 }
1504 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1505 dl 1.20 complete = true;
1506 dl 1.18 if (s instanceof AltResult) {
1507     ex = ((AltResult)s).ex; // no rewrap
1508     u = null;
1509 dl 1.28 }
1510     else {
1511     @SuppressWarnings("unchecked") U us = (U) s;
1512     u = us;
1513     }
1514     }
1515     }
1516     if (complete || ex != null)
1517 dl 1.99 dst.setInternalResult(u, ex);
1518 dl 1.28 if (c != null)
1519     c.helpPostComplete();
1520     }
1521 dl 1.99 return dst;
1522 dl 1.28 }
1523     private static final long serialVersionUID = 5232453952276885070L;
1524     }
1525    
1526 dl 1.88 // Implementations of stage methods with (plain, async, Executor) forms
1527 dl 1.28
1528 dl 1.88 private <U> CompletableFuture<U> doThenApply
1529     (Function<? super T,? extends U> fn,
1530     Executor e) {
1531     if (fn == null) throw new NullPointerException();
1532     CompletableFuture<U> dst = new CompletableFuture<U>();
1533     ThenApply<T,U> d = null;
1534     Object r;
1535     if ((r = result) == null) {
1536     CompletionNode p = new CompletionNode
1537     (d = new ThenApply<T,U>(this, fn, dst, e));
1538     while ((r = result) == null) {
1539     if (UNSAFE.compareAndSwapObject
1540     (this, COMPLETIONS, p.next = completions, p))
1541     break;
1542     }
1543     }
1544     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1545     T t; Throwable ex;
1546     if (r instanceof AltResult) {
1547     ex = ((AltResult)r).ex;
1548     t = null;
1549     }
1550     else {
1551     ex = null;
1552     @SuppressWarnings("unchecked") T tr = (T) r;
1553     t = tr;
1554     }
1555     U u = null;
1556     if (ex == null) {
1557     try {
1558     if (e != null)
1559 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1560 dl 1.88 else
1561     u = fn.apply(t);
1562     } catch (Throwable rex) {
1563     ex = rex;
1564     }
1565     }
1566     if (e == null || ex != null)
1567     dst.internalComplete(u, ex);
1568     }
1569     helpPostComplete();
1570     return dst;
1571 dl 1.28 }
1572    
1573 dl 1.88 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1574     Executor e) {
1575     if (fn == null) throw new NullPointerException();
1576     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1577     ThenAccept<T> d = null;
1578     Object r;
1579     if ((r = result) == null) {
1580     CompletionNode p = new CompletionNode
1581     (d = new ThenAccept<T>(this, fn, dst, e));
1582     while ((r = result) == null) {
1583     if (UNSAFE.compareAndSwapObject
1584     (this, COMPLETIONS, p.next = completions, p))
1585     break;
1586     }
1587     }
1588     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1589     T t; Throwable ex;
1590     if (r instanceof AltResult) {
1591     ex = ((AltResult)r).ex;
1592     t = null;
1593     }
1594     else {
1595     ex = null;
1596     @SuppressWarnings("unchecked") T tr = (T) r;
1597     t = tr;
1598     }
1599     if (ex == null) {
1600     try {
1601     if (e != null)
1602 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1603 dl 1.88 else
1604     fn.accept(t);
1605     } catch (Throwable rex) {
1606     ex = rex;
1607     }
1608     }
1609     if (e == null || ex != null)
1610     dst.internalComplete(null, ex);
1611     }
1612     helpPostComplete();
1613     return dst;
1614 dl 1.28 }
1615    
1616 dl 1.88 private CompletableFuture<Void> doThenRun(Runnable action,
1617     Executor e) {
1618     if (action == null) throw new NullPointerException();
1619     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1620     ThenRun d = null;
1621     Object r;
1622     if ((r = result) == null) {
1623     CompletionNode p = new CompletionNode
1624     (d = new ThenRun(this, action, dst, e));
1625     while ((r = result) == null) {
1626     if (UNSAFE.compareAndSwapObject
1627     (this, COMPLETIONS, p.next = completions, p))
1628     break;
1629     }
1630     }
1631     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1632     Throwable ex;
1633     if (r instanceof AltResult)
1634     ex = ((AltResult)r).ex;
1635     else
1636     ex = null;
1637     if (ex == null) {
1638     try {
1639     if (e != null)
1640 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1641 dl 1.88 else
1642     action.run();
1643     } catch (Throwable rex) {
1644     ex = rex;
1645     }
1646     }
1647     if (e == null || ex != null)
1648     dst.internalComplete(null, ex);
1649     }
1650     helpPostComplete();
1651     return dst;
1652     }
1653    
1654     private <U,V> CompletableFuture<V> doThenCombine
1655     (CompletableFuture<? extends U> other,
1656     BiFunction<? super T,? super U,? extends V> fn,
1657     Executor e) {
1658     if (other == null || fn == null) throw new NullPointerException();
1659     CompletableFuture<V> dst = new CompletableFuture<V>();
1660     ThenCombine<T,U,V> d = null;
1661     Object r, s = null;
1662     if ((r = result) == null || (s = other.result) == null) {
1663     d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
1664     CompletionNode q = null, p = new CompletionNode(d);
1665     while ((r == null && (r = result) == null) ||
1666     (s == null && (s = other.result) == null)) {
1667     if (q != null) {
1668     if (s != null ||
1669     UNSAFE.compareAndSwapObject
1670     (other, COMPLETIONS, q.next = other.completions, q))
1671     break;
1672     }
1673     else if (r != null ||
1674     UNSAFE.compareAndSwapObject
1675     (this, COMPLETIONS, p.next = completions, p)) {
1676     if (s != null)
1677     break;
1678     q = new CompletionNode(d);
1679     }
1680     }
1681     }
1682     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1683     T t; U u; Throwable ex;
1684     if (r instanceof AltResult) {
1685     ex = ((AltResult)r).ex;
1686     t = null;
1687     }
1688     else {
1689     ex = null;
1690     @SuppressWarnings("unchecked") T tr = (T) r;
1691     t = tr;
1692     }
1693     if (ex != null)
1694     u = null;
1695     else if (s instanceof AltResult) {
1696     ex = ((AltResult)s).ex;
1697     u = null;
1698     }
1699     else {
1700     @SuppressWarnings("unchecked") U us = (U) s;
1701     u = us;
1702     }
1703     V v = null;
1704     if (ex == null) {
1705     try {
1706     if (e != null)
1707 dl 1.96 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
1708 dl 1.88 else
1709     v = fn.apply(t, u);
1710     } catch (Throwable rex) {
1711     ex = rex;
1712     }
1713     }
1714     if (e == null || ex != null)
1715     dst.internalComplete(v, ex);
1716     }
1717     helpPostComplete();
1718     other.helpPostComplete();
1719     return dst;
1720     }
1721    
1722     private <U> CompletableFuture<Void> doThenAcceptBoth
1723     (CompletableFuture<? extends U> other,
1724     BiConsumer<? super T,? super U> fn,
1725     Executor e) {
1726     if (other == null || fn == null) throw new NullPointerException();
1727     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1728     ThenAcceptBoth<T,U> d = null;
1729     Object r, s = null;
1730     if ((r = result) == null || (s = other.result) == null) {
1731     d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1732     CompletionNode q = null, p = new CompletionNode(d);
1733     while ((r == null && (r = result) == null) ||
1734     (s == null && (s = other.result) == null)) {
1735     if (q != null) {
1736     if (s != null ||
1737     UNSAFE.compareAndSwapObject
1738     (other, COMPLETIONS, q.next = other.completions, q))
1739     break;
1740     }
1741     else if (r != null ||
1742     UNSAFE.compareAndSwapObject
1743     (this, COMPLETIONS, p.next = completions, p)) {
1744     if (s != null)
1745     break;
1746     q = new CompletionNode(d);
1747     }
1748     }
1749     }
1750     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1751     T t; U u; Throwable ex;
1752     if (r instanceof AltResult) {
1753     ex = ((AltResult)r).ex;
1754     t = null;
1755     }
1756     else {
1757     ex = null;
1758     @SuppressWarnings("unchecked") T tr = (T) r;
1759     t = tr;
1760     }
1761     if (ex != null)
1762     u = null;
1763     else if (s instanceof AltResult) {
1764     ex = ((AltResult)s).ex;
1765     u = null;
1766     }
1767     else {
1768     @SuppressWarnings("unchecked") U us = (U) s;
1769     u = us;
1770     }
1771     if (ex == null) {
1772     try {
1773     if (e != null)
1774 dl 1.96 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1775 dl 1.88 else
1776     fn.accept(t, u);
1777     } catch (Throwable rex) {
1778     ex = rex;
1779     }
1780     }
1781     if (e == null || ex != null)
1782     dst.internalComplete(null, ex);
1783     }
1784     helpPostComplete();
1785     other.helpPostComplete();
1786     return dst;
1787     }
1788    
1789     private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1790     Runnable action,
1791     Executor e) {
1792     if (other == null || action == null) throw new NullPointerException();
1793     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1794     RunAfterBoth d = null;
1795     Object r, s = null;
1796     if ((r = result) == null || (s = other.result) == null) {
1797     d = new RunAfterBoth(this, other, action, dst, e);
1798     CompletionNode q = null, p = new CompletionNode(d);
1799     while ((r == null && (r = result) == null) ||
1800     (s == null && (s = other.result) == null)) {
1801     if (q != null) {
1802     if (s != null ||
1803     UNSAFE.compareAndSwapObject
1804     (other, COMPLETIONS, q.next = other.completions, q))
1805     break;
1806     }
1807     else if (r != null ||
1808     UNSAFE.compareAndSwapObject
1809     (this, COMPLETIONS, p.next = completions, p)) {
1810     if (s != null)
1811     break;
1812     q = new CompletionNode(d);
1813     }
1814     }
1815     }
1816     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1817     Throwable ex;
1818     if (r instanceof AltResult)
1819     ex = ((AltResult)r).ex;
1820     else
1821     ex = null;
1822     if (ex == null && (s instanceof AltResult))
1823     ex = ((AltResult)s).ex;
1824     if (ex == null) {
1825     try {
1826     if (e != null)
1827 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1828 dl 1.88 else
1829     action.run();
1830     } catch (Throwable rex) {
1831     ex = rex;
1832     }
1833     }
1834     if (e == null || ex != null)
1835     dst.internalComplete(null, ex);
1836     }
1837     helpPostComplete();
1838     other.helpPostComplete();
1839     return dst;
1840     }
1841    
1842     private <U> CompletableFuture<U> doApplyToEither
1843     (CompletableFuture<? extends T> other,
1844     Function<? super T, U> fn,
1845     Executor e) {
1846     if (other == null || fn == null) throw new NullPointerException();
1847     CompletableFuture<U> dst = new CompletableFuture<U>();
1848     ApplyToEither<T,U> d = null;
1849     Object r;
1850     if ((r = result) == null && (r = other.result) == null) {
1851     d = new ApplyToEither<T,U>(this, other, fn, dst, e);
1852     CompletionNode q = null, p = new CompletionNode(d);
1853     while ((r = result) == null && (r = other.result) == null) {
1854     if (q != null) {
1855     if (UNSAFE.compareAndSwapObject
1856     (other, COMPLETIONS, q.next = other.completions, q))
1857     break;
1858     }
1859     else if (UNSAFE.compareAndSwapObject
1860     (this, COMPLETIONS, p.next = completions, p))
1861     q = new CompletionNode(d);
1862     }
1863     }
1864     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1865     T t; Throwable ex;
1866     if (r instanceof AltResult) {
1867     ex = ((AltResult)r).ex;
1868     t = null;
1869     }
1870     else {
1871     ex = null;
1872     @SuppressWarnings("unchecked") T tr = (T) r;
1873     t = tr;
1874     }
1875     U u = null;
1876     if (ex == null) {
1877     try {
1878     if (e != null)
1879 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1880 dl 1.88 else
1881     u = fn.apply(t);
1882     } catch (Throwable rex) {
1883     ex = rex;
1884     }
1885     }
1886     if (e == null || ex != null)
1887     dst.internalComplete(u, ex);
1888     }
1889     helpPostComplete();
1890     other.helpPostComplete();
1891     return dst;
1892     }
1893    
1894     private CompletableFuture<Void> doAcceptEither
1895     (CompletableFuture<? extends T> other,
1896     Consumer<? super T> fn,
1897     Executor e) {
1898     if (other == null || fn == null) throw new NullPointerException();
1899     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1900     AcceptEither<T> d = null;
1901     Object r;
1902     if ((r = result) == null && (r = other.result) == null) {
1903     d = new AcceptEither<T>(this, other, fn, dst, e);
1904     CompletionNode q = null, p = new CompletionNode(d);
1905     while ((r = result) == null && (r = other.result) == null) {
1906     if (q != null) {
1907     if (UNSAFE.compareAndSwapObject
1908     (other, COMPLETIONS, q.next = other.completions, q))
1909     break;
1910     }
1911     else if (UNSAFE.compareAndSwapObject
1912     (this, COMPLETIONS, p.next = completions, p))
1913     q = new CompletionNode(d);
1914     }
1915     }
1916     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1917     T t; Throwable ex;
1918     if (r instanceof AltResult) {
1919     ex = ((AltResult)r).ex;
1920     t = null;
1921     }
1922     else {
1923     ex = null;
1924     @SuppressWarnings("unchecked") T tr = (T) r;
1925     t = tr;
1926     }
1927     if (ex == null) {
1928     try {
1929     if (e != null)
1930 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1931 dl 1.88 else
1932     fn.accept(t);
1933     } catch (Throwable rex) {
1934     ex = rex;
1935     }
1936     }
1937     if (e == null || ex != null)
1938     dst.internalComplete(null, ex);
1939     }
1940     helpPostComplete();
1941     other.helpPostComplete();
1942     return dst;
1943     }
1944    
1945     private CompletableFuture<Void> doRunAfterEither
1946     (CompletableFuture<?> other,
1947     Runnable action,
1948     Executor e) {
1949     if (other == null || action == null) throw new NullPointerException();
1950     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1951     RunAfterEither d = null;
1952     Object r;
1953     if ((r = result) == null && (r = other.result) == null) {
1954     d = new RunAfterEither(this, other, action, dst, e);
1955     CompletionNode q = null, p = new CompletionNode(d);
1956     while ((r = result) == null && (r = other.result) == null) {
1957     if (q != null) {
1958     if (UNSAFE.compareAndSwapObject
1959     (other, COMPLETIONS, q.next = other.completions, q))
1960     break;
1961     }
1962     else if (UNSAFE.compareAndSwapObject
1963     (this, COMPLETIONS, p.next = completions, p))
1964     q = new CompletionNode(d);
1965     }
1966     }
1967     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1968     Throwable ex;
1969     if (r instanceof AltResult)
1970     ex = ((AltResult)r).ex;
1971     else
1972     ex = null;
1973     if (ex == null) {
1974     try {
1975     if (e != null)
1976 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1977 dl 1.88 else
1978     action.run();
1979     } catch (Throwable rex) {
1980     ex = rex;
1981     }
1982     }
1983     if (e == null || ex != null)
1984     dst.internalComplete(null, ex);
1985     }
1986     helpPostComplete();
1987     other.helpPostComplete();
1988     return dst;
1989     }
1990    
1991     private <U> CompletableFuture<U> doThenCompose
1992     (Function<? super T, ? extends CompletionStage<U>> fn,
1993     Executor e) {
1994     if (fn == null) throw new NullPointerException();
1995     CompletableFuture<U> dst = null;
1996     ThenCompose<T,U> d = null;
1997     Object r;
1998     if ((r = result) == null) {
1999     dst = new CompletableFuture<U>();
2000     CompletionNode p = new CompletionNode
2001     (d = new ThenCompose<T,U>(this, fn, dst, e));
2002     while ((r = result) == null) {
2003     if (UNSAFE.compareAndSwapObject
2004     (this, COMPLETIONS, p.next = completions, p))
2005     break;
2006     }
2007     }
2008     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2009     T t; Throwable ex;
2010     if (r instanceof AltResult) {
2011     ex = ((AltResult)r).ex;
2012     t = null;
2013     }
2014     else {
2015     ex = null;
2016     @SuppressWarnings("unchecked") T tr = (T) r;
2017     t = tr;
2018     }
2019     if (ex == null) {
2020     if (e != null) {
2021     if (dst == null)
2022     dst = new CompletableFuture<U>();
2023 dl 1.96 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
2024 dl 1.88 }
2025     else {
2026     try {
2027     CompletionStage<U> cs = fn.apply(t);
2028     if (cs == null ||
2029     (dst = cs.toCompletableFuture()) == null)
2030     ex = new NullPointerException();
2031     } catch (Throwable rex) {
2032     ex = rex;
2033     }
2034     }
2035     }
2036     if (dst == null)
2037     dst = new CompletableFuture<U>();
2038 dl 1.98 if (ex != null)
2039 dl 1.88 dst.internalComplete(null, ex);
2040     }
2041     helpPostComplete();
2042     dst.helpPostComplete();
2043     return dst;
2044     }
2045    
2046     private CompletableFuture<T> doWhenComplete
2047     (BiConsumer<? super T, ? super Throwable> fn,
2048     Executor e) {
2049     if (fn == null) throw new NullPointerException();
2050     CompletableFuture<T> dst = new CompletableFuture<T>();
2051     WhenCompleteCompletion<T> d = null;
2052     Object r;
2053     if ((r = result) == null) {
2054     CompletionNode p =
2055     new CompletionNode(d = new WhenCompleteCompletion<T>
2056     (this, fn, dst, e));
2057     while ((r = result) == null) {
2058     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2059     p.next = completions, p))
2060     break;
2061     }
2062     }
2063     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2064     T t; Throwable ex;
2065     if (r instanceof AltResult) {
2066     ex = ((AltResult)r).ex;
2067     t = null;
2068     }
2069     else {
2070     ex = null;
2071     @SuppressWarnings("unchecked") T tr = (T) r;
2072     t = tr;
2073     }
2074     Throwable dx = null;
2075     try {
2076     if (e != null)
2077 dl 1.96 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
2078 dl 1.88 else
2079     fn.accept(t, ex);
2080     } catch (Throwable rex) {
2081     dx = rex;
2082     }
2083     if (e == null || dx != null)
2084     dst.internalComplete(t, ex != null ? ex : dx);
2085     }
2086     helpPostComplete();
2087     return dst;
2088     }
2089    
2090     private <U> CompletableFuture<U> doHandle
2091     (BiFunction<? super T, Throwable, ? extends U> fn,
2092     Executor e) {
2093     if (fn == null) throw new NullPointerException();
2094     CompletableFuture<U> dst = new CompletableFuture<U>();
2095     HandleCompletion<T,U> d = null;
2096     Object r;
2097     if ((r = result) == null) {
2098     CompletionNode p =
2099     new CompletionNode(d = new HandleCompletion<T,U>
2100     (this, fn, dst, e));
2101     while ((r = result) == null) {
2102     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2103     p.next = completions, p))
2104     break;
2105     }
2106     }
2107     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2108     T t; Throwable ex;
2109     if (r instanceof AltResult) {
2110     ex = ((AltResult)r).ex;
2111     t = null;
2112     }
2113     else {
2114     ex = null;
2115     @SuppressWarnings("unchecked") T tr = (T) r;
2116     t = tr;
2117     }
2118 jsr166 1.90 U u = null;
2119 dl 1.88 Throwable dx = null;
2120     try {
2121     if (e != null)
2122 dl 1.96 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2123 dl 1.88 else {
2124     u = fn.apply(t, ex);
2125     dx = null;
2126     }
2127     } catch (Throwable rex) {
2128     dx = rex;
2129     u = null;
2130     }
2131     if (e == null || dx != null)
2132     dst.internalComplete(u, dx);
2133     }
2134     helpPostComplete();
2135     return dst;
2136     }
2137    
2138    
2139     // public methods
2140    
2141     /**
2142     * Creates a new incomplete CompletableFuture.
2143     */
2144     public CompletableFuture() {
2145     }
2146    
2147     /**
2148     * Returns a new CompletableFuture that is asynchronously completed
2149     * by a task running in the {@link ForkJoinPool#commonPool()} with
2150     * the value obtained by calling the given Supplier.
2151     *
2152     * @param supplier a function returning the value to be used
2153     * to complete the returned CompletableFuture
2154 jsr166 1.95 * @param <U> the function's return type
2155 dl 1.88 * @return the new CompletableFuture
2156     */
2157     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2158     if (supplier == null) throw new NullPointerException();
2159     CompletableFuture<U> f = new CompletableFuture<U>();
2160 dl 1.96 execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
2161 dl 1.88 return f;
2162     }
2163    
2164     /**
2165     * Returns a new CompletableFuture that is asynchronously completed
2166     * by a task running in the given executor with the value obtained
2167     * by calling the given Supplier.
2168     *
2169     * @param supplier a function returning the value to be used
2170     * to complete the returned CompletableFuture
2171     * @param executor the executor to use for asynchronous execution
2172 jsr166 1.95 * @param <U> the function's return type
2173 dl 1.88 * @return the new CompletableFuture
2174     */
2175     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2176     Executor executor) {
2177     if (executor == null || supplier == null)
2178     throw new NullPointerException();
2179     CompletableFuture<U> f = new CompletableFuture<U>();
2180 dl 1.96 execAsync(executor, new AsyncSupply<U>(supplier, f));
2181 dl 1.88 return f;
2182 dl 1.28 }
2183    
2184     /**
2185 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2186     * by a task running in the {@link ForkJoinPool#commonPool()} after
2187     * it runs the given action.
2188 dl 1.28 *
2189     * @param runnable the action to run before completing the
2190     * returned CompletableFuture
2191 jsr166 1.58 * @return the new CompletableFuture
2192 dl 1.28 */
2193     public static CompletableFuture<Void> runAsync(Runnable runnable) {
2194     if (runnable == null) throw new NullPointerException();
2195     CompletableFuture<Void> f = new CompletableFuture<Void>();
2196 dl 1.96 execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f));
2197 dl 1.28 return f;
2198     }
2199    
2200     /**
2201 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2202     * by a task running in the given executor after it runs the given
2203     * action.
2204 dl 1.28 *
2205     * @param runnable the action to run before completing the
2206     * returned CompletableFuture
2207     * @param executor the executor to use for asynchronous execution
2208 jsr166 1.58 * @return the new CompletableFuture
2209 dl 1.28 */
2210     public static CompletableFuture<Void> runAsync(Runnable runnable,
2211     Executor executor) {
2212     if (executor == null || runnable == null)
2213     throw new NullPointerException();
2214     CompletableFuture<Void> f = new CompletableFuture<Void>();
2215 dl 1.96 execAsync(executor, new AsyncRun(runnable, f));
2216 dl 1.28 return f;
2217     }
2218    
2219     /**
2220 dl 1.77 * Returns a new CompletableFuture that is already completed with
2221     * the given value.
2222     *
2223     * @param value the value
2224 jsr166 1.95 * @param <U> the type of the value
2225 dl 1.77 * @return the completed CompletableFuture
2226     */
2227     public static <U> CompletableFuture<U> completedFuture(U value) {
2228     CompletableFuture<U> f = new CompletableFuture<U>();
2229 jsr166 1.78 f.result = (value == null) ? NIL : value;
2230 dl 1.77 return f;
2231     }
2232    
2233     /**
2234 dl 1.28 * Returns {@code true} if completed in any fashion: normally,
2235     * exceptionally, or via cancellation.
2236     *
2237     * @return {@code true} if completed
2238     */
2239     public boolean isDone() {
2240     return result != null;
2241     }
2242    
2243     /**
2244 dl 1.49 * Waits if necessary for this future to complete, and then
2245 dl 1.48 * returns its result.
2246 dl 1.28 *
2247 dl 1.48 * @return the result value
2248     * @throws CancellationException if this future was cancelled
2249     * @throws ExecutionException if this future completed exceptionally
2250 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2251     * while waiting
2252     */
2253     public T get() throws InterruptedException, ExecutionException {
2254     Object r; Throwable ex, cause;
2255     if ((r = result) == null && (r = waitingGet(true)) == null)
2256     throw new InterruptedException();
2257 jsr166 1.45 if (!(r instanceof AltResult)) {
2258     @SuppressWarnings("unchecked") T tr = (T) r;
2259     return tr;
2260     }
2261     if ((ex = ((AltResult)r).ex) == null)
2262 dl 1.28 return null;
2263 jsr166 1.45 if (ex instanceof CancellationException)
2264     throw (CancellationException)ex;
2265     if ((ex instanceof CompletionException) &&
2266     (cause = ex.getCause()) != null)
2267     ex = cause;
2268     throw new ExecutionException(ex);
2269 dl 1.28 }
2270    
2271     /**
2272 dl 1.49 * Waits if necessary for at most the given time for this future
2273     * to complete, and then returns its result, if available.
2274 dl 1.28 *
2275     * @param timeout the maximum time to wait
2276     * @param unit the time unit of the timeout argument
2277 dl 1.48 * @return the result value
2278     * @throws CancellationException if this future was cancelled
2279     * @throws ExecutionException if this future completed exceptionally
2280 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2281     * while waiting
2282     * @throws TimeoutException if the wait timed out
2283     */
2284     public T get(long timeout, TimeUnit unit)
2285     throws InterruptedException, ExecutionException, TimeoutException {
2286     Object r; Throwable ex, cause;
2287     long nanos = unit.toNanos(timeout);
2288     if (Thread.interrupted())
2289     throw new InterruptedException();
2290     if ((r = result) == null)
2291     r = timedAwaitDone(nanos);
2292 jsr166 1.45 if (!(r instanceof AltResult)) {
2293     @SuppressWarnings("unchecked") T tr = (T) r;
2294     return tr;
2295     }
2296     if ((ex = ((AltResult)r).ex) == null)
2297 dl 1.28 return null;
2298 jsr166 1.45 if (ex instanceof CancellationException)
2299     throw (CancellationException)ex;
2300     if ((ex instanceof CompletionException) &&
2301     (cause = ex.getCause()) != null)
2302     ex = cause;
2303     throw new ExecutionException(ex);
2304 dl 1.28 }
2305    
2306     /**
2307     * Returns the result value when complete, or throws an
2308     * (unchecked) exception if completed exceptionally. To better
2309     * conform with the use of common functional forms, if a
2310     * computation involved in the completion of this
2311     * CompletableFuture threw an exception, this method throws an
2312     * (unchecked) {@link CompletionException} with the underlying
2313     * exception as its cause.
2314     *
2315     * @return the result value
2316     * @throws CancellationException if the computation was cancelled
2317 jsr166 1.55 * @throws CompletionException if this future completed
2318     * exceptionally or a completion computation threw an exception
2319 dl 1.28 */
2320     public T join() {
2321     Object r; Throwable ex;
2322     if ((r = result) == null)
2323     r = waitingGet(false);
2324 jsr166 1.45 if (!(r instanceof AltResult)) {
2325     @SuppressWarnings("unchecked") T tr = (T) r;
2326     return tr;
2327     }
2328     if ((ex = ((AltResult)r).ex) == null)
2329 dl 1.28 return null;
2330 jsr166 1.45 if (ex instanceof CancellationException)
2331     throw (CancellationException)ex;
2332     if (ex instanceof CompletionException)
2333     throw (CompletionException)ex;
2334     throw new CompletionException(ex);
2335 dl 1.28 }
2336    
2337     /**
2338     * Returns the result value (or throws any encountered exception)
2339     * if completed, else returns the given valueIfAbsent.
2340     *
2341     * @param valueIfAbsent the value to return if not completed
2342     * @return the result value, if completed, else the given valueIfAbsent
2343     * @throws CancellationException if the computation was cancelled
2344 jsr166 1.55 * @throws CompletionException if this future completed
2345     * exceptionally or a completion computation threw an exception
2346 dl 1.28 */
2347     public T getNow(T valueIfAbsent) {
2348     Object r; Throwable ex;
2349     if ((r = result) == null)
2350     return valueIfAbsent;
2351 jsr166 1.45 if (!(r instanceof AltResult)) {
2352     @SuppressWarnings("unchecked") T tr = (T) r;
2353     return tr;
2354     }
2355     if ((ex = ((AltResult)r).ex) == null)
2356 dl 1.28 return null;
2357 jsr166 1.45 if (ex instanceof CancellationException)
2358     throw (CancellationException)ex;
2359     if (ex instanceof CompletionException)
2360     throw (CompletionException)ex;
2361     throw new CompletionException(ex);
2362 dl 1.28 }
2363    
2364     /**
2365     * If not already completed, sets the value returned by {@link
2366     * #get()} and related methods to the given value.
2367     *
2368     * @param value the result value
2369     * @return {@code true} if this invocation caused this CompletableFuture
2370     * to transition to a completed state, else {@code false}
2371     */
2372     public boolean complete(T value) {
2373     boolean triggered = result == null &&
2374     UNSAFE.compareAndSwapObject(this, RESULT, null,
2375     value == null ? NIL : value);
2376     postComplete();
2377     return triggered;
2378     }
2379    
2380     /**
2381     * If not already completed, causes invocations of {@link #get()}
2382     * and related methods to throw the given exception.
2383     *
2384     * @param ex the exception
2385     * @return {@code true} if this invocation caused this CompletableFuture
2386     * to transition to a completed state, else {@code false}
2387     */
2388     public boolean completeExceptionally(Throwable ex) {
2389     if (ex == null) throw new NullPointerException();
2390     boolean triggered = result == null &&
2391     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2392     postComplete();
2393     return triggered;
2394     }
2395    
2396 dl 1.88 // CompletionStage methods
2397    
2398     public <U> CompletableFuture<U> thenApply
2399     (Function<? super T,? extends U> fn) {
2400 dl 1.28 return doThenApply(fn, null);
2401     }
2402    
2403 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2404     (Function<? super T,? extends U> fn) {
2405 dl 1.28 return doThenApply(fn, ForkJoinPool.commonPool());
2406 dl 1.17 }
2407    
2408 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2409     (Function<? super T,? extends U> fn,
2410     Executor executor) {
2411 dl 1.28 if (executor == null) throw new NullPointerException();
2412     return doThenApply(fn, executor);
2413     }
2414 dl 1.1
2415 dl 1.88 public CompletableFuture<Void> thenAccept
2416     (Consumer<? super T> action) {
2417     return doThenAccept(action, null);
2418 dl 1.28 }
2419    
2420 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2421     (Consumer<? super T> action) {
2422     return doThenAccept(action, ForkJoinPool.commonPool());
2423 dl 1.28 }
2424    
2425 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2426     (Consumer<? super T> action,
2427     Executor executor) {
2428 dl 1.28 if (executor == null) throw new NullPointerException();
2429 dl 1.88 return doThenAccept(action, executor);
2430 dl 1.7 }
2431    
2432 dl 1.88 public CompletableFuture<Void> thenRun
2433     (Runnable action) {
2434 dl 1.28 return doThenRun(action, null);
2435     }
2436    
2437 dl 1.88 public CompletableFuture<Void> thenRunAsync
2438     (Runnable action) {
2439 dl 1.28 return doThenRun(action, ForkJoinPool.commonPool());
2440     }
2441    
2442 dl 1.88 public CompletableFuture<Void> thenRunAsync
2443     (Runnable action,
2444     Executor executor) {
2445 dl 1.28 if (executor == null) throw new NullPointerException();
2446     return doThenRun(action, executor);
2447     }
2448    
2449 dl 1.48 public <U,V> CompletableFuture<V> thenCombine
2450 dl 1.88 (CompletionStage<? extends U> other,
2451 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2452 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, null);
2453 dl 1.28 }
2454    
2455 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2456 dl 1.88 (CompletionStage<? extends U> other,
2457 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2458 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn,
2459     ForkJoinPool.commonPool());
2460 dl 1.28 }
2461    
2462 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2463 dl 1.88 (CompletionStage<? extends U> other,
2464 dl 1.48 BiFunction<? super T,? super U,? extends V> fn,
2465     Executor executor) {
2466 dl 1.28 if (executor == null) throw new NullPointerException();
2467 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, executor);
2468 dl 1.1 }
2469    
2470 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBoth
2471 dl 1.88 (CompletionStage<? extends U> other,
2472     BiConsumer<? super T, ? super U> action) {
2473     return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2474 dl 1.28 }
2475    
2476 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2477 dl 1.88 (CompletionStage<? extends U> other,
2478     BiConsumer<? super T, ? super U> action) {
2479     return doThenAcceptBoth(other.toCompletableFuture(), action,
2480     ForkJoinPool.commonPool());
2481 dl 1.28 }
2482    
2483 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2484 dl 1.88 (CompletionStage<? extends U> other,
2485     BiConsumer<? super T, ? super U> action,
2486 dl 1.48 Executor executor) {
2487 dl 1.28 if (executor == null) throw new NullPointerException();
2488 dl 1.88 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2489 dl 1.28 }
2490    
2491 dl 1.88 public CompletableFuture<Void> runAfterBoth
2492     (CompletionStage<?> other,
2493     Runnable action) {
2494     return doRunAfterBoth(other.toCompletableFuture(), action, null);
2495 dl 1.7 }
2496    
2497 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2498     (CompletionStage<?> other,
2499     Runnable action) {
2500     return doRunAfterBoth(other.toCompletableFuture(), action,
2501     ForkJoinPool.commonPool());
2502 dl 1.28 }
2503    
2504 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2505     (CompletionStage<?> other,
2506     Runnable action,
2507     Executor executor) {
2508 dl 1.28 if (executor == null) throw new NullPointerException();
2509 dl 1.88 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2510 dl 1.28 }
2511    
2512 dl 1.1
2513 dl 1.48 public <U> CompletableFuture<U> applyToEither
2514 dl 1.88 (CompletionStage<? extends T> other,
2515 dl 1.48 Function<? super T, U> fn) {
2516 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, null);
2517 dl 1.28 }
2518    
2519 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2520 dl 1.88 (CompletionStage<? extends T> other,
2521 dl 1.48 Function<? super T, U> fn) {
2522 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn,
2523     ForkJoinPool.commonPool());
2524 dl 1.28 }
2525    
2526 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2527 dl 1.88 (CompletionStage<? extends T> other,
2528 dl 1.48 Function<? super T, U> fn,
2529     Executor executor) {
2530 dl 1.28 if (executor == null) throw new NullPointerException();
2531 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2532 dl 1.1 }
2533    
2534 dl 1.48 public CompletableFuture<Void> acceptEither
2535 dl 1.88 (CompletionStage<? extends T> other,
2536     Consumer<? super T> action) {
2537     return doAcceptEither(other.toCompletableFuture(), action, null);
2538 dl 1.28 }
2539    
2540 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2541 dl 1.88 (CompletionStage<? extends T> other,
2542     Consumer<? super T> action) {
2543     return doAcceptEither(other.toCompletableFuture(), action,
2544     ForkJoinPool.commonPool());
2545 dl 1.28 }
2546    
2547 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2548 dl 1.88 (CompletionStage<? extends T> other,
2549     Consumer<? super T> action,
2550 dl 1.48 Executor executor) {
2551 dl 1.28 if (executor == null) throw new NullPointerException();
2552 dl 1.88 return doAcceptEither(other.toCompletableFuture(), action, executor);
2553 dl 1.7 }
2554    
2555 dl 1.88 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2556 dl 1.28 Runnable action) {
2557 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, null);
2558 dl 1.28 }
2559    
2560 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2561 dl 1.88 (CompletionStage<?> other,
2562 dl 1.48 Runnable action) {
2563 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action,
2564     ForkJoinPool.commonPool());
2565 dl 1.28 }
2566    
2567 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2568 dl 1.88 (CompletionStage<?> other,
2569 dl 1.48 Runnable action,
2570     Executor executor) {
2571 dl 1.28 if (executor == null) throw new NullPointerException();
2572 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2573 dl 1.1 }
2574    
2575 dl 1.48 public <U> CompletableFuture<U> thenCompose
2576 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2577 jsr166 1.81 return doThenCompose(fn, null);
2578 dl 1.37 }
2579    
2580 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2581 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2582 jsr166 1.81 return doThenCompose(fn, ForkJoinPool.commonPool());
2583 dl 1.37 }
2584    
2585 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2586 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn,
2587 dl 1.48 Executor executor) {
2588 dl 1.37 if (executor == null) throw new NullPointerException();
2589 jsr166 1.81 return doThenCompose(fn, executor);
2590 dl 1.37 }
2591    
2592 dl 1.88 public CompletableFuture<T> whenComplete
2593     (BiConsumer<? super T, ? super Throwable> action) {
2594     return doWhenComplete(action, null);
2595     }
2596    
2597     public CompletableFuture<T> whenCompleteAsync
2598     (BiConsumer<? super T, ? super Throwable> action) {
2599     return doWhenComplete(action, ForkJoinPool.commonPool());
2600     }
2601    
2602     public CompletableFuture<T> whenCompleteAsync
2603     (BiConsumer<? super T, ? super Throwable> action,
2604     Executor executor) {
2605     if (executor == null) throw new NullPointerException();
2606     return doWhenComplete(action, executor);
2607     }
2608    
2609     public <U> CompletableFuture<U> handle
2610     (BiFunction<? super T, Throwable, ? extends U> fn) {
2611     return doHandle(fn, null);
2612     }
2613    
2614     public <U> CompletableFuture<U> handleAsync
2615     (BiFunction<? super T, Throwable, ? extends U> fn) {
2616     return doHandle(fn, ForkJoinPool.commonPool());
2617     }
2618    
2619     public <U> CompletableFuture<U> handleAsync
2620     (BiFunction<? super T, Throwable, ? extends U> fn,
2621     Executor executor) {
2622     if (executor == null) throw new NullPointerException();
2623     return doHandle(fn, executor);
2624     }
2625    
2626     /**
2627     * Returns this CompletableFuture
2628     *
2629     * @return this CompletableFuture
2630     */
2631     public CompletableFuture<T> toCompletableFuture() {
2632     return this;
2633 dl 1.28 }
2634    
2635 dl 1.88 // not in interface CompletionStage
2636    
2637 dl 1.28 /**
2638 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
2639     * CompletableFuture completes, with the result of the given
2640     * function of the exception triggering this CompletableFuture's
2641     * completion when it completes exceptionally; otherwise, if this
2642     * CompletableFuture completes normally, then the returned
2643     * CompletableFuture also completes normally with the same value.
2644 dl 1.88 * Note: More flexible versions of this functionality are
2645     * available using methods {@code whenComplete} and {@code handle}.
2646 dl 1.28 *
2647     * @param fn the function to use to compute the value of the
2648     * returned CompletableFuture if this CompletableFuture completed
2649     * exceptionally
2650     * @return the new CompletableFuture
2651     */
2652 dl 1.48 public CompletableFuture<T> exceptionally
2653     (Function<Throwable, ? extends T> fn) {
2654 dl 1.28 if (fn == null) throw new NullPointerException();
2655     CompletableFuture<T> dst = new CompletableFuture<T>();
2656     ExceptionCompletion<T> d = null;
2657     Object r;
2658     if ((r = result) == null) {
2659     CompletionNode p =
2660 dl 1.88 new CompletionNode(d = new ExceptionCompletion<T>
2661     (this, fn, dst));
2662 dl 1.28 while ((r = result) == null) {
2663     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2664     p.next = completions, p))
2665     break;
2666     }
2667     }
2668     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2669     T t = null; Throwable ex, dx = null;
2670     if (r instanceof AltResult) {
2671 jsr166 1.87 if ((ex = ((AltResult)r).ex) != null) {
2672 dl 1.28 try {
2673     t = fn.apply(ex);
2674     } catch (Throwable rex) {
2675     dx = rex;
2676     }
2677     }
2678     }
2679     else {
2680     @SuppressWarnings("unchecked") T tr = (T) r;
2681     t = tr;
2682     }
2683     dst.internalComplete(t, dx);
2684     }
2685     helpPostComplete();
2686     return dst;
2687     }
2688    
2689 dl 1.35 /* ------------- Arbitrary-arity constructions -------------- */
2690    
2691     /*
2692     * The basic plan of attack is to recursively form binary
2693     * completion trees of elements. This can be overkill for small
2694     * sets, but scales nicely. The And/All vs Or/Any forms use the
2695     * same idea, but details differ.
2696     */
2697    
2698     /**
2699     * Returns a new CompletableFuture that is completed when all of
2700 jsr166 1.66 * the given CompletableFutures complete. If any of the given
2701 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
2702     * CompletableFuture also does so, with a CompletionException
2703     * holding this exception as its cause. Otherwise, the results,
2704     * if any, of the given CompletableFutures are not reflected in
2705     * the returned CompletableFuture, but may be obtained by
2706     * inspecting them individually. If no CompletableFutures are
2707     * provided, returns a CompletableFuture completed with the value
2708     * {@code null}.
2709 dl 1.35 *
2710     * <p>Among the applications of this method is to await completion
2711     * of a set of independent CompletableFutures before continuing a
2712     * program, as in: {@code CompletableFuture.allOf(c1, c2,
2713     * c3).join();}.
2714     *
2715     * @param cfs the CompletableFutures
2716 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
2717 dl 1.35 * given CompletableFutures complete
2718     * @throws NullPointerException if the array or any of its elements are
2719     * {@code null}
2720     */
2721     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2722     int len = cfs.length; // Directly handle empty and singleton cases
2723     if (len > 1)
2724     return allTree(cfs, 0, len - 1);
2725     else {
2726     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2727     CompletableFuture<?> f;
2728     if (len == 0)
2729     dst.result = NIL;
2730     else if ((f = cfs[0]) == null)
2731     throw new NullPointerException();
2732     else {
2733 dl 1.75 ThenPropagate d = null;
2734 dl 1.35 CompletionNode p = null;
2735     Object r;
2736     while ((r = f.result) == null) {
2737     if (d == null)
2738 dl 1.75 d = new ThenPropagate(f, dst);
2739 dl 1.35 else if (p == null)
2740     p = new CompletionNode(d);
2741     else if (UNSAFE.compareAndSwapObject
2742     (f, COMPLETIONS, p.next = f.completions, p))
2743     break;
2744     }
2745     if (r != null && (d == null || d.compareAndSet(0, 1)))
2746     dst.internalComplete(null, (r instanceof AltResult) ?
2747     ((AltResult)r).ex : null);
2748     f.helpPostComplete();
2749     }
2750     return dst;
2751     }
2752     }
2753    
2754     /**
2755     * Recursively constructs an And'ed tree of CompletableFutures.
2756     * Called only when array known to have at least two elements.
2757     */
2758     private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2759     int lo, int hi) {
2760     CompletableFuture<?> fst, snd;
2761     int mid = (lo + hi) >>> 1;
2762     if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2763     (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2764     throw new NullPointerException();
2765     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2766     AndCompletion d = null;
2767     CompletionNode p = null, q = null;
2768     Object r = null, s = null;
2769     while ((r = fst.result) == null || (s = snd.result) == null) {
2770     if (d == null)
2771     d = new AndCompletion(fst, snd, dst);
2772     else if (p == null)
2773     p = new CompletionNode(d);
2774     else if (q == null) {
2775     if (UNSAFE.compareAndSwapObject
2776     (fst, COMPLETIONS, p.next = fst.completions, p))
2777     q = new CompletionNode(d);
2778     }
2779     else if (UNSAFE.compareAndSwapObject
2780     (snd, COMPLETIONS, q.next = snd.completions, q))
2781     break;
2782     }
2783     if ((r != null || (r = fst.result) != null) &&
2784     (s != null || (s = snd.result) != null) &&
2785     (d == null || d.compareAndSet(0, 1))) {
2786     Throwable ex;
2787     if (r instanceof AltResult)
2788     ex = ((AltResult)r).ex;
2789     else
2790     ex = null;
2791     if (ex == null && (s instanceof AltResult))
2792     ex = ((AltResult)s).ex;
2793     dst.internalComplete(null, ex);
2794     }
2795     fst.helpPostComplete();
2796     snd.helpPostComplete();
2797     return dst;
2798     }
2799    
2800     /**
2801 dl 1.76 * Returns a new CompletableFuture that is completed when any of
2802 jsr166 1.79 * the given CompletableFutures complete, with the same result.
2803     * Otherwise, if it completed exceptionally, the returned
2804 dl 1.77 * CompletableFuture also does so, with a CompletionException
2805     * holding this exception as its cause. If no CompletableFutures
2806     * are provided, returns an incomplete CompletableFuture.
2807 dl 1.35 *
2808     * @param cfs the CompletableFutures
2809 dl 1.77 * @return a new CompletableFuture that is completed with the
2810     * result or exception of any of the given CompletableFutures when
2811     * one completes
2812 dl 1.35 * @throws NullPointerException if the array or any of its elements are
2813     * {@code null}
2814     */
2815 dl 1.77 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2816 dl 1.35 int len = cfs.length; // Same idea as allOf
2817     if (len > 1)
2818     return anyTree(cfs, 0, len - 1);
2819     else {
2820 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2821 dl 1.35 CompletableFuture<?> f;
2822     if (len == 0)
2823 dl 1.48 ; // skip
2824 dl 1.35 else if ((f = cfs[0]) == null)
2825     throw new NullPointerException();
2826     else {
2827 dl 1.77 ThenCopy<Object> d = null;
2828 dl 1.35 CompletionNode p = null;
2829     Object r;
2830     while ((r = f.result) == null) {
2831     if (d == null)
2832 dl 1.77 d = new ThenCopy<Object>(f, dst);
2833 dl 1.35 else if (p == null)
2834     p = new CompletionNode(d);
2835     else if (UNSAFE.compareAndSwapObject
2836     (f, COMPLETIONS, p.next = f.completions, p))
2837     break;
2838     }
2839     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2840     Throwable ex; Object t;
2841 dl 1.77 if (r instanceof AltResult) {
2842 dl 1.35 ex = ((AltResult)r).ex;
2843 dl 1.77 t = null;
2844     }
2845     else {
2846     ex = null;
2847     t = r;
2848     }
2849     dst.internalComplete(t, ex);
2850 dl 1.35 }
2851     f.helpPostComplete();
2852     }
2853     return dst;
2854     }
2855     }
2856    
2857     /**
2858 jsr166 1.44 * Recursively constructs an Or'ed tree of CompletableFutures.
2859 dl 1.35 */
2860 dl 1.77 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2861 jsr166 1.79 int lo, int hi) {
2862 dl 1.35 CompletableFuture<?> fst, snd;
2863     int mid = (lo + hi) >>> 1;
2864     if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2865     (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2866     throw new NullPointerException();
2867 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2868 dl 1.35 OrCompletion d = null;
2869     CompletionNode p = null, q = null;
2870     Object r;
2871     while ((r = fst.result) == null && (r = snd.result) == null) {
2872     if (d == null)
2873     d = new OrCompletion(fst, snd, dst);
2874     else if (p == null)
2875     p = new CompletionNode(d);
2876     else if (q == null) {
2877     if (UNSAFE.compareAndSwapObject
2878     (fst, COMPLETIONS, p.next = fst.completions, p))
2879     q = new CompletionNode(d);
2880     }
2881     else if (UNSAFE.compareAndSwapObject
2882     (snd, COMPLETIONS, q.next = snd.completions, q))
2883     break;
2884     }
2885     if ((r != null || (r = fst.result) != null ||
2886     (r = snd.result) != null) &&
2887     (d == null || d.compareAndSet(0, 1))) {
2888 dl 1.77 Throwable ex; Object t;
2889 dl 1.35 if (r instanceof AltResult) {
2890     ex = ((AltResult)r).ex;
2891 dl 1.77 t = null;
2892 dl 1.35 }
2893 dl 1.77 else {
2894 dl 1.35 ex = null;
2895 dl 1.77 t = r;
2896     }
2897     dst.internalComplete(t, ex);
2898 dl 1.35 }
2899     fst.helpPostComplete();
2900     snd.helpPostComplete();
2901     return dst;
2902     }
2903    
2904     /* ------------- Control and status methods -------------- */
2905    
2906 dl 1.28 /**
2907 dl 1.37 * If not already completed, completes this CompletableFuture with
2908     * a {@link CancellationException}. Dependent CompletableFutures
2909     * that have not already completed will also complete
2910     * exceptionally, with a {@link CompletionException} caused by
2911     * this {@code CancellationException}.
2912 dl 1.28 *
2913     * @param mayInterruptIfRunning this value has no effect in this
2914     * implementation because interrupts are not used to control
2915     * processing.
2916     *
2917     * @return {@code true} if this task is now cancelled
2918     */
2919     public boolean cancel(boolean mayInterruptIfRunning) {
2920 dl 1.46 boolean cancelled = (result == null) &&
2921     UNSAFE.compareAndSwapObject
2922     (this, RESULT, null, new AltResult(new CancellationException()));
2923     postComplete();
2924 dl 1.48 return cancelled || isCancelled();
2925 dl 1.28 }
2926    
2927     /**
2928     * Returns {@code true} if this CompletableFuture was cancelled
2929     * before it completed normally.
2930     *
2931     * @return {@code true} if this CompletableFuture was cancelled
2932     * before it completed normally
2933     */
2934     public boolean isCancelled() {
2935     Object r;
2936 jsr166 1.43 return ((r = result) instanceof AltResult) &&
2937     (((AltResult)r).ex instanceof CancellationException);
2938 dl 1.28 }
2939    
2940     /**
2941 dl 1.88 * Returns {@code true} if this CompletableFuture completed
2942 dl 1.91 * exceptionally, in any way. Possible causes include
2943     * cancellation, explicit invocation of {@code
2944     * completeExceptionally}, and abrupt termination of a
2945     * CompletionStage action.
2946 dl 1.88 *
2947     * @return {@code true} if this CompletableFuture completed
2948     * exceptionally
2949     */
2950     public boolean isCompletedExceptionally() {
2951 dl 1.91 Object r;
2952     return ((r = result) instanceof AltResult) && r != NIL;
2953 dl 1.88 }
2954    
2955     /**
2956 dl 1.28 * Forcibly sets or resets the value subsequently returned by
2957 jsr166 1.42 * method {@link #get()} and related methods, whether or not
2958     * already completed. This method is designed for use only in
2959     * error recovery actions, and even in such situations may result
2960     * in ongoing dependent completions using established versus
2961 dl 1.30 * overwritten outcomes.
2962 dl 1.28 *
2963     * @param value the completion value
2964     */
2965     public void obtrudeValue(T value) {
2966     result = (value == null) ? NIL : value;
2967     postComplete();
2968     }
2969    
2970 dl 1.30 /**
2971 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
2972     * and related methods to throw the given exception, whether or
2973     * not already completed. This method is designed for use only in
2974 dl 1.30 * recovery actions, and even in such situations may result in
2975     * ongoing dependent completions using established versus
2976     * overwritten outcomes.
2977     *
2978     * @param ex the exception
2979     */
2980     public void obtrudeException(Throwable ex) {
2981     if (ex == null) throw new NullPointerException();
2982     result = new AltResult(ex);
2983     postComplete();
2984     }
2985    
2986 dl 1.35 /**
2987     * Returns the estimated number of CompletableFutures whose
2988     * completions are awaiting completion of this CompletableFuture.
2989     * This method is designed for use in monitoring system state, not
2990     * for synchronization control.
2991     *
2992     * @return the number of dependent CompletableFutures
2993     */
2994     public int getNumberOfDependents() {
2995     int count = 0;
2996     for (CompletionNode p = completions; p != null; p = p.next)
2997     ++count;
2998     return count;
2999     }
3000    
3001     /**
3002     * Returns a string identifying this CompletableFuture, as well as
3003 jsr166 1.40 * its completion state. The state, in brackets, contains the
3004 dl 1.35 * String {@code "Completed Normally"} or the String {@code
3005     * "Completed Exceptionally"}, or the String {@code "Not
3006     * completed"} followed by the number of CompletableFutures
3007     * dependent upon its completion, if any.
3008     *
3009     * @return a string identifying this CompletableFuture, as well as its state
3010     */
3011     public String toString() {
3012     Object r = result;
3013 jsr166 1.40 int count;
3014     return super.toString() +
3015     ((r == null) ?
3016     (((count = getNumberOfDependents()) == 0) ?
3017     "[Not completed]" :
3018     "[Not completed, " + count + " dependents]") :
3019     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3020     "[Completed exceptionally]" :
3021     "[Completed normally]"));
3022 dl 1.35 }
3023    
3024 dl 1.1 // Unsafe mechanics
3025     private static final sun.misc.Unsafe UNSAFE;
3026     private static final long RESULT;
3027     private static final long WAITERS;
3028     private static final long COMPLETIONS;
3029     static {
3030     try {
3031     UNSAFE = sun.misc.Unsafe.getUnsafe();
3032     Class<?> k = CompletableFuture.class;
3033     RESULT = UNSAFE.objectFieldOffset
3034     (k.getDeclaredField("result"));
3035     WAITERS = UNSAFE.objectFieldOffset
3036     (k.getDeclaredField("waiters"));
3037     COMPLETIONS = UNSAFE.objectFieldOffset
3038     (k.getDeclaredField("completions"));
3039     } catch (Exception e) {
3040     throw new Error(e);
3041     }
3042     }
3043     }