ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.35
Committed: Sat Feb 2 16:49:54 2013 UTC (11 years, 4 months ago) by dl
Branch: MAIN
Changes since 1.34: +370 -51 lines
Log Message:
Add control, status, grouping methods

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22     import java.util.concurrent.atomic.AtomicInteger;
23     import java.util.concurrent.locks.LockSupport;
24    
25     /**
26     * A {@link Future} that may be explicitly completed (setting its
27     * value and status), and may include dependent functions and actions
28 dl 1.35 * that trigger upon its completion.
29 dl 1.1 *
30 jsr166 1.4 * <p>When two or more threads attempt to {@link #complete} or {@link
31 dl 1.17 * #completeExceptionally} a CompletableFuture, only one of them
32 dl 1.19 * succeeds.
33     *
34 dl 1.35 * <p>Methods are available for adding dependents based on Functions,
35     * Consumers, and Runnables. The appropriate form to use depends on
36     * whether actions require arguments and/or produce results. Actions
37     * may also be triggered after either or both the current and another
38     * CompletableFuture complete. Multiple CompletableFutures may also
39     * be grouped as one using {@link #anyOf} and {@link #allOf}.
40     *
41     * <p>Actions supplied for dependent completions (mainly using methods
42     * with prefix {@code then}) may be performed by the thread that
43     * completes the current CompletableFuture, or by any other caller of
44     * these methods. There are no guarantees about the order of
45     * processing completions unless constrained by these methods.
46     *
47 jsr166 1.23 * <p>Upon exceptional completion, or when a completion entails
48 dl 1.35 * computation, and it terminates abruptly with an (unchecked)
49     * exception or error, then further completions act as {@code
50     * completeExceptionally} with a {@link CompletionException} holding
51     * that exception as its cause. If a CompletableFuture completes
52     * exceptionally, and is not followed by a {@link #exceptionally} or
53     * {@link #handle} completion, then all of its dependents (and their
54     * dependents) also complete exceptionally with CompletionExceptions
55     * holding the ultimate cause. In case of a CompletionException,
56     * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
57     * {@link ExecutionException} with the same cause as would be held in
58     * the corresponding CompletionException. However, in these cases,
59     * methods {@link #join()} and {@link #getNow} throw the
60     * CompletionException, which simplifies usage especially within other
61     * completion functions.
62 dl 1.1 *
63     * <p>CompletableFutures themselves do not execute asynchronously.
64     * However, the {@code async} methods provide commonly useful ways to
65 jsr166 1.8 * commence asynchronous processing, using either a given {@link
66 dl 1.1 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
67     * function or action that will result in the completion of a new
68 dl 1.28 * CompletableFuture. To simplify monitoring, debugging, and tracking,
69     * all generated asynchronous tasks are instances of the tagging
70     * interface {@link AsynchronousCompletionTask}.
71 dl 1.1 *
72     * @author Doug Lea
73     * @since 1.8
74     */
75     public class CompletableFuture<T> implements Future<T> {
76 dl 1.28
77 dl 1.1 /*
78 dl 1.20 * Overview:
79 dl 1.1 *
80 jsr166 1.32 * 1. Non-nullness of field result (set via CAS) indicates done.
81     * An AltResult is used to box null as a result, as well as to
82     * hold exceptions. Using a single field makes completion fast
83 dl 1.20 * and simple to detect and trigger, at the expense of a lot of
84     * encoding and decoding that infiltrates many methods. One minor
85     * simplification relies on the (static) NIL (to box null results)
86     * being the only AltResult with a null exception field, so we
87 dl 1.28 * don't usually need explicit comparisons with NIL. The CF
88     * exception propagation mechanics surrounding decoding rely on
89     * unchecked casts of decoded results really being unchecked,
90     * where user type errors are caught at point of use, as is
91     * currently the case in Java. These are highlighted by using
92     * SuppressWarnings-annotated temporaries.
93 dl 1.1 *
94     * 2. Waiters are held in a Treiber stack similar to the one used
95 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
96 dl 1.28 * internal documentation for algorithmic details.
97 dl 1.1 *
98     * 3. Completions are also kept in a list/stack, and pulled off
99 dl 1.28 * and run when completion is triggered. (We could even use the
100 jsr166 1.24 * same stack as for waiters, but would give up the potential
101 dl 1.20 * parallelism obtained because woken waiters help release/run
102 dl 1.28 * others -- see method postComplete). Because post-processing
103     * may race with direct calls, class Completion opportunistically
104     * extends AtomicInteger so callers can claim the action via
105     * compareAndSet(0, 1). The Completion.run methods are all
106     * written a boringly similar uniform way (that sometimes includes
107     * unnecessary-looking checks, kept to maintain uniformity). There
108 dl 1.35 * are enough dimensions upon which they differ that attempts to
109     * factor commonalities while maintaining efficiency require more
110     * lines of code than they would save.
111 dl 1.20 *
112     * 4. The exported then/and/or methods do support a bit of
113 dl 1.28 * factoring (see doThenApply etc). They must cope with the
114     * intrinsic races surrounding addition of a dependent action
115     * versus performing the action directly because the task is
116     * already complete. For example, a CF may not be complete upon
117     * entry, so a dependent completion is added, but by the time it
118     * is added, the target CF is complete, so must be directly
119 dl 1.20 * executed. This is all done while avoiding unnecessary object
120     * construction in safe-bypass cases.
121 dl 1.1 */
122    
123 dl 1.28 // preliminaries
124 dl 1.20
125 dl 1.1 static final class AltResult {
126     final Throwable ex; // null only for NIL
127 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
128 dl 1.1 }
129    
130     static final AltResult NIL = new AltResult(null);
131    
132 dl 1.20 // Fields
133    
134     volatile Object result; // Either the result or boxed AltResult
135 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
136     volatile CompletionNode completions; // list (Treiber stack) of completions
137    
138 dl 1.20 // Basic utilities for triggering and processing completions
139    
140     /**
141     * Removes and signals all waiting threads and runs all completions.
142     */
143     final void postComplete() {
144     WaitNode q; Thread t;
145     while ((q = waiters) != null) {
146     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
147     (t = q.thread) != null) {
148     q.thread = null;
149     LockSupport.unpark(t);
150     }
151     }
152    
153     CompletionNode h; Completion c;
154     while ((h = completions) != null) {
155     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
156     (c = h.completion) != null)
157     c.run();
158     }
159     }
160    
161     /**
162     * Triggers completion with the encoding of the given arguments:
163     * if the exception is non-null, encodes it as a wrapped
164     * CompletionException unless it is one already. Otherwise uses
165     * the given result, boxed as NIL if null.
166     */
167     final void internalComplete(Object v, Throwable ex) {
168     if (result == null)
169     UNSAFE.compareAndSwapObject
170     (this, RESULT, null,
171     (ex == null) ? (v == null) ? NIL : v :
172     new AltResult((ex instanceof CompletionException) ? ex :
173     new CompletionException(ex)));
174     postComplete(); // help out even if not triggered
175     }
176    
177     /**
178 jsr166 1.25 * If triggered, helps release and/or process completions.
179 dl 1.20 */
180     final void helpPostComplete() {
181     if (result != null)
182     postComplete();
183     }
184    
185 dl 1.28 /* ------------- waiting for completions -------------- */
186 dl 1.20
187 dl 1.35 /** Number of processors, for spin control */
188     static final int NCPU = Runtime.getRuntime().availableProcessors();
189    
190 dl 1.1 /**
191 dl 1.28 * Heuristic spin value for waitingGet() before blocking on
192     * multiprocessors
193 dl 1.1 */
194 dl 1.35 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
195 dl 1.1
196     /**
197 dl 1.28 * Linked nodes to record waiting threads in a Treiber stack. See
198     * other classes such as Phaser and SynchronousQueue for more
199     * detailed explanation. This class implements ManagedBlocker to
200     * avoid starvation when blocking actions pile up in
201     * ForkJoinPools.
202     */
203     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
204     long nanos; // wait time if timed
205     final long deadline; // non-zero if timed
206     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
207     volatile Thread thread;
208     volatile WaitNode next;
209     WaitNode(boolean interruptible, long nanos, long deadline) {
210     this.thread = Thread.currentThread();
211     this.interruptControl = interruptible ? 1 : 0;
212     this.nanos = nanos;
213     this.deadline = deadline;
214     }
215     public boolean isReleasable() {
216     if (thread == null)
217     return true;
218     if (Thread.interrupted()) {
219     int i = interruptControl;
220     interruptControl = -1;
221     if (i > 0)
222     return true;
223     }
224     if (deadline != 0L &&
225     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
226     thread = null;
227     return true;
228     }
229     return false;
230     }
231     public boolean block() {
232     if (isReleasable())
233     return true;
234     else if (deadline == 0L)
235     LockSupport.park(this);
236     else if (nanos > 0L)
237     LockSupport.parkNanos(this, nanos);
238     return isReleasable();
239     }
240 dl 1.1 }
241    
242     /**
243 dl 1.28 * Returns raw result after waiting, or null if interruptible and
244     * interrupted.
245 dl 1.1 */
246 dl 1.28 private Object waitingGet(boolean interruptible) {
247     WaitNode q = null;
248     boolean queued = false;
249 dl 1.35 int spins = SPINS;
250 dl 1.28 for (Object r;;) {
251     if ((r = result) != null) {
252     if (q != null) { // suppress unpark
253     q.thread = null;
254     if (q.interruptControl < 0) {
255     if (interruptible) {
256     removeWaiter(q);
257     return null;
258     }
259     Thread.currentThread().interrupt();
260     }
261     }
262     postComplete(); // help release others
263     return r;
264     }
265     else if (spins > 0) {
266 dl 1.35 int rnd = ThreadLocalRandom.nextSecondarySeed();
267     if (rnd == 0)
268     rnd = ThreadLocalRandom.current().nextInt();
269     if (rnd >= 0)
270 dl 1.28 --spins;
271     }
272     else if (q == null)
273     q = new WaitNode(interruptible, 0L, 0L);
274     else if (!queued)
275     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
276     q.next = waiters, q);
277     else if (interruptible && q.interruptControl < 0) {
278     removeWaiter(q);
279     return null;
280     }
281     else if (q.thread != null && result == null) {
282     try {
283     ForkJoinPool.managedBlock(q);
284 jsr166 1.31 } catch (InterruptedException ex) {
285 dl 1.28 q.interruptControl = -1;
286     }
287     }
288     }
289 dl 1.1 }
290    
291     /**
292 dl 1.28 * Awaits completion or aborts on interrupt or timeout.
293 dl 1.1 *
294 dl 1.28 * @param nanos time to wait
295     * @return raw result
296 dl 1.1 */
297 dl 1.28 private Object timedAwaitDone(long nanos)
298     throws InterruptedException, TimeoutException {
299     WaitNode q = null;
300     boolean queued = false;
301     for (Object r;;) {
302     if ((r = result) != null) {
303     if (q != null) {
304     q.thread = null;
305     if (q.interruptControl < 0) {
306     removeWaiter(q);
307     throw new InterruptedException();
308     }
309     }
310     postComplete();
311     return r;
312     }
313     else if (q == null) {
314     if (nanos <= 0L)
315     throw new TimeoutException();
316     long d = System.nanoTime() + nanos;
317 jsr166 1.31 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
318 dl 1.28 }
319     else if (!queued)
320     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
321     q.next = waiters, q);
322     else if (q.interruptControl < 0) {
323     removeWaiter(q);
324     throw new InterruptedException();
325     }
326     else if (q.nanos <= 0L) {
327     if (result == null) {
328     removeWaiter(q);
329     throw new TimeoutException();
330     }
331     }
332     else if (q.thread != null && result == null) {
333     try {
334     ForkJoinPool.managedBlock(q);
335 jsr166 1.31 } catch (InterruptedException ex) {
336 dl 1.28 q.interruptControl = -1;
337     }
338     }
339     }
340 dl 1.1 }
341    
342     /**
343 dl 1.28 * Tries to unlink a timed-out or interrupted wait node to avoid
344     * accumulating garbage. Internal nodes are simply unspliced
345     * without CAS since it is harmless if they are traversed anyway
346     * by releasers. To avoid effects of unsplicing from already
347     * removed nodes, the list is retraversed in case of an apparent
348     * race. This is slow when there are a lot of nodes, but we don't
349     * expect lists to be long enough to outweigh higher-overhead
350     * schemes.
351 dl 1.1 */
352 dl 1.28 private void removeWaiter(WaitNode node) {
353     if (node != null) {
354     node.thread = null;
355     retry:
356     for (;;) { // restart on removeWaiter race
357     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
358     s = q.next;
359     if (q.thread != null)
360     pred = q;
361     else if (pred != null) {
362     pred.next = s;
363     if (pred.thread == null) // check for race
364     continue retry;
365     }
366     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
367     continue retry;
368     }
369     break;
370     }
371     }
372 dl 1.1 }
373    
374 dl 1.28 /* ------------- Async tasks -------------- */
375    
376 dl 1.1 /**
377 dl 1.28 * A tagging interface identifying asynchronous tasks produced by
378     * {@code async} methods. This may be useful for monitoring,
379     * debugging, and tracking asynchronous activities.
380 dl 1.1 */
381 dl 1.28 public static interface AsynchronousCompletionTask {
382 dl 1.1 }
383    
384 dl 1.28 /** Base class can act as either FJ or plain Runnable */
385 jsr166 1.33 abstract static class Async extends ForkJoinTask<Void>
386 dl 1.28 implements Runnable, AsynchronousCompletionTask {
387     public final Void getRawResult() { return null; }
388     public final void setRawResult(Void v) { }
389     public final void run() { exec(); }
390 jsr166 1.26 }
391    
392 dl 1.28 static final class AsyncRun extends Async {
393     final Runnable fn;
394     final CompletableFuture<Void> dst;
395     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
396     this.fn = fn; this.dst = dst;
397     }
398     public final boolean exec() {
399     CompletableFuture<Void> d; Throwable ex;
400 dl 1.29 if ((d = this.dst) != null && d.result == null) {
401 dl 1.28 try {
402     fn.run();
403     ex = null;
404     } catch (Throwable rex) {
405     ex = rex;
406     }
407     d.internalComplete(null, ex);
408 dl 1.19 }
409 dl 1.28 return true;
410 dl 1.19 }
411 dl 1.28 private static final long serialVersionUID = 5232453952276885070L;
412 dl 1.19 }
413    
414 dl 1.28 static final class AsyncSupply<U> extends Async {
415     final Supplier<U> fn;
416     final CompletableFuture<U> dst;
417     AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
418     this.fn = fn; this.dst = dst;
419     }
420     public final boolean exec() {
421     CompletableFuture<U> d; U u; Throwable ex;
422 dl 1.29 if ((d = this.dst) != null && d.result == null) {
423 dl 1.28 try {
424     u = fn.get();
425     ex = null;
426     } catch (Throwable rex) {
427     ex = rex;
428     u = null;
429     }
430     d.internalComplete(u, ex);
431 dl 1.1 }
432     return true;
433     }
434     private static final long serialVersionUID = 5232453952276885070L;
435     }
436    
437 dl 1.28 static final class AsyncApply<T,U> extends Async {
438     final Function<? super T,? extends U> fn;
439     final T arg;
440 dl 1.1 final CompletableFuture<U> dst;
441 dl 1.28 AsyncApply(T arg, Function<? super T,? extends U> fn,
442 dl 1.35 CompletableFuture<U> dst) {
443 dl 1.1 this.arg = arg; this.fn = fn; this.dst = dst;
444     }
445     public final boolean exec() {
446 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
447 dl 1.29 if ((d = this.dst) != null && d.result == null) {
448 dl 1.21 try {
449     u = fn.apply(arg);
450     ex = null;
451     } catch (Throwable rex) {
452     ex = rex;
453     u = null;
454     }
455     d.internalComplete(u, ex);
456 dl 1.1 }
457     return true;
458     }
459     private static final long serialVersionUID = 5232453952276885070L;
460     }
461    
462 dl 1.28 static final class AsyncBiApply<T,U,V> extends Async {
463 dl 1.1 final BiFunction<? super T,? super U,? extends V> fn;
464     final T arg1;
465     final U arg2;
466     final CompletableFuture<V> dst;
467 dl 1.28 AsyncBiApply(T arg1, U arg2,
468 dl 1.35 BiFunction<? super T,? super U,? extends V> fn,
469     CompletableFuture<V> dst) {
470 dl 1.1 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
471     }
472     public final boolean exec() {
473 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
474 dl 1.29 if ((d = this.dst) != null && d.result == null) {
475 dl 1.21 try {
476     v = fn.apply(arg1, arg2);
477     ex = null;
478     } catch (Throwable rex) {
479     ex = rex;
480     v = null;
481     }
482     d.internalComplete(v, ex);
483 dl 1.1 }
484     return true;
485     }
486     private static final long serialVersionUID = 5232453952276885070L;
487     }
488    
489 dl 1.28 static final class AsyncAccept<T> extends Async {
490 dl 1.34 final Consumer<? super T> fn;
491 dl 1.28 final T arg;
492 dl 1.7 final CompletableFuture<Void> dst;
493 dl 1.34 AsyncAccept(T arg, Consumer<? super T> fn,
494 dl 1.35 CompletableFuture<Void> dst) {
495 dl 1.7 this.arg = arg; this.fn = fn; this.dst = dst;
496     }
497     public final boolean exec() {
498 dl 1.21 CompletableFuture<Void> d; Throwable ex;
499 dl 1.29 if ((d = this.dst) != null && d.result == null) {
500 dl 1.21 try {
501     fn.accept(arg);
502     ex = null;
503     } catch (Throwable rex) {
504     ex = rex;
505     }
506     d.internalComplete(null, ex);
507 dl 1.7 }
508     return true;
509     }
510     private static final long serialVersionUID = 5232453952276885070L;
511     }
512    
513 dl 1.28 static final class AsyncBiAccept<T,U> extends Async {
514 dl 1.34 final BiConsumer<? super T,? super U> fn;
515 dl 1.7 final T arg1;
516     final U arg2;
517     final CompletableFuture<Void> dst;
518 dl 1.28 AsyncBiAccept(T arg1, U arg2,
519 dl 1.35 BiConsumer<? super T,? super U> fn,
520     CompletableFuture<Void> dst) {
521 dl 1.7 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
522     }
523     public final boolean exec() {
524 dl 1.21 CompletableFuture<Void> d; Throwable ex;
525 dl 1.29 if ((d = this.dst) != null && d.result == null) {
526 dl 1.21 try {
527     fn.accept(arg1, arg2);
528     ex = null;
529     } catch (Throwable rex) {
530     ex = rex;
531     }
532     d.internalComplete(null, ex);
533 dl 1.7 }
534     return true;
535     }
536     private static final long serialVersionUID = 5232453952276885070L;
537     }
538    
539 dl 1.1 /* ------------- Completions -------------- */
540    
541 dl 1.28 /**
542     * Simple linked list nodes to record completions, used in
543     * basically the same way as WaitNodes. (We separate nodes from
544     * the Completions themselves mainly because for the And and Or
545     * methods, the same Completion object resides in two lists.)
546     */
547     static final class CompletionNode {
548     final Completion completion;
549     volatile CompletionNode next;
550     CompletionNode(Completion completion) { this.completion = completion; }
551     }
552    
553 dl 1.1 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
554 jsr166 1.33 abstract static class Completion extends AtomicInteger implements Runnable {
555 dl 1.1 }
556    
557 dl 1.28 static final class ApplyCompletion<T,U> extends Completion {
558 jsr166 1.2 final CompletableFuture<? extends T> src;
559     final Function<? super T,? extends U> fn;
560     final CompletableFuture<U> dst;
561 dl 1.1 final Executor executor;
562 dl 1.28 ApplyCompletion(CompletableFuture<? extends T> src,
563     Function<? super T,? extends U> fn,
564     CompletableFuture<U> dst, Executor executor) {
565 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
566     this.executor = executor;
567     }
568 jsr166 1.26 public final void run() {
569 dl 1.28 final CompletableFuture<? extends T> a;
570     final Function<? super T,? extends U> fn;
571     final CompletableFuture<U> dst;
572 jsr166 1.2 Object r; T t; Throwable ex;
573     if ((dst = this.dst) != null &&
574 dl 1.1 (fn = this.fn) != null &&
575     (a = this.src) != null &&
576     (r = a.result) != null &&
577     compareAndSet(0, 1)) {
578 jsr166 1.2 if (r instanceof AltResult) {
579 dl 1.19 ex = ((AltResult)r).ex;
580 dl 1.1 t = null;
581     }
582 dl 1.17 else {
583     ex = null;
584 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
585     t = tr;
586 dl 1.1 }
587 dl 1.20 Executor e = executor;
588     U u = null;
589 dl 1.17 if (ex == null) {
590     try {
591 dl 1.20 if (e != null)
592 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
593 dl 1.17 else
594 dl 1.20 u = fn.apply(t);
595 dl 1.17 } catch (Throwable rex) {
596     ex = rex;
597     }
598     }
599 dl 1.20 if (e == null || ex != null)
600     dst.internalComplete(u, ex);
601 dl 1.1 }
602     }
603 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
604 dl 1.1 }
605    
606 dl 1.28 static final class AcceptCompletion<T> extends Completion {
607 dl 1.7 final CompletableFuture<? extends T> src;
608 dl 1.34 final Consumer<? super T> fn;
609 dl 1.7 final CompletableFuture<Void> dst;
610     final Executor executor;
611 dl 1.28 AcceptCompletion(CompletableFuture<? extends T> src,
612 dl 1.34 Consumer<? super T> fn,
613 dl 1.28 CompletableFuture<Void> dst, Executor executor) {
614 dl 1.7 this.src = src; this.fn = fn; this.dst = dst;
615     this.executor = executor;
616     }
617 jsr166 1.26 public final void run() {
618 dl 1.28 final CompletableFuture<? extends T> a;
619 dl 1.34 final Consumer<? super T> fn;
620 dl 1.28 final CompletableFuture<Void> dst;
621 dl 1.7 Object r; T t; Throwable ex;
622     if ((dst = this.dst) != null &&
623     (fn = this.fn) != null &&
624     (a = this.src) != null &&
625     (r = a.result) != null &&
626     compareAndSet(0, 1)) {
627     if (r instanceof AltResult) {
628 dl 1.19 ex = ((AltResult)r).ex;
629 dl 1.7 t = null;
630     }
631 dl 1.17 else {
632     ex = null;
633 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
634     t = tr;
635 dl 1.17 }
636 dl 1.20 Executor e = executor;
637 dl 1.17 if (ex == null) {
638     try {
639 dl 1.20 if (e != null)
640 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
641 dl 1.20 else
642 dl 1.17 fn.accept(t);
643     } catch (Throwable rex) {
644     ex = rex;
645 dl 1.7 }
646     }
647 dl 1.20 if (e == null || ex != null)
648     dst.internalComplete(null, ex);
649 dl 1.7 }
650     }
651 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
652 dl 1.7 }
653    
654 dl 1.28 static final class RunCompletion<T> extends Completion {
655 jsr166 1.2 final CompletableFuture<? extends T> src;
656     final Runnable fn;
657     final CompletableFuture<Void> dst;
658 dl 1.1 final Executor executor;
659 dl 1.28 RunCompletion(CompletableFuture<? extends T> src,
660     Runnable fn,
661     CompletableFuture<Void> dst,
662     Executor executor) {
663 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
664     this.executor = executor;
665     }
666 jsr166 1.26 public final void run() {
667 dl 1.28 final CompletableFuture<? extends T> a;
668     final Runnable fn;
669     final CompletableFuture<Void> dst;
670 jsr166 1.2 Object r; Throwable ex;
671     if ((dst = this.dst) != null &&
672 dl 1.1 (fn = this.fn) != null &&
673     (a = this.src) != null &&
674     (r = a.result) != null &&
675     compareAndSet(0, 1)) {
676 dl 1.19 if (r instanceof AltResult)
677     ex = ((AltResult)r).ex;
678 dl 1.17 else
679     ex = null;
680 dl 1.20 Executor e = executor;
681 dl 1.17 if (ex == null) {
682     try {
683 dl 1.20 if (e != null)
684 dl 1.28 e.execute(new AsyncRun(fn, dst));
685 dl 1.20 else
686 dl 1.17 fn.run();
687     } catch (Throwable rex) {
688     ex = rex;
689 dl 1.1 }
690     }
691 dl 1.20 if (e == null || ex != null)
692     dst.internalComplete(null, ex);
693 dl 1.1 }
694     }
695 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
696 dl 1.1 }
697    
698 dl 1.28 static final class BiApplyCompletion<T,U,V> extends Completion {
699 jsr166 1.2 final CompletableFuture<? extends T> src;
700     final CompletableFuture<? extends U> snd;
701     final BiFunction<? super T,? super U,? extends V> fn;
702     final CompletableFuture<V> dst;
703 dl 1.1 final Executor executor;
704 dl 1.28 BiApplyCompletion(CompletableFuture<? extends T> src,
705     CompletableFuture<? extends U> snd,
706     BiFunction<? super T,? super U,? extends V> fn,
707     CompletableFuture<V> dst, Executor executor) {
708 dl 1.1 this.src = src; this.snd = snd;
709     this.fn = fn; this.dst = dst;
710     this.executor = executor;
711     }
712 jsr166 1.26 public final void run() {
713 dl 1.28 final CompletableFuture<? extends T> a;
714     final CompletableFuture<? extends U> b;
715     final BiFunction<? super T,? super U,? extends V> fn;
716     final CompletableFuture<V> dst;
717 jsr166 1.2 Object r, s; T t; U u; Throwable ex;
718     if ((dst = this.dst) != null &&
719 dl 1.1 (fn = this.fn) != null &&
720     (a = this.src) != null &&
721     (r = a.result) != null &&
722     (b = this.snd) != null &&
723     (s = b.result) != null &&
724     compareAndSet(0, 1)) {
725 jsr166 1.2 if (r instanceof AltResult) {
726 dl 1.19 ex = ((AltResult)r).ex;
727 jsr166 1.2 t = null;
728     }
729 dl 1.19 else {
730     ex = null;
731 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
732     t = tr;
733 dl 1.19 }
734     if (ex != null)
735     u = null;
736     else if (s instanceof AltResult) {
737     ex = ((AltResult)s).ex;
738 jsr166 1.2 u = null;
739     }
740 dl 1.28 else {
741     @SuppressWarnings("unchecked") U us = (U) s;
742     u = us;
743     }
744 dl 1.20 Executor e = executor;
745     V v = null;
746 dl 1.19 if (ex == null) {
747     try {
748 dl 1.20 if (e != null)
749 dl 1.28 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
750 dl 1.19 else
751 dl 1.20 v = fn.apply(t, u);
752 dl 1.19 } catch (Throwable rex) {
753     ex = rex;
754     }
755 dl 1.1 }
756 dl 1.20 if (e == null || ex != null)
757     dst.internalComplete(v, ex);
758 jsr166 1.2 }
759     }
760 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
761 dl 1.1 }
762    
763 dl 1.28 static final class BiAcceptCompletion<T,U> extends Completion {
764 dl 1.7 final CompletableFuture<? extends T> src;
765     final CompletableFuture<? extends U> snd;
766 dl 1.34 final BiConsumer<? super T,? super U> fn;
767 dl 1.7 final CompletableFuture<Void> dst;
768     final Executor executor;
769 dl 1.28 BiAcceptCompletion(CompletableFuture<? extends T> src,
770     CompletableFuture<? extends U> snd,
771 dl 1.34 BiConsumer<? super T,? super U> fn,
772 dl 1.28 CompletableFuture<Void> dst, Executor executor) {
773 dl 1.7 this.src = src; this.snd = snd;
774     this.fn = fn; this.dst = dst;
775     this.executor = executor;
776     }
777 jsr166 1.26 public final void run() {
778 dl 1.28 final CompletableFuture<? extends T> a;
779     final CompletableFuture<? extends U> b;
780 dl 1.34 final BiConsumer<? super T,? super U> fn;
781 dl 1.28 final CompletableFuture<Void> dst;
782 dl 1.7 Object r, s; T t; U u; Throwable ex;
783     if ((dst = this.dst) != null &&
784     (fn = this.fn) != null &&
785     (a = this.src) != null &&
786     (r = a.result) != null &&
787     (b = this.snd) != null &&
788     (s = b.result) != null &&
789     compareAndSet(0, 1)) {
790     if (r instanceof AltResult) {
791 dl 1.19 ex = ((AltResult)r).ex;
792 dl 1.7 t = null;
793     }
794 dl 1.19 else {
795     ex = null;
796 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
797     t = tr;
798 dl 1.19 }
799     if (ex != null)
800     u = null;
801     else if (s instanceof AltResult) {
802     ex = ((AltResult)s).ex;
803 dl 1.7 u = null;
804     }
805 dl 1.28 else {
806     @SuppressWarnings("unchecked") U us = (U) s;
807     u = us;
808     }
809 dl 1.20 Executor e = executor;
810 dl 1.19 if (ex == null) {
811     try {
812 dl 1.20 if (e != null)
813 dl 1.28 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
814 dl 1.20 else
815 dl 1.19 fn.accept(t, u);
816     } catch (Throwable rex) {
817     ex = rex;
818 dl 1.7 }
819     }
820 dl 1.20 if (e == null || ex != null)
821     dst.internalComplete(null, ex);
822 dl 1.7 }
823     }
824 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
825 dl 1.7 }
826    
827 dl 1.28 static final class BiRunCompletion<T> extends Completion {
828 jsr166 1.2 final CompletableFuture<? extends T> src;
829     final CompletableFuture<?> snd;
830     final Runnable fn;
831     final CompletableFuture<Void> dst;
832 dl 1.1 final Executor executor;
833 dl 1.28 BiRunCompletion(CompletableFuture<? extends T> src,
834     CompletableFuture<?> snd,
835     Runnable fn,
836     CompletableFuture<Void> dst, Executor executor) {
837 dl 1.1 this.src = src; this.snd = snd;
838     this.fn = fn; this.dst = dst;
839     this.executor = executor;
840     }
841 jsr166 1.26 public final void run() {
842 dl 1.1 final CompletableFuture<? extends T> a;
843     final CompletableFuture<?> b;
844     final Runnable fn;
845     final CompletableFuture<Void> dst;
846 dl 1.28 Object r, s; Throwable ex;
847 jsr166 1.2 if ((dst = this.dst) != null &&
848 dl 1.1 (fn = this.fn) != null &&
849     (a = this.src) != null &&
850     (r = a.result) != null &&
851     (b = this.snd) != null &&
852     (s = b.result) != null &&
853     compareAndSet(0, 1)) {
854 dl 1.19 if (r instanceof AltResult)
855     ex = ((AltResult)r).ex;
856     else
857     ex = null;
858     if (ex == null && (s instanceof AltResult))
859     ex = ((AltResult)s).ex;
860 dl 1.20 Executor e = executor;
861 dl 1.19 if (ex == null) {
862     try {
863 dl 1.20 if (e != null)
864 dl 1.28 e.execute(new AsyncRun(fn, dst));
865 dl 1.20 else
866 dl 1.19 fn.run();
867     } catch (Throwable rex) {
868     ex = rex;
869 dl 1.1 }
870 jsr166 1.2 }
871 dl 1.20 if (e == null || ex != null)
872     dst.internalComplete(null, ex);
873 jsr166 1.2 }
874     }
875 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
876 dl 1.1 }
877    
878 dl 1.35 static final class AndCompletion extends Completion {
879     final CompletableFuture<?> src;
880     final CompletableFuture<?> snd;
881     final CompletableFuture<Void> dst;
882     AndCompletion(CompletableFuture<?> src,
883     CompletableFuture<?> snd,
884     CompletableFuture<Void> dst) {
885     this.src = src; this.snd = snd; this.dst = dst;
886     }
887     public final void run() {
888     final CompletableFuture<?> a;
889     final CompletableFuture<?> b;
890     final CompletableFuture<Void> dst;
891     Object r, s; Throwable ex;
892     if ((dst = this.dst) != null &&
893     (a = this.src) != null &&
894     (r = a.result) != null &&
895     (b = this.snd) != null &&
896     (s = b.result) != null &&
897     compareAndSet(0, 1)) {
898     if (r instanceof AltResult)
899     ex = ((AltResult)r).ex;
900     else
901     ex = null;
902     if (ex == null && (s instanceof AltResult))
903     ex = ((AltResult)s).ex;
904     dst.internalComplete(null, ex);
905     }
906     }
907     private static final long serialVersionUID = 5232453952276885070L;
908     }
909    
910 dl 1.28 static final class OrApplyCompletion<T,U> extends Completion {
911 jsr166 1.2 final CompletableFuture<? extends T> src;
912     final CompletableFuture<? extends T> snd;
913     final Function<? super T,? extends U> fn;
914     final CompletableFuture<U> dst;
915 dl 1.1 final Executor executor;
916 dl 1.28 OrApplyCompletion(CompletableFuture<? extends T> src,
917     CompletableFuture<? extends T> snd,
918     Function<? super T,? extends U> fn,
919     CompletableFuture<U> dst, Executor executor) {
920 dl 1.1 this.src = src; this.snd = snd;
921     this.fn = fn; this.dst = dst;
922     this.executor = executor;
923     }
924 jsr166 1.26 public final void run() {
925 dl 1.28 final CompletableFuture<? extends T> a;
926     final CompletableFuture<? extends T> b;
927     final Function<? super T,? extends U> fn;
928     final CompletableFuture<U> dst;
929 jsr166 1.2 Object r; T t; Throwable ex;
930     if ((dst = this.dst) != null &&
931 dl 1.1 (fn = this.fn) != null &&
932     (((a = this.src) != null && (r = a.result) != null) ||
933     ((b = this.snd) != null && (r = b.result) != null)) &&
934     compareAndSet(0, 1)) {
935 jsr166 1.2 if (r instanceof AltResult) {
936 dl 1.19 ex = ((AltResult)r).ex;
937 jsr166 1.2 t = null;
938     }
939 dl 1.19 else {
940     ex = null;
941 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
942     t = tr;
943 dl 1.1 }
944 dl 1.20 Executor e = executor;
945     U u = null;
946 dl 1.19 if (ex == null) {
947     try {
948 dl 1.20 if (e != null)
949 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
950 dl 1.19 else
951 dl 1.20 u = fn.apply(t);
952 dl 1.19 } catch (Throwable rex) {
953     ex = rex;
954     }
955     }
956 dl 1.20 if (e == null || ex != null)
957     dst.internalComplete(u, ex);
958 jsr166 1.2 }
959     }
960 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
961 dl 1.1 }
962    
963 dl 1.28 static final class OrAcceptCompletion<T> extends Completion {
964 dl 1.7 final CompletableFuture<? extends T> src;
965     final CompletableFuture<? extends T> snd;
966 dl 1.34 final Consumer<? super T> fn;
967 dl 1.7 final CompletableFuture<Void> dst;
968     final Executor executor;
969 dl 1.28 OrAcceptCompletion(CompletableFuture<? extends T> src,
970     CompletableFuture<? extends T> snd,
971 dl 1.34 Consumer<? super T> fn,
972 dl 1.28 CompletableFuture<Void> dst, Executor executor) {
973 dl 1.7 this.src = src; this.snd = snd;
974     this.fn = fn; this.dst = dst;
975     this.executor = executor;
976     }
977 jsr166 1.26 public final void run() {
978 dl 1.28 final CompletableFuture<? extends T> a;
979     final CompletableFuture<? extends T> b;
980 dl 1.34 final Consumer<? super T> fn;
981 dl 1.28 final CompletableFuture<Void> dst;
982 dl 1.7 Object r; T t; Throwable ex;
983     if ((dst = this.dst) != null &&
984     (fn = this.fn) != null &&
985     (((a = this.src) != null && (r = a.result) != null) ||
986     ((b = this.snd) != null && (r = b.result) != null)) &&
987     compareAndSet(0, 1)) {
988     if (r instanceof AltResult) {
989 dl 1.19 ex = ((AltResult)r).ex;
990 dl 1.7 t = null;
991     }
992 dl 1.19 else {
993     ex = null;
994 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
995     t = tr;
996 dl 1.19 }
997 dl 1.20 Executor e = executor;
998 dl 1.19 if (ex == null) {
999     try {
1000 dl 1.20 if (e != null)
1001 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
1002 dl 1.20 else
1003 dl 1.19 fn.accept(t);
1004     } catch (Throwable rex) {
1005     ex = rex;
1006 dl 1.7 }
1007     }
1008 dl 1.20 if (e == null || ex != null)
1009     dst.internalComplete(null, ex);
1010 dl 1.7 }
1011     }
1012 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1013 dl 1.7 }
1014    
1015 dl 1.28 static final class OrRunCompletion<T> extends Completion {
1016 jsr166 1.2 final CompletableFuture<? extends T> src;
1017     final CompletableFuture<?> snd;
1018     final Runnable fn;
1019     final CompletableFuture<Void> dst;
1020 dl 1.1 final Executor executor;
1021 dl 1.28 OrRunCompletion(CompletableFuture<? extends T> src,
1022     CompletableFuture<?> snd,
1023     Runnable fn,
1024     CompletableFuture<Void> dst, Executor executor) {
1025 dl 1.1 this.src = src; this.snd = snd;
1026     this.fn = fn; this.dst = dst;
1027     this.executor = executor;
1028     }
1029 jsr166 1.26 public final void run() {
1030 dl 1.28 final CompletableFuture<? extends T> a;
1031 dl 1.1 final CompletableFuture<?> b;
1032     final Runnable fn;
1033     final CompletableFuture<Void> dst;
1034 dl 1.28 Object r; Throwable ex;
1035 jsr166 1.2 if ((dst = this.dst) != null &&
1036 dl 1.1 (fn = this.fn) != null &&
1037     (((a = this.src) != null && (r = a.result) != null) ||
1038     ((b = this.snd) != null && (r = b.result) != null)) &&
1039     compareAndSet(0, 1)) {
1040 dl 1.19 if (r instanceof AltResult)
1041     ex = ((AltResult)r).ex;
1042     else
1043     ex = null;
1044 dl 1.20 Executor e = executor;
1045 dl 1.19 if (ex == null) {
1046 dl 1.1 try {
1047 dl 1.20 if (e != null)
1048 dl 1.28 e.execute(new AsyncRun(fn, dst));
1049 dl 1.20 else
1050 dl 1.1 fn.run();
1051     } catch (Throwable rex) {
1052 dl 1.19 ex = rex;
1053 dl 1.1 }
1054     }
1055 dl 1.20 if (e == null || ex != null)
1056     dst.internalComplete(null, ex);
1057 jsr166 1.2 }
1058     }
1059 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1060 dl 1.1 }
1061    
1062 dl 1.35 static final class OrCompletion extends Completion {
1063     final CompletableFuture<?> src;
1064     final CompletableFuture<?> snd;
1065     final CompletableFuture<?> dst;
1066     OrCompletion(CompletableFuture<?> src,
1067     CompletableFuture<?> snd,
1068     CompletableFuture<?> dst) {
1069     this.src = src; this.snd = snd; this.dst = dst;
1070     }
1071     public final void run() {
1072     final CompletableFuture<?> a;
1073     final CompletableFuture<?> b;
1074     final CompletableFuture<?> dst;
1075     Object r, t; Throwable ex;
1076     if ((dst = this.dst) != null &&
1077     (((a = this.src) != null && (r = a.result) != null) ||
1078     ((b = this.snd) != null && (r = b.result) != null)) &&
1079     compareAndSet(0, 1)) {
1080     if (r instanceof AltResult) {
1081     ex = ((AltResult)r).ex;
1082     t = null;
1083     }
1084     else {
1085     ex = null;
1086     t = r;
1087     }
1088     dst.internalComplete(t, ex);
1089     }
1090     }
1091     private static final long serialVersionUID = 5232453952276885070L;
1092     }
1093    
1094 dl 1.28 static final class ExceptionCompletion<T> extends Completion {
1095 jsr166 1.2 final CompletableFuture<? extends T> src;
1096 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1097     final CompletableFuture<T> dst;
1098 dl 1.28 ExceptionCompletion(CompletableFuture<? extends T> src,
1099     Function<? super Throwable, ? extends T> fn,
1100     CompletableFuture<T> dst) {
1101 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1102     }
1103 jsr166 1.26 public final void run() {
1104 dl 1.28 final CompletableFuture<? extends T> a;
1105     final Function<? super Throwable, ? extends T> fn;
1106     final CompletableFuture<T> dst;
1107 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1108 jsr166 1.2 if ((dst = this.dst) != null &&
1109 dl 1.1 (fn = this.fn) != null &&
1110     (a = this.src) != null &&
1111     (r = a.result) != null &&
1112     compareAndSet(0, 1)) {
1113 dl 1.20 if ((r instanceof AltResult) &&
1114     (ex = ((AltResult)r).ex) != null) {
1115     try {
1116     t = fn.apply(ex);
1117     } catch (Throwable rex) {
1118     dx = rex;
1119 dl 1.1 }
1120     }
1121 dl 1.28 else {
1122     @SuppressWarnings("unchecked") T tr = (T) r;
1123     t = tr;
1124     }
1125 dl 1.20 dst.internalComplete(t, dx);
1126 dl 1.1 }
1127     }
1128 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1129 dl 1.1 }
1130    
1131 dl 1.35 static final class ThenCopy extends Completion {
1132     final CompletableFuture<?> src;
1133     final CompletableFuture<?> dst;
1134     ThenCopy(CompletableFuture<?> src,
1135     CompletableFuture<?> dst) {
1136 dl 1.17 this.src = src; this.dst = dst;
1137     }
1138 jsr166 1.26 public final void run() {
1139 dl 1.35 final CompletableFuture<?> a;
1140     final CompletableFuture<?> dst;
1141 dl 1.20 Object r; Object t; Throwable ex;
1142 dl 1.17 if ((dst = this.dst) != null &&
1143     (a = this.src) != null &&
1144     (r = a.result) != null &&
1145     compareAndSet(0, 1)) {
1146 dl 1.20 if (r instanceof AltResult) {
1147     ex = ((AltResult)r).ex;
1148     t = null;
1149     }
1150     else {
1151     ex = null;
1152     t = r;
1153     }
1154     dst.internalComplete(t, ex);
1155 dl 1.17 }
1156     }
1157 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1158 dl 1.17 }
1159    
1160 dl 1.28 static final class HandleCompletion<T,U> extends Completion {
1161 dl 1.17 final CompletableFuture<? extends T> src;
1162     final BiFunction<? super T, Throwable, ? extends U> fn;
1163     final CompletableFuture<U> dst;
1164 dl 1.28 HandleCompletion(CompletableFuture<? extends T> src,
1165     BiFunction<? super T, Throwable, ? extends U> fn,
1166     final CompletableFuture<U> dst) {
1167 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1168     }
1169 jsr166 1.26 public final void run() {
1170 dl 1.28 final CompletableFuture<? extends T> a;
1171     final BiFunction<? super T, Throwable, ? extends U> fn;
1172     final CompletableFuture<U> dst;
1173 dl 1.17 Object r; T t; Throwable ex;
1174     if ((dst = this.dst) != null &&
1175     (fn = this.fn) != null &&
1176     (a = this.src) != null &&
1177     (r = a.result) != null &&
1178     compareAndSet(0, 1)) {
1179     if (r instanceof AltResult) {
1180     ex = ((AltResult)r).ex;
1181     t = null;
1182     }
1183     else {
1184     ex = null;
1185 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1186     t = tr;
1187 dl 1.17 }
1188 dl 1.20 U u = null; Throwable dx = null;
1189 dl 1.17 try {
1190 dl 1.20 u = fn.apply(t, ex);
1191 dl 1.17 } catch (Throwable rex) {
1192 dl 1.20 dx = rex;
1193 dl 1.17 }
1194 dl 1.20 dst.internalComplete(u, dx);
1195 dl 1.17 }
1196     }
1197 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1198 dl 1.17 }
1199    
1200 dl 1.28 static final class ComposeCompletion<T,U> extends Completion {
1201 dl 1.17 final CompletableFuture<? extends T> src;
1202     final Function<? super T, CompletableFuture<U>> fn;
1203     final CompletableFuture<U> dst;
1204 dl 1.28 ComposeCompletion(CompletableFuture<? extends T> src,
1205     Function<? super T, CompletableFuture<U>> fn,
1206     final CompletableFuture<U> dst) {
1207 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1208     }
1209 jsr166 1.26 public final void run() {
1210 dl 1.28 final CompletableFuture<? extends T> a;
1211     final Function<? super T, CompletableFuture<U>> fn;
1212     final CompletableFuture<U> dst;
1213 dl 1.17 Object r; T t; Throwable ex;
1214     if ((dst = this.dst) != null &&
1215     (fn = this.fn) != null &&
1216     (a = this.src) != null &&
1217     (r = a.result) != null &&
1218     compareAndSet(0, 1)) {
1219     if (r instanceof AltResult) {
1220     ex = ((AltResult)r).ex;
1221     t = null;
1222     }
1223     else {
1224     ex = null;
1225 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1226     t = tr;
1227 dl 1.17 }
1228     CompletableFuture<U> c = null;
1229 dl 1.20 U u = null;
1230     boolean complete = false;
1231 dl 1.17 if (ex == null) {
1232     try {
1233     c = fn.apply(t);
1234     } catch (Throwable rex) {
1235     ex = rex;
1236     }
1237     }
1238     if (ex != null || c == null) {
1239     if (ex == null)
1240     ex = new NullPointerException();
1241     }
1242 dl 1.18 else {
1243 dl 1.35 ThenCopy d = null;
1244 dl 1.18 Object s;
1245     if ((s = c.result) == null) {
1246     CompletionNode p = new CompletionNode
1247 dl 1.35 (d = new ThenCopy(c, dst));
1248 dl 1.18 while ((s = c.result) == null) {
1249     if (UNSAFE.compareAndSwapObject
1250     (c, COMPLETIONS, p.next = c.completions, p))
1251     break;
1252     }
1253 dl 1.17 }
1254 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1255 dl 1.20 complete = true;
1256 dl 1.18 if (s instanceof AltResult) {
1257     ex = ((AltResult)s).ex; // no rewrap
1258     u = null;
1259 dl 1.28 }
1260     else {
1261     @SuppressWarnings("unchecked") U us = (U) s;
1262     u = us;
1263     }
1264     }
1265     }
1266     if (complete || ex != null)
1267     dst.internalComplete(u, ex);
1268     if (c != null)
1269     c.helpPostComplete();
1270     }
1271     }
1272     private static final long serialVersionUID = 5232453952276885070L;
1273     }
1274    
1275     // public methods
1276    
1277     /**
1278     * Creates a new incomplete CompletableFuture.
1279     */
1280     public CompletableFuture() {
1281     }
1282    
1283     /**
1284     * Asynchronously executes in the {@link
1285     * ForkJoinPool#commonPool()}, a task that completes the returned
1286     * CompletableFuture with the result of the given Supplier.
1287     *
1288     * @param supplier a function returning the value to be used
1289     * to complete the returned CompletableFuture
1290     * @return the CompletableFuture
1291     */
1292     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1293     if (supplier == null) throw new NullPointerException();
1294     CompletableFuture<U> f = new CompletableFuture<U>();
1295     ForkJoinPool.commonPool().
1296     execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1297     return f;
1298     }
1299    
1300     /**
1301     * Asynchronously executes using the given executor, a task that
1302     * completes the returned CompletableFuture with the result of the
1303     * given Supplier.
1304     *
1305     * @param supplier a function returning the value to be used
1306     * to complete the returned CompletableFuture
1307     * @param executor the executor to use for asynchronous execution
1308     * @return the CompletableFuture
1309     */
1310     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1311     Executor executor) {
1312     if (executor == null || supplier == null)
1313     throw new NullPointerException();
1314     CompletableFuture<U> f = new CompletableFuture<U>();
1315     executor.execute(new AsyncSupply<U>(supplier, f));
1316     return f;
1317     }
1318    
1319     /**
1320     * Asynchronously executes in the {@link
1321     * ForkJoinPool#commonPool()} a task that runs the given action,
1322     * and then completes the returned CompletableFuture.
1323     *
1324     * @param runnable the action to run before completing the
1325     * returned CompletableFuture
1326     * @return the CompletableFuture
1327     */
1328     public static CompletableFuture<Void> runAsync(Runnable runnable) {
1329     if (runnable == null) throw new NullPointerException();
1330     CompletableFuture<Void> f = new CompletableFuture<Void>();
1331     ForkJoinPool.commonPool().
1332     execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1333     return f;
1334     }
1335    
1336     /**
1337     * Asynchronously executes using the given executor, a task that
1338     * runs the given action, and then completes the returned
1339     * CompletableFuture.
1340     *
1341     * @param runnable the action to run before completing the
1342     * returned CompletableFuture
1343     * @param executor the executor to use for asynchronous execution
1344     * @return the CompletableFuture
1345     */
1346     public static CompletableFuture<Void> runAsync(Runnable runnable,
1347     Executor executor) {
1348     if (executor == null || runnable == null)
1349     throw new NullPointerException();
1350     CompletableFuture<Void> f = new CompletableFuture<Void>();
1351     executor.execute(new AsyncRun(runnable, f));
1352     return f;
1353     }
1354    
1355     /**
1356     * Returns {@code true} if completed in any fashion: normally,
1357     * exceptionally, or via cancellation.
1358     *
1359     * @return {@code true} if completed
1360     */
1361     public boolean isDone() {
1362     return result != null;
1363     }
1364    
1365     /**
1366     * Waits if necessary for the computation to complete, and then
1367     * retrieves its result.
1368     *
1369     * @return the computed result
1370     * @throws CancellationException if the computation was cancelled
1371     * @throws ExecutionException if the computation threw an
1372     * exception
1373     * @throws InterruptedException if the current thread was interrupted
1374     * while waiting
1375     */
1376     public T get() throws InterruptedException, ExecutionException {
1377     Object r; Throwable ex, cause;
1378     if ((r = result) == null && (r = waitingGet(true)) == null)
1379     throw new InterruptedException();
1380     if (r instanceof AltResult) {
1381     if ((ex = ((AltResult)r).ex) != null) {
1382     if (ex instanceof CancellationException)
1383     throw (CancellationException)ex;
1384     if ((ex instanceof CompletionException) &&
1385     (cause = ex.getCause()) != null)
1386     ex = cause;
1387     throw new ExecutionException(ex);
1388     }
1389     return null;
1390     }
1391     @SuppressWarnings("unchecked") T tr = (T) r;
1392     return tr;
1393     }
1394    
1395     /**
1396     * Waits if necessary for at most the given time for completion,
1397     * and then retrieves its result, if available.
1398     *
1399     * @param timeout the maximum time to wait
1400     * @param unit the time unit of the timeout argument
1401     * @return the computed result
1402     * @throws CancellationException if the computation was cancelled
1403     * @throws ExecutionException if the computation threw an
1404     * exception
1405     * @throws InterruptedException if the current thread was interrupted
1406     * while waiting
1407     * @throws TimeoutException if the wait timed out
1408     */
1409     public T get(long timeout, TimeUnit unit)
1410     throws InterruptedException, ExecutionException, TimeoutException {
1411     Object r; Throwable ex, cause;
1412     long nanos = unit.toNanos(timeout);
1413     if (Thread.interrupted())
1414     throw new InterruptedException();
1415     if ((r = result) == null)
1416     r = timedAwaitDone(nanos);
1417     if (r instanceof AltResult) {
1418     if ((ex = ((AltResult)r).ex) != null) {
1419     if (ex instanceof CancellationException)
1420     throw (CancellationException)ex;
1421     if ((ex instanceof CompletionException) &&
1422     (cause = ex.getCause()) != null)
1423     ex = cause;
1424     throw new ExecutionException(ex);
1425     }
1426     return null;
1427     }
1428     @SuppressWarnings("unchecked") T tr = (T) r;
1429     return tr;
1430     }
1431    
1432     /**
1433     * Returns the result value when complete, or throws an
1434     * (unchecked) exception if completed exceptionally. To better
1435     * conform with the use of common functional forms, if a
1436     * computation involved in the completion of this
1437     * CompletableFuture threw an exception, this method throws an
1438     * (unchecked) {@link CompletionException} with the underlying
1439     * exception as its cause.
1440     *
1441     * @return the result value
1442     * @throws CancellationException if the computation was cancelled
1443     * @throws CompletionException if a completion computation threw
1444     * an exception
1445     */
1446     public T join() {
1447     Object r; Throwable ex;
1448     if ((r = result) == null)
1449     r = waitingGet(false);
1450     if (r instanceof AltResult) {
1451     if ((ex = ((AltResult)r).ex) != null) {
1452     if (ex instanceof CancellationException)
1453     throw (CancellationException)ex;
1454     if (ex instanceof CompletionException)
1455     throw (CompletionException)ex;
1456     throw new CompletionException(ex);
1457     }
1458     return null;
1459     }
1460     @SuppressWarnings("unchecked") T tr = (T) r;
1461     return tr;
1462     }
1463    
1464     /**
1465     * Returns the result value (or throws any encountered exception)
1466     * if completed, else returns the given valueIfAbsent.
1467     *
1468     * @param valueIfAbsent the value to return if not completed
1469     * @return the result value, if completed, else the given valueIfAbsent
1470     * @throws CancellationException if the computation was cancelled
1471     * @throws CompletionException if a completion computation threw
1472     * an exception
1473     */
1474     public T getNow(T valueIfAbsent) {
1475     Object r; Throwable ex;
1476     if ((r = result) == null)
1477     return valueIfAbsent;
1478     if (r instanceof AltResult) {
1479     if ((ex = ((AltResult)r).ex) != null) {
1480     if (ex instanceof CancellationException)
1481     throw (CancellationException)ex;
1482     if (ex instanceof CompletionException)
1483     throw (CompletionException)ex;
1484     throw new CompletionException(ex);
1485 dl 1.17 }
1486 dl 1.28 return null;
1487 dl 1.17 }
1488 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1489     return tr;
1490     }
1491    
1492     /**
1493     * If not already completed, sets the value returned by {@link
1494     * #get()} and related methods to the given value.
1495     *
1496     * @param value the result value
1497     * @return {@code true} if this invocation caused this CompletableFuture
1498     * to transition to a completed state, else {@code false}
1499     */
1500     public boolean complete(T value) {
1501     boolean triggered = result == null &&
1502     UNSAFE.compareAndSwapObject(this, RESULT, null,
1503     value == null ? NIL : value);
1504     postComplete();
1505     return triggered;
1506     }
1507    
1508     /**
1509     * If not already completed, causes invocations of {@link #get()}
1510     * and related methods to throw the given exception.
1511     *
1512     * @param ex the exception
1513     * @return {@code true} if this invocation caused this CompletableFuture
1514     * to transition to a completed state, else {@code false}
1515     */
1516     public boolean completeExceptionally(Throwable ex) {
1517     if (ex == null) throw new NullPointerException();
1518     boolean triggered = result == null &&
1519     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1520     postComplete();
1521     return triggered;
1522     }
1523    
1524     /**
1525     * Creates and returns a CompletableFuture that is completed with
1526     * the result of the given function of this CompletableFuture.
1527     * If this CompletableFuture completes exceptionally,
1528     * then the returned CompletableFuture also does so,
1529     * with a CompletionException holding this exception as
1530     * its cause.
1531     *
1532     * @param fn the function to use to compute the value of
1533     * the returned CompletableFuture
1534     * @return the new CompletableFuture
1535     */
1536     public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1537     return doThenApply(fn, null);
1538     }
1539    
1540     /**
1541     * Creates and returns a CompletableFuture that is asynchronously
1542     * completed using the {@link ForkJoinPool#commonPool()} with the
1543     * result of the given function of this CompletableFuture. If
1544     * this CompletableFuture completes exceptionally, then the
1545     * returned CompletableFuture also does so, with a
1546     * CompletionException holding this exception as its cause.
1547     *
1548     * @param fn the function to use to compute the value of
1549     * the returned CompletableFuture
1550     * @return the new CompletableFuture
1551     */
1552     public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
1553     return doThenApply(fn, ForkJoinPool.commonPool());
1554 dl 1.17 }
1555    
1556 dl 1.28 /**
1557     * Creates and returns a CompletableFuture that is asynchronously
1558     * completed using the given executor with the result of the given
1559     * function of this CompletableFuture. If this CompletableFuture
1560     * completes exceptionally, then the returned CompletableFuture
1561     * also does so, with a CompletionException holding this exception as
1562     * its cause.
1563     *
1564     * @param fn the function to use to compute the value of
1565     * the returned CompletableFuture
1566     * @param executor the executor to use for asynchronous execution
1567     * @return the new CompletableFuture
1568     */
1569     public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
1570     Executor executor) {
1571     if (executor == null) throw new NullPointerException();
1572     return doThenApply(fn, executor);
1573     }
1574 dl 1.1
1575 dl 1.28 private <U> CompletableFuture<U> doThenApply(Function<? super T,? extends U> fn,
1576     Executor e) {
1577 dl 1.1 if (fn == null) throw new NullPointerException();
1578     CompletableFuture<U> dst = new CompletableFuture<U>();
1579 dl 1.28 ApplyCompletion<T,U> d = null;
1580 jsr166 1.2 Object r;
1581     if ((r = result) == null) {
1582 dl 1.1 CompletionNode p = new CompletionNode
1583 dl 1.28 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1584 dl 1.1 while ((r = result) == null) {
1585     if (UNSAFE.compareAndSwapObject
1586     (this, COMPLETIONS, p.next = completions, p))
1587     break;
1588     }
1589     }
1590 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1591 dl 1.19 T t; Throwable ex;
1592 jsr166 1.2 if (r instanceof AltResult) {
1593 dl 1.19 ex = ((AltResult)r).ex;
1594 jsr166 1.2 t = null;
1595 dl 1.1 }
1596 dl 1.19 else {
1597     ex = null;
1598 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1599     t = tr;
1600 dl 1.19 }
1601 dl 1.20 U u = null;
1602 jsr166 1.2 if (ex == null) {
1603 dl 1.1 try {
1604 dl 1.20 if (e != null)
1605 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
1606 dl 1.1 else
1607 dl 1.20 u = fn.apply(t);
1608 dl 1.1 } catch (Throwable rex) {
1609 dl 1.19 ex = rex;
1610 dl 1.1 }
1611     }
1612 dl 1.20 if (e == null || ex != null)
1613     dst.internalComplete(u, ex);
1614 dl 1.17 }
1615 dl 1.20 helpPostComplete();
1616 dl 1.1 return dst;
1617     }
1618    
1619 dl 1.28 /**
1620     * Creates and returns a CompletableFuture that is completed after
1621     * performing the given action with this CompletableFuture's
1622     * result when it completes. If this CompletableFuture
1623     * completes exceptionally, then the returned CompletableFuture
1624     * also does so, with a CompletionException holding this exception as
1625     * its cause.
1626     *
1627     * @param block the action to perform before completing the
1628     * returned CompletableFuture
1629     * @return the new CompletableFuture
1630     */
1631 dl 1.34 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1632 dl 1.28 return doThenAccept(block, null);
1633     }
1634    
1635     /**
1636     * Creates and returns a CompletableFuture that is asynchronously
1637     * completed using the {@link ForkJoinPool#commonPool()} with this
1638     * CompletableFuture's result when it completes. If this
1639     * CompletableFuture completes exceptionally, then the returned
1640     * CompletableFuture also does so, with a CompletionException holding
1641     * this exception as its cause.
1642     *
1643     * @param block the action to perform before completing the
1644     * returned CompletableFuture
1645     * @return the new CompletableFuture
1646     */
1647 dl 1.34 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1648 dl 1.28 return doThenAccept(block, ForkJoinPool.commonPool());
1649     }
1650    
1651     /**
1652     * Creates and returns a CompletableFuture that is asynchronously
1653     * completed using the given executor with this
1654     * CompletableFuture's result when it completes. If this
1655     * CompletableFuture completes exceptionally, then the returned
1656     * CompletableFuture also does so, with a CompletionException holding
1657     * this exception as its cause.
1658     *
1659     * @param block the action to perform before completing the
1660     * returned CompletableFuture
1661     * @param executor the executor to use for asynchronous execution
1662     * @return the new CompletableFuture
1663     */
1664 dl 1.34 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1665 dl 1.28 Executor executor) {
1666     if (executor == null) throw new NullPointerException();
1667     return doThenAccept(block, executor);
1668     }
1669    
1670 dl 1.34 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1671 dl 1.28 Executor e) {
1672 dl 1.7 if (fn == null) throw new NullPointerException();
1673     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1674 dl 1.28 AcceptCompletion<T> d = null;
1675 dl 1.7 Object r;
1676     if ((r = result) == null) {
1677     CompletionNode p = new CompletionNode
1678 dl 1.28 (d = new AcceptCompletion<T>(this, fn, dst, e));
1679 dl 1.7 while ((r = result) == null) {
1680     if (UNSAFE.compareAndSwapObject
1681     (this, COMPLETIONS, p.next = completions, p))
1682     break;
1683     }
1684     }
1685     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1686 dl 1.19 T t; Throwable ex;
1687 dl 1.7 if (r instanceof AltResult) {
1688 dl 1.19 ex = ((AltResult)r).ex;
1689 dl 1.7 t = null;
1690     }
1691 dl 1.19 else {
1692     ex = null;
1693 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1694     t = tr;
1695 dl 1.19 }
1696 dl 1.7 if (ex == null) {
1697     try {
1698 dl 1.20 if (e != null)
1699 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
1700 dl 1.20 else
1701 dl 1.7 fn.accept(t);
1702     } catch (Throwable rex) {
1703 dl 1.19 ex = rex;
1704 dl 1.7 }
1705     }
1706 dl 1.20 if (e == null || ex != null)
1707     dst.internalComplete(null, ex);
1708 dl 1.17 }
1709 dl 1.20 helpPostComplete();
1710 dl 1.7 return dst;
1711     }
1712    
1713 dl 1.28 /**
1714     * Creates and returns a CompletableFuture that is completed after
1715     * performing the given action when this CompletableFuture
1716     * completes. If this CompletableFuture completes exceptionally,
1717     * then the returned CompletableFuture also does so, with a
1718     * CompletionException holding this exception as its cause.
1719     *
1720     * @param action the action to perform before completing the
1721     * returned CompletableFuture
1722     * @return the new CompletableFuture
1723     */
1724     public CompletableFuture<Void> thenRun(Runnable action) {
1725     return doThenRun(action, null);
1726     }
1727    
1728     /**
1729     * Creates and returns a CompletableFuture that is asynchronously
1730     * completed using the {@link ForkJoinPool#commonPool()} after
1731     * performing the given action when this CompletableFuture
1732     * completes. If this CompletableFuture completes exceptionally,
1733     * then the returned CompletableFuture also does so, with a
1734     * CompletionException holding this exception as its cause.
1735     *
1736     * @param action the action to perform before completing the
1737     * returned CompletableFuture
1738     * @return the new CompletableFuture
1739     */
1740     public CompletableFuture<Void> thenRunAsync(Runnable action) {
1741     return doThenRun(action, ForkJoinPool.commonPool());
1742     }
1743    
1744     /**
1745     * Creates and returns a CompletableFuture that is asynchronously
1746     * completed using the given executor after performing the given
1747     * action when this CompletableFuture completes. If this
1748     * CompletableFuture completes exceptionally, then the returned
1749     * CompletableFuture also does so, with a CompletionException holding
1750     * this exception as its cause.
1751     *
1752     * @param action the action to perform before completing the
1753     * returned CompletableFuture
1754     * @param executor the executor to use for asynchronous execution
1755     * @return the new CompletableFuture
1756     */
1757     public CompletableFuture<Void> thenRunAsync(Runnable action,
1758     Executor executor) {
1759     if (executor == null) throw new NullPointerException();
1760     return doThenRun(action, executor);
1761     }
1762    
1763     private CompletableFuture<Void> doThenRun(Runnable action,
1764     Executor e) {
1765 dl 1.1 if (action == null) throw new NullPointerException();
1766     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1767 dl 1.28 RunCompletion<T> d = null;
1768 jsr166 1.2 Object r;
1769     if ((r = result) == null) {
1770 dl 1.1 CompletionNode p = new CompletionNode
1771 dl 1.28 (d = new RunCompletion<T>(this, action, dst, e));
1772 dl 1.1 while ((r = result) == null) {
1773     if (UNSAFE.compareAndSwapObject
1774     (this, COMPLETIONS, p.next = completions, p))
1775     break;
1776     }
1777     }
1778 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1779 dl 1.19 Throwable ex;
1780     if (r instanceof AltResult)
1781 dl 1.28 ex = ((AltResult)r).ex;
1782     else
1783     ex = null;
1784     if (ex == null) {
1785     try {
1786     if (e != null)
1787     e.execute(new AsyncRun(action, dst));
1788     else
1789     action.run();
1790     } catch (Throwable rex) {
1791     ex = rex;
1792     }
1793     }
1794     if (e == null || ex != null)
1795     dst.internalComplete(null, ex);
1796     }
1797     helpPostComplete();
1798     return dst;
1799     }
1800    
1801     /**
1802     * Creates and returns a CompletableFuture that is completed with
1803     * the result of the given function of this and the other given
1804     * CompletableFuture's results when both complete. If this or
1805     * the other CompletableFuture complete exceptionally, then the
1806     * returned CompletableFuture also does so, with a
1807     * CompletionException holding the exception as its cause.
1808     *
1809     * @param other the other CompletableFuture
1810     * @param fn the function to use to compute the value of
1811     * the returned CompletableFuture
1812     * @return the new CompletableFuture
1813     */
1814     public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1815     BiFunction<? super T,? super U,? extends V> fn) {
1816     return doThenBiApply(other, fn, null);
1817     }
1818    
1819     /**
1820     * Creates and returns a CompletableFuture that is asynchronously
1821     * completed using the {@link ForkJoinPool#commonPool()} with
1822     * the result of the given function of this and the other given
1823     * CompletableFuture's results when both complete. If this or
1824     * the other CompletableFuture complete exceptionally, then the
1825     * returned CompletableFuture also does so, with a
1826     * CompletionException holding the exception as its cause.
1827     *
1828     * @param other the other CompletableFuture
1829     * @param fn the function to use to compute the value of
1830     * the returned CompletableFuture
1831     * @return the new CompletableFuture
1832     */
1833     public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1834     BiFunction<? super T,? super U,? extends V> fn) {
1835     return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1836     }
1837    
1838     /**
1839     * Creates and returns a CompletableFuture that is
1840     * asynchronously completed using the given executor with the
1841     * result of the given function of this and the other given
1842     * CompletableFuture's results when both complete. If this or
1843     * the other CompletableFuture complete exceptionally, then the
1844     * returned CompletableFuture also does so, with a
1845     * CompletionException holding the exception as its cause.
1846     *
1847     * @param other the other CompletableFuture
1848     * @param fn the function to use to compute the value of
1849     * the returned CompletableFuture
1850     * @param executor the executor to use for asynchronous execution
1851     * @return the new CompletableFuture
1852     */
1853    
1854     public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1855     BiFunction<? super T,? super U,? extends V> fn,
1856     Executor executor) {
1857     if (executor == null) throw new NullPointerException();
1858     return doThenBiApply(other, fn, executor);
1859 dl 1.1 }
1860    
1861 dl 1.28 private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1862     BiFunction<? super T,? super U,? extends V> fn,
1863     Executor e) {
1864 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
1865 jsr166 1.2 CompletableFuture<V> dst = new CompletableFuture<V>();
1866 dl 1.28 BiApplyCompletion<T,U,V> d = null;
1867 jsr166 1.2 Object r, s = null;
1868     if ((r = result) == null || (s = other.result) == null) {
1869 dl 1.28 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1870 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
1871     while ((r == null && (r = result) == null) ||
1872     (s == null && (s = other.result) == null)) {
1873     if (q != null) {
1874     if (s != null ||
1875     UNSAFE.compareAndSwapObject
1876     (other, COMPLETIONS, q.next = other.completions, q))
1877     break;
1878     }
1879     else if (r != null ||
1880     UNSAFE.compareAndSwapObject
1881     (this, COMPLETIONS, p.next = completions, p)) {
1882     if (s != null)
1883     break;
1884     q = new CompletionNode(d);
1885     }
1886     }
1887     }
1888 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1889 dl 1.19 T t; U u; Throwable ex;
1890 jsr166 1.2 if (r instanceof AltResult) {
1891 dl 1.19 ex = ((AltResult)r).ex;
1892 jsr166 1.2 t = null;
1893 dl 1.1 }
1894 dl 1.19 else {
1895     ex = null;
1896 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1897     t = tr;
1898 dl 1.19 }
1899 dl 1.1 if (ex != null)
1900     u = null;
1901     else if (s instanceof AltResult) {
1902 dl 1.19 ex = ((AltResult)s).ex;
1903 dl 1.1 u = null;
1904     }
1905 dl 1.28 else {
1906     @SuppressWarnings("unchecked") U us = (U) s;
1907     u = us;
1908     }
1909 dl 1.20 V v = null;
1910 jsr166 1.2 if (ex == null) {
1911 dl 1.1 try {
1912 dl 1.20 if (e != null)
1913 dl 1.28 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1914 dl 1.1 else
1915 dl 1.20 v = fn.apply(t, u);
1916 dl 1.1 } catch (Throwable rex) {
1917 dl 1.19 ex = rex;
1918 dl 1.1 }
1919     }
1920 dl 1.20 if (e == null || ex != null)
1921     dst.internalComplete(v, ex);
1922 jsr166 1.2 }
1923 dl 1.20 helpPostComplete();
1924     other.helpPostComplete();
1925 jsr166 1.2 return dst;
1926 dl 1.1 }
1927    
1928 dl 1.28 /**
1929     * Creates and returns a CompletableFuture that is completed with
1930     * the results of this and the other given CompletableFuture if
1931     * both complete. If this and/or the other CompletableFuture
1932     * complete exceptionally, then the returned CompletableFuture
1933     * also does so, with a CompletionException holding one of these
1934     * exceptions as its cause.
1935     *
1936     * @param other the other CompletableFuture
1937     * @param block the action to perform before completing the
1938     * returned CompletableFuture
1939     * @return the new CompletableFuture
1940     */
1941     public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1942 dl 1.34 BiConsumer<? super T, ? super U> block) {
1943 dl 1.28 return doThenBiAccept(other, block, null);
1944     }
1945    
1946     /**
1947     * Creates and returns a CompletableFuture that is completed
1948     * asynchronously using the {@link ForkJoinPool#commonPool()} with
1949     * the results of this and the other given CompletableFuture when
1950     * both complete. If this and/or the other CompletableFuture
1951     * complete exceptionally, then the returned CompletableFuture
1952     * also does so, with a CompletionException holding one of these
1953     * exceptions as its cause.
1954     *
1955     * @param other the other CompletableFuture
1956     * @param block the action to perform before completing the
1957     * returned CompletableFuture
1958     * @return the new CompletableFuture
1959     */
1960     public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1961 dl 1.34 BiConsumer<? super T, ? super U> block) {
1962 dl 1.28 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
1963     }
1964    
1965     /**
1966     * Creates and returns a CompletableFuture that is completed
1967     * asynchronously using the given executor with the results of
1968     * this and the other given CompletableFuture when both complete.
1969     * If this and/or the other CompletableFuture complete
1970     * exceptionally, then the returned CompletableFuture also does
1971     * so, with a CompletionException holding one of these exceptions as
1972     * its cause.
1973     *
1974     * @param other the other CompletableFuture
1975     * @param block the action to perform before completing the
1976     * returned CompletableFuture
1977     * @param executor the executor to use for asynchronous execution
1978     * @return the new CompletableFuture
1979     */
1980     public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1981 dl 1.34 BiConsumer<? super T, ? super U> block,
1982 dl 1.28 Executor executor) {
1983     if (executor == null) throw new NullPointerException();
1984     return doThenBiAccept(other, block, executor);
1985     }
1986    
1987     private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
1988 dl 1.34 BiConsumer<? super T,? super U> fn,
1989 dl 1.28 Executor e) {
1990 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
1991     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1992 dl 1.28 BiAcceptCompletion<T,U> d = null;
1993 dl 1.7 Object r, s = null;
1994     if ((r = result) == null || (s = other.result) == null) {
1995 dl 1.28 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
1996 dl 1.7 CompletionNode q = null, p = new CompletionNode(d);
1997     while ((r == null && (r = result) == null) ||
1998     (s == null && (s = other.result) == null)) {
1999     if (q != null) {
2000     if (s != null ||
2001     UNSAFE.compareAndSwapObject
2002     (other, COMPLETIONS, q.next = other.completions, q))
2003     break;
2004     }
2005     else if (r != null ||
2006     UNSAFE.compareAndSwapObject
2007     (this, COMPLETIONS, p.next = completions, p)) {
2008     if (s != null)
2009     break;
2010     q = new CompletionNode(d);
2011     }
2012     }
2013     }
2014     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2015 dl 1.19 T t; U u; Throwable ex;
2016 dl 1.7 if (r instanceof AltResult) {
2017 dl 1.19 ex = ((AltResult)r).ex;
2018 dl 1.7 t = null;
2019     }
2020 dl 1.19 else {
2021     ex = null;
2022 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2023     t = tr;
2024 dl 1.19 }
2025 dl 1.7 if (ex != null)
2026     u = null;
2027     else if (s instanceof AltResult) {
2028 dl 1.19 ex = ((AltResult)s).ex;
2029 dl 1.7 u = null;
2030     }
2031 dl 1.28 else {
2032     @SuppressWarnings("unchecked") U us = (U) s;
2033     u = us;
2034     }
2035 dl 1.7 if (ex == null) {
2036     try {
2037 dl 1.20 if (e != null)
2038 dl 1.28 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2039 dl 1.20 else
2040 dl 1.7 fn.accept(t, u);
2041     } catch (Throwable rex) {
2042 dl 1.19 ex = rex;
2043 dl 1.7 }
2044     }
2045 dl 1.20 if (e == null || ex != null)
2046     dst.internalComplete(null, ex);
2047 dl 1.7 }
2048 dl 1.20 helpPostComplete();
2049     other.helpPostComplete();
2050 dl 1.7 return dst;
2051     }
2052    
2053 dl 1.28 /**
2054     * Creates and returns a CompletableFuture that is completed
2055     * when this and the other given CompletableFuture both
2056     * complete. If this and/or the other CompletableFuture complete
2057     * exceptionally, then the returned CompletableFuture also does
2058     * so, with a CompletionException holding one of these exceptions as
2059     * its cause.
2060     *
2061     * @param other the other CompletableFuture
2062     * @param action the action to perform before completing the
2063     * returned CompletableFuture
2064     * @return the new CompletableFuture
2065     */
2066     public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2067     Runnable action) {
2068     return doThenBiRun(other, action, null);
2069     }
2070    
2071     /**
2072     * Creates and returns a CompletableFuture that is completed
2073     * asynchronously using the {@link ForkJoinPool#commonPool()}
2074     * when this and the other given CompletableFuture both
2075     * complete. If this and/or the other CompletableFuture complete
2076     * exceptionally, then the returned CompletableFuture also does
2077     * so, with a CompletionException holding one of these exceptions as
2078     * its cause.
2079     *
2080     * @param other the other CompletableFuture
2081     * @param action the action to perform before completing the
2082     * returned CompletableFuture
2083     * @return the new CompletableFuture
2084     */
2085     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2086     Runnable action) {
2087     return doThenBiRun(other, action, ForkJoinPool.commonPool());
2088     }
2089    
2090     /**
2091     * Creates and returns a CompletableFuture that is completed
2092     * asynchronously using the given executor
2093     * when this and the other given CompletableFuture both
2094     * complete. If this and/or the other CompletableFuture complete
2095     * exceptionally, then the returned CompletableFuture also does
2096     * so, with a CompletionException holding one of these exceptions as
2097     * its cause.
2098     *
2099     * @param other the other CompletableFuture
2100     * @param action the action to perform before completing the
2101     * returned CompletableFuture
2102     * @param executor the executor to use for asynchronous execution
2103     * @return the new CompletableFuture
2104     */
2105     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2106     Runnable action,
2107     Executor executor) {
2108     if (executor == null) throw new NullPointerException();
2109     return doThenBiRun(other, action, executor);
2110     }
2111    
2112     private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2113     Runnable action,
2114     Executor e) {
2115 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2116 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2117 dl 1.28 BiRunCompletion<T> d = null;
2118 jsr166 1.2 Object r, s = null;
2119     if ((r = result) == null || (s = other.result) == null) {
2120 dl 1.28 d = new BiRunCompletion<T>(this, other, action, dst, e);
2121 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2122     while ((r == null && (r = result) == null) ||
2123     (s == null && (s = other.result) == null)) {
2124     if (q != null) {
2125     if (s != null ||
2126     UNSAFE.compareAndSwapObject
2127     (other, COMPLETIONS, q.next = other.completions, q))
2128     break;
2129     }
2130     else if (r != null ||
2131     UNSAFE.compareAndSwapObject
2132     (this, COMPLETIONS, p.next = completions, p)) {
2133     if (s != null)
2134     break;
2135     q = new CompletionNode(d);
2136     }
2137     }
2138     }
2139 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2140 dl 1.19 Throwable ex;
2141     if (r instanceof AltResult)
2142     ex = ((AltResult)r).ex;
2143     else
2144     ex = null;
2145     if (ex == null && (s instanceof AltResult))
2146     ex = ((AltResult)s).ex;
2147     if (ex == null) {
2148 dl 1.1 try {
2149 dl 1.20 if (e != null)
2150 dl 1.28 e.execute(new AsyncRun(action, dst));
2151 dl 1.20 else
2152 dl 1.1 action.run();
2153     } catch (Throwable rex) {
2154 dl 1.19 ex = rex;
2155 dl 1.1 }
2156     }
2157 dl 1.20 if (e == null || ex != null)
2158     dst.internalComplete(null, ex);
2159 jsr166 1.2 }
2160 dl 1.20 helpPostComplete();
2161     other.helpPostComplete();
2162 jsr166 1.2 return dst;
2163 dl 1.1 }
2164    
2165 dl 1.28 /**
2166     * Creates and returns a CompletableFuture that is completed with
2167     * the result of the given function of either this or the other
2168     * given CompletableFuture's results when either complete. If
2169     * this and/or the other CompletableFuture complete exceptionally,
2170     * then the returned CompletableFuture may also do so, with a
2171     * CompletionException holding one of these exceptions as its cause.
2172     * No guarantees are made about which result or exception is used
2173     * in the returned CompletableFuture.
2174     *
2175     * @param other the other CompletableFuture
2176     * @param fn the function to use to compute the value of
2177     * the returned CompletableFuture
2178     * @return the new CompletableFuture
2179     */
2180     public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2181     Function<? super T, U> fn) {
2182     return doOrApply(other, fn, null);
2183     }
2184    
2185     /**
2186     * Creates and returns a CompletableFuture that is completed
2187     * asynchronously using the {@link ForkJoinPool#commonPool()} with
2188     * the result of the given function of either this or the other
2189     * given CompletableFuture's results when either complete. If
2190     * this and/or the other CompletableFuture complete exceptionally,
2191     * then the returned CompletableFuture may also do so, with a
2192     * CompletionException holding one of these exceptions as its cause.
2193     * No guarantees are made about which result or exception is used
2194     * in the returned CompletableFuture.
2195     *
2196     * @param other the other CompletableFuture
2197     * @param fn the function to use to compute the value of
2198     * the returned CompletableFuture
2199     * @return the new CompletableFuture
2200     */
2201     public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2202     Function<? super T, U> fn) {
2203     return doOrApply(other, fn, ForkJoinPool.commonPool());
2204     }
2205    
2206     /**
2207     * Creates and returns a CompletableFuture that is completed
2208     * asynchronously using the given executor with the result of the
2209     * given function of either this or the other given
2210     * CompletableFuture's results when either complete. If this
2211     * and/or the other CompletableFuture complete exceptionally, then
2212     * the returned CompletableFuture may also do so, with a
2213     * CompletionException holding one of these exceptions as its cause.
2214     * No guarantees are made about which result or exception is used
2215     * in the returned CompletableFuture.
2216     *
2217     * @param other the other CompletableFuture
2218     * @param fn the function to use to compute the value of
2219     * the returned CompletableFuture
2220     * @param executor the executor to use for asynchronous execution
2221     * @return the new CompletableFuture
2222     */
2223     public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2224     Function<? super T, U> fn,
2225     Executor executor) {
2226     if (executor == null) throw new NullPointerException();
2227     return doOrApply(other, fn, executor);
2228     }
2229    
2230     private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2231     Function<? super T, U> fn,
2232     Executor e) {
2233 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2234 jsr166 1.2 CompletableFuture<U> dst = new CompletableFuture<U>();
2235 dl 1.28 OrApplyCompletion<T,U> d = null;
2236 jsr166 1.2 Object r;
2237     if ((r = result) == null && (r = other.result) == null) {
2238 dl 1.28 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2239 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2240     while ((r = result) == null && (r = other.result) == null) {
2241     if (q != null) {
2242     if (UNSAFE.compareAndSwapObject
2243     (other, COMPLETIONS, q.next = other.completions, q))
2244     break;
2245     }
2246     else if (UNSAFE.compareAndSwapObject
2247     (this, COMPLETIONS, p.next = completions, p))
2248     q = new CompletionNode(d);
2249     }
2250 jsr166 1.2 }
2251     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2252 dl 1.19 T t; Throwable ex;
2253 jsr166 1.2 if (r instanceof AltResult) {
2254 dl 1.19 ex = ((AltResult)r).ex;
2255 jsr166 1.2 t = null;
2256 dl 1.1 }
2257 dl 1.19 else {
2258     ex = null;
2259 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2260     t = tr;
2261 dl 1.19 }
2262 dl 1.20 U u = null;
2263 jsr166 1.2 if (ex == null) {
2264 dl 1.1 try {
2265 dl 1.20 if (e != null)
2266 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
2267 dl 1.1 else
2268 dl 1.20 u = fn.apply(t);
2269 dl 1.1 } catch (Throwable rex) {
2270 dl 1.19 ex = rex;
2271 dl 1.1 }
2272     }
2273 dl 1.20 if (e == null || ex != null)
2274     dst.internalComplete(u, ex);
2275 jsr166 1.2 }
2276 dl 1.20 helpPostComplete();
2277     other.helpPostComplete();
2278 jsr166 1.2 return dst;
2279 dl 1.1 }
2280    
2281 dl 1.28 /**
2282     * Creates and returns a CompletableFuture that is completed after
2283     * performing the given action with the result of either this or the
2284     * other given CompletableFuture's result, when either complete.
2285     * If this and/or the other CompletableFuture complete
2286     * exceptionally, then the returned CompletableFuture may also do
2287     * so, with a CompletionException holding one of these exceptions as
2288     * its cause. No guarantees are made about which exception is
2289     * used in the returned CompletableFuture.
2290     *
2291     * @param other the other CompletableFuture
2292     * @param block the action to perform before completing the
2293     * returned CompletableFuture
2294     * @return the new CompletableFuture
2295     */
2296     public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2297 dl 1.34 Consumer<? super T> block) {
2298 dl 1.28 return doOrAccept(other, block, null);
2299     }
2300    
2301     /**
2302     * Creates and returns a CompletableFuture that is completed
2303     * asynchronously using the {@link ForkJoinPool#commonPool()},
2304     * performing the given action with the result of either this or
2305     * the other given CompletableFuture's result, when either
2306     * complete. If this and/or the other CompletableFuture complete
2307     * exceptionally, then the returned CompletableFuture may also do
2308     * so, with a CompletionException holding one of these exceptions as
2309     * its cause. No guarantees are made about which exception is
2310     * used in the returned CompletableFuture.
2311     *
2312     * @param other the other CompletableFuture
2313     * @param block the action to perform before completing the
2314     * returned CompletableFuture
2315     * @return the new CompletableFuture
2316     */
2317     public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2318 dl 1.34 Consumer<? super T> block) {
2319 dl 1.28 return doOrAccept(other, block, ForkJoinPool.commonPool());
2320     }
2321    
2322     /**
2323     * Creates and returns a CompletableFuture that is completed
2324     * asynchronously using the given executor,
2325     * performing the given action with the result of either this or
2326     * the other given CompletableFuture's result, when either
2327     * complete. If this and/or the other CompletableFuture complete
2328     * exceptionally, then the returned CompletableFuture may also do
2329     * so, with a CompletionException holding one of these exceptions as
2330     * its cause. No guarantees are made about which exception is
2331     * used in the returned CompletableFuture.
2332     *
2333     * @param other the other CompletableFuture
2334     * @param block the action to perform before completing the
2335     * returned CompletableFuture
2336     * @param executor the executor to use for asynchronous execution
2337     * @return the new CompletableFuture
2338     */
2339     public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2340 dl 1.34 Consumer<? super T> block,
2341 dl 1.28 Executor executor) {
2342     if (executor == null) throw new NullPointerException();
2343     return doOrAccept(other, block, executor);
2344     }
2345    
2346     private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2347 dl 1.34 Consumer<? super T> fn,
2348 dl 1.28 Executor e) {
2349 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2350     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2351 dl 1.28 OrAcceptCompletion<T> d = null;
2352 dl 1.7 Object r;
2353     if ((r = result) == null && (r = other.result) == null) {
2354 dl 1.28 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2355 dl 1.7 CompletionNode q = null, p = new CompletionNode(d);
2356     while ((r = result) == null && (r = other.result) == null) {
2357     if (q != null) {
2358     if (UNSAFE.compareAndSwapObject
2359     (other, COMPLETIONS, q.next = other.completions, q))
2360     break;
2361     }
2362     else if (UNSAFE.compareAndSwapObject
2363     (this, COMPLETIONS, p.next = completions, p))
2364     q = new CompletionNode(d);
2365     }
2366     }
2367     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2368 dl 1.19 T t; Throwable ex;
2369 dl 1.7 if (r instanceof AltResult) {
2370 dl 1.19 ex = ((AltResult)r).ex;
2371 dl 1.7 t = null;
2372     }
2373 dl 1.19 else {
2374     ex = null;
2375 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2376     t = tr;
2377 dl 1.19 }
2378 dl 1.7 if (ex == null) {
2379     try {
2380 dl 1.20 if (e != null)
2381 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
2382 dl 1.20 else
2383 dl 1.7 fn.accept(t);
2384     } catch (Throwable rex) {
2385 dl 1.19 ex = rex;
2386 dl 1.7 }
2387     }
2388 dl 1.20 if (e == null || ex != null)
2389     dst.internalComplete(null, ex);
2390 dl 1.7 }
2391 dl 1.20 helpPostComplete();
2392     other.helpPostComplete();
2393 dl 1.7 return dst;
2394     }
2395    
2396 dl 1.28 /**
2397     * Creates and returns a CompletableFuture that is completed
2398     * after this or the other given CompletableFuture complete. If
2399     * this and/or the other CompletableFuture complete exceptionally,
2400     * then the returned CompletableFuture may also do so, with a
2401     * CompletionException holding one of these exceptions as its cause.
2402     * No guarantees are made about which exception is used in the
2403     * returned CompletableFuture.
2404     *
2405     * @param other the other CompletableFuture
2406     * @param action the action to perform before completing the
2407     * returned CompletableFuture
2408     * @return the new CompletableFuture
2409     */
2410     public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2411     Runnable action) {
2412     return doOrRun(other, action, null);
2413     }
2414    
2415     /**
2416     * Creates and returns a CompletableFuture that is completed
2417     * asynchronously using the {@link ForkJoinPool#commonPool()}
2418     * after this or the other given CompletableFuture complete. If
2419     * this and/or the other CompletableFuture complete exceptionally,
2420     * then the returned CompletableFuture may also do so, with a
2421     * CompletionException holding one of these exceptions as its cause.
2422     * No guarantees are made about which exception is used in the
2423     * returned CompletableFuture.
2424     *
2425     * @param other the other CompletableFuture
2426     * @param action the action to perform before completing the
2427     * returned CompletableFuture
2428     * @return the new CompletableFuture
2429     */
2430     public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2431     Runnable action) {
2432     return doOrRun(other, action, ForkJoinPool.commonPool());
2433     }
2434    
2435     /**
2436     * Creates and returns a CompletableFuture that is completed
2437     * asynchronously using the given executor after this or the other
2438     * given CompletableFuture complete. If this and/or the other
2439     * CompletableFuture complete exceptionally, then the returned
2440     * CompletableFuture may also do so, with a CompletionException
2441     * holding one of these exceptions as its cause. No guarantees are
2442     * made about which exception is used in the returned
2443     * CompletableFuture.
2444     *
2445     * @param other the other CompletableFuture
2446     * @param action the action to perform before completing the
2447     * returned CompletableFuture
2448     * @param executor the executor to use for asynchronous execution
2449     * @return the new CompletableFuture
2450     */
2451     public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2452     Runnable action,
2453     Executor executor) {
2454     if (executor == null) throw new NullPointerException();
2455     return doOrRun(other, action, executor);
2456     }
2457    
2458     private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2459     Runnable action,
2460     Executor e) {
2461 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2462 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2463 dl 1.28 OrRunCompletion<T> d = null;
2464 jsr166 1.2 Object r;
2465     if ((r = result) == null && (r = other.result) == null) {
2466 dl 1.28 d = new OrRunCompletion<T>(this, other, action, dst, e);
2467 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2468     while ((r = result) == null && (r = other.result) == null) {
2469     if (q != null) {
2470     if (UNSAFE.compareAndSwapObject
2471     (other, COMPLETIONS, q.next = other.completions, q))
2472     break;
2473     }
2474     else if (UNSAFE.compareAndSwapObject
2475     (this, COMPLETIONS, p.next = completions, p))
2476     q = new CompletionNode(d);
2477     }
2478 jsr166 1.2 }
2479     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2480 dl 1.19 Throwable ex;
2481     if (r instanceof AltResult)
2482     ex = ((AltResult)r).ex;
2483     else
2484     ex = null;
2485     if (ex == null) {
2486 dl 1.1 try {
2487 dl 1.20 if (e != null)
2488 dl 1.28 e.execute(new AsyncRun(action, dst));
2489 dl 1.20 else
2490 dl 1.1 action.run();
2491     } catch (Throwable rex) {
2492 dl 1.19 ex = rex;
2493 dl 1.1 }
2494     }
2495 dl 1.20 if (e == null || ex != null)
2496     dst.internalComplete(null, ex);
2497 jsr166 1.2 }
2498 dl 1.20 helpPostComplete();
2499     other.helpPostComplete();
2500 jsr166 1.2 return dst;
2501 dl 1.1 }
2502    
2503 dl 1.28 /**
2504     * Returns a CompletableFuture (or an equivalent one) produced by
2505     * the given function of the result of this CompletableFuture when
2506     * completed. If this CompletableFuture completes exceptionally,
2507     * then the returned CompletableFuture also does so, with a
2508     * CompletionException holding this exception as its cause.
2509     *
2510     * @param fn the function returning a new CompletableFuture.
2511     * @return the CompletableFuture, that {@code isDone()} upon
2512     * return if completed by the given function, or an exception
2513     * occurs.
2514     */
2515     public <U> CompletableFuture<U> thenCompose(Function<? super T,
2516     CompletableFuture<U>> fn) {
2517     if (fn == null) throw new NullPointerException();
2518     CompletableFuture<U> dst = null;
2519     ComposeCompletion<T,U> d = null;
2520     Object r;
2521     if ((r = result) == null) {
2522     dst = new CompletableFuture<U>();
2523     CompletionNode p = new CompletionNode
2524     (d = new ComposeCompletion<T,U>(this, fn, dst));
2525     while ((r = result) == null) {
2526     if (UNSAFE.compareAndSwapObject
2527     (this, COMPLETIONS, p.next = completions, p))
2528     break;
2529     }
2530     }
2531     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2532     T t; Throwable ex;
2533     if (r instanceof AltResult) {
2534     ex = ((AltResult)r).ex;
2535     t = null;
2536     }
2537     else {
2538     ex = null;
2539     @SuppressWarnings("unchecked") T tr = (T) r;
2540     t = tr;
2541     }
2542     if (ex == null) {
2543     try {
2544     dst = fn.apply(t);
2545     } catch (Throwable rex) {
2546     ex = rex;
2547     }
2548     }
2549     if (dst == null) {
2550     dst = new CompletableFuture<U>();
2551     if (ex == null)
2552     ex = new NullPointerException();
2553     }
2554     if (ex != null)
2555     dst.internalComplete(null, ex);
2556     }
2557     helpPostComplete();
2558     dst.helpPostComplete();
2559     return dst;
2560     }
2561    
2562     /**
2563     * Creates and returns a CompletableFuture that is completed with
2564     * the result of the given function of the exception triggering
2565     * this CompletableFuture's completion when it completes
2566     * exceptionally; Otherwise, if this CompletableFuture completes
2567     * normally, then the returned CompletableFuture also completes
2568     * normally with the same value.
2569     *
2570     * @param fn the function to use to compute the value of the
2571     * returned CompletableFuture if this CompletableFuture completed
2572     * exceptionally
2573     * @return the new CompletableFuture
2574     */
2575     public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
2576     if (fn == null) throw new NullPointerException();
2577     CompletableFuture<T> dst = new CompletableFuture<T>();
2578     ExceptionCompletion<T> d = null;
2579     Object r;
2580     if ((r = result) == null) {
2581     CompletionNode p =
2582     new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2583     while ((r = result) == null) {
2584     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2585     p.next = completions, p))
2586     break;
2587     }
2588     }
2589     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2590     T t = null; Throwable ex, dx = null;
2591     if (r instanceof AltResult) {
2592     if ((ex = ((AltResult)r).ex) != null) {
2593     try {
2594     t = fn.apply(ex);
2595     } catch (Throwable rex) {
2596     dx = rex;
2597     }
2598     }
2599     }
2600     else {
2601     @SuppressWarnings("unchecked") T tr = (T) r;
2602     t = tr;
2603     }
2604     dst.internalComplete(t, dx);
2605     }
2606     helpPostComplete();
2607     return dst;
2608     }
2609    
2610     /**
2611     * Creates and returns a CompletableFuture that is completed with
2612     * the result of the given function of the result and exception of
2613     * this CompletableFuture's completion when it completes. The
2614     * given function is invoked with the result (or {@code null} if
2615     * none) and the exception (or {@code null} if none) of this
2616     * CompletableFuture when complete.
2617     *
2618     * @param fn the function to use to compute the value of the
2619     * returned CompletableFuture
2620    
2621     * @return the new CompletableFuture
2622     */
2623     public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
2624     if (fn == null) throw new NullPointerException();
2625     CompletableFuture<U> dst = new CompletableFuture<U>();
2626     HandleCompletion<T,U> d = null;
2627     Object r;
2628     if ((r = result) == null) {
2629     CompletionNode p =
2630     new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2631     while ((r = result) == null) {
2632     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2633     p.next = completions, p))
2634     break;
2635     }
2636     }
2637     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2638     T t; Throwable ex;
2639     if (r instanceof AltResult) {
2640     ex = ((AltResult)r).ex;
2641     t = null;
2642     }
2643     else {
2644     ex = null;
2645     @SuppressWarnings("unchecked") T tr = (T) r;
2646     t = tr;
2647     }
2648     U u; Throwable dx;
2649     try {
2650     u = fn.apply(t, ex);
2651     dx = null;
2652     } catch (Throwable rex) {
2653     dx = rex;
2654     u = null;
2655     }
2656     dst.internalComplete(u, dx);
2657     }
2658     helpPostComplete();
2659     return dst;
2660     }
2661    
2662 dl 1.35
2663     /* ------------- Arbitrary-arity constructions -------------- */
2664    
2665     /*
2666     * The basic plan of attack is to recursively form binary
2667     * completion trees of elements. This can be overkill for small
2668     * sets, but scales nicely. The And/All vs Or/Any forms use the
2669     * same idea, but details differ.
2670     */
2671    
2672     /**
2673     * Returns a new CompletableFuture that is completed when all of
2674     * the given CompletableFutures complete. If any of the component
2675     * CompletableFuture complete exceptionally, then so does the
2676     * returned CompletableFuture. Otherwise, the results, if any, of
2677     * the component CompletableFutures are not reflected in the
2678     * returned CompletableFuture, but may be obtained by inspecting
2679     * them individually. If the number of components is zero, returns
2680     * a completed CompletableFuture.
2681     *
2682     * <p>Among the applications of this method is to await completion
2683     * of a set of independent CompletableFutures before continuing a
2684     * program, as in: {@code CompletableFuture.allOf(c1, c2,
2685     * c3).join();}.
2686     *
2687     * @param cfs the CompletableFutures
2688     * @return a CompletableFuture that is complete when all of the
2689     * given CompletableFutures complete
2690     * @throws NullPointerException if the array or any of its elements are
2691     * {@code null}
2692     */
2693     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2694     int len = cfs.length; // Directly handle empty and singleton cases
2695     if (len > 1)
2696     return allTree(cfs, 0, len - 1);
2697     else {
2698     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2699     CompletableFuture<?> f;
2700     if (len == 0)
2701     dst.result = NIL;
2702     else if ((f = cfs[0]) == null)
2703     throw new NullPointerException();
2704     else {
2705     ThenCopy d = null;
2706     CompletionNode p = null;
2707     Object r;
2708     while ((r = f.result) == null) {
2709     if (d == null)
2710     d = new ThenCopy(f, dst);
2711     else if (p == null)
2712     p = new CompletionNode(d);
2713     else if (UNSAFE.compareAndSwapObject
2714     (f, COMPLETIONS, p.next = f.completions, p))
2715     break;
2716     }
2717     if (r != null && (d == null || d.compareAndSet(0, 1)))
2718     dst.internalComplete(null, (r instanceof AltResult) ?
2719     ((AltResult)r).ex : null);
2720     f.helpPostComplete();
2721     }
2722     return dst;
2723     }
2724     }
2725    
2726     /**
2727     * Recursively constructs an And'ed tree of CompletableFutures.
2728     * Called only when array known to have at least two elements.
2729     */
2730     private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2731     int lo, int hi) {
2732     CompletableFuture<?> fst, snd;
2733     int mid = (lo + hi) >>> 1;
2734     if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2735     (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2736     throw new NullPointerException();
2737     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2738     AndCompletion d = null;
2739     CompletionNode p = null, q = null;
2740     Object r = null, s = null;
2741     while ((r = fst.result) == null || (s = snd.result) == null) {
2742     if (d == null)
2743     d = new AndCompletion(fst, snd, dst);
2744     else if (p == null)
2745     p = new CompletionNode(d);
2746     else if (q == null) {
2747     if (UNSAFE.compareAndSwapObject
2748     (fst, COMPLETIONS, p.next = fst.completions, p))
2749     q = new CompletionNode(d);
2750     }
2751     else if (UNSAFE.compareAndSwapObject
2752     (snd, COMPLETIONS, q.next = snd.completions, q))
2753     break;
2754     }
2755     if ((r != null || (r = fst.result) != null) &&
2756     (s != null || (s = snd.result) != null) &&
2757     (d == null || d.compareAndSet(0, 1))) {
2758     Throwable ex;
2759     if (r instanceof AltResult)
2760     ex = ((AltResult)r).ex;
2761     else
2762     ex = null;
2763     if (ex == null && (s instanceof AltResult))
2764     ex = ((AltResult)s).ex;
2765     dst.internalComplete(null, ex);
2766     }
2767     fst.helpPostComplete();
2768     snd.helpPostComplete();
2769     return dst;
2770     }
2771    
2772     /**
2773     * Returns a new CompletableFuture that is completed when any of
2774     * the component CompletableFutures complete; with the same result if
2775     * it completed normally, otherwise exceptionally. If the number
2776     * of components is zero, returns a completed CompletableFuture.
2777     *
2778     * @param cfs the CompletableFutures
2779     * @return a CompletableFuture that is complete when any of the
2780     * given CompletableFutures complete
2781     * @throws NullPointerException if the array or any of its elements are
2782     * {@code null}
2783     */
2784     public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2785     int len = cfs.length; // Same idea as allOf
2786     if (len > 1)
2787     return anyTree(cfs, 0, len - 1);
2788     else {
2789     CompletableFuture<?> dst = new CompletableFuture<Object>();
2790     CompletableFuture<?> f;
2791     if (len == 0)
2792     dst.result = NIL;
2793     else if ((f = cfs[0]) == null)
2794     throw new NullPointerException();
2795     else {
2796     ThenCopy d = null;
2797     CompletionNode p = null;
2798     Object r;
2799     while ((r = f.result) == null) {
2800     if (d == null)
2801     d = new ThenCopy(f, dst);
2802     else if (p == null)
2803     p = new CompletionNode(d);
2804     else if (UNSAFE.compareAndSwapObject
2805     (f, COMPLETIONS, p.next = f.completions, p))
2806     break;
2807     }
2808     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2809     Throwable ex; Object t;
2810     if (r instanceof AltResult) {
2811     ex = ((AltResult)r).ex;
2812     t = null;
2813     }
2814     else {
2815     ex = null;
2816     t = r;
2817     }
2818     dst.internalComplete(t, ex);
2819     }
2820     f.helpPostComplete();
2821     }
2822     return dst;
2823     }
2824     }
2825    
2826     /**
2827     * Recursively constructs an Or'ed tree of CompletableFutures
2828     */
2829     private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
2830     int lo, int hi) {
2831     CompletableFuture<?> fst, snd;
2832     int mid = (lo + hi) >>> 1;
2833     if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2834     (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2835     throw new NullPointerException();
2836     CompletableFuture<?> dst = new CompletableFuture<Object>();
2837     OrCompletion d = null;
2838     CompletionNode p = null, q = null;
2839     Object r;
2840     while ((r = fst.result) == null && (r = snd.result) == null) {
2841     if (d == null)
2842     d = new OrCompletion(fst, snd, dst);
2843     else if (p == null)
2844     p = new CompletionNode(d);
2845     else if (q == null) {
2846     if (UNSAFE.compareAndSwapObject
2847     (fst, COMPLETIONS, p.next = fst.completions, p))
2848     q = new CompletionNode(d);
2849     }
2850     else if (UNSAFE.compareAndSwapObject
2851     (snd, COMPLETIONS, q.next = snd.completions, q))
2852     break;
2853     }
2854     if ((r != null || (r = fst.result) != null ||
2855     (r = snd.result) != null) &&
2856     (d == null || d.compareAndSet(0, 1))) {
2857     Throwable ex; Object t;
2858     if (r instanceof AltResult) {
2859     ex = ((AltResult)r).ex;
2860     t = null;
2861     }
2862     else {
2863     ex = null;
2864     t = r;
2865     }
2866     dst.internalComplete(t, ex);
2867     }
2868     fst.helpPostComplete();
2869     snd.helpPostComplete();
2870     return dst;
2871     }
2872    
2873    
2874     /* ------------- Control and status methods -------------- */
2875    
2876 dl 1.28 /**
2877     * Attempts to complete this CompletableFuture with
2878     * a {@link CancellationException}.
2879     *
2880     * @param mayInterruptIfRunning this value has no effect in this
2881     * implementation because interrupts are not used to control
2882     * processing.
2883     *
2884     * @return {@code true} if this task is now cancelled
2885     */
2886     public boolean cancel(boolean mayInterruptIfRunning) {
2887     Object r;
2888     while ((r = result) == null) {
2889     r = new AltResult(new CancellationException());
2890     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
2891     postComplete();
2892     return true;
2893     }
2894     }
2895     return ((r instanceof AltResult) &&
2896     (((AltResult)r).ex instanceof CancellationException));
2897     }
2898    
2899     /**
2900     * Returns {@code true} if this CompletableFuture was cancelled
2901     * before it completed normally.
2902     *
2903     * @return {@code true} if this CompletableFuture was cancelled
2904     * before it completed normally
2905     */
2906     public boolean isCancelled() {
2907     Object r;
2908     return ((r = result) != null &&
2909     (r instanceof AltResult) &&
2910     (((AltResult)r).ex instanceof CancellationException));
2911     }
2912    
2913     /**
2914     * Forcibly sets or resets the value subsequently returned by
2915     * method get() and related methods, whether or not already
2916     * completed. This method is designed for use only in error
2917     * recovery actions, and even in such situations may result in
2918     * ongoing dependent completions using established versus
2919 dl 1.30 * overwritten outcomes.
2920 dl 1.28 *
2921     * @param value the completion value
2922     */
2923     public void obtrudeValue(T value) {
2924     result = (value == null) ? NIL : value;
2925     postComplete();
2926     }
2927    
2928 dl 1.30 /**
2929     * Forcibly causes subsequent invocations of method get() and
2930     * related methods to throw the given exception, whether or not
2931     * already completed. This method is designed for use only in
2932     * recovery actions, and even in such situations may result in
2933     * ongoing dependent completions using established versus
2934     * overwritten outcomes.
2935     *
2936     * @param ex the exception
2937     */
2938     public void obtrudeException(Throwable ex) {
2939     if (ex == null) throw new NullPointerException();
2940     result = new AltResult(ex);
2941     postComplete();
2942     }
2943    
2944 dl 1.35 /**
2945     * Returns the estimated number of CompletableFutures whose
2946     * completions are awaiting completion of this CompletableFuture.
2947     * This method is designed for use in monitoring system state, not
2948     * for synchronization control.
2949     *
2950     * @return the number of dependent CompletableFutures
2951     */
2952     public int getNumberOfDependents() {
2953     int count = 0;
2954     for (CompletionNode p = completions; p != null; p = p.next)
2955     ++count;
2956     return count;
2957     }
2958    
2959     /**
2960     * Returns a string identifying this CompletableFuture, as well as
2961     * its completion state. The state, in brackets, includes the
2962     * String {@code "Completed Normally"} or the String {@code
2963     * "Completed Exceptionally"}, or the String {@code "Not
2964     * completed"} followed by the number of CompletableFutures
2965     * dependent upon its completion, if any.
2966     *
2967     * @return a string identifying this CompletableFuture, as well as its state
2968     */
2969     public String toString() {
2970     String id = super.toString();
2971     int count = getNumberOfDependents();
2972     Object r = result;
2973     Throwable ex = (r != null && (r instanceof AltResult) ?
2974     ((AltResult)r).ex : null);
2975     return id + (ex != null? "[Completed exceptionally]" :
2976     (r != null ? "[Completed normally]" :
2977     (count == 0 ? "[Not completed]" :
2978     ("[Not completed, " + count + " dependents]"))));
2979     }
2980    
2981 dl 1.1 // Unsafe mechanics
2982     private static final sun.misc.Unsafe UNSAFE;
2983     private static final long RESULT;
2984     private static final long WAITERS;
2985     private static final long COMPLETIONS;
2986     static {
2987     try {
2988     UNSAFE = sun.misc.Unsafe.getUnsafe();
2989     Class<?> k = CompletableFuture.class;
2990     RESULT = UNSAFE.objectFieldOffset
2991     (k.getDeclaredField("result"));
2992     WAITERS = UNSAFE.objectFieldOffset
2993     (k.getDeclaredField("waiters"));
2994     COMPLETIONS = UNSAFE.objectFieldOffset
2995     (k.getDeclaredField("completions"));
2996     } catch (Exception e) {
2997     throw new Error(e);
2998     }
2999     }
3000     }