ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.84
Committed: Fri Apr 5 14:21:16 2013 UTC (11 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.83: +71 -68 lines
Log Message:
Symmetric exception checks

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22     import java.util.concurrent.atomic.AtomicInteger;
23     import java.util.concurrent.locks.LockSupport;
24    
25     /**
26     * A {@link Future} that may be explicitly completed (setting its
27     * value and status), and may include dependent functions and actions
28 dl 1.35 * that trigger upon its completion.
29 dl 1.1 *
30 jsr166 1.50 * <p>When two or more threads attempt to
31     * {@link #complete complete},
32 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
33 jsr166 1.50 * {@link #cancel cancel}
34     * a CompletableFuture, only one of them succeeds.
35 dl 1.19 *
36 jsr166 1.65 * <p>Methods are available for adding dependents based on
37     * user-provided Functions, Consumers, or Runnables. The appropriate
38     * form to use depends on whether actions require arguments and/or
39     * produce results. Completion of a dependent action will trigger the
40     * completion of another CompletableFuture. Actions may also be
41     * triggered after either or both the current and another
42 dl 1.35 * CompletableFuture complete. Multiple CompletableFutures may also
43 jsr166 1.50 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44     * {@link #allOf(CompletableFuture...)}.
45 dl 1.35 *
46 jsr166 1.65 * <p>CompletableFutures themselves do not execute asynchronously.
47     * However, actions supplied for dependent completions of another
48     * CompletableFuture may do so, depending on whether they are provided
49     * via one of the <em>async</em> methods (that is, methods with names
50     * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51     * methods provide a way to commence asynchronous processing of an
52     * action using either a given {@link Executor} or by default the
53     * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54     * debugging, and tracking, all generated asynchronous tasks are
55     * instances of the marker interface {@link AsynchronousCompletionTask}.
56     *
57     * <p>Actions supplied for dependent completions of <em>non-async</em>
58     * methods may be performed by the thread that completes the current
59     * CompletableFuture, or by any other caller of these methods. There
60     * are no guarantees about the order of processing completions unless
61     * constrained by these methods.
62 dl 1.35 *
63 jsr166 1.55 * <p>Since (unlike {@link FutureTask}) this class has no direct
64     * control over the computation that causes it to be completed,
65     * cancellation is treated as just another form of exceptional completion.
66     * Method {@link #cancel cancel} has the same effect as
67     * {@code completeExceptionally(new CancellationException())}.
68     *
69 dl 1.48 * <p>Upon exceptional completion (including cancellation), or when a
70 jsr166 1.55 * completion entails an additional computation which terminates
71     * abruptly with an (unchecked) exception or error, then all of their
72     * dependent completions (and their dependents in turn) generally act
73     * as {@code completeExceptionally} with a {@link CompletionException}
74     * holding that exception as its cause. However, the {@link
75     * #exceptionally exceptionally} and {@link #handle handle}
76     * completions <em>are</em> able to handle exceptional completions of
77     * the CompletableFutures they depend on.
78     *
79     * <p>In case of exceptional completion with a CompletionException,
80     * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81     * {@link ExecutionException} with the same cause as held in the
82     * corresponding CompletionException. However, in these cases,
83     * methods {@link #join()} and {@link #getNow} throw the
84     * CompletionException, which simplifies usage.
85 dl 1.1 *
86 jsr166 1.80 * <p>Arguments used to pass a completion result (that is, for parameters
87     * of type {@code T}) may be null, but passing a null value for any other
88     * parameter will result in a {@link NullPointerException} being thrown.
89     *
90 dl 1.1 * @author Doug Lea
91     * @since 1.8
92     */
93     public class CompletableFuture<T> implements Future<T> {
94 dl 1.28
95 dl 1.1 /*
96 dl 1.20 * Overview:
97 dl 1.1 *
98 jsr166 1.32 * 1. Non-nullness of field result (set via CAS) indicates done.
99     * An AltResult is used to box null as a result, as well as to
100     * hold exceptions. Using a single field makes completion fast
101 dl 1.20 * and simple to detect and trigger, at the expense of a lot of
102     * encoding and decoding that infiltrates many methods. One minor
103     * simplification relies on the (static) NIL (to box null results)
104     * being the only AltResult with a null exception field, so we
105 dl 1.28 * don't usually need explicit comparisons with NIL. The CF
106     * exception propagation mechanics surrounding decoding rely on
107     * unchecked casts of decoded results really being unchecked,
108     * where user type errors are caught at point of use, as is
109     * currently the case in Java. These are highlighted by using
110     * SuppressWarnings-annotated temporaries.
111 dl 1.1 *
112     * 2. Waiters are held in a Treiber stack similar to the one used
113 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
114 dl 1.28 * internal documentation for algorithmic details.
115 dl 1.1 *
116     * 3. Completions are also kept in a list/stack, and pulled off
117 dl 1.28 * and run when completion is triggered. (We could even use the
118 jsr166 1.24 * same stack as for waiters, but would give up the potential
119 dl 1.20 * parallelism obtained because woken waiters help release/run
120 dl 1.28 * others -- see method postComplete). Because post-processing
121     * may race with direct calls, class Completion opportunistically
122     * extends AtomicInteger so callers can claim the action via
123     * compareAndSet(0, 1). The Completion.run methods are all
124     * written a boringly similar uniform way (that sometimes includes
125 jsr166 1.55 * unnecessary-looking checks, kept to maintain uniformity).
126     * There are enough dimensions upon which they differ that
127     * attempts to factor commonalities while maintaining efficiency
128     * require more lines of code than they would save.
129 dl 1.20 *
130     * 4. The exported then/and/or methods do support a bit of
131 dl 1.28 * factoring (see doThenApply etc). They must cope with the
132     * intrinsic races surrounding addition of a dependent action
133     * versus performing the action directly because the task is
134     * already complete. For example, a CF may not be complete upon
135     * entry, so a dependent completion is added, but by the time it
136     * is added, the target CF is complete, so must be directly
137 dl 1.20 * executed. This is all done while avoiding unnecessary object
138     * construction in safe-bypass cases.
139 dl 1.1 */
140    
141 dl 1.28 // preliminaries
142 dl 1.20
143 dl 1.1 static final class AltResult {
144     final Throwable ex; // null only for NIL
145 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
146 dl 1.1 }
147    
148     static final AltResult NIL = new AltResult(null);
149    
150 dl 1.20 // Fields
151    
152     volatile Object result; // Either the result or boxed AltResult
153 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
154     volatile CompletionNode completions; // list (Treiber stack) of completions
155    
156 dl 1.20 // Basic utilities for triggering and processing completions
157    
158     /**
159     * Removes and signals all waiting threads and runs all completions.
160     */
161     final void postComplete() {
162     WaitNode q; Thread t;
163     while ((q = waiters) != null) {
164     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
165     (t = q.thread) != null) {
166     q.thread = null;
167     LockSupport.unpark(t);
168     }
169     }
170    
171     CompletionNode h; Completion c;
172     while ((h = completions) != null) {
173     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
174     (c = h.completion) != null)
175     c.run();
176     }
177     }
178    
179     /**
180     * Triggers completion with the encoding of the given arguments:
181     * if the exception is non-null, encodes it as a wrapped
182     * CompletionException unless it is one already. Otherwise uses
183     * the given result, boxed as NIL if null.
184     */
185 dl 1.75 final void internalComplete(T v, Throwable ex) {
186 dl 1.20 if (result == null)
187     UNSAFE.compareAndSwapObject
188     (this, RESULT, null,
189     (ex == null) ? (v == null) ? NIL : v :
190     new AltResult((ex instanceof CompletionException) ? ex :
191     new CompletionException(ex)));
192     postComplete(); // help out even if not triggered
193     }
194    
195     /**
196 jsr166 1.25 * If triggered, helps release and/or process completions.
197 dl 1.20 */
198     final void helpPostComplete() {
199     if (result != null)
200     postComplete();
201     }
202    
203 dl 1.28 /* ------------- waiting for completions -------------- */
204 dl 1.20
205 dl 1.35 /** Number of processors, for spin control */
206     static final int NCPU = Runtime.getRuntime().availableProcessors();
207    
208 dl 1.1 /**
209 dl 1.28 * Heuristic spin value for waitingGet() before blocking on
210     * multiprocessors
211 dl 1.1 */
212 dl 1.35 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
213 dl 1.1
214     /**
215 dl 1.28 * Linked nodes to record waiting threads in a Treiber stack. See
216     * other classes such as Phaser and SynchronousQueue for more
217     * detailed explanation. This class implements ManagedBlocker to
218     * avoid starvation when blocking actions pile up in
219     * ForkJoinPools.
220     */
221     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
222     long nanos; // wait time if timed
223     final long deadline; // non-zero if timed
224     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
225     volatile Thread thread;
226     volatile WaitNode next;
227     WaitNode(boolean interruptible, long nanos, long deadline) {
228     this.thread = Thread.currentThread();
229     this.interruptControl = interruptible ? 1 : 0;
230     this.nanos = nanos;
231     this.deadline = deadline;
232     }
233     public boolean isReleasable() {
234     if (thread == null)
235     return true;
236     if (Thread.interrupted()) {
237     int i = interruptControl;
238     interruptControl = -1;
239     if (i > 0)
240     return true;
241     }
242     if (deadline != 0L &&
243     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
244     thread = null;
245     return true;
246     }
247     return false;
248     }
249     public boolean block() {
250     if (isReleasable())
251     return true;
252     else if (deadline == 0L)
253     LockSupport.park(this);
254     else if (nanos > 0L)
255     LockSupport.parkNanos(this, nanos);
256     return isReleasable();
257     }
258 dl 1.1 }
259    
260     /**
261 dl 1.28 * Returns raw result after waiting, or null if interruptible and
262     * interrupted.
263 dl 1.1 */
264 dl 1.28 private Object waitingGet(boolean interruptible) {
265     WaitNode q = null;
266     boolean queued = false;
267 dl 1.35 int spins = SPINS;
268 dl 1.28 for (Object r;;) {
269     if ((r = result) != null) {
270     if (q != null) { // suppress unpark
271     q.thread = null;
272     if (q.interruptControl < 0) {
273     if (interruptible) {
274     removeWaiter(q);
275     return null;
276     }
277     Thread.currentThread().interrupt();
278     }
279     }
280     postComplete(); // help release others
281     return r;
282     }
283     else if (spins > 0) {
284 dl 1.35 int rnd = ThreadLocalRandom.nextSecondarySeed();
285     if (rnd == 0)
286     rnd = ThreadLocalRandom.current().nextInt();
287     if (rnd >= 0)
288 dl 1.28 --spins;
289     }
290     else if (q == null)
291     q = new WaitNode(interruptible, 0L, 0L);
292     else if (!queued)
293     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
294     q.next = waiters, q);
295     else if (interruptible && q.interruptControl < 0) {
296     removeWaiter(q);
297     return null;
298     }
299     else if (q.thread != null && result == null) {
300     try {
301     ForkJoinPool.managedBlock(q);
302 jsr166 1.31 } catch (InterruptedException ex) {
303 dl 1.28 q.interruptControl = -1;
304     }
305     }
306     }
307 dl 1.1 }
308    
309     /**
310 dl 1.28 * Awaits completion or aborts on interrupt or timeout.
311 dl 1.1 *
312 dl 1.28 * @param nanos time to wait
313     * @return raw result
314 dl 1.1 */
315 dl 1.28 private Object timedAwaitDone(long nanos)
316     throws InterruptedException, TimeoutException {
317     WaitNode q = null;
318     boolean queued = false;
319     for (Object r;;) {
320     if ((r = result) != null) {
321     if (q != null) {
322     q.thread = null;
323     if (q.interruptControl < 0) {
324     removeWaiter(q);
325     throw new InterruptedException();
326     }
327     }
328     postComplete();
329     return r;
330     }
331     else if (q == null) {
332     if (nanos <= 0L)
333     throw new TimeoutException();
334     long d = System.nanoTime() + nanos;
335 jsr166 1.31 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
336 dl 1.28 }
337     else if (!queued)
338     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
339     q.next = waiters, q);
340     else if (q.interruptControl < 0) {
341     removeWaiter(q);
342     throw new InterruptedException();
343     }
344     else if (q.nanos <= 0L) {
345     if (result == null) {
346     removeWaiter(q);
347     throw new TimeoutException();
348     }
349     }
350     else if (q.thread != null && result == null) {
351     try {
352     ForkJoinPool.managedBlock(q);
353 jsr166 1.31 } catch (InterruptedException ex) {
354 dl 1.28 q.interruptControl = -1;
355     }
356     }
357     }
358 dl 1.1 }
359    
360     /**
361 dl 1.28 * Tries to unlink a timed-out or interrupted wait node to avoid
362     * accumulating garbage. Internal nodes are simply unspliced
363     * without CAS since it is harmless if they are traversed anyway
364     * by releasers. To avoid effects of unsplicing from already
365     * removed nodes, the list is retraversed in case of an apparent
366     * race. This is slow when there are a lot of nodes, but we don't
367     * expect lists to be long enough to outweigh higher-overhead
368     * schemes.
369 dl 1.1 */
370 dl 1.28 private void removeWaiter(WaitNode node) {
371     if (node != null) {
372     node.thread = null;
373     retry:
374     for (;;) { // restart on removeWaiter race
375     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
376     s = q.next;
377     if (q.thread != null)
378     pred = q;
379     else if (pred != null) {
380     pred.next = s;
381     if (pred.thread == null) // check for race
382     continue retry;
383     }
384     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
385     continue retry;
386     }
387     break;
388     }
389     }
390 dl 1.1 }
391    
392 dl 1.28 /* ------------- Async tasks -------------- */
393    
394 dl 1.1 /**
395 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
396 dl 1.28 * {@code async} methods. This may be useful for monitoring,
397     * debugging, and tracking asynchronous activities.
398 jsr166 1.57 *
399     * @since 1.8
400 dl 1.1 */
401 dl 1.28 public static interface AsynchronousCompletionTask {
402 dl 1.1 }
403    
404 dl 1.28 /** Base class can act as either FJ or plain Runnable */
405 jsr166 1.33 abstract static class Async extends ForkJoinTask<Void>
406 dl 1.28 implements Runnable, AsynchronousCompletionTask {
407     public final Void getRawResult() { return null; }
408     public final void setRawResult(Void v) { }
409     public final void run() { exec(); }
410 jsr166 1.26 }
411    
412 dl 1.28 static final class AsyncRun extends Async {
413     final Runnable fn;
414     final CompletableFuture<Void> dst;
415     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
416     this.fn = fn; this.dst = dst;
417     }
418     public final boolean exec() {
419     CompletableFuture<Void> d; Throwable ex;
420 dl 1.29 if ((d = this.dst) != null && d.result == null) {
421 dl 1.28 try {
422     fn.run();
423     ex = null;
424     } catch (Throwable rex) {
425     ex = rex;
426     }
427     d.internalComplete(null, ex);
428 dl 1.19 }
429 dl 1.28 return true;
430 dl 1.19 }
431 dl 1.28 private static final long serialVersionUID = 5232453952276885070L;
432 dl 1.19 }
433    
434 dl 1.28 static final class AsyncSupply<U> extends Async {
435     final Supplier<U> fn;
436     final CompletableFuture<U> dst;
437     AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
438     this.fn = fn; this.dst = dst;
439     }
440     public final boolean exec() {
441     CompletableFuture<U> d; U u; Throwable ex;
442 dl 1.29 if ((d = this.dst) != null && d.result == null) {
443 dl 1.28 try {
444     u = fn.get();
445     ex = null;
446     } catch (Throwable rex) {
447     ex = rex;
448     u = null;
449     }
450     d.internalComplete(u, ex);
451 dl 1.1 }
452     return true;
453     }
454     private static final long serialVersionUID = 5232453952276885070L;
455     }
456    
457 dl 1.28 static final class AsyncApply<T,U> extends Async {
458 jsr166 1.63 final T arg;
459 dl 1.28 final Function<? super T,? extends U> fn;
460 dl 1.1 final CompletableFuture<U> dst;
461 dl 1.28 AsyncApply(T arg, Function<? super T,? extends U> fn,
462 dl 1.35 CompletableFuture<U> dst) {
463 dl 1.1 this.arg = arg; this.fn = fn; this.dst = dst;
464     }
465     public final boolean exec() {
466 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
467 dl 1.29 if ((d = this.dst) != null && d.result == null) {
468 dl 1.21 try {
469     u = fn.apply(arg);
470     ex = null;
471     } catch (Throwable rex) {
472     ex = rex;
473     u = null;
474     }
475     d.internalComplete(u, ex);
476 dl 1.1 }
477     return true;
478     }
479     private static final long serialVersionUID = 5232453952276885070L;
480     }
481    
482 jsr166 1.81 static final class AsyncCombine<T,U,V> extends Async {
483 dl 1.1 final T arg1;
484     final U arg2;
485 jsr166 1.63 final BiFunction<? super T,? super U,? extends V> fn;
486 dl 1.1 final CompletableFuture<V> dst;
487 jsr166 1.81 AsyncCombine(T arg1, U arg2,
488 dl 1.35 BiFunction<? super T,? super U,? extends V> fn,
489     CompletableFuture<V> dst) {
490 dl 1.1 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
491     }
492     public final boolean exec() {
493 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
494 dl 1.29 if ((d = this.dst) != null && d.result == null) {
495 dl 1.21 try {
496     v = fn.apply(arg1, arg2);
497     ex = null;
498     } catch (Throwable rex) {
499     ex = rex;
500     v = null;
501     }
502     d.internalComplete(v, ex);
503 dl 1.1 }
504     return true;
505     }
506     private static final long serialVersionUID = 5232453952276885070L;
507     }
508    
509 dl 1.28 static final class AsyncAccept<T> extends Async {
510 jsr166 1.63 final T arg;
511 dl 1.34 final Consumer<? super T> fn;
512 dl 1.7 final CompletableFuture<Void> dst;
513 dl 1.34 AsyncAccept(T arg, Consumer<? super T> fn,
514 dl 1.35 CompletableFuture<Void> dst) {
515 dl 1.7 this.arg = arg; this.fn = fn; this.dst = dst;
516     }
517     public final boolean exec() {
518 dl 1.21 CompletableFuture<Void> d; Throwable ex;
519 dl 1.29 if ((d = this.dst) != null && d.result == null) {
520 dl 1.21 try {
521     fn.accept(arg);
522     ex = null;
523     } catch (Throwable rex) {
524     ex = rex;
525     }
526     d.internalComplete(null, ex);
527 dl 1.7 }
528     return true;
529     }
530     private static final long serialVersionUID = 5232453952276885070L;
531     }
532    
533 jsr166 1.81 static final class AsyncAcceptBoth<T,U> extends Async {
534 dl 1.7 final T arg1;
535     final U arg2;
536 jsr166 1.63 final BiConsumer<? super T,? super U> fn;
537 dl 1.7 final CompletableFuture<Void> dst;
538 jsr166 1.81 AsyncAcceptBoth(T arg1, U arg2,
539     BiConsumer<? super T,? super U> fn,
540     CompletableFuture<Void> dst) {
541 dl 1.7 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
542     }
543     public final boolean exec() {
544 dl 1.21 CompletableFuture<Void> d; Throwable ex;
545 dl 1.29 if ((d = this.dst) != null && d.result == null) {
546 dl 1.21 try {
547     fn.accept(arg1, arg2);
548     ex = null;
549     } catch (Throwable rex) {
550     ex = rex;
551     }
552     d.internalComplete(null, ex);
553 dl 1.7 }
554     return true;
555     }
556     private static final long serialVersionUID = 5232453952276885070L;
557     }
558    
559 dl 1.37 static final class AsyncCompose<T,U> extends Async {
560 jsr166 1.63 final T arg;
561 dl 1.37 final Function<? super T, CompletableFuture<U>> fn;
562     final CompletableFuture<U> dst;
563     AsyncCompose(T arg,
564     Function<? super T, CompletableFuture<U>> fn,
565     CompletableFuture<U> dst) {
566     this.arg = arg; this.fn = fn; this.dst = dst;
567     }
568     public final boolean exec() {
569     CompletableFuture<U> d, fr; U u; Throwable ex;
570     if ((d = this.dst) != null && d.result == null) {
571     try {
572     fr = fn.apply(arg);
573 jsr166 1.61 ex = (fr == null) ? new NullPointerException() : null;
574 dl 1.37 } catch (Throwable rex) {
575     ex = rex;
576     fr = null;
577     }
578     if (ex != null)
579     u = null;
580     else {
581     Object r = fr.result;
582 dl 1.67 if (r == null)
583     r = fr.waitingGet(false);
584 dl 1.37 if (r instanceof AltResult) {
585     ex = ((AltResult)r).ex;
586     u = null;
587     }
588     else {
589     @SuppressWarnings("unchecked") U ur = (U) r;
590     u = ur;
591     }
592     }
593     d.internalComplete(u, ex);
594     }
595     return true;
596     }
597     private static final long serialVersionUID = 5232453952276885070L;
598     }
599    
600 dl 1.1 /* ------------- Completions -------------- */
601    
602 dl 1.28 /**
603     * Simple linked list nodes to record completions, used in
604     * basically the same way as WaitNodes. (We separate nodes from
605     * the Completions themselves mainly because for the And and Or
606     * methods, the same Completion object resides in two lists.)
607     */
608     static final class CompletionNode {
609     final Completion completion;
610     volatile CompletionNode next;
611     CompletionNode(Completion completion) { this.completion = completion; }
612     }
613    
614 dl 1.1 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
615 jsr166 1.33 abstract static class Completion extends AtomicInteger implements Runnable {
616 dl 1.1 }
617    
618 jsr166 1.81 static final class ThenApply<T,U> extends Completion {
619 jsr166 1.2 final CompletableFuture<? extends T> src;
620     final Function<? super T,? extends U> fn;
621     final CompletableFuture<U> dst;
622 dl 1.1 final Executor executor;
623 jsr166 1.81 ThenApply(CompletableFuture<? extends T> src,
624     Function<? super T,? extends U> fn,
625     CompletableFuture<U> dst,
626     Executor executor) {
627 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
628     this.executor = executor;
629     }
630 jsr166 1.26 public final void run() {
631 dl 1.28 final CompletableFuture<? extends T> a;
632     final Function<? super T,? extends U> fn;
633     final CompletableFuture<U> dst;
634 jsr166 1.2 Object r; T t; Throwable ex;
635     if ((dst = this.dst) != null &&
636 dl 1.1 (fn = this.fn) != null &&
637     (a = this.src) != null &&
638     (r = a.result) != null &&
639     compareAndSet(0, 1)) {
640 jsr166 1.2 if (r instanceof AltResult) {
641 dl 1.19 ex = ((AltResult)r).ex;
642 dl 1.1 t = null;
643     }
644 dl 1.17 else {
645     ex = null;
646 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
647     t = tr;
648 dl 1.1 }
649 dl 1.20 Executor e = executor;
650     U u = null;
651 dl 1.17 if (ex == null) {
652     try {
653 dl 1.20 if (e != null)
654 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
655 dl 1.17 else
656 dl 1.20 u = fn.apply(t);
657 dl 1.17 } catch (Throwable rex) {
658     ex = rex;
659     }
660     }
661 dl 1.20 if (e == null || ex != null)
662     dst.internalComplete(u, ex);
663 dl 1.1 }
664     }
665 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
666 dl 1.1 }
667    
668 jsr166 1.81 static final class ThenAccept<T> extends Completion {
669 dl 1.7 final CompletableFuture<? extends T> src;
670 dl 1.34 final Consumer<? super T> fn;
671 dl 1.7 final CompletableFuture<Void> dst;
672     final Executor executor;
673 jsr166 1.81 ThenAccept(CompletableFuture<? extends T> src,
674     Consumer<? super T> fn,
675     CompletableFuture<Void> dst,
676     Executor executor) {
677 dl 1.7 this.src = src; this.fn = fn; this.dst = dst;
678     this.executor = executor;
679     }
680 jsr166 1.26 public final void run() {
681 dl 1.28 final CompletableFuture<? extends T> a;
682 dl 1.34 final Consumer<? super T> fn;
683 dl 1.28 final CompletableFuture<Void> dst;
684 dl 1.7 Object r; T t; Throwable ex;
685     if ((dst = this.dst) != null &&
686     (fn = this.fn) != null &&
687     (a = this.src) != null &&
688     (r = a.result) != null &&
689     compareAndSet(0, 1)) {
690     if (r instanceof AltResult) {
691 dl 1.19 ex = ((AltResult)r).ex;
692 dl 1.7 t = null;
693     }
694 dl 1.17 else {
695     ex = null;
696 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
697     t = tr;
698 dl 1.17 }
699 dl 1.20 Executor e = executor;
700 dl 1.17 if (ex == null) {
701     try {
702 dl 1.20 if (e != null)
703 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
704 dl 1.20 else
705 dl 1.17 fn.accept(t);
706     } catch (Throwable rex) {
707     ex = rex;
708 dl 1.7 }
709     }
710 dl 1.20 if (e == null || ex != null)
711     dst.internalComplete(null, ex);
712 dl 1.7 }
713     }
714 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
715 dl 1.7 }
716    
717 jsr166 1.82 static final class ThenRun extends Completion {
718     final CompletableFuture<?> src;
719 jsr166 1.2 final Runnable fn;
720     final CompletableFuture<Void> dst;
721 dl 1.1 final Executor executor;
722 jsr166 1.82 ThenRun(CompletableFuture<?> src,
723 jsr166 1.81 Runnable fn,
724     CompletableFuture<Void> dst,
725     Executor executor) {
726 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
727     this.executor = executor;
728     }
729 jsr166 1.26 public final void run() {
730 jsr166 1.82 final CompletableFuture<?> a;
731 dl 1.28 final Runnable fn;
732     final CompletableFuture<Void> dst;
733 jsr166 1.2 Object r; Throwable ex;
734     if ((dst = this.dst) != null &&
735 dl 1.1 (fn = this.fn) != null &&
736     (a = this.src) != null &&
737     (r = a.result) != null &&
738     compareAndSet(0, 1)) {
739 dl 1.19 if (r instanceof AltResult)
740     ex = ((AltResult)r).ex;
741 dl 1.17 else
742     ex = null;
743 dl 1.20 Executor e = executor;
744 dl 1.17 if (ex == null) {
745     try {
746 dl 1.20 if (e != null)
747 dl 1.28 e.execute(new AsyncRun(fn, dst));
748 dl 1.20 else
749 dl 1.17 fn.run();
750     } catch (Throwable rex) {
751     ex = rex;
752 dl 1.1 }
753     }
754 dl 1.20 if (e == null || ex != null)
755     dst.internalComplete(null, ex);
756 dl 1.1 }
757     }
758 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
759 dl 1.1 }
760    
761 jsr166 1.81 static final class ThenCombine<T,U,V> extends Completion {
762 jsr166 1.2 final CompletableFuture<? extends T> src;
763     final CompletableFuture<? extends U> snd;
764     final BiFunction<? super T,? super U,? extends V> fn;
765     final CompletableFuture<V> dst;
766 dl 1.1 final Executor executor;
767 jsr166 1.81 ThenCombine(CompletableFuture<? extends T> src,
768     CompletableFuture<? extends U> snd,
769     BiFunction<? super T,? super U,? extends V> fn,
770     CompletableFuture<V> dst,
771     Executor executor) {
772 dl 1.1 this.src = src; this.snd = snd;
773     this.fn = fn; this.dst = dst;
774     this.executor = executor;
775     }
776 dl 1.84
777 jsr166 1.26 public final void run() {
778 dl 1.28 final CompletableFuture<? extends T> a;
779     final CompletableFuture<? extends U> b;
780     final BiFunction<? super T,? super U,? extends V> fn;
781     final CompletableFuture<V> dst;
782 dl 1.84 if ((dst = this.dst) == null ||
783     (fn = this.fn) == null ||
784     (a = this.src) == null ||
785     (b = this.snd) == null)
786     return;
787     final Object r = a.result, s = b.result;
788     final T t; final U u; Throwable ex;
789     if (r instanceof AltResult) {
790     ex = ((AltResult)r).ex;
791     t = null;
792     } else {
793     ex = null;
794     @SuppressWarnings("unchecked") T tr = (T) r;
795     t = tr;
796     }
797     if (ex != null)
798     u = null;
799     else if (s instanceof AltResult) {
800     ex = ((AltResult)s).ex;
801     u = null;
802     } else {
803     ex = null;
804     @SuppressWarnings("unchecked") U us = (U) s;
805     u = us;
806     }
807    
808     if ((ex != null || (r != null && s != null))
809     && compareAndSet(0, 1)) {
810 dl 1.20 Executor e = executor;
811     V v = null;
812 dl 1.19 if (ex == null) {
813     try {
814 dl 1.20 if (e != null)
815 jsr166 1.81 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
816 dl 1.19 else
817 dl 1.20 v = fn.apply(t, u);
818 dl 1.19 } catch (Throwable rex) {
819     ex = rex;
820     }
821 dl 1.1 }
822 dl 1.20 if (e == null || ex != null)
823     dst.internalComplete(v, ex);
824 jsr166 1.2 }
825     }
826 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
827 dl 1.1 }
828    
829 jsr166 1.81 static final class ThenAcceptBoth<T,U> extends Completion {
830 dl 1.7 final CompletableFuture<? extends T> src;
831     final CompletableFuture<? extends U> snd;
832 dl 1.34 final BiConsumer<? super T,? super U> fn;
833 dl 1.7 final CompletableFuture<Void> dst;
834     final Executor executor;
835 jsr166 1.81 ThenAcceptBoth(CompletableFuture<? extends T> src,
836     CompletableFuture<? extends U> snd,
837     BiConsumer<? super T,? super U> fn,
838     CompletableFuture<Void> dst,
839     Executor executor) {
840 dl 1.7 this.src = src; this.snd = snd;
841     this.fn = fn; this.dst = dst;
842     this.executor = executor;
843     }
844 jsr166 1.26 public final void run() {
845 dl 1.28 final CompletableFuture<? extends T> a;
846     final CompletableFuture<? extends U> b;
847 dl 1.34 final BiConsumer<? super T,? super U> fn;
848 dl 1.28 final CompletableFuture<Void> dst;
849 dl 1.84 if ((dst = this.dst) == null ||
850     (fn = this.fn) == null ||
851     (a = this.src) == null ||
852     (b = this.snd) == null)
853     return;
854     final Object r = a.result, s = b.result;
855     final T t; final U u; Throwable ex;
856     if (r instanceof AltResult) {
857     ex = ((AltResult)r).ex;
858     t = null;
859     } else {
860     ex = null;
861     @SuppressWarnings("unchecked") T tr = (T) r;
862     t = tr;
863     }
864     if (ex != null)
865     u = null;
866     else if (s instanceof AltResult) {
867     ex = ((AltResult)s).ex;
868     u = null;
869     } else {
870     ex = null;
871     @SuppressWarnings("unchecked") U us = (U) s;
872     u = us;
873     }
874     if ((ex != null || (r != null && s != null))
875     && compareAndSet(0, 1)) {
876 dl 1.20 Executor e = executor;
877 dl 1.19 if (ex == null) {
878     try {
879 dl 1.20 if (e != null)
880 jsr166 1.81 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
881 dl 1.20 else
882 dl 1.19 fn.accept(t, u);
883     } catch (Throwable rex) {
884     ex = rex;
885 dl 1.7 }
886     }
887 dl 1.20 if (e == null || ex != null)
888     dst.internalComplete(null, ex);
889 dl 1.7 }
890     }
891 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
892 dl 1.7 }
893    
894 jsr166 1.82 static final class RunAfterBoth extends Completion {
895     final CompletableFuture<?> src;
896 jsr166 1.2 final CompletableFuture<?> snd;
897     final Runnable fn;
898     final CompletableFuture<Void> dst;
899 dl 1.1 final Executor executor;
900 jsr166 1.82 RunAfterBoth(CompletableFuture<?> src,
901 jsr166 1.81 CompletableFuture<?> snd,
902     Runnable fn,
903     CompletableFuture<Void> dst,
904     Executor executor) {
905 dl 1.1 this.src = src; this.snd = snd;
906     this.fn = fn; this.dst = dst;
907     this.executor = executor;
908     }
909 jsr166 1.26 public final void run() {
910 jsr166 1.82 final CompletableFuture<?> a;
911 dl 1.1 final CompletableFuture<?> b;
912     final Runnable fn;
913     final CompletableFuture<Void> dst;
914 dl 1.84 if ((dst = this.dst) == null ||
915     (fn = this.fn) == null ||
916     (a = this.src) == null ||
917     (b = this.snd) == null)
918     return;
919     final Object r = a.result, s = b.result;
920     Throwable ex;
921     if (r instanceof AltResult)
922     ex = ((AltResult)r).ex;
923     else
924     ex = null;
925     if (ex == null && (s instanceof AltResult))
926     ex = ((AltResult)s).ex;
927     if ((ex != null || (r != null && s != null))
928     && compareAndSet(0, 1)) {
929 dl 1.20 Executor e = executor;
930 dl 1.19 if (ex == null) {
931     try {
932 dl 1.20 if (e != null)
933 dl 1.28 e.execute(new AsyncRun(fn, dst));
934 dl 1.20 else
935 dl 1.19 fn.run();
936     } catch (Throwable rex) {
937     ex = rex;
938 dl 1.1 }
939 jsr166 1.2 }
940 dl 1.20 if (e == null || ex != null)
941     dst.internalComplete(null, ex);
942 jsr166 1.2 }
943     }
944 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
945 dl 1.1 }
946    
947 dl 1.35 static final class AndCompletion extends Completion {
948     final CompletableFuture<?> src;
949     final CompletableFuture<?> snd;
950     final CompletableFuture<Void> dst;
951     AndCompletion(CompletableFuture<?> src,
952     CompletableFuture<?> snd,
953     CompletableFuture<Void> dst) {
954     this.src = src; this.snd = snd; this.dst = dst;
955     }
956     public final void run() {
957     final CompletableFuture<?> a;
958     final CompletableFuture<?> b;
959     final CompletableFuture<Void> dst;
960     Object r, s; Throwable ex;
961     if ((dst = this.dst) != null &&
962     (a = this.src) != null &&
963     (r = a.result) != null &&
964     (b = this.snd) != null &&
965     (s = b.result) != null &&
966     compareAndSet(0, 1)) {
967     if (r instanceof AltResult)
968     ex = ((AltResult)r).ex;
969     else
970     ex = null;
971     if (ex == null && (s instanceof AltResult))
972     ex = ((AltResult)s).ex;
973     dst.internalComplete(null, ex);
974     }
975     }
976     private static final long serialVersionUID = 5232453952276885070L;
977     }
978    
979 jsr166 1.81 static final class ApplyToEither<T,U> extends Completion {
980 jsr166 1.2 final CompletableFuture<? extends T> src;
981     final CompletableFuture<? extends T> snd;
982     final Function<? super T,? extends U> fn;
983     final CompletableFuture<U> dst;
984 dl 1.1 final Executor executor;
985 jsr166 1.81 ApplyToEither(CompletableFuture<? extends T> src,
986     CompletableFuture<? extends T> snd,
987     Function<? super T,? extends U> fn,
988     CompletableFuture<U> dst,
989     Executor executor) {
990 dl 1.1 this.src = src; this.snd = snd;
991     this.fn = fn; this.dst = dst;
992     this.executor = executor;
993     }
994 jsr166 1.26 public final void run() {
995 dl 1.28 final CompletableFuture<? extends T> a;
996     final CompletableFuture<? extends T> b;
997     final Function<? super T,? extends U> fn;
998     final CompletableFuture<U> dst;
999 jsr166 1.2 Object r; T t; Throwable ex;
1000     if ((dst = this.dst) != null &&
1001 dl 1.1 (fn = this.fn) != null &&
1002     (((a = this.src) != null && (r = a.result) != null) ||
1003     ((b = this.snd) != null && (r = b.result) != null)) &&
1004     compareAndSet(0, 1)) {
1005 jsr166 1.2 if (r instanceof AltResult) {
1006 dl 1.19 ex = ((AltResult)r).ex;
1007 jsr166 1.2 t = null;
1008     }
1009 dl 1.19 else {
1010     ex = null;
1011 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1012     t = tr;
1013 dl 1.1 }
1014 dl 1.20 Executor e = executor;
1015     U u = null;
1016 dl 1.19 if (ex == null) {
1017     try {
1018 dl 1.20 if (e != null)
1019 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
1020 dl 1.19 else
1021 dl 1.20 u = fn.apply(t);
1022 dl 1.19 } catch (Throwable rex) {
1023     ex = rex;
1024     }
1025     }
1026 dl 1.20 if (e == null || ex != null)
1027     dst.internalComplete(u, ex);
1028 jsr166 1.2 }
1029     }
1030 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1031 dl 1.1 }
1032    
1033 jsr166 1.81 static final class AcceptEither<T> extends Completion {
1034 dl 1.7 final CompletableFuture<? extends T> src;
1035     final CompletableFuture<? extends T> snd;
1036 dl 1.34 final Consumer<? super T> fn;
1037 dl 1.7 final CompletableFuture<Void> dst;
1038     final Executor executor;
1039 jsr166 1.81 AcceptEither(CompletableFuture<? extends T> src,
1040     CompletableFuture<? extends T> snd,
1041     Consumer<? super T> fn,
1042     CompletableFuture<Void> dst,
1043     Executor executor) {
1044 dl 1.7 this.src = src; this.snd = snd;
1045     this.fn = fn; this.dst = dst;
1046     this.executor = executor;
1047     }
1048 jsr166 1.26 public final void run() {
1049 dl 1.28 final CompletableFuture<? extends T> a;
1050     final CompletableFuture<? extends T> b;
1051 dl 1.34 final Consumer<? super T> fn;
1052 dl 1.28 final CompletableFuture<Void> dst;
1053 dl 1.7 Object r; T t; Throwable ex;
1054     if ((dst = this.dst) != null &&
1055     (fn = this.fn) != null &&
1056     (((a = this.src) != null && (r = a.result) != null) ||
1057     ((b = this.snd) != null && (r = b.result) != null)) &&
1058     compareAndSet(0, 1)) {
1059     if (r instanceof AltResult) {
1060 dl 1.19 ex = ((AltResult)r).ex;
1061 dl 1.7 t = null;
1062     }
1063 dl 1.19 else {
1064     ex = null;
1065 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1066     t = tr;
1067 dl 1.19 }
1068 dl 1.20 Executor e = executor;
1069 dl 1.19 if (ex == null) {
1070     try {
1071 dl 1.20 if (e != null)
1072 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
1073 dl 1.20 else
1074 dl 1.19 fn.accept(t);
1075     } catch (Throwable rex) {
1076     ex = rex;
1077 dl 1.7 }
1078     }
1079 dl 1.20 if (e == null || ex != null)
1080     dst.internalComplete(null, ex);
1081 dl 1.7 }
1082     }
1083 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1084 dl 1.7 }
1085    
1086 jsr166 1.82 static final class RunAfterEither extends Completion {
1087     final CompletableFuture<?> src;
1088 jsr166 1.2 final CompletableFuture<?> snd;
1089     final Runnable fn;
1090     final CompletableFuture<Void> dst;
1091 dl 1.1 final Executor executor;
1092 jsr166 1.82 RunAfterEither(CompletableFuture<?> src,
1093 jsr166 1.81 CompletableFuture<?> snd,
1094     Runnable fn,
1095     CompletableFuture<Void> dst,
1096     Executor executor) {
1097 dl 1.1 this.src = src; this.snd = snd;
1098     this.fn = fn; this.dst = dst;
1099     this.executor = executor;
1100     }
1101 jsr166 1.26 public final void run() {
1102 jsr166 1.82 final CompletableFuture<?> a;
1103 dl 1.1 final CompletableFuture<?> b;
1104     final Runnable fn;
1105     final CompletableFuture<Void> dst;
1106 dl 1.28 Object r; Throwable ex;
1107 jsr166 1.2 if ((dst = this.dst) != null &&
1108 dl 1.1 (fn = this.fn) != null &&
1109     (((a = this.src) != null && (r = a.result) != null) ||
1110     ((b = this.snd) != null && (r = b.result) != null)) &&
1111     compareAndSet(0, 1)) {
1112 dl 1.19 if (r instanceof AltResult)
1113     ex = ((AltResult)r).ex;
1114     else
1115     ex = null;
1116 dl 1.20 Executor e = executor;
1117 dl 1.19 if (ex == null) {
1118 dl 1.1 try {
1119 dl 1.20 if (e != null)
1120 dl 1.28 e.execute(new AsyncRun(fn, dst));
1121 dl 1.20 else
1122 dl 1.1 fn.run();
1123     } catch (Throwable rex) {
1124 dl 1.19 ex = rex;
1125 dl 1.1 }
1126     }
1127 dl 1.20 if (e == null || ex != null)
1128     dst.internalComplete(null, ex);
1129 jsr166 1.2 }
1130     }
1131 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1132 dl 1.1 }
1133    
1134 dl 1.35 static final class OrCompletion extends Completion {
1135     final CompletableFuture<?> src;
1136     final CompletableFuture<?> snd;
1137 dl 1.77 final CompletableFuture<Object> dst;
1138 dl 1.35 OrCompletion(CompletableFuture<?> src,
1139     CompletableFuture<?> snd,
1140 dl 1.77 CompletableFuture<Object> dst) {
1141 dl 1.35 this.src = src; this.snd = snd; this.dst = dst;
1142     }
1143     public final void run() {
1144     final CompletableFuture<?> a;
1145     final CompletableFuture<?> b;
1146 dl 1.77 final CompletableFuture<Object> dst;
1147     Object r, t; Throwable ex;
1148 dl 1.35 if ((dst = this.dst) != null &&
1149     (((a = this.src) != null && (r = a.result) != null) ||
1150     ((b = this.snd) != null && (r = b.result) != null)) &&
1151     compareAndSet(0, 1)) {
1152 dl 1.77 if (r instanceof AltResult) {
1153 dl 1.35 ex = ((AltResult)r).ex;
1154 dl 1.77 t = null;
1155     }
1156     else {
1157 dl 1.35 ex = null;
1158 dl 1.77 t = r;
1159     }
1160     dst.internalComplete(t, ex);
1161 dl 1.35 }
1162     }
1163     private static final long serialVersionUID = 5232453952276885070L;
1164     }
1165    
1166 dl 1.28 static final class ExceptionCompletion<T> extends Completion {
1167 jsr166 1.2 final CompletableFuture<? extends T> src;
1168 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1169     final CompletableFuture<T> dst;
1170 dl 1.28 ExceptionCompletion(CompletableFuture<? extends T> src,
1171     Function<? super Throwable, ? extends T> fn,
1172     CompletableFuture<T> dst) {
1173 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1174     }
1175 jsr166 1.26 public final void run() {
1176 dl 1.28 final CompletableFuture<? extends T> a;
1177     final Function<? super Throwable, ? extends T> fn;
1178     final CompletableFuture<T> dst;
1179 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1180 jsr166 1.2 if ((dst = this.dst) != null &&
1181 dl 1.1 (fn = this.fn) != null &&
1182     (a = this.src) != null &&
1183     (r = a.result) != null &&
1184     compareAndSet(0, 1)) {
1185 dl 1.20 if ((r instanceof AltResult) &&
1186     (ex = ((AltResult)r).ex) != null) {
1187     try {
1188     t = fn.apply(ex);
1189     } catch (Throwable rex) {
1190     dx = rex;
1191 dl 1.1 }
1192     }
1193 dl 1.28 else {
1194     @SuppressWarnings("unchecked") T tr = (T) r;
1195     t = tr;
1196     }
1197 dl 1.20 dst.internalComplete(t, dx);
1198 dl 1.1 }
1199     }
1200 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1201 dl 1.1 }
1202    
1203 dl 1.75 static final class ThenCopy<T> extends Completion {
1204 dl 1.77 final CompletableFuture<?> src;
1205 dl 1.75 final CompletableFuture<T> dst;
1206 dl 1.77 ThenCopy(CompletableFuture<?> src,
1207 dl 1.75 CompletableFuture<T> dst) {
1208 dl 1.17 this.src = src; this.dst = dst;
1209     }
1210 jsr166 1.26 public final void run() {
1211 dl 1.77 final CompletableFuture<?> a;
1212 dl 1.75 final CompletableFuture<T> dst;
1213     Object r; T t; Throwable ex;
1214 dl 1.17 if ((dst = this.dst) != null &&
1215     (a = this.src) != null &&
1216     (r = a.result) != null &&
1217     compareAndSet(0, 1)) {
1218 dl 1.20 if (r instanceof AltResult) {
1219     ex = ((AltResult)r).ex;
1220     t = null;
1221     }
1222     else {
1223     ex = null;
1224 dl 1.75 @SuppressWarnings("unchecked") T tr = (T) r;
1225     t = tr;
1226 dl 1.20 }
1227     dst.internalComplete(t, ex);
1228 dl 1.17 }
1229     }
1230 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1231 dl 1.17 }
1232    
1233 dl 1.75 // version of ThenCopy for CompletableFuture<Void> dst
1234     static final class ThenPropagate extends Completion {
1235     final CompletableFuture<?> src;
1236     final CompletableFuture<Void> dst;
1237     ThenPropagate(CompletableFuture<?> src,
1238     CompletableFuture<Void> dst) {
1239     this.src = src; this.dst = dst;
1240     }
1241     public final void run() {
1242     final CompletableFuture<?> a;
1243     final CompletableFuture<Void> dst;
1244     Object r; Throwable ex;
1245     if ((dst = this.dst) != null &&
1246     (a = this.src) != null &&
1247     (r = a.result) != null &&
1248     compareAndSet(0, 1)) {
1249     if (r instanceof AltResult)
1250     ex = ((AltResult)r).ex;
1251     else
1252     ex = null;
1253     dst.internalComplete(null, ex);
1254     }
1255     }
1256     private static final long serialVersionUID = 5232453952276885070L;
1257     }
1258    
1259 dl 1.28 static final class HandleCompletion<T,U> extends Completion {
1260 dl 1.17 final CompletableFuture<? extends T> src;
1261     final BiFunction<? super T, Throwable, ? extends U> fn;
1262     final CompletableFuture<U> dst;
1263 dl 1.28 HandleCompletion(CompletableFuture<? extends T> src,
1264     BiFunction<? super T, Throwable, ? extends U> fn,
1265 jsr166 1.64 CompletableFuture<U> dst) {
1266 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1267     }
1268 jsr166 1.26 public final void run() {
1269 dl 1.28 final CompletableFuture<? extends T> a;
1270     final BiFunction<? super T, Throwable, ? extends U> fn;
1271     final CompletableFuture<U> dst;
1272 dl 1.17 Object r; T t; Throwable ex;
1273     if ((dst = this.dst) != null &&
1274     (fn = this.fn) != null &&
1275     (a = this.src) != null &&
1276     (r = a.result) != null &&
1277     compareAndSet(0, 1)) {
1278     if (r instanceof AltResult) {
1279     ex = ((AltResult)r).ex;
1280     t = null;
1281     }
1282     else {
1283     ex = null;
1284 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1285     t = tr;
1286 dl 1.17 }
1287 dl 1.20 U u = null; Throwable dx = null;
1288 dl 1.17 try {
1289 dl 1.20 u = fn.apply(t, ex);
1290 dl 1.17 } catch (Throwable rex) {
1291 dl 1.20 dx = rex;
1292 dl 1.17 }
1293 dl 1.20 dst.internalComplete(u, dx);
1294 dl 1.17 }
1295     }
1296 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1297 dl 1.17 }
1298    
1299 jsr166 1.81 static final class ThenCompose<T,U> extends Completion {
1300 dl 1.17 final CompletableFuture<? extends T> src;
1301     final Function<? super T, CompletableFuture<U>> fn;
1302     final CompletableFuture<U> dst;
1303 dl 1.37 final Executor executor;
1304 jsr166 1.81 ThenCompose(CompletableFuture<? extends T> src,
1305     Function<? super T, CompletableFuture<U>> fn,
1306     CompletableFuture<U> dst,
1307     Executor executor) {
1308 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1309 dl 1.37 this.executor = executor;
1310 dl 1.17 }
1311 jsr166 1.26 public final void run() {
1312 dl 1.28 final CompletableFuture<? extends T> a;
1313     final Function<? super T, CompletableFuture<U>> fn;
1314     final CompletableFuture<U> dst;
1315 dl 1.37 Object r; T t; Throwable ex; Executor e;
1316 dl 1.17 if ((dst = this.dst) != null &&
1317     (fn = this.fn) != null &&
1318     (a = this.src) != null &&
1319     (r = a.result) != null &&
1320     compareAndSet(0, 1)) {
1321     if (r instanceof AltResult) {
1322     ex = ((AltResult)r).ex;
1323     t = null;
1324     }
1325     else {
1326     ex = null;
1327 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1328     t = tr;
1329 dl 1.17 }
1330     CompletableFuture<U> c = null;
1331 dl 1.20 U u = null;
1332     boolean complete = false;
1333 dl 1.17 if (ex == null) {
1334 dl 1.37 if ((e = executor) != null)
1335     e.execute(new AsyncCompose<T,U>(t, fn, dst));
1336     else {
1337     try {
1338     if ((c = fn.apply(t)) == null)
1339     ex = new NullPointerException();
1340     } catch (Throwable rex) {
1341     ex = rex;
1342     }
1343 dl 1.17 }
1344     }
1345 dl 1.37 if (c != null) {
1346 dl 1.75 ThenCopy<U> d = null;
1347 dl 1.18 Object s;
1348     if ((s = c.result) == null) {
1349     CompletionNode p = new CompletionNode
1350 dl 1.75 (d = new ThenCopy<U>(c, dst));
1351 dl 1.18 while ((s = c.result) == null) {
1352     if (UNSAFE.compareAndSwapObject
1353     (c, COMPLETIONS, p.next = c.completions, p))
1354     break;
1355     }
1356 dl 1.17 }
1357 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1358 dl 1.20 complete = true;
1359 dl 1.18 if (s instanceof AltResult) {
1360     ex = ((AltResult)s).ex; // no rewrap
1361     u = null;
1362 dl 1.28 }
1363     else {
1364     @SuppressWarnings("unchecked") U us = (U) s;
1365     u = us;
1366     }
1367     }
1368     }
1369     if (complete || ex != null)
1370     dst.internalComplete(u, ex);
1371     if (c != null)
1372     c.helpPostComplete();
1373     }
1374     }
1375     private static final long serialVersionUID = 5232453952276885070L;
1376     }
1377    
1378     // public methods
1379    
1380     /**
1381     * Creates a new incomplete CompletableFuture.
1382     */
1383     public CompletableFuture() {
1384     }
1385    
1386     /**
1387 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1388     * by a task running in the {@link ForkJoinPool#commonPool()} with
1389     * the value obtained by calling the given Supplier.
1390 dl 1.28 *
1391     * @param supplier a function returning the value to be used
1392     * to complete the returned CompletableFuture
1393 jsr166 1.58 * @return the new CompletableFuture
1394 dl 1.28 */
1395     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1396     if (supplier == null) throw new NullPointerException();
1397     CompletableFuture<U> f = new CompletableFuture<U>();
1398     ForkJoinPool.commonPool().
1399     execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1400     return f;
1401     }
1402    
1403     /**
1404 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1405     * by a task running in the given executor with the value obtained
1406     * by calling the given Supplier.
1407 dl 1.28 *
1408     * @param supplier a function returning the value to be used
1409     * to complete the returned CompletableFuture
1410     * @param executor the executor to use for asynchronous execution
1411 jsr166 1.58 * @return the new CompletableFuture
1412 dl 1.28 */
1413     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1414     Executor executor) {
1415     if (executor == null || supplier == null)
1416     throw new NullPointerException();
1417     CompletableFuture<U> f = new CompletableFuture<U>();
1418     executor.execute(new AsyncSupply<U>(supplier, f));
1419     return f;
1420     }
1421    
1422     /**
1423 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1424     * by a task running in the {@link ForkJoinPool#commonPool()} after
1425     * it runs the given action.
1426 dl 1.28 *
1427     * @param runnable the action to run before completing the
1428     * returned CompletableFuture
1429 jsr166 1.58 * @return the new CompletableFuture
1430 dl 1.28 */
1431     public static CompletableFuture<Void> runAsync(Runnable runnable) {
1432     if (runnable == null) throw new NullPointerException();
1433     CompletableFuture<Void> f = new CompletableFuture<Void>();
1434     ForkJoinPool.commonPool().
1435     execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1436     return f;
1437     }
1438    
1439     /**
1440 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1441     * by a task running in the given executor after it runs the given
1442     * action.
1443 dl 1.28 *
1444     * @param runnable the action to run before completing the
1445     * returned CompletableFuture
1446     * @param executor the executor to use for asynchronous execution
1447 jsr166 1.58 * @return the new CompletableFuture
1448 dl 1.28 */
1449     public static CompletableFuture<Void> runAsync(Runnable runnable,
1450     Executor executor) {
1451     if (executor == null || runnable == null)
1452     throw new NullPointerException();
1453     CompletableFuture<Void> f = new CompletableFuture<Void>();
1454     executor.execute(new AsyncRun(runnable, f));
1455     return f;
1456     }
1457    
1458     /**
1459 dl 1.77 * Returns a new CompletableFuture that is already completed with
1460     * the given value.
1461     *
1462     * @param value the value
1463     * @return the completed CompletableFuture
1464     */
1465     public static <U> CompletableFuture<U> completedFuture(U value) {
1466     CompletableFuture<U> f = new CompletableFuture<U>();
1467 jsr166 1.78 f.result = (value == null) ? NIL : value;
1468 dl 1.77 return f;
1469     }
1470    
1471     /**
1472 dl 1.28 * Returns {@code true} if completed in any fashion: normally,
1473     * exceptionally, or via cancellation.
1474     *
1475     * @return {@code true} if completed
1476     */
1477     public boolean isDone() {
1478     return result != null;
1479     }
1480    
1481     /**
1482 dl 1.49 * Waits if necessary for this future to complete, and then
1483 dl 1.48 * returns its result.
1484 dl 1.28 *
1485 dl 1.48 * @return the result value
1486     * @throws CancellationException if this future was cancelled
1487     * @throws ExecutionException if this future completed exceptionally
1488 dl 1.28 * @throws InterruptedException if the current thread was interrupted
1489     * while waiting
1490     */
1491     public T get() throws InterruptedException, ExecutionException {
1492     Object r; Throwable ex, cause;
1493     if ((r = result) == null && (r = waitingGet(true)) == null)
1494     throw new InterruptedException();
1495 jsr166 1.45 if (!(r instanceof AltResult)) {
1496     @SuppressWarnings("unchecked") T tr = (T) r;
1497     return tr;
1498     }
1499     if ((ex = ((AltResult)r).ex) == null)
1500 dl 1.28 return null;
1501 jsr166 1.45 if (ex instanceof CancellationException)
1502     throw (CancellationException)ex;
1503     if ((ex instanceof CompletionException) &&
1504     (cause = ex.getCause()) != null)
1505     ex = cause;
1506     throw new ExecutionException(ex);
1507 dl 1.28 }
1508    
1509     /**
1510 dl 1.49 * Waits if necessary for at most the given time for this future
1511     * to complete, and then returns its result, if available.
1512 dl 1.28 *
1513     * @param timeout the maximum time to wait
1514     * @param unit the time unit of the timeout argument
1515 dl 1.48 * @return the result value
1516     * @throws CancellationException if this future was cancelled
1517     * @throws ExecutionException if this future completed exceptionally
1518 dl 1.28 * @throws InterruptedException if the current thread was interrupted
1519     * while waiting
1520     * @throws TimeoutException if the wait timed out
1521     */
1522     public T get(long timeout, TimeUnit unit)
1523     throws InterruptedException, ExecutionException, TimeoutException {
1524     Object r; Throwable ex, cause;
1525     long nanos = unit.toNanos(timeout);
1526     if (Thread.interrupted())
1527     throw new InterruptedException();
1528     if ((r = result) == null)
1529     r = timedAwaitDone(nanos);
1530 jsr166 1.45 if (!(r instanceof AltResult)) {
1531     @SuppressWarnings("unchecked") T tr = (T) r;
1532     return tr;
1533     }
1534     if ((ex = ((AltResult)r).ex) == null)
1535 dl 1.28 return null;
1536 jsr166 1.45 if (ex instanceof CancellationException)
1537     throw (CancellationException)ex;
1538     if ((ex instanceof CompletionException) &&
1539     (cause = ex.getCause()) != null)
1540     ex = cause;
1541     throw new ExecutionException(ex);
1542 dl 1.28 }
1543    
1544     /**
1545     * Returns the result value when complete, or throws an
1546     * (unchecked) exception if completed exceptionally. To better
1547     * conform with the use of common functional forms, if a
1548     * computation involved in the completion of this
1549     * CompletableFuture threw an exception, this method throws an
1550     * (unchecked) {@link CompletionException} with the underlying
1551     * exception as its cause.
1552     *
1553     * @return the result value
1554     * @throws CancellationException if the computation was cancelled
1555 jsr166 1.55 * @throws CompletionException if this future completed
1556     * exceptionally or a completion computation threw an exception
1557 dl 1.28 */
1558     public T join() {
1559     Object r; Throwable ex;
1560     if ((r = result) == null)
1561     r = waitingGet(false);
1562 jsr166 1.45 if (!(r instanceof AltResult)) {
1563     @SuppressWarnings("unchecked") T tr = (T) r;
1564     return tr;
1565     }
1566     if ((ex = ((AltResult)r).ex) == null)
1567 dl 1.28 return null;
1568 jsr166 1.45 if (ex instanceof CancellationException)
1569     throw (CancellationException)ex;
1570     if (ex instanceof CompletionException)
1571     throw (CompletionException)ex;
1572     throw new CompletionException(ex);
1573 dl 1.28 }
1574    
1575     /**
1576     * Returns the result value (or throws any encountered exception)
1577     * if completed, else returns the given valueIfAbsent.
1578     *
1579     * @param valueIfAbsent the value to return if not completed
1580     * @return the result value, if completed, else the given valueIfAbsent
1581     * @throws CancellationException if the computation was cancelled
1582 jsr166 1.55 * @throws CompletionException if this future completed
1583     * exceptionally or a completion computation threw an exception
1584 dl 1.28 */
1585     public T getNow(T valueIfAbsent) {
1586     Object r; Throwable ex;
1587     if ((r = result) == null)
1588     return valueIfAbsent;
1589 jsr166 1.45 if (!(r instanceof AltResult)) {
1590     @SuppressWarnings("unchecked") T tr = (T) r;
1591     return tr;
1592     }
1593     if ((ex = ((AltResult)r).ex) == null)
1594 dl 1.28 return null;
1595 jsr166 1.45 if (ex instanceof CancellationException)
1596     throw (CancellationException)ex;
1597     if (ex instanceof CompletionException)
1598     throw (CompletionException)ex;
1599     throw new CompletionException(ex);
1600 dl 1.28 }
1601    
1602     /**
1603     * If not already completed, sets the value returned by {@link
1604     * #get()} and related methods to the given value.
1605     *
1606     * @param value the result value
1607     * @return {@code true} if this invocation caused this CompletableFuture
1608     * to transition to a completed state, else {@code false}
1609     */
1610     public boolean complete(T value) {
1611     boolean triggered = result == null &&
1612     UNSAFE.compareAndSwapObject(this, RESULT, null,
1613     value == null ? NIL : value);
1614     postComplete();
1615     return triggered;
1616     }
1617    
1618     /**
1619     * If not already completed, causes invocations of {@link #get()}
1620     * and related methods to throw the given exception.
1621     *
1622     * @param ex the exception
1623     * @return {@code true} if this invocation caused this CompletableFuture
1624     * to transition to a completed state, else {@code false}
1625     */
1626     public boolean completeExceptionally(Throwable ex) {
1627     if (ex == null) throw new NullPointerException();
1628     boolean triggered = result == null &&
1629     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1630     postComplete();
1631     return triggered;
1632     }
1633    
1634     /**
1635 jsr166 1.66 * Returns a new CompletableFuture that is completed
1636     * when this CompletableFuture completes, with the result of the
1637     * given function of this CompletableFuture's result.
1638     *
1639 dl 1.71 * <p>If this CompletableFuture completes exceptionally, or the
1640 jsr166 1.73 * supplied function throws an exception, then the returned
1641     * CompletableFuture completes exceptionally with a
1642 dl 1.71 * CompletionException holding the exception as its cause.
1643 dl 1.28 *
1644     * @param fn the function to use to compute the value of
1645     * the returned CompletableFuture
1646     * @return the new CompletableFuture
1647     */
1648     public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1649     return doThenApply(fn, null);
1650     }
1651    
1652     /**
1653 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1654     * when this CompletableFuture completes, with the result of the
1655 dl 1.71 * given function of this CompletableFuture's result from a
1656 jsr166 1.66 * task running in the {@link ForkJoinPool#commonPool()}.
1657     *
1658 dl 1.71 * <p>If this CompletableFuture completes exceptionally, or the
1659 jsr166 1.73 * supplied function throws an exception, then the returned
1660     * CompletableFuture completes exceptionally with a
1661 dl 1.71 * CompletionException holding the exception as its cause.
1662 dl 1.28 *
1663     * @param fn the function to use to compute the value of
1664     * the returned CompletableFuture
1665     * @return the new CompletableFuture
1666     */
1667 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
1668     (Function<? super T,? extends U> fn) {
1669 dl 1.28 return doThenApply(fn, ForkJoinPool.commonPool());
1670 dl 1.17 }
1671    
1672 dl 1.28 /**
1673 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1674     * when this CompletableFuture completes, with the result of the
1675 dl 1.71 * given function of this CompletableFuture's result from a
1676 jsr166 1.66 * task running in the given executor.
1677     *
1678 dl 1.71 * <p>If this CompletableFuture completes exceptionally, or the
1679 jsr166 1.73 * supplied function throws an exception, then the returned
1680     * CompletableFuture completes exceptionally with a
1681 dl 1.71 * CompletionException holding the exception as its cause.
1682 dl 1.28 *
1683     * @param fn the function to use to compute the value of
1684     * the returned CompletableFuture
1685     * @param executor the executor to use for asynchronous execution
1686     * @return the new CompletableFuture
1687     */
1688 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
1689     (Function<? super T,? extends U> fn,
1690     Executor executor) {
1691 dl 1.28 if (executor == null) throw new NullPointerException();
1692     return doThenApply(fn, executor);
1693     }
1694 dl 1.1
1695 dl 1.48 private <U> CompletableFuture<U> doThenApply
1696     (Function<? super T,? extends U> fn,
1697     Executor e) {
1698 dl 1.1 if (fn == null) throw new NullPointerException();
1699     CompletableFuture<U> dst = new CompletableFuture<U>();
1700 jsr166 1.81 ThenApply<T,U> d = null;
1701 jsr166 1.2 Object r;
1702     if ((r = result) == null) {
1703 dl 1.1 CompletionNode p = new CompletionNode
1704 jsr166 1.81 (d = new ThenApply<T,U>(this, fn, dst, e));
1705 dl 1.1 while ((r = result) == null) {
1706     if (UNSAFE.compareAndSwapObject
1707     (this, COMPLETIONS, p.next = completions, p))
1708     break;
1709     }
1710     }
1711 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1712 dl 1.19 T t; Throwable ex;
1713 jsr166 1.2 if (r instanceof AltResult) {
1714 dl 1.19 ex = ((AltResult)r).ex;
1715 jsr166 1.2 t = null;
1716 dl 1.1 }
1717 dl 1.19 else {
1718     ex = null;
1719 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1720     t = tr;
1721 dl 1.19 }
1722 dl 1.20 U u = null;
1723 jsr166 1.2 if (ex == null) {
1724 dl 1.1 try {
1725 dl 1.20 if (e != null)
1726 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
1727 dl 1.1 else
1728 dl 1.20 u = fn.apply(t);
1729 dl 1.1 } catch (Throwable rex) {
1730 dl 1.19 ex = rex;
1731 dl 1.1 }
1732     }
1733 dl 1.20 if (e == null || ex != null)
1734     dst.internalComplete(u, ex);
1735 dl 1.17 }
1736 dl 1.20 helpPostComplete();
1737 dl 1.1 return dst;
1738     }
1739    
1740 dl 1.28 /**
1741 jsr166 1.66 * Returns a new CompletableFuture that is completed
1742     * when this CompletableFuture completes, after performing the given
1743     * action with this CompletableFuture's result.
1744     *
1745 dl 1.71 * <p>If this CompletableFuture completes exceptionally, or the
1746 jsr166 1.73 * supplied action throws an exception, then the returned
1747     * CompletableFuture completes exceptionally with a
1748 dl 1.71 * CompletionException holding the exception as its cause.
1749 dl 1.28 *
1750     * @param block the action to perform before completing the
1751     * returned CompletableFuture
1752     * @return the new CompletableFuture
1753     */
1754 dl 1.34 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1755 dl 1.28 return doThenAccept(block, null);
1756     }
1757    
1758     /**
1759 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1760     * when this CompletableFuture completes, after performing the given
1761     * action with this CompletableFuture's result from a task running
1762     * in the {@link ForkJoinPool#commonPool()}.
1763     *
1764 dl 1.71 * <p>If this CompletableFuture completes exceptionally, or the
1765 jsr166 1.73 * supplied action throws an exception, then the returned
1766     * CompletableFuture completes exceptionally with a
1767 dl 1.71 * CompletionException holding the exception as its cause.
1768 dl 1.28 *
1769     * @param block the action to perform before completing the
1770     * returned CompletableFuture
1771     * @return the new CompletableFuture
1772     */
1773 dl 1.34 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1774 dl 1.28 return doThenAccept(block, ForkJoinPool.commonPool());
1775     }
1776    
1777     /**
1778 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1779     * when this CompletableFuture completes, after performing the given
1780     * action with this CompletableFuture's result from a task running
1781     * in the given executor.
1782     *
1783 dl 1.71 * <p>If this CompletableFuture completes exceptionally, or the
1784 jsr166 1.73 * supplied action throws an exception, then the returned
1785     * CompletableFuture completes exceptionally with a
1786 dl 1.71 * CompletionException holding the exception as its cause.
1787 dl 1.28 *
1788     * @param block the action to perform before completing the
1789     * returned CompletableFuture
1790     * @param executor the executor to use for asynchronous execution
1791     * @return the new CompletableFuture
1792     */
1793 dl 1.34 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1794 dl 1.28 Executor executor) {
1795     if (executor == null) throw new NullPointerException();
1796     return doThenAccept(block, executor);
1797     }
1798    
1799 dl 1.34 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1800 dl 1.28 Executor e) {
1801 dl 1.7 if (fn == null) throw new NullPointerException();
1802     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1803 jsr166 1.81 ThenAccept<T> d = null;
1804 dl 1.7 Object r;
1805     if ((r = result) == null) {
1806     CompletionNode p = new CompletionNode
1807 jsr166 1.81 (d = new ThenAccept<T>(this, fn, dst, e));
1808 dl 1.7 while ((r = result) == null) {
1809     if (UNSAFE.compareAndSwapObject
1810     (this, COMPLETIONS, p.next = completions, p))
1811     break;
1812     }
1813     }
1814     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1815 dl 1.19 T t; Throwable ex;
1816 dl 1.7 if (r instanceof AltResult) {
1817 dl 1.19 ex = ((AltResult)r).ex;
1818 dl 1.7 t = null;
1819     }
1820 dl 1.19 else {
1821     ex = null;
1822 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1823     t = tr;
1824 dl 1.19 }
1825 dl 1.7 if (ex == null) {
1826     try {
1827 dl 1.20 if (e != null)
1828 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
1829 dl 1.20 else
1830 dl 1.7 fn.accept(t);
1831     } catch (Throwable rex) {
1832 dl 1.19 ex = rex;
1833 dl 1.7 }
1834     }
1835 dl 1.20 if (e == null || ex != null)
1836     dst.internalComplete(null, ex);
1837 dl 1.17 }
1838 dl 1.20 helpPostComplete();
1839 dl 1.7 return dst;
1840     }
1841    
1842 dl 1.28 /**
1843 jsr166 1.66 * Returns a new CompletableFuture that is completed
1844     * when this CompletableFuture completes, after performing the given
1845     * action.
1846     *
1847 dl 1.71 * <p>If this CompletableFuture completes exceptionally, or the
1848 jsr166 1.73 * supplied action throws an exception, then the returned
1849     * CompletableFuture completes exceptionally with a
1850 dl 1.71 * CompletionException holding the exception as its cause.
1851 dl 1.28 *
1852     * @param action the action to perform before completing the
1853     * returned CompletableFuture
1854     * @return the new CompletableFuture
1855     */
1856     public CompletableFuture<Void> thenRun(Runnable action) {
1857     return doThenRun(action, null);
1858     }
1859    
1860     /**
1861 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1862     * when this CompletableFuture completes, after performing the given
1863     * action from a task running in the {@link ForkJoinPool#commonPool()}.
1864     *
1865 dl 1.71 * <p>If this CompletableFuture completes exceptionally, or the
1866 jsr166 1.73 * supplied action throws an exception, then the returned
1867     * CompletableFuture completes exceptionally with a
1868 dl 1.71 * CompletionException holding the exception as its cause.
1869 dl 1.28 *
1870     * @param action the action to perform before completing the
1871     * returned CompletableFuture
1872     * @return the new CompletableFuture
1873     */
1874     public CompletableFuture<Void> thenRunAsync(Runnable action) {
1875     return doThenRun(action, ForkJoinPool.commonPool());
1876     }
1877    
1878     /**
1879 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1880     * when this CompletableFuture completes, after performing the given
1881     * action from a task running in the given executor.
1882     *
1883 dl 1.71 * <p>If this CompletableFuture completes exceptionally, or the
1884 jsr166 1.73 * supplied action throws an exception, then the returned
1885     * CompletableFuture completes exceptionally with a
1886 dl 1.71 * CompletionException holding the exception as its cause.
1887 dl 1.28 *
1888     * @param action the action to perform before completing the
1889     * returned CompletableFuture
1890     * @param executor the executor to use for asynchronous execution
1891     * @return the new CompletableFuture
1892     */
1893     public CompletableFuture<Void> thenRunAsync(Runnable action,
1894     Executor executor) {
1895     if (executor == null) throw new NullPointerException();
1896     return doThenRun(action, executor);
1897     }
1898    
1899     private CompletableFuture<Void> doThenRun(Runnable action,
1900     Executor e) {
1901 dl 1.1 if (action == null) throw new NullPointerException();
1902     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1903 jsr166 1.82 ThenRun d = null;
1904 jsr166 1.2 Object r;
1905     if ((r = result) == null) {
1906 dl 1.1 CompletionNode p = new CompletionNode
1907 jsr166 1.82 (d = new ThenRun(this, action, dst, e));
1908 dl 1.1 while ((r = result) == null) {
1909     if (UNSAFE.compareAndSwapObject
1910     (this, COMPLETIONS, p.next = completions, p))
1911     break;
1912     }
1913     }
1914 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1915 dl 1.19 Throwable ex;
1916     if (r instanceof AltResult)
1917 dl 1.28 ex = ((AltResult)r).ex;
1918     else
1919     ex = null;
1920     if (ex == null) {
1921     try {
1922     if (e != null)
1923     e.execute(new AsyncRun(action, dst));
1924     else
1925     action.run();
1926     } catch (Throwable rex) {
1927     ex = rex;
1928     }
1929     }
1930     if (e == null || ex != null)
1931     dst.internalComplete(null, ex);
1932     }
1933     helpPostComplete();
1934     return dst;
1935     }
1936    
1937     /**
1938 jsr166 1.66 * Returns a new CompletableFuture that is completed
1939     * when both this and the other given CompletableFuture complete,
1940     * with the result of the given function of the results of the two
1941     * CompletableFutures.
1942     *
1943 dl 1.71 * <p>If this and/or the other CompletableFuture complete
1944 jsr166 1.73 * exceptionally, or the supplied function throws an exception,
1945     * then the returned CompletableFuture completes exceptionally
1946     * with a CompletionException holding the exception as its cause.
1947 dl 1.28 *
1948     * @param other the other CompletableFuture
1949     * @param fn the function to use to compute the value of
1950     * the returned CompletableFuture
1951     * @return the new CompletableFuture
1952     */
1953 dl 1.48 public <U,V> CompletableFuture<V> thenCombine
1954     (CompletableFuture<? extends U> other,
1955     BiFunction<? super T,? super U,? extends V> fn) {
1956 jsr166 1.81 return doThenCombine(other, fn, null);
1957 dl 1.28 }
1958    
1959     /**
1960 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1961     * when both this and the other given CompletableFuture complete,
1962     * with the result of the given function of the results of the two
1963 dl 1.71 * CompletableFutures from a task running in the
1964 jsr166 1.66 * {@link ForkJoinPool#commonPool()}.
1965     *
1966 dl 1.71 * <p>If this and/or the other CompletableFuture complete
1967 jsr166 1.73 * exceptionally, or the supplied function throws an exception,
1968     * then the returned CompletableFuture completes exceptionally
1969     * with a CompletionException holding the exception as its cause.
1970 dl 1.28 *
1971     * @param other the other CompletableFuture
1972     * @param fn the function to use to compute the value of
1973     * the returned CompletableFuture
1974     * @return the new CompletableFuture
1975     */
1976 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
1977     (CompletableFuture<? extends U> other,
1978     BiFunction<? super T,? super U,? extends V> fn) {
1979 jsr166 1.81 return doThenCombine(other, fn, ForkJoinPool.commonPool());
1980 dl 1.28 }
1981    
1982     /**
1983 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1984     * when both this and the other given CompletableFuture complete,
1985     * with the result of the given function of the results of the two
1986 dl 1.71 * CompletableFutures from a task running in the given executor.
1987 jsr166 1.66 *
1988 dl 1.71 * <p>If this and/or the other CompletableFuture complete
1989 jsr166 1.73 * exceptionally, or the supplied function throws an exception,
1990     * then the returned CompletableFuture completes exceptionally
1991     * with a CompletionException holding the exception as its cause.
1992 dl 1.28 *
1993     * @param other the other CompletableFuture
1994     * @param fn the function to use to compute the value of
1995     * the returned CompletableFuture
1996     * @param executor the executor to use for asynchronous execution
1997     * @return the new CompletableFuture
1998     */
1999 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2000     (CompletableFuture<? extends U> other,
2001     BiFunction<? super T,? super U,? extends V> fn,
2002     Executor executor) {
2003 dl 1.28 if (executor == null) throw new NullPointerException();
2004 jsr166 1.81 return doThenCombine(other, fn, executor);
2005 dl 1.1 }
2006    
2007 jsr166 1.81 private <U,V> CompletableFuture<V> doThenCombine
2008 dl 1.48 (CompletableFuture<? extends U> other,
2009     BiFunction<? super T,? super U,? extends V> fn,
2010     Executor e) {
2011 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2012 jsr166 1.2 CompletableFuture<V> dst = new CompletableFuture<V>();
2013 jsr166 1.81 ThenCombine<T,U,V> d = null;
2014 jsr166 1.2 Object r, s = null;
2015     if ((r = result) == null || (s = other.result) == null) {
2016 jsr166 1.81 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
2017 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2018     while ((r == null && (r = result) == null) ||
2019     (s == null && (s = other.result) == null)) {
2020     if (q != null) {
2021     if (s != null ||
2022     UNSAFE.compareAndSwapObject
2023     (other, COMPLETIONS, q.next = other.completions, q))
2024     break;
2025     }
2026     else if (r != null ||
2027     UNSAFE.compareAndSwapObject
2028     (this, COMPLETIONS, p.next = completions, p)) {
2029     if (s != null)
2030     break;
2031     q = new CompletionNode(d);
2032     }
2033     }
2034     }
2035 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2036 dl 1.19 T t; U u; Throwable ex;
2037 jsr166 1.2 if (r instanceof AltResult) {
2038 dl 1.19 ex = ((AltResult)r).ex;
2039 jsr166 1.2 t = null;
2040 dl 1.1 }
2041 dl 1.19 else {
2042     ex = null;
2043 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2044     t = tr;
2045 dl 1.19 }
2046 dl 1.1 if (ex != null)
2047     u = null;
2048     else if (s instanceof AltResult) {
2049 dl 1.19 ex = ((AltResult)s).ex;
2050 dl 1.1 u = null;
2051     }
2052 dl 1.28 else {
2053     @SuppressWarnings("unchecked") U us = (U) s;
2054     u = us;
2055     }
2056 dl 1.20 V v = null;
2057 jsr166 1.2 if (ex == null) {
2058 dl 1.1 try {
2059 dl 1.20 if (e != null)
2060 jsr166 1.81 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
2061 dl 1.1 else
2062 dl 1.20 v = fn.apply(t, u);
2063 dl 1.1 } catch (Throwable rex) {
2064 dl 1.19 ex = rex;
2065 dl 1.1 }
2066     }
2067 dl 1.20 if (e == null || ex != null)
2068     dst.internalComplete(v, ex);
2069 jsr166 1.2 }
2070 dl 1.20 helpPostComplete();
2071     other.helpPostComplete();
2072 jsr166 1.2 return dst;
2073 dl 1.1 }
2074    
2075 dl 1.28 /**
2076 jsr166 1.66 * Returns a new CompletableFuture that is completed
2077     * when both this and the other given CompletableFuture complete,
2078     * after performing the given action with the results of the two
2079     * CompletableFutures.
2080     *
2081 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2082 jsr166 1.73 * exceptionally, or the supplied action throws an exception,
2083     * then the returned CompletableFuture completes exceptionally
2084     * with a CompletionException holding the exception as its cause.
2085 dl 1.28 *
2086     * @param other the other CompletableFuture
2087     * @param block the action to perform before completing the
2088     * returned CompletableFuture
2089     * @return the new CompletableFuture
2090     */
2091 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBoth
2092     (CompletableFuture<? extends U> other,
2093     BiConsumer<? super T, ? super U> block) {
2094 jsr166 1.81 return doThenAcceptBoth(other, block, null);
2095 dl 1.28 }
2096    
2097     /**
2098 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2099     * when both this and the other given CompletableFuture complete,
2100     * after performing the given action with the results of the two
2101     * CompletableFutures from a task running in the {@link
2102     * ForkJoinPool#commonPool()}.
2103     *
2104 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2105 jsr166 1.73 * exceptionally, or the supplied action throws an exception,
2106     * then the returned CompletableFuture completes exceptionally
2107     * with a CompletionException holding the exception as its cause.
2108 dl 1.28 *
2109     * @param other the other CompletableFuture
2110     * @param block the action to perform before completing the
2111     * returned CompletableFuture
2112     * @return the new CompletableFuture
2113     */
2114 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2115     (CompletableFuture<? extends U> other,
2116     BiConsumer<? super T, ? super U> block) {
2117 jsr166 1.81 return doThenAcceptBoth(other, block, ForkJoinPool.commonPool());
2118 dl 1.28 }
2119    
2120     /**
2121 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2122     * when both this and the other given CompletableFuture complete,
2123     * after performing the given action with the results of the two
2124     * CompletableFutures from a task running in the given executor.
2125     *
2126 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2127 jsr166 1.73 * exceptionally, or the supplied action throws an exception,
2128     * then the returned CompletableFuture completes exceptionally
2129     * with a CompletionException holding the exception as its cause.
2130 dl 1.28 *
2131     * @param other the other CompletableFuture
2132     * @param block the action to perform before completing the
2133     * returned CompletableFuture
2134     * @param executor the executor to use for asynchronous execution
2135     * @return the new CompletableFuture
2136     */
2137 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2138     (CompletableFuture<? extends U> other,
2139     BiConsumer<? super T, ? super U> block,
2140     Executor executor) {
2141 dl 1.28 if (executor == null) throw new NullPointerException();
2142 jsr166 1.81 return doThenAcceptBoth(other, block, executor);
2143 dl 1.28 }
2144    
2145 jsr166 1.81 private <U> CompletableFuture<Void> doThenAcceptBoth
2146 dl 1.48 (CompletableFuture<? extends U> other,
2147     BiConsumer<? super T,? super U> fn,
2148     Executor e) {
2149 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2150     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2151 jsr166 1.81 ThenAcceptBoth<T,U> d = null;
2152 dl 1.7 Object r, s = null;
2153     if ((r = result) == null || (s = other.result) == null) {
2154 jsr166 1.81 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
2155 dl 1.7 CompletionNode q = null, p = new CompletionNode(d);
2156     while ((r == null && (r = result) == null) ||
2157     (s == null && (s = other.result) == null)) {
2158     if (q != null) {
2159     if (s != null ||
2160     UNSAFE.compareAndSwapObject
2161     (other, COMPLETIONS, q.next = other.completions, q))
2162     break;
2163     }
2164     else if (r != null ||
2165     UNSAFE.compareAndSwapObject
2166     (this, COMPLETIONS, p.next = completions, p)) {
2167     if (s != null)
2168     break;
2169     q = new CompletionNode(d);
2170     }
2171     }
2172     }
2173     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2174 dl 1.19 T t; U u; Throwable ex;
2175 dl 1.7 if (r instanceof AltResult) {
2176 dl 1.19 ex = ((AltResult)r).ex;
2177 dl 1.7 t = null;
2178     }
2179 dl 1.19 else {
2180     ex = null;
2181 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2182     t = tr;
2183 dl 1.19 }
2184 dl 1.7 if (ex != null)
2185     u = null;
2186     else if (s instanceof AltResult) {
2187 dl 1.19 ex = ((AltResult)s).ex;
2188 dl 1.7 u = null;
2189     }
2190 dl 1.28 else {
2191     @SuppressWarnings("unchecked") U us = (U) s;
2192     u = us;
2193     }
2194 dl 1.7 if (ex == null) {
2195     try {
2196 dl 1.20 if (e != null)
2197 jsr166 1.81 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
2198 dl 1.20 else
2199 dl 1.7 fn.accept(t, u);
2200     } catch (Throwable rex) {
2201 dl 1.19 ex = rex;
2202 dl 1.7 }
2203     }
2204 dl 1.20 if (e == null || ex != null)
2205     dst.internalComplete(null, ex);
2206 dl 1.7 }
2207 dl 1.20 helpPostComplete();
2208     other.helpPostComplete();
2209 dl 1.7 return dst;
2210     }
2211    
2212 dl 1.28 /**
2213 jsr166 1.66 * Returns a new CompletableFuture that is completed
2214     * when both this and the other given CompletableFuture complete,
2215     * after performing the given action.
2216     *
2217 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2218 jsr166 1.73 * exceptionally, or the supplied action throws an exception,
2219     * then the returned CompletableFuture completes exceptionally
2220     * with a CompletionException holding the exception as its cause.
2221 dl 1.28 *
2222     * @param other the other CompletableFuture
2223     * @param action the action to perform before completing the
2224     * returned CompletableFuture
2225     * @return the new CompletableFuture
2226     */
2227     public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2228     Runnable action) {
2229 jsr166 1.81 return doRunAfterBoth(other, action, null);
2230 dl 1.28 }
2231    
2232     /**
2233 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2234     * when both this and the other given CompletableFuture complete,
2235     * after performing the given action from a task running in the
2236     * {@link ForkJoinPool#commonPool()}.
2237     *
2238 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2239 jsr166 1.73 * exceptionally, or the supplied action throws an exception,
2240     * then the returned CompletableFuture completes exceptionally
2241     * with a CompletionException holding the exception as its cause.
2242 dl 1.28 *
2243     * @param other the other CompletableFuture
2244     * @param action the action to perform before completing the
2245     * returned CompletableFuture
2246     * @return the new CompletableFuture
2247     */
2248     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2249     Runnable action) {
2250 jsr166 1.81 return doRunAfterBoth(other, action, ForkJoinPool.commonPool());
2251 dl 1.28 }
2252    
2253     /**
2254 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2255     * when both this and the other given CompletableFuture complete,
2256     * after performing the given action from a task running in the
2257     * given executor.
2258     *
2259 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2260 jsr166 1.73 * exceptionally, or the supplied action throws an exception,
2261     * then the returned CompletableFuture completes exceptionally
2262     * with a CompletionException holding the exception as its cause.
2263 dl 1.28 *
2264     * @param other the other CompletableFuture
2265     * @param action the action to perform before completing the
2266     * returned CompletableFuture
2267     * @param executor the executor to use for asynchronous execution
2268     * @return the new CompletableFuture
2269     */
2270     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2271     Runnable action,
2272     Executor executor) {
2273     if (executor == null) throw new NullPointerException();
2274 jsr166 1.81 return doRunAfterBoth(other, action, executor);
2275 dl 1.28 }
2276    
2277 jsr166 1.81 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
2278     Runnable action,
2279     Executor e) {
2280 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2281 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2282 jsr166 1.82 RunAfterBoth d = null;
2283 jsr166 1.2 Object r, s = null;
2284     if ((r = result) == null || (s = other.result) == null) {
2285 jsr166 1.82 d = new RunAfterBoth(this, other, action, dst, e);
2286 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2287     while ((r == null && (r = result) == null) ||
2288     (s == null && (s = other.result) == null)) {
2289     if (q != null) {
2290     if (s != null ||
2291     UNSAFE.compareAndSwapObject
2292     (other, COMPLETIONS, q.next = other.completions, q))
2293     break;
2294     }
2295     else if (r != null ||
2296     UNSAFE.compareAndSwapObject
2297     (this, COMPLETIONS, p.next = completions, p)) {
2298     if (s != null)
2299     break;
2300     q = new CompletionNode(d);
2301     }
2302     }
2303     }
2304 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2305 dl 1.19 Throwable ex;
2306     if (r instanceof AltResult)
2307     ex = ((AltResult)r).ex;
2308     else
2309     ex = null;
2310     if (ex == null && (s instanceof AltResult))
2311     ex = ((AltResult)s).ex;
2312     if (ex == null) {
2313 dl 1.1 try {
2314 dl 1.20 if (e != null)
2315 dl 1.28 e.execute(new AsyncRun(action, dst));
2316 dl 1.20 else
2317 dl 1.1 action.run();
2318     } catch (Throwable rex) {
2319 dl 1.19 ex = rex;
2320 dl 1.1 }
2321     }
2322 dl 1.20 if (e == null || ex != null)
2323     dst.internalComplete(null, ex);
2324 jsr166 1.2 }
2325 dl 1.20 helpPostComplete();
2326     other.helpPostComplete();
2327 jsr166 1.2 return dst;
2328 dl 1.1 }
2329    
2330 dl 1.28 /**
2331 jsr166 1.66 * Returns a new CompletableFuture that is completed
2332     * when either this or the other given CompletableFuture completes,
2333     * with the result of the given function of either this or the other
2334     * CompletableFuture's result.
2335     *
2336 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2337 jsr166 1.74 * exceptionally, then the returned CompletableFuture may also do so,
2338     * with a CompletionException holding one of these exceptions as its
2339     * cause. No guarantees are made about which result or exception is
2340     * used in the returned CompletableFuture. If the supplied function
2341     * throws an exception, then the returned CompletableFuture completes
2342     * exceptionally with a CompletionException holding the exception as
2343     * its cause.
2344 dl 1.28 *
2345     * @param other the other CompletableFuture
2346     * @param fn the function to use to compute the value of
2347     * the returned CompletableFuture
2348     * @return the new CompletableFuture
2349     */
2350 dl 1.48 public <U> CompletableFuture<U> applyToEither
2351     (CompletableFuture<? extends T> other,
2352     Function<? super T, U> fn) {
2353 jsr166 1.81 return doApplyToEither(other, fn, null);
2354 dl 1.28 }
2355    
2356     /**
2357 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2358     * when either this or the other given CompletableFuture completes,
2359     * with the result of the given function of either this or the other
2360 dl 1.71 * CompletableFuture's result from a task running in the
2361 jsr166 1.66 * {@link ForkJoinPool#commonPool()}.
2362     *
2363 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2364 jsr166 1.74 * exceptionally, then the returned CompletableFuture may also do so,
2365     * with a CompletionException holding one of these exceptions as its
2366     * cause. No guarantees are made about which result or exception is
2367     * used in the returned CompletableFuture. If the supplied function
2368     * throws an exception, then the returned CompletableFuture completes
2369     * exceptionally with a CompletionException holding the exception as
2370     * its cause.
2371 dl 1.28 *
2372     * @param other the other CompletableFuture
2373     * @param fn the function to use to compute the value of
2374     * the returned CompletableFuture
2375     * @return the new CompletableFuture
2376     */
2377 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2378     (CompletableFuture<? extends T> other,
2379     Function<? super T, U> fn) {
2380 jsr166 1.81 return doApplyToEither(other, fn, ForkJoinPool.commonPool());
2381 dl 1.28 }
2382    
2383     /**
2384 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2385     * when either this or the other given CompletableFuture completes,
2386     * with the result of the given function of either this or the other
2387 dl 1.71 * CompletableFuture's result from a task running in the
2388 jsr166 1.66 * given executor.
2389     *
2390 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2391 jsr166 1.74 * exceptionally, then the returned CompletableFuture may also do so,
2392     * with a CompletionException holding one of these exceptions as its
2393     * cause. No guarantees are made about which result or exception is
2394     * used in the returned CompletableFuture. If the supplied function
2395     * throws an exception, then the returned CompletableFuture completes
2396     * exceptionally with a CompletionException holding the exception as
2397     * its cause.
2398 dl 1.28 *
2399     * @param other the other CompletableFuture
2400     * @param fn the function to use to compute the value of
2401     * the returned CompletableFuture
2402     * @param executor the executor to use for asynchronous execution
2403     * @return the new CompletableFuture
2404     */
2405 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2406     (CompletableFuture<? extends T> other,
2407     Function<? super T, U> fn,
2408     Executor executor) {
2409 dl 1.28 if (executor == null) throw new NullPointerException();
2410 jsr166 1.81 return doApplyToEither(other, fn, executor);
2411 dl 1.28 }
2412    
2413 jsr166 1.81 private <U> CompletableFuture<U> doApplyToEither
2414 dl 1.48 (CompletableFuture<? extends T> other,
2415     Function<? super T, U> fn,
2416     Executor e) {
2417 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2418 jsr166 1.2 CompletableFuture<U> dst = new CompletableFuture<U>();
2419 jsr166 1.81 ApplyToEither<T,U> d = null;
2420 jsr166 1.2 Object r;
2421     if ((r = result) == null && (r = other.result) == null) {
2422 jsr166 1.81 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
2423 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2424     while ((r = result) == null && (r = other.result) == null) {
2425     if (q != null) {
2426     if (UNSAFE.compareAndSwapObject
2427     (other, COMPLETIONS, q.next = other.completions, q))
2428     break;
2429     }
2430     else if (UNSAFE.compareAndSwapObject
2431     (this, COMPLETIONS, p.next = completions, p))
2432     q = new CompletionNode(d);
2433     }
2434 jsr166 1.2 }
2435     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2436 dl 1.19 T t; Throwable ex;
2437 jsr166 1.2 if (r instanceof AltResult) {
2438 dl 1.19 ex = ((AltResult)r).ex;
2439 jsr166 1.2 t = null;
2440 dl 1.1 }
2441 dl 1.19 else {
2442     ex = null;
2443 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2444     t = tr;
2445 dl 1.19 }
2446 dl 1.20 U u = null;
2447 jsr166 1.2 if (ex == null) {
2448 dl 1.1 try {
2449 dl 1.20 if (e != null)
2450 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
2451 dl 1.1 else
2452 dl 1.20 u = fn.apply(t);
2453 dl 1.1 } catch (Throwable rex) {
2454 dl 1.19 ex = rex;
2455 dl 1.1 }
2456     }
2457 dl 1.20 if (e == null || ex != null)
2458     dst.internalComplete(u, ex);
2459 jsr166 1.2 }
2460 dl 1.20 helpPostComplete();
2461     other.helpPostComplete();
2462 jsr166 1.2 return dst;
2463 dl 1.1 }
2464    
2465 dl 1.28 /**
2466 jsr166 1.66 * Returns a new CompletableFuture that is completed
2467     * when either this or the other given CompletableFuture completes,
2468     * after performing the given action with the result of either this
2469     * or the other CompletableFuture's result.
2470     *
2471 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2472 jsr166 1.74 * exceptionally, then the returned CompletableFuture may also do so,
2473     * with a CompletionException holding one of these exceptions as its
2474     * cause. No guarantees are made about which result or exception is
2475     * used in the returned CompletableFuture. If the supplied action
2476     * throws an exception, then the returned CompletableFuture completes
2477     * exceptionally with a CompletionException holding the exception as
2478     * its cause.
2479 dl 1.28 *
2480     * @param other the other CompletableFuture
2481     * @param block the action to perform before completing the
2482     * returned CompletableFuture
2483     * @return the new CompletableFuture
2484     */
2485 dl 1.48 public CompletableFuture<Void> acceptEither
2486     (CompletableFuture<? extends T> other,
2487     Consumer<? super T> block) {
2488 jsr166 1.81 return doAcceptEither(other, block, null);
2489 dl 1.28 }
2490    
2491     /**
2492 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2493     * when either this or the other given CompletableFuture completes,
2494     * after performing the given action with the result of either this
2495     * or the other CompletableFuture's result from a task running in
2496     * the {@link ForkJoinPool#commonPool()}.
2497     *
2498 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2499 jsr166 1.74 * exceptionally, then the returned CompletableFuture may also do so,
2500     * with a CompletionException holding one of these exceptions as its
2501     * cause. No guarantees are made about which result or exception is
2502     * used in the returned CompletableFuture. If the supplied action
2503     * throws an exception, then the returned CompletableFuture completes
2504     * exceptionally with a CompletionException holding the exception as
2505     * its cause.
2506 dl 1.28 *
2507     * @param other the other CompletableFuture
2508     * @param block the action to perform before completing the
2509     * returned CompletableFuture
2510     * @return the new CompletableFuture
2511     */
2512 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2513     (CompletableFuture<? extends T> other,
2514     Consumer<? super T> block) {
2515 jsr166 1.81 return doAcceptEither(other, block, ForkJoinPool.commonPool());
2516 dl 1.28 }
2517    
2518     /**
2519 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2520     * when either this or the other given CompletableFuture completes,
2521     * after performing the given action with the result of either this
2522     * or the other CompletableFuture's result from a task running in
2523     * the given executor.
2524     *
2525 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2526 jsr166 1.74 * exceptionally, then the returned CompletableFuture may also do so,
2527     * with a CompletionException holding one of these exceptions as its
2528     * cause. No guarantees are made about which result or exception is
2529     * used in the returned CompletableFuture. If the supplied action
2530     * throws an exception, then the returned CompletableFuture completes
2531     * exceptionally with a CompletionException holding the exception as
2532     * its cause.
2533 dl 1.28 *
2534     * @param other the other CompletableFuture
2535     * @param block the action to perform before completing the
2536     * returned CompletableFuture
2537     * @param executor the executor to use for asynchronous execution
2538     * @return the new CompletableFuture
2539     */
2540 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2541     (CompletableFuture<? extends T> other,
2542     Consumer<? super T> block,
2543     Executor executor) {
2544 dl 1.28 if (executor == null) throw new NullPointerException();
2545 jsr166 1.81 return doAcceptEither(other, block, executor);
2546 dl 1.28 }
2547    
2548 jsr166 1.81 private CompletableFuture<Void> doAcceptEither
2549 dl 1.48 (CompletableFuture<? extends T> other,
2550     Consumer<? super T> fn,
2551     Executor e) {
2552 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2553     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2554 jsr166 1.81 AcceptEither<T> d = null;
2555 dl 1.7 Object r;
2556     if ((r = result) == null && (r = other.result) == null) {
2557 jsr166 1.81 d = new AcceptEither<T>(this, other, fn, dst, e);
2558 dl 1.7 CompletionNode q = null, p = new CompletionNode(d);
2559     while ((r = result) == null && (r = other.result) == null) {
2560     if (q != null) {
2561     if (UNSAFE.compareAndSwapObject
2562     (other, COMPLETIONS, q.next = other.completions, q))
2563     break;
2564     }
2565     else if (UNSAFE.compareAndSwapObject
2566     (this, COMPLETIONS, p.next = completions, p))
2567     q = new CompletionNode(d);
2568     }
2569     }
2570     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2571 dl 1.19 T t; Throwable ex;
2572 dl 1.7 if (r instanceof AltResult) {
2573 dl 1.19 ex = ((AltResult)r).ex;
2574 dl 1.7 t = null;
2575     }
2576 dl 1.19 else {
2577     ex = null;
2578 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2579     t = tr;
2580 dl 1.19 }
2581 dl 1.7 if (ex == null) {
2582     try {
2583 dl 1.20 if (e != null)
2584 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
2585 dl 1.20 else
2586 dl 1.7 fn.accept(t);
2587     } catch (Throwable rex) {
2588 dl 1.19 ex = rex;
2589 dl 1.7 }
2590     }
2591 dl 1.20 if (e == null || ex != null)
2592     dst.internalComplete(null, ex);
2593 dl 1.7 }
2594 dl 1.20 helpPostComplete();
2595     other.helpPostComplete();
2596 dl 1.7 return dst;
2597     }
2598    
2599 dl 1.28 /**
2600 jsr166 1.60 * Returns a new CompletableFuture that is completed
2601 jsr166 1.66 * when either this or the other given CompletableFuture completes,
2602     * after performing the given action.
2603     *
2604 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2605 jsr166 1.74 * exceptionally, then the returned CompletableFuture may also do so,
2606     * with a CompletionException holding one of these exceptions as its
2607     * cause. No guarantees are made about which result or exception is
2608     * used in the returned CompletableFuture. If the supplied action
2609     * throws an exception, then the returned CompletableFuture completes
2610     * exceptionally with a CompletionException holding the exception as
2611     * its cause.
2612 dl 1.28 *
2613     * @param other the other CompletableFuture
2614     * @param action the action to perform before completing the
2615     * returned CompletableFuture
2616     * @return the new CompletableFuture
2617     */
2618     public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2619     Runnable action) {
2620 jsr166 1.81 return doRunAfterEither(other, action, null);
2621 dl 1.28 }
2622    
2623     /**
2624 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2625     * when either this or the other given CompletableFuture completes,
2626     * after performing the given action from a task running in the
2627     * {@link ForkJoinPool#commonPool()}.
2628     *
2629 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2630 jsr166 1.74 * exceptionally, then the returned CompletableFuture may also do so,
2631     * with a CompletionException holding one of these exceptions as its
2632     * cause. No guarantees are made about which result or exception is
2633     * used in the returned CompletableFuture. If the supplied action
2634     * throws an exception, then the returned CompletableFuture completes
2635     * exceptionally with a CompletionException holding the exception as
2636     * its cause.
2637 dl 1.28 *
2638     * @param other the other CompletableFuture
2639     * @param action the action to perform before completing the
2640     * returned CompletableFuture
2641     * @return the new CompletableFuture
2642     */
2643 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2644     (CompletableFuture<?> other,
2645     Runnable action) {
2646 jsr166 1.81 return doRunAfterEither(other, action, ForkJoinPool.commonPool());
2647 dl 1.28 }
2648    
2649     /**
2650 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2651     * when either this or the other given CompletableFuture completes,
2652     * after performing the given action from a task running in the
2653     * given executor.
2654     *
2655 dl 1.71 * <p>If this and/or the other CompletableFuture complete
2656 jsr166 1.74 * exceptionally, then the returned CompletableFuture may also do so,
2657     * with a CompletionException holding one of these exceptions as its
2658     * cause. No guarantees are made about which result or exception is
2659     * used in the returned CompletableFuture. If the supplied action
2660     * throws an exception, then the returned CompletableFuture completes
2661     * exceptionally with a CompletionException holding the exception as
2662     * its cause.
2663 dl 1.28 *
2664     * @param other the other CompletableFuture
2665     * @param action the action to perform before completing the
2666     * returned CompletableFuture
2667     * @param executor the executor to use for asynchronous execution
2668     * @return the new CompletableFuture
2669     */
2670 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2671     (CompletableFuture<?> other,
2672     Runnable action,
2673     Executor executor) {
2674 dl 1.28 if (executor == null) throw new NullPointerException();
2675 jsr166 1.81 return doRunAfterEither(other, action, executor);
2676 dl 1.28 }
2677    
2678 jsr166 1.81 private CompletableFuture<Void> doRunAfterEither
2679     (CompletableFuture<?> other,
2680     Runnable action,
2681     Executor e) {
2682 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2683 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2684 jsr166 1.82 RunAfterEither d = null;
2685 jsr166 1.2 Object r;
2686     if ((r = result) == null && (r = other.result) == null) {
2687 jsr166 1.82 d = new RunAfterEither(this, other, action, dst, e);
2688 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2689     while ((r = result) == null && (r = other.result) == null) {
2690     if (q != null) {
2691     if (UNSAFE.compareAndSwapObject
2692     (other, COMPLETIONS, q.next = other.completions, q))
2693     break;
2694     }
2695     else if (UNSAFE.compareAndSwapObject
2696     (this, COMPLETIONS, p.next = completions, p))
2697     q = new CompletionNode(d);
2698     }
2699 jsr166 1.2 }
2700     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2701 dl 1.19 Throwable ex;
2702     if (r instanceof AltResult)
2703     ex = ((AltResult)r).ex;
2704     else
2705     ex = null;
2706     if (ex == null) {
2707 dl 1.1 try {
2708 dl 1.20 if (e != null)
2709 dl 1.28 e.execute(new AsyncRun(action, dst));
2710 dl 1.20 else
2711 dl 1.1 action.run();
2712     } catch (Throwable rex) {
2713 dl 1.19 ex = rex;
2714 dl 1.1 }
2715     }
2716 dl 1.20 if (e == null || ex != null)
2717     dst.internalComplete(null, ex);
2718 jsr166 1.2 }
2719 dl 1.20 helpPostComplete();
2720     other.helpPostComplete();
2721 jsr166 1.2 return dst;
2722 dl 1.1 }
2723    
2724 dl 1.28 /**
2725 dl 1.68 * Returns a CompletableFuture that upon completion, has the same
2726     * value as produced by the given function of the result of this
2727     * CompletableFuture.
2728 jsr166 1.66 *
2729 dl 1.68 * <p>If this CompletableFuture completes exceptionally, then the
2730     * returned CompletableFuture also does so, with a
2731 dl 1.28 * CompletionException holding this exception as its cause.
2732 dl 1.68 * Similarly, if the computed CompletableFuture completes
2733     * exceptionally, then so does the returned CompletableFuture.
2734 dl 1.28 *
2735 jsr166 1.39 * @param fn the function returning a new CompletableFuture
2736 dl 1.68 * @return the CompletableFuture
2737 dl 1.28 */
2738 dl 1.48 public <U> CompletableFuture<U> thenCompose
2739     (Function<? super T, CompletableFuture<U>> fn) {
2740 jsr166 1.81 return doThenCompose(fn, null);
2741 dl 1.37 }
2742    
2743     /**
2744 dl 1.68 * Returns a CompletableFuture that upon completion, has the same
2745     * value as that produced asynchronously using the {@link
2746     * ForkJoinPool#commonPool()} by the given function of the result
2747     * of this CompletableFuture.
2748 jsr166 1.66 *
2749 dl 1.68 * <p>If this CompletableFuture completes exceptionally, then the
2750     * returned CompletableFuture also does so, with a
2751 dl 1.37 * CompletionException holding this exception as its cause.
2752 dl 1.68 * Similarly, if the computed CompletableFuture completes
2753     * exceptionally, then so does the returned CompletableFuture.
2754 dl 1.37 *
2755 jsr166 1.39 * @param fn the function returning a new CompletableFuture
2756 dl 1.68 * @return the CompletableFuture
2757 dl 1.37 */
2758 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2759     (Function<? super T, CompletableFuture<U>> fn) {
2760 jsr166 1.81 return doThenCompose(fn, ForkJoinPool.commonPool());
2761 dl 1.37 }
2762    
2763     /**
2764 dl 1.68 * Returns a CompletableFuture that upon completion, has the same
2765     * value as that produced asynchronously using the given executor
2766     * by the given function of this CompletableFuture.
2767 jsr166 1.66 *
2768 dl 1.68 * <p>If this CompletableFuture completes exceptionally, then the
2769     * returned CompletableFuture also does so, with a
2770 dl 1.37 * CompletionException holding this exception as its cause.
2771 dl 1.68 * Similarly, if the computed CompletableFuture completes
2772     * exceptionally, then so does the returned CompletableFuture.
2773 dl 1.37 *
2774 jsr166 1.39 * @param fn the function returning a new CompletableFuture
2775 dl 1.37 * @param executor the executor to use for asynchronous execution
2776 jsr166 1.70 * @return the CompletableFuture
2777 dl 1.37 */
2778 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2779     (Function<? super T, CompletableFuture<U>> fn,
2780     Executor executor) {
2781 dl 1.37 if (executor == null) throw new NullPointerException();
2782 jsr166 1.81 return doThenCompose(fn, executor);
2783 dl 1.37 }
2784    
2785 jsr166 1.81 private <U> CompletableFuture<U> doThenCompose
2786 dl 1.48 (Function<? super T, CompletableFuture<U>> fn,
2787     Executor e) {
2788 dl 1.28 if (fn == null) throw new NullPointerException();
2789     CompletableFuture<U> dst = null;
2790 jsr166 1.81 ThenCompose<T,U> d = null;
2791 dl 1.28 Object r;
2792     if ((r = result) == null) {
2793     dst = new CompletableFuture<U>();
2794     CompletionNode p = new CompletionNode
2795 jsr166 1.81 (d = new ThenCompose<T,U>(this, fn, dst, e));
2796 dl 1.28 while ((r = result) == null) {
2797     if (UNSAFE.compareAndSwapObject
2798     (this, COMPLETIONS, p.next = completions, p))
2799     break;
2800     }
2801     }
2802     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2803     T t; Throwable ex;
2804     if (r instanceof AltResult) {
2805     ex = ((AltResult)r).ex;
2806     t = null;
2807     }
2808     else {
2809     ex = null;
2810     @SuppressWarnings("unchecked") T tr = (T) r;
2811     t = tr;
2812     }
2813     if (ex == null) {
2814 dl 1.37 if (e != null) {
2815     if (dst == null)
2816     dst = new CompletableFuture<U>();
2817     e.execute(new AsyncCompose<T,U>(t, fn, dst));
2818     }
2819     else {
2820     try {
2821 jsr166 1.61 if ((dst = fn.apply(t)) == null)
2822     ex = new NullPointerException();
2823 dl 1.37 } catch (Throwable rex) {
2824     ex = rex;
2825     }
2826 dl 1.28 }
2827     }
2828 dl 1.83 if (dst == null)
2829     dst = new CompletableFuture<U>();
2830 dl 1.37 if (e == null && ex != null)
2831 dl 1.28 dst.internalComplete(null, ex);
2832     }
2833     helpPostComplete();
2834     dst.helpPostComplete();
2835     return dst;
2836     }
2837    
2838     /**
2839 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
2840     * CompletableFuture completes, with the result of the given
2841     * function of the exception triggering this CompletableFuture's
2842     * completion when it completes exceptionally; otherwise, if this
2843     * CompletableFuture completes normally, then the returned
2844     * CompletableFuture also completes normally with the same value.
2845 dl 1.28 *
2846     * @param fn the function to use to compute the value of the
2847     * returned CompletableFuture if this CompletableFuture completed
2848     * exceptionally
2849     * @return the new CompletableFuture
2850     */
2851 dl 1.48 public CompletableFuture<T> exceptionally
2852     (Function<Throwable, ? extends T> fn) {
2853 dl 1.28 if (fn == null) throw new NullPointerException();
2854     CompletableFuture<T> dst = new CompletableFuture<T>();
2855     ExceptionCompletion<T> d = null;
2856     Object r;
2857     if ((r = result) == null) {
2858     CompletionNode p =
2859     new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2860     while ((r = result) == null) {
2861     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2862     p.next = completions, p))
2863     break;
2864     }
2865     }
2866     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2867     T t = null; Throwable ex, dx = null;
2868     if (r instanceof AltResult) {
2869     if ((ex = ((AltResult)r).ex) != null) {
2870     try {
2871     t = fn.apply(ex);
2872     } catch (Throwable rex) {
2873     dx = rex;
2874     }
2875     }
2876     }
2877     else {
2878     @SuppressWarnings("unchecked") T tr = (T) r;
2879     t = tr;
2880     }
2881     dst.internalComplete(t, dx);
2882     }
2883     helpPostComplete();
2884     return dst;
2885     }
2886    
2887     /**
2888 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
2889     * CompletableFuture completes, with the result of the given
2890     * function of the result and exception of this CompletableFuture's
2891     * completion. The given function is invoked with the result (or
2892     * {@code null} if none) and the exception (or {@code null} if none)
2893     * of this CompletableFuture when complete.
2894 dl 1.28 *
2895     * @param fn the function to use to compute the value of the
2896     * returned CompletableFuture
2897     * @return the new CompletableFuture
2898     */
2899 dl 1.48 public <U> CompletableFuture<U> handle
2900     (BiFunction<? super T, Throwable, ? extends U> fn) {
2901 dl 1.28 if (fn == null) throw new NullPointerException();
2902     CompletableFuture<U> dst = new CompletableFuture<U>();
2903     HandleCompletion<T,U> d = null;
2904     Object r;
2905     if ((r = result) == null) {
2906     CompletionNode p =
2907     new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2908     while ((r = result) == null) {
2909     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2910     p.next = completions, p))
2911     break;
2912     }
2913     }
2914     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2915     T t; Throwable ex;
2916     if (r instanceof AltResult) {
2917     ex = ((AltResult)r).ex;
2918     t = null;
2919     }
2920     else {
2921     ex = null;
2922     @SuppressWarnings("unchecked") T tr = (T) r;
2923     t = tr;
2924     }
2925     U u; Throwable dx;
2926     try {
2927     u = fn.apply(t, ex);
2928     dx = null;
2929     } catch (Throwable rex) {
2930     dx = rex;
2931     u = null;
2932     }
2933     dst.internalComplete(u, dx);
2934     }
2935     helpPostComplete();
2936     return dst;
2937     }
2938    
2939 dl 1.35
2940     /* ------------- Arbitrary-arity constructions -------------- */
2941    
2942     /*
2943     * The basic plan of attack is to recursively form binary
2944     * completion trees of elements. This can be overkill for small
2945     * sets, but scales nicely. The And/All vs Or/Any forms use the
2946     * same idea, but details differ.
2947     */
2948    
2949     /**
2950     * Returns a new CompletableFuture that is completed when all of
2951 jsr166 1.66 * the given CompletableFutures complete. If any of the given
2952 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
2953     * CompletableFuture also does so, with a CompletionException
2954     * holding this exception as its cause. Otherwise, the results,
2955     * if any, of the given CompletableFutures are not reflected in
2956     * the returned CompletableFuture, but may be obtained by
2957     * inspecting them individually. If no CompletableFutures are
2958     * provided, returns a CompletableFuture completed with the value
2959     * {@code null}.
2960 dl 1.35 *
2961     * <p>Among the applications of this method is to await completion
2962     * of a set of independent CompletableFutures before continuing a
2963     * program, as in: {@code CompletableFuture.allOf(c1, c2,
2964     * c3).join();}.
2965     *
2966     * @param cfs the CompletableFutures
2967 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
2968 dl 1.35 * given CompletableFutures complete
2969     * @throws NullPointerException if the array or any of its elements are
2970     * {@code null}
2971     */
2972     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2973     int len = cfs.length; // Directly handle empty and singleton cases
2974     if (len > 1)
2975     return allTree(cfs, 0, len - 1);
2976     else {
2977     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2978     CompletableFuture<?> f;
2979     if (len == 0)
2980     dst.result = NIL;
2981     else if ((f = cfs[0]) == null)
2982     throw new NullPointerException();
2983     else {
2984 dl 1.75 ThenPropagate d = null;
2985 dl 1.35 CompletionNode p = null;
2986     Object r;
2987     while ((r = f.result) == null) {
2988     if (d == null)
2989 dl 1.75 d = new ThenPropagate(f, dst);
2990 dl 1.35 else if (p == null)
2991     p = new CompletionNode(d);
2992     else if (UNSAFE.compareAndSwapObject
2993     (f, COMPLETIONS, p.next = f.completions, p))
2994     break;
2995     }
2996     if (r != null && (d == null || d.compareAndSet(0, 1)))
2997     dst.internalComplete(null, (r instanceof AltResult) ?
2998     ((AltResult)r).ex : null);
2999     f.helpPostComplete();
3000     }
3001     return dst;
3002     }
3003     }
3004    
3005     /**
3006     * Recursively constructs an And'ed tree of CompletableFutures.
3007     * Called only when array known to have at least two elements.
3008     */
3009     private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
3010     int lo, int hi) {
3011     CompletableFuture<?> fst, snd;
3012     int mid = (lo + hi) >>> 1;
3013     if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
3014     (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
3015     throw new NullPointerException();
3016     CompletableFuture<Void> dst = new CompletableFuture<Void>();
3017     AndCompletion d = null;
3018     CompletionNode p = null, q = null;
3019     Object r = null, s = null;
3020     while ((r = fst.result) == null || (s = snd.result) == null) {
3021     if (d == null)
3022     d = new AndCompletion(fst, snd, dst);
3023     else if (p == null)
3024     p = new CompletionNode(d);
3025     else if (q == null) {
3026     if (UNSAFE.compareAndSwapObject
3027     (fst, COMPLETIONS, p.next = fst.completions, p))
3028     q = new CompletionNode(d);
3029     }
3030     else if (UNSAFE.compareAndSwapObject
3031     (snd, COMPLETIONS, q.next = snd.completions, q))
3032     break;
3033     }
3034     if ((r != null || (r = fst.result) != null) &&
3035     (s != null || (s = snd.result) != null) &&
3036     (d == null || d.compareAndSet(0, 1))) {
3037     Throwable ex;
3038     if (r instanceof AltResult)
3039     ex = ((AltResult)r).ex;
3040     else
3041     ex = null;
3042     if (ex == null && (s instanceof AltResult))
3043     ex = ((AltResult)s).ex;
3044     dst.internalComplete(null, ex);
3045     }
3046     fst.helpPostComplete();
3047     snd.helpPostComplete();
3048     return dst;
3049     }
3050    
3051     /**
3052 dl 1.76 * Returns a new CompletableFuture that is completed when any of
3053 jsr166 1.79 * the given CompletableFutures complete, with the same result.
3054     * Otherwise, if it completed exceptionally, the returned
3055 dl 1.77 * CompletableFuture also does so, with a CompletionException
3056     * holding this exception as its cause. If no CompletableFutures
3057     * are provided, returns an incomplete CompletableFuture.
3058 dl 1.35 *
3059     * @param cfs the CompletableFutures
3060 dl 1.77 * @return a new CompletableFuture that is completed with the
3061     * result or exception of any of the given CompletableFutures when
3062     * one completes
3063 dl 1.35 * @throws NullPointerException if the array or any of its elements are
3064     * {@code null}
3065     */
3066 dl 1.77 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
3067 dl 1.35 int len = cfs.length; // Same idea as allOf
3068     if (len > 1)
3069     return anyTree(cfs, 0, len - 1);
3070     else {
3071 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3072 dl 1.35 CompletableFuture<?> f;
3073     if (len == 0)
3074 dl 1.48 ; // skip
3075 dl 1.35 else if ((f = cfs[0]) == null)
3076     throw new NullPointerException();
3077     else {
3078 dl 1.77 ThenCopy<Object> d = null;
3079 dl 1.35 CompletionNode p = null;
3080     Object r;
3081     while ((r = f.result) == null) {
3082     if (d == null)
3083 dl 1.77 d = new ThenCopy<Object>(f, dst);
3084 dl 1.35 else if (p == null)
3085     p = new CompletionNode(d);
3086     else if (UNSAFE.compareAndSwapObject
3087     (f, COMPLETIONS, p.next = f.completions, p))
3088     break;
3089     }
3090     if (r != null && (d == null || d.compareAndSet(0, 1))) {
3091     Throwable ex; Object t;
3092 dl 1.77 if (r instanceof AltResult) {
3093 dl 1.35 ex = ((AltResult)r).ex;
3094 dl 1.77 t = null;
3095     }
3096     else {
3097     ex = null;
3098     t = r;
3099     }
3100     dst.internalComplete(t, ex);
3101 dl 1.35 }
3102     f.helpPostComplete();
3103     }
3104     return dst;
3105     }
3106     }
3107    
3108     /**
3109 jsr166 1.44 * Recursively constructs an Or'ed tree of CompletableFutures.
3110 dl 1.35 */
3111 dl 1.77 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
3112 jsr166 1.79 int lo, int hi) {
3113 dl 1.35 CompletableFuture<?> fst, snd;
3114     int mid = (lo + hi) >>> 1;
3115     if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3116     (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3117     throw new NullPointerException();
3118 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3119 dl 1.35 OrCompletion d = null;
3120     CompletionNode p = null, q = null;
3121     Object r;
3122     while ((r = fst.result) == null && (r = snd.result) == null) {
3123     if (d == null)
3124     d = new OrCompletion(fst, snd, dst);
3125     else if (p == null)
3126     p = new CompletionNode(d);
3127     else if (q == null) {
3128     if (UNSAFE.compareAndSwapObject
3129     (fst, COMPLETIONS, p.next = fst.completions, p))
3130     q = new CompletionNode(d);
3131     }
3132     else if (UNSAFE.compareAndSwapObject
3133     (snd, COMPLETIONS, q.next = snd.completions, q))
3134     break;
3135     }
3136     if ((r != null || (r = fst.result) != null ||
3137     (r = snd.result) != null) &&
3138     (d == null || d.compareAndSet(0, 1))) {
3139 dl 1.77 Throwable ex; Object t;
3140 dl 1.35 if (r instanceof AltResult) {
3141     ex = ((AltResult)r).ex;
3142 dl 1.77 t = null;
3143 dl 1.35 }
3144 dl 1.77 else {
3145 dl 1.35 ex = null;
3146 dl 1.77 t = r;
3147     }
3148     dst.internalComplete(t, ex);
3149 dl 1.35 }
3150     fst.helpPostComplete();
3151     snd.helpPostComplete();
3152     return dst;
3153     }
3154    
3155     /* ------------- Control and status methods -------------- */
3156    
3157 dl 1.28 /**
3158 dl 1.37 * If not already completed, completes this CompletableFuture with
3159     * a {@link CancellationException}. Dependent CompletableFutures
3160     * that have not already completed will also complete
3161     * exceptionally, with a {@link CompletionException} caused by
3162     * this {@code CancellationException}.
3163 dl 1.28 *
3164     * @param mayInterruptIfRunning this value has no effect in this
3165     * implementation because interrupts are not used to control
3166     * processing.
3167     *
3168     * @return {@code true} if this task is now cancelled
3169     */
3170     public boolean cancel(boolean mayInterruptIfRunning) {
3171 dl 1.46 boolean cancelled = (result == null) &&
3172     UNSAFE.compareAndSwapObject
3173     (this, RESULT, null, new AltResult(new CancellationException()));
3174     postComplete();
3175 dl 1.48 return cancelled || isCancelled();
3176 dl 1.28 }
3177    
3178     /**
3179     * Returns {@code true} if this CompletableFuture was cancelled
3180     * before it completed normally.
3181     *
3182     * @return {@code true} if this CompletableFuture was cancelled
3183     * before it completed normally
3184     */
3185     public boolean isCancelled() {
3186     Object r;
3187 jsr166 1.43 return ((r = result) instanceof AltResult) &&
3188     (((AltResult)r).ex instanceof CancellationException);
3189 dl 1.28 }
3190    
3191     /**
3192     * Forcibly sets or resets the value subsequently returned by
3193 jsr166 1.42 * method {@link #get()} and related methods, whether or not
3194     * already completed. This method is designed for use only in
3195     * error recovery actions, and even in such situations may result
3196     * in ongoing dependent completions using established versus
3197 dl 1.30 * overwritten outcomes.
3198 dl 1.28 *
3199     * @param value the completion value
3200     */
3201     public void obtrudeValue(T value) {
3202     result = (value == null) ? NIL : value;
3203     postComplete();
3204     }
3205    
3206 dl 1.30 /**
3207 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
3208     * and related methods to throw the given exception, whether or
3209     * not already completed. This method is designed for use only in
3210 dl 1.30 * recovery actions, and even in such situations may result in
3211     * ongoing dependent completions using established versus
3212     * overwritten outcomes.
3213     *
3214     * @param ex the exception
3215     */
3216     public void obtrudeException(Throwable ex) {
3217     if (ex == null) throw new NullPointerException();
3218     result = new AltResult(ex);
3219     postComplete();
3220     }
3221    
3222 dl 1.35 /**
3223     * Returns the estimated number of CompletableFutures whose
3224     * completions are awaiting completion of this CompletableFuture.
3225     * This method is designed for use in monitoring system state, not
3226     * for synchronization control.
3227     *
3228     * @return the number of dependent CompletableFutures
3229     */
3230     public int getNumberOfDependents() {
3231     int count = 0;
3232     for (CompletionNode p = completions; p != null; p = p.next)
3233     ++count;
3234     return count;
3235     }
3236    
3237     /**
3238     * Returns a string identifying this CompletableFuture, as well as
3239 jsr166 1.40 * its completion state. The state, in brackets, contains the
3240 dl 1.35 * String {@code "Completed Normally"} or the String {@code
3241     * "Completed Exceptionally"}, or the String {@code "Not
3242     * completed"} followed by the number of CompletableFutures
3243     * dependent upon its completion, if any.
3244     *
3245     * @return a string identifying this CompletableFuture, as well as its state
3246     */
3247     public String toString() {
3248     Object r = result;
3249 jsr166 1.40 int count;
3250     return super.toString() +
3251     ((r == null) ?
3252     (((count = getNumberOfDependents()) == 0) ?
3253     "[Not completed]" :
3254     "[Not completed, " + count + " dependents]") :
3255     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3256     "[Completed exceptionally]" :
3257     "[Completed normally]"));
3258 dl 1.35 }
3259    
3260 dl 1.1 // Unsafe mechanics
3261     private static final sun.misc.Unsafe UNSAFE;
3262     private static final long RESULT;
3263     private static final long WAITERS;
3264     private static final long COMPLETIONS;
3265     static {
3266     try {
3267     UNSAFE = sun.misc.Unsafe.getUnsafe();
3268     Class<?> k = CompletableFuture.class;
3269     RESULT = UNSAFE.objectFieldOffset
3270     (k.getDeclaredField("result"));
3271     WAITERS = UNSAFE.objectFieldOffset
3272     (k.getDeclaredField("waiters"));
3273     COMPLETIONS = UNSAFE.objectFieldOffset
3274     (k.getDeclaredField("completions"));
3275     } catch (Exception e) {
3276     throw new Error(e);
3277     }
3278     }
3279     }