ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.69
Committed: Tue Mar 19 00:32:19 2013 UTC (11 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.68: +13 -9 lines
Log Message:
clarify exceptional behavior of anyOf and allOf

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22     import java.util.concurrent.atomic.AtomicInteger;
23     import java.util.concurrent.locks.LockSupport;
24    
25     /**
26     * A {@link Future} that may be explicitly completed (setting its
27     * value and status), and may include dependent functions and actions
28 dl 1.35 * that trigger upon its completion.
29 dl 1.1 *
30 jsr166 1.50 * <p>When two or more threads attempt to
31     * {@link #complete complete},
32 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
33 jsr166 1.50 * {@link #cancel cancel}
34     * a CompletableFuture, only one of them succeeds.
35 dl 1.19 *
36 jsr166 1.65 * <p>Methods are available for adding dependents based on
37     * user-provided Functions, Consumers, or Runnables. The appropriate
38     * form to use depends on whether actions require arguments and/or
39     * produce results. Completion of a dependent action will trigger the
40     * completion of another CompletableFuture. Actions may also be
41     * triggered after either or both the current and another
42 dl 1.35 * CompletableFuture complete. Multiple CompletableFutures may also
43 jsr166 1.50 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44     * {@link #allOf(CompletableFuture...)}.
45 dl 1.35 *
46 jsr166 1.65 * <p>CompletableFutures themselves do not execute asynchronously.
47     * However, actions supplied for dependent completions of another
48     * CompletableFuture may do so, depending on whether they are provided
49     * via one of the <em>async</em> methods (that is, methods with names
50     * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51     * methods provide a way to commence asynchronous processing of an
52     * action using either a given {@link Executor} or by default the
53     * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54     * debugging, and tracking, all generated asynchronous tasks are
55     * instances of the marker interface {@link AsynchronousCompletionTask}.
56     *
57     * <p>Actions supplied for dependent completions of <em>non-async</em>
58     * methods may be performed by the thread that completes the current
59     * CompletableFuture, or by any other caller of these methods. There
60     * are no guarantees about the order of processing completions unless
61     * constrained by these methods.
62 dl 1.35 *
63 jsr166 1.55 * <p>Since (unlike {@link FutureTask}) this class has no direct
64     * control over the computation that causes it to be completed,
65     * cancellation is treated as just another form of exceptional completion.
66     * Method {@link #cancel cancel} has the same effect as
67     * {@code completeExceptionally(new CancellationException())}.
68     *
69 dl 1.48 * <p>Upon exceptional completion (including cancellation), or when a
70 jsr166 1.55 * completion entails an additional computation which terminates
71     * abruptly with an (unchecked) exception or error, then all of their
72     * dependent completions (and their dependents in turn) generally act
73     * as {@code completeExceptionally} with a {@link CompletionException}
74     * holding that exception as its cause. However, the {@link
75     * #exceptionally exceptionally} and {@link #handle handle}
76     * completions <em>are</em> able to handle exceptional completions of
77     * the CompletableFutures they depend on.
78     *
79     * <p>In case of exceptional completion with a CompletionException,
80     * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81     * {@link ExecutionException} with the same cause as held in the
82     * corresponding CompletionException. However, in these cases,
83     * methods {@link #join()} and {@link #getNow} throw the
84     * CompletionException, which simplifies usage.
85 dl 1.1 *
86     * @author Doug Lea
87     * @since 1.8
88     */
89     public class CompletableFuture<T> implements Future<T> {
90 dl 1.28
91 dl 1.1 /*
92 dl 1.20 * Overview:
93 dl 1.1 *
94 jsr166 1.32 * 1. Non-nullness of field result (set via CAS) indicates done.
95     * An AltResult is used to box null as a result, as well as to
96     * hold exceptions. Using a single field makes completion fast
97 dl 1.20 * and simple to detect and trigger, at the expense of a lot of
98     * encoding and decoding that infiltrates many methods. One minor
99     * simplification relies on the (static) NIL (to box null results)
100     * being the only AltResult with a null exception field, so we
101 dl 1.28 * don't usually need explicit comparisons with NIL. The CF
102     * exception propagation mechanics surrounding decoding rely on
103     * unchecked casts of decoded results really being unchecked,
104     * where user type errors are caught at point of use, as is
105     * currently the case in Java. These are highlighted by using
106     * SuppressWarnings-annotated temporaries.
107 dl 1.1 *
108     * 2. Waiters are held in a Treiber stack similar to the one used
109 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
110 dl 1.28 * internal documentation for algorithmic details.
111 dl 1.1 *
112     * 3. Completions are also kept in a list/stack, and pulled off
113 dl 1.28 * and run when completion is triggered. (We could even use the
114 jsr166 1.24 * same stack as for waiters, but would give up the potential
115 dl 1.20 * parallelism obtained because woken waiters help release/run
116 dl 1.28 * others -- see method postComplete). Because post-processing
117     * may race with direct calls, class Completion opportunistically
118     * extends AtomicInteger so callers can claim the action via
119     * compareAndSet(0, 1). The Completion.run methods are all
120     * written a boringly similar uniform way (that sometimes includes
121 jsr166 1.55 * unnecessary-looking checks, kept to maintain uniformity).
122     * There are enough dimensions upon which they differ that
123     * attempts to factor commonalities while maintaining efficiency
124     * require more lines of code than they would save.
125 dl 1.20 *
126     * 4. The exported then/and/or methods do support a bit of
127 dl 1.28 * factoring (see doThenApply etc). They must cope with the
128     * intrinsic races surrounding addition of a dependent action
129     * versus performing the action directly because the task is
130     * already complete. For example, a CF may not be complete upon
131     * entry, so a dependent completion is added, but by the time it
132     * is added, the target CF is complete, so must be directly
133 dl 1.20 * executed. This is all done while avoiding unnecessary object
134     * construction in safe-bypass cases.
135 dl 1.1 */
136    
137 dl 1.28 // preliminaries
138 dl 1.20
139 dl 1.1 static final class AltResult {
140     final Throwable ex; // null only for NIL
141 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
142 dl 1.1 }
143    
144     static final AltResult NIL = new AltResult(null);
145    
146 dl 1.20 // Fields
147    
148     volatile Object result; // Either the result or boxed AltResult
149 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
150     volatile CompletionNode completions; // list (Treiber stack) of completions
151    
152 dl 1.20 // Basic utilities for triggering and processing completions
153    
154     /**
155     * Removes and signals all waiting threads and runs all completions.
156     */
157     final void postComplete() {
158     WaitNode q; Thread t;
159     while ((q = waiters) != null) {
160     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
161     (t = q.thread) != null) {
162     q.thread = null;
163     LockSupport.unpark(t);
164     }
165     }
166    
167     CompletionNode h; Completion c;
168     while ((h = completions) != null) {
169     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
170     (c = h.completion) != null)
171     c.run();
172     }
173     }
174    
175     /**
176     * Triggers completion with the encoding of the given arguments:
177     * if the exception is non-null, encodes it as a wrapped
178     * CompletionException unless it is one already. Otherwise uses
179     * the given result, boxed as NIL if null.
180     */
181     final void internalComplete(Object v, Throwable ex) {
182     if (result == null)
183     UNSAFE.compareAndSwapObject
184     (this, RESULT, null,
185     (ex == null) ? (v == null) ? NIL : v :
186     new AltResult((ex instanceof CompletionException) ? ex :
187     new CompletionException(ex)));
188     postComplete(); // help out even if not triggered
189     }
190    
191     /**
192 jsr166 1.25 * If triggered, helps release and/or process completions.
193 dl 1.20 */
194     final void helpPostComplete() {
195     if (result != null)
196     postComplete();
197     }
198    
199 dl 1.28 /* ------------- waiting for completions -------------- */
200 dl 1.20
201 dl 1.35 /** Number of processors, for spin control */
202     static final int NCPU = Runtime.getRuntime().availableProcessors();
203    
204 dl 1.1 /**
205 dl 1.28 * Heuristic spin value for waitingGet() before blocking on
206     * multiprocessors
207 dl 1.1 */
208 dl 1.35 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
209 dl 1.1
210     /**
211 dl 1.28 * Linked nodes to record waiting threads in a Treiber stack. See
212     * other classes such as Phaser and SynchronousQueue for more
213     * detailed explanation. This class implements ManagedBlocker to
214     * avoid starvation when blocking actions pile up in
215     * ForkJoinPools.
216     */
217     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
218     long nanos; // wait time if timed
219     final long deadline; // non-zero if timed
220     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
221     volatile Thread thread;
222     volatile WaitNode next;
223     WaitNode(boolean interruptible, long nanos, long deadline) {
224     this.thread = Thread.currentThread();
225     this.interruptControl = interruptible ? 1 : 0;
226     this.nanos = nanos;
227     this.deadline = deadline;
228     }
229     public boolean isReleasable() {
230     if (thread == null)
231     return true;
232     if (Thread.interrupted()) {
233     int i = interruptControl;
234     interruptControl = -1;
235     if (i > 0)
236     return true;
237     }
238     if (deadline != 0L &&
239     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
240     thread = null;
241     return true;
242     }
243     return false;
244     }
245     public boolean block() {
246     if (isReleasable())
247     return true;
248     else if (deadline == 0L)
249     LockSupport.park(this);
250     else if (nanos > 0L)
251     LockSupport.parkNanos(this, nanos);
252     return isReleasable();
253     }
254 dl 1.1 }
255    
256     /**
257 dl 1.28 * Returns raw result after waiting, or null if interruptible and
258     * interrupted.
259 dl 1.1 */
260 dl 1.28 private Object waitingGet(boolean interruptible) {
261     WaitNode q = null;
262     boolean queued = false;
263 dl 1.35 int spins = SPINS;
264 dl 1.28 for (Object r;;) {
265     if ((r = result) != null) {
266     if (q != null) { // suppress unpark
267     q.thread = null;
268     if (q.interruptControl < 0) {
269     if (interruptible) {
270     removeWaiter(q);
271     return null;
272     }
273     Thread.currentThread().interrupt();
274     }
275     }
276     postComplete(); // help release others
277     return r;
278     }
279     else if (spins > 0) {
280 dl 1.35 int rnd = ThreadLocalRandom.nextSecondarySeed();
281     if (rnd == 0)
282     rnd = ThreadLocalRandom.current().nextInt();
283     if (rnd >= 0)
284 dl 1.28 --spins;
285     }
286     else if (q == null)
287     q = new WaitNode(interruptible, 0L, 0L);
288     else if (!queued)
289     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
290     q.next = waiters, q);
291     else if (interruptible && q.interruptControl < 0) {
292     removeWaiter(q);
293     return null;
294     }
295     else if (q.thread != null && result == null) {
296     try {
297     ForkJoinPool.managedBlock(q);
298 jsr166 1.31 } catch (InterruptedException ex) {
299 dl 1.28 q.interruptControl = -1;
300     }
301     }
302     }
303 dl 1.1 }
304    
305     /**
306 dl 1.28 * Awaits completion or aborts on interrupt or timeout.
307 dl 1.1 *
308 dl 1.28 * @param nanos time to wait
309     * @return raw result
310 dl 1.1 */
311 dl 1.28 private Object timedAwaitDone(long nanos)
312     throws InterruptedException, TimeoutException {
313     WaitNode q = null;
314     boolean queued = false;
315     for (Object r;;) {
316     if ((r = result) != null) {
317     if (q != null) {
318     q.thread = null;
319     if (q.interruptControl < 0) {
320     removeWaiter(q);
321     throw new InterruptedException();
322     }
323     }
324     postComplete();
325     return r;
326     }
327     else if (q == null) {
328     if (nanos <= 0L)
329     throw new TimeoutException();
330     long d = System.nanoTime() + nanos;
331 jsr166 1.31 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
332 dl 1.28 }
333     else if (!queued)
334     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
335     q.next = waiters, q);
336     else if (q.interruptControl < 0) {
337     removeWaiter(q);
338     throw new InterruptedException();
339     }
340     else if (q.nanos <= 0L) {
341     if (result == null) {
342     removeWaiter(q);
343     throw new TimeoutException();
344     }
345     }
346     else if (q.thread != null && result == null) {
347     try {
348     ForkJoinPool.managedBlock(q);
349 jsr166 1.31 } catch (InterruptedException ex) {
350 dl 1.28 q.interruptControl = -1;
351     }
352     }
353     }
354 dl 1.1 }
355    
356     /**
357 dl 1.28 * Tries to unlink a timed-out or interrupted wait node to avoid
358     * accumulating garbage. Internal nodes are simply unspliced
359     * without CAS since it is harmless if they are traversed anyway
360     * by releasers. To avoid effects of unsplicing from already
361     * removed nodes, the list is retraversed in case of an apparent
362     * race. This is slow when there are a lot of nodes, but we don't
363     * expect lists to be long enough to outweigh higher-overhead
364     * schemes.
365 dl 1.1 */
366 dl 1.28 private void removeWaiter(WaitNode node) {
367     if (node != null) {
368     node.thread = null;
369     retry:
370     for (;;) { // restart on removeWaiter race
371     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
372     s = q.next;
373     if (q.thread != null)
374     pred = q;
375     else if (pred != null) {
376     pred.next = s;
377     if (pred.thread == null) // check for race
378     continue retry;
379     }
380     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
381     continue retry;
382     }
383     break;
384     }
385     }
386 dl 1.1 }
387    
388 dl 1.28 /* ------------- Async tasks -------------- */
389    
390 dl 1.1 /**
391 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
392 dl 1.28 * {@code async} methods. This may be useful for monitoring,
393     * debugging, and tracking asynchronous activities.
394 jsr166 1.57 *
395     * @since 1.8
396 dl 1.1 */
397 dl 1.28 public static interface AsynchronousCompletionTask {
398 dl 1.1 }
399    
400 dl 1.28 /** Base class can act as either FJ or plain Runnable */
401 jsr166 1.33 abstract static class Async extends ForkJoinTask<Void>
402 dl 1.28 implements Runnable, AsynchronousCompletionTask {
403     public final Void getRawResult() { return null; }
404     public final void setRawResult(Void v) { }
405     public final void run() { exec(); }
406 jsr166 1.26 }
407    
408 dl 1.28 static final class AsyncRun extends Async {
409     final Runnable fn;
410     final CompletableFuture<Void> dst;
411     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
412     this.fn = fn; this.dst = dst;
413     }
414     public final boolean exec() {
415     CompletableFuture<Void> d; Throwable ex;
416 dl 1.29 if ((d = this.dst) != null && d.result == null) {
417 dl 1.28 try {
418     fn.run();
419     ex = null;
420     } catch (Throwable rex) {
421     ex = rex;
422     }
423     d.internalComplete(null, ex);
424 dl 1.19 }
425 dl 1.28 return true;
426 dl 1.19 }
427 dl 1.28 private static final long serialVersionUID = 5232453952276885070L;
428 dl 1.19 }
429    
430 dl 1.28 static final class AsyncSupply<U> extends Async {
431     final Supplier<U> fn;
432     final CompletableFuture<U> dst;
433     AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
434     this.fn = fn; this.dst = dst;
435     }
436     public final boolean exec() {
437     CompletableFuture<U> d; U u; Throwable ex;
438 dl 1.29 if ((d = this.dst) != null && d.result == null) {
439 dl 1.28 try {
440     u = fn.get();
441     ex = null;
442     } catch (Throwable rex) {
443     ex = rex;
444     u = null;
445     }
446     d.internalComplete(u, ex);
447 dl 1.1 }
448     return true;
449     }
450     private static final long serialVersionUID = 5232453952276885070L;
451     }
452    
453 dl 1.28 static final class AsyncApply<T,U> extends Async {
454 jsr166 1.63 final T arg;
455 dl 1.28 final Function<? super T,? extends U> fn;
456 dl 1.1 final CompletableFuture<U> dst;
457 dl 1.28 AsyncApply(T arg, Function<? super T,? extends U> fn,
458 dl 1.35 CompletableFuture<U> dst) {
459 dl 1.1 this.arg = arg; this.fn = fn; this.dst = dst;
460     }
461     public final boolean exec() {
462 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
463 dl 1.29 if ((d = this.dst) != null && d.result == null) {
464 dl 1.21 try {
465     u = fn.apply(arg);
466     ex = null;
467     } catch (Throwable rex) {
468     ex = rex;
469     u = null;
470     }
471     d.internalComplete(u, ex);
472 dl 1.1 }
473     return true;
474     }
475     private static final long serialVersionUID = 5232453952276885070L;
476     }
477    
478 dl 1.28 static final class AsyncBiApply<T,U,V> extends Async {
479 dl 1.1 final T arg1;
480     final U arg2;
481 jsr166 1.63 final BiFunction<? super T,? super U,? extends V> fn;
482 dl 1.1 final CompletableFuture<V> dst;
483 dl 1.28 AsyncBiApply(T arg1, U arg2,
484 dl 1.35 BiFunction<? super T,? super U,? extends V> fn,
485     CompletableFuture<V> dst) {
486 dl 1.1 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
487     }
488     public final boolean exec() {
489 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
490 dl 1.29 if ((d = this.dst) != null && d.result == null) {
491 dl 1.21 try {
492     v = fn.apply(arg1, arg2);
493     ex = null;
494     } catch (Throwable rex) {
495     ex = rex;
496     v = null;
497     }
498     d.internalComplete(v, ex);
499 dl 1.1 }
500     return true;
501     }
502     private static final long serialVersionUID = 5232453952276885070L;
503     }
504    
505 dl 1.28 static final class AsyncAccept<T> extends Async {
506 jsr166 1.63 final T arg;
507 dl 1.34 final Consumer<? super T> fn;
508 dl 1.7 final CompletableFuture<Void> dst;
509 dl 1.34 AsyncAccept(T arg, Consumer<? super T> fn,
510 dl 1.35 CompletableFuture<Void> dst) {
511 dl 1.7 this.arg = arg; this.fn = fn; this.dst = dst;
512     }
513     public final boolean exec() {
514 dl 1.21 CompletableFuture<Void> d; Throwable ex;
515 dl 1.29 if ((d = this.dst) != null && d.result == null) {
516 dl 1.21 try {
517     fn.accept(arg);
518     ex = null;
519     } catch (Throwable rex) {
520     ex = rex;
521     }
522     d.internalComplete(null, ex);
523 dl 1.7 }
524     return true;
525     }
526     private static final long serialVersionUID = 5232453952276885070L;
527     }
528    
529 dl 1.28 static final class AsyncBiAccept<T,U> extends Async {
530 dl 1.7 final T arg1;
531     final U arg2;
532 jsr166 1.63 final BiConsumer<? super T,? super U> fn;
533 dl 1.7 final CompletableFuture<Void> dst;
534 dl 1.28 AsyncBiAccept(T arg1, U arg2,
535 dl 1.35 BiConsumer<? super T,? super U> fn,
536     CompletableFuture<Void> dst) {
537 dl 1.7 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
538     }
539     public final boolean exec() {
540 dl 1.21 CompletableFuture<Void> d; Throwable ex;
541 dl 1.29 if ((d = this.dst) != null && d.result == null) {
542 dl 1.21 try {
543     fn.accept(arg1, arg2);
544     ex = null;
545     } catch (Throwable rex) {
546     ex = rex;
547     }
548     d.internalComplete(null, ex);
549 dl 1.7 }
550     return true;
551     }
552     private static final long serialVersionUID = 5232453952276885070L;
553     }
554    
555 dl 1.37 static final class AsyncCompose<T,U> extends Async {
556 jsr166 1.63 final T arg;
557 dl 1.37 final Function<? super T, CompletableFuture<U>> fn;
558     final CompletableFuture<U> dst;
559     AsyncCompose(T arg,
560     Function<? super T, CompletableFuture<U>> fn,
561     CompletableFuture<U> dst) {
562     this.arg = arg; this.fn = fn; this.dst = dst;
563     }
564     public final boolean exec() {
565     CompletableFuture<U> d, fr; U u; Throwable ex;
566     if ((d = this.dst) != null && d.result == null) {
567     try {
568     fr = fn.apply(arg);
569 jsr166 1.61 ex = (fr == null) ? new NullPointerException() : null;
570 dl 1.37 } catch (Throwable rex) {
571     ex = rex;
572     fr = null;
573     }
574     if (ex != null)
575     u = null;
576     else {
577     Object r = fr.result;
578 dl 1.67 if (r == null)
579     r = fr.waitingGet(false);
580 dl 1.37 if (r instanceof AltResult) {
581     ex = ((AltResult)r).ex;
582     u = null;
583     }
584     else {
585     @SuppressWarnings("unchecked") U ur = (U) r;
586     u = ur;
587     }
588     }
589     d.internalComplete(u, ex);
590     }
591     return true;
592     }
593     private static final long serialVersionUID = 5232453952276885070L;
594     }
595    
596 dl 1.1 /* ------------- Completions -------------- */
597    
598 dl 1.28 /**
599     * Simple linked list nodes to record completions, used in
600     * basically the same way as WaitNodes. (We separate nodes from
601     * the Completions themselves mainly because for the And and Or
602     * methods, the same Completion object resides in two lists.)
603     */
604     static final class CompletionNode {
605     final Completion completion;
606     volatile CompletionNode next;
607     CompletionNode(Completion completion) { this.completion = completion; }
608     }
609    
610 dl 1.1 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
611 jsr166 1.33 abstract static class Completion extends AtomicInteger implements Runnable {
612 dl 1.1 }
613    
614 dl 1.28 static final class ApplyCompletion<T,U> extends Completion {
615 jsr166 1.2 final CompletableFuture<? extends T> src;
616     final Function<? super T,? extends U> fn;
617     final CompletableFuture<U> dst;
618 dl 1.1 final Executor executor;
619 dl 1.28 ApplyCompletion(CompletableFuture<? extends T> src,
620     Function<? super T,? extends U> fn,
621     CompletableFuture<U> dst, Executor executor) {
622 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
623     this.executor = executor;
624     }
625 jsr166 1.26 public final void run() {
626 dl 1.28 final CompletableFuture<? extends T> a;
627     final Function<? super T,? extends U> fn;
628     final CompletableFuture<U> dst;
629 jsr166 1.2 Object r; T t; Throwable ex;
630     if ((dst = this.dst) != null &&
631 dl 1.1 (fn = this.fn) != null &&
632     (a = this.src) != null &&
633     (r = a.result) != null &&
634     compareAndSet(0, 1)) {
635 jsr166 1.2 if (r instanceof AltResult) {
636 dl 1.19 ex = ((AltResult)r).ex;
637 dl 1.1 t = null;
638     }
639 dl 1.17 else {
640     ex = null;
641 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
642     t = tr;
643 dl 1.1 }
644 dl 1.20 Executor e = executor;
645     U u = null;
646 dl 1.17 if (ex == null) {
647     try {
648 dl 1.20 if (e != null)
649 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
650 dl 1.17 else
651 dl 1.20 u = fn.apply(t);
652 dl 1.17 } catch (Throwable rex) {
653     ex = rex;
654     }
655     }
656 dl 1.20 if (e == null || ex != null)
657     dst.internalComplete(u, ex);
658 dl 1.1 }
659     }
660 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
661 dl 1.1 }
662    
663 dl 1.28 static final class AcceptCompletion<T> extends Completion {
664 dl 1.7 final CompletableFuture<? extends T> src;
665 dl 1.34 final Consumer<? super T> fn;
666 dl 1.7 final CompletableFuture<Void> dst;
667     final Executor executor;
668 dl 1.28 AcceptCompletion(CompletableFuture<? extends T> src,
669 dl 1.34 Consumer<? super T> fn,
670 dl 1.28 CompletableFuture<Void> dst, Executor executor) {
671 dl 1.7 this.src = src; this.fn = fn; this.dst = dst;
672     this.executor = executor;
673     }
674 jsr166 1.26 public final void run() {
675 dl 1.28 final CompletableFuture<? extends T> a;
676 dl 1.34 final Consumer<? super T> fn;
677 dl 1.28 final CompletableFuture<Void> dst;
678 dl 1.7 Object r; T t; Throwable ex;
679     if ((dst = this.dst) != null &&
680     (fn = this.fn) != null &&
681     (a = this.src) != null &&
682     (r = a.result) != null &&
683     compareAndSet(0, 1)) {
684     if (r instanceof AltResult) {
685 dl 1.19 ex = ((AltResult)r).ex;
686 dl 1.7 t = null;
687     }
688 dl 1.17 else {
689     ex = null;
690 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
691     t = tr;
692 dl 1.17 }
693 dl 1.20 Executor e = executor;
694 dl 1.17 if (ex == null) {
695     try {
696 dl 1.20 if (e != null)
697 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
698 dl 1.20 else
699 dl 1.17 fn.accept(t);
700     } catch (Throwable rex) {
701     ex = rex;
702 dl 1.7 }
703     }
704 dl 1.20 if (e == null || ex != null)
705     dst.internalComplete(null, ex);
706 dl 1.7 }
707     }
708 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
709 dl 1.7 }
710    
711 dl 1.28 static final class RunCompletion<T> extends Completion {
712 jsr166 1.2 final CompletableFuture<? extends T> src;
713     final Runnable fn;
714     final CompletableFuture<Void> dst;
715 dl 1.1 final Executor executor;
716 dl 1.28 RunCompletion(CompletableFuture<? extends T> src,
717     Runnable fn,
718     CompletableFuture<Void> dst,
719     Executor executor) {
720 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
721     this.executor = executor;
722     }
723 jsr166 1.26 public final void run() {
724 dl 1.28 final CompletableFuture<? extends T> a;
725     final Runnable fn;
726     final CompletableFuture<Void> dst;
727 jsr166 1.2 Object r; Throwable ex;
728     if ((dst = this.dst) != null &&
729 dl 1.1 (fn = this.fn) != null &&
730     (a = this.src) != null &&
731     (r = a.result) != null &&
732     compareAndSet(0, 1)) {
733 dl 1.19 if (r instanceof AltResult)
734     ex = ((AltResult)r).ex;
735 dl 1.17 else
736     ex = null;
737 dl 1.20 Executor e = executor;
738 dl 1.17 if (ex == null) {
739     try {
740 dl 1.20 if (e != null)
741 dl 1.28 e.execute(new AsyncRun(fn, dst));
742 dl 1.20 else
743 dl 1.17 fn.run();
744     } catch (Throwable rex) {
745     ex = rex;
746 dl 1.1 }
747     }
748 dl 1.20 if (e == null || ex != null)
749     dst.internalComplete(null, ex);
750 dl 1.1 }
751     }
752 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
753 dl 1.1 }
754    
755 dl 1.28 static final class BiApplyCompletion<T,U,V> extends Completion {
756 jsr166 1.2 final CompletableFuture<? extends T> src;
757     final CompletableFuture<? extends U> snd;
758     final BiFunction<? super T,? super U,? extends V> fn;
759     final CompletableFuture<V> dst;
760 dl 1.1 final Executor executor;
761 dl 1.28 BiApplyCompletion(CompletableFuture<? extends T> src,
762     CompletableFuture<? extends U> snd,
763     BiFunction<? super T,? super U,? extends V> fn,
764     CompletableFuture<V> dst, Executor executor) {
765 dl 1.1 this.src = src; this.snd = snd;
766     this.fn = fn; this.dst = dst;
767     this.executor = executor;
768     }
769 jsr166 1.26 public final void run() {
770 dl 1.28 final CompletableFuture<? extends T> a;
771     final CompletableFuture<? extends U> b;
772     final BiFunction<? super T,? super U,? extends V> fn;
773     final CompletableFuture<V> dst;
774 jsr166 1.2 Object r, s; T t; U u; Throwable ex;
775     if ((dst = this.dst) != null &&
776 dl 1.1 (fn = this.fn) != null &&
777     (a = this.src) != null &&
778     (r = a.result) != null &&
779     (b = this.snd) != null &&
780     (s = b.result) != null &&
781     compareAndSet(0, 1)) {
782 jsr166 1.2 if (r instanceof AltResult) {
783 dl 1.19 ex = ((AltResult)r).ex;
784 jsr166 1.2 t = null;
785     }
786 dl 1.19 else {
787     ex = null;
788 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
789     t = tr;
790 dl 1.19 }
791     if (ex != null)
792     u = null;
793     else if (s instanceof AltResult) {
794     ex = ((AltResult)s).ex;
795 jsr166 1.2 u = null;
796     }
797 dl 1.28 else {
798     @SuppressWarnings("unchecked") U us = (U) s;
799     u = us;
800     }
801 dl 1.20 Executor e = executor;
802     V v = null;
803 dl 1.19 if (ex == null) {
804     try {
805 dl 1.20 if (e != null)
806 dl 1.28 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
807 dl 1.19 else
808 dl 1.20 v = fn.apply(t, u);
809 dl 1.19 } catch (Throwable rex) {
810     ex = rex;
811     }
812 dl 1.1 }
813 dl 1.20 if (e == null || ex != null)
814     dst.internalComplete(v, ex);
815 jsr166 1.2 }
816     }
817 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
818 dl 1.1 }
819    
820 dl 1.28 static final class BiAcceptCompletion<T,U> extends Completion {
821 dl 1.7 final CompletableFuture<? extends T> src;
822     final CompletableFuture<? extends U> snd;
823 dl 1.34 final BiConsumer<? super T,? super U> fn;
824 dl 1.7 final CompletableFuture<Void> dst;
825     final Executor executor;
826 dl 1.28 BiAcceptCompletion(CompletableFuture<? extends T> src,
827     CompletableFuture<? extends U> snd,
828 dl 1.34 BiConsumer<? super T,? super U> fn,
829 dl 1.28 CompletableFuture<Void> dst, Executor executor) {
830 dl 1.7 this.src = src; this.snd = snd;
831     this.fn = fn; this.dst = dst;
832     this.executor = executor;
833     }
834 jsr166 1.26 public final void run() {
835 dl 1.28 final CompletableFuture<? extends T> a;
836     final CompletableFuture<? extends U> b;
837 dl 1.34 final BiConsumer<? super T,? super U> fn;
838 dl 1.28 final CompletableFuture<Void> dst;
839 dl 1.7 Object r, s; T t; U u; Throwable ex;
840     if ((dst = this.dst) != null &&
841     (fn = this.fn) != null &&
842     (a = this.src) != null &&
843     (r = a.result) != null &&
844     (b = this.snd) != null &&
845     (s = b.result) != null &&
846     compareAndSet(0, 1)) {
847     if (r instanceof AltResult) {
848 dl 1.19 ex = ((AltResult)r).ex;
849 dl 1.7 t = null;
850     }
851 dl 1.19 else {
852     ex = null;
853 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
854     t = tr;
855 dl 1.19 }
856     if (ex != null)
857     u = null;
858     else if (s instanceof AltResult) {
859     ex = ((AltResult)s).ex;
860 dl 1.7 u = null;
861     }
862 dl 1.28 else {
863     @SuppressWarnings("unchecked") U us = (U) s;
864     u = us;
865     }
866 dl 1.20 Executor e = executor;
867 dl 1.19 if (ex == null) {
868     try {
869 dl 1.20 if (e != null)
870 dl 1.28 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
871 dl 1.20 else
872 dl 1.19 fn.accept(t, u);
873     } catch (Throwable rex) {
874     ex = rex;
875 dl 1.7 }
876     }
877 dl 1.20 if (e == null || ex != null)
878     dst.internalComplete(null, ex);
879 dl 1.7 }
880     }
881 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
882 dl 1.7 }
883    
884 dl 1.28 static final class BiRunCompletion<T> extends Completion {
885 jsr166 1.2 final CompletableFuture<? extends T> src;
886     final CompletableFuture<?> snd;
887     final Runnable fn;
888     final CompletableFuture<Void> dst;
889 dl 1.1 final Executor executor;
890 dl 1.28 BiRunCompletion(CompletableFuture<? extends T> src,
891     CompletableFuture<?> snd,
892     Runnable fn,
893     CompletableFuture<Void> dst, Executor executor) {
894 dl 1.1 this.src = src; this.snd = snd;
895     this.fn = fn; this.dst = dst;
896     this.executor = executor;
897     }
898 jsr166 1.26 public final void run() {
899 dl 1.1 final CompletableFuture<? extends T> a;
900     final CompletableFuture<?> b;
901     final Runnable fn;
902     final CompletableFuture<Void> dst;
903 dl 1.28 Object r, s; Throwable ex;
904 jsr166 1.2 if ((dst = this.dst) != null &&
905 dl 1.1 (fn = this.fn) != null &&
906     (a = this.src) != null &&
907     (r = a.result) != null &&
908     (b = this.snd) != null &&
909     (s = b.result) != null &&
910     compareAndSet(0, 1)) {
911 dl 1.19 if (r instanceof AltResult)
912     ex = ((AltResult)r).ex;
913     else
914     ex = null;
915     if (ex == null && (s instanceof AltResult))
916     ex = ((AltResult)s).ex;
917 dl 1.20 Executor e = executor;
918 dl 1.19 if (ex == null) {
919     try {
920 dl 1.20 if (e != null)
921 dl 1.28 e.execute(new AsyncRun(fn, dst));
922 dl 1.20 else
923 dl 1.19 fn.run();
924     } catch (Throwable rex) {
925     ex = rex;
926 dl 1.1 }
927 jsr166 1.2 }
928 dl 1.20 if (e == null || ex != null)
929     dst.internalComplete(null, ex);
930 jsr166 1.2 }
931     }
932 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
933 dl 1.1 }
934    
935 dl 1.35 static final class AndCompletion extends Completion {
936     final CompletableFuture<?> src;
937     final CompletableFuture<?> snd;
938     final CompletableFuture<Void> dst;
939     AndCompletion(CompletableFuture<?> src,
940     CompletableFuture<?> snd,
941     CompletableFuture<Void> dst) {
942     this.src = src; this.snd = snd; this.dst = dst;
943     }
944     public final void run() {
945     final CompletableFuture<?> a;
946     final CompletableFuture<?> b;
947     final CompletableFuture<Void> dst;
948     Object r, s; Throwable ex;
949     if ((dst = this.dst) != null &&
950     (a = this.src) != null &&
951     (r = a.result) != null &&
952     (b = this.snd) != null &&
953     (s = b.result) != null &&
954     compareAndSet(0, 1)) {
955     if (r instanceof AltResult)
956     ex = ((AltResult)r).ex;
957     else
958     ex = null;
959     if (ex == null && (s instanceof AltResult))
960     ex = ((AltResult)s).ex;
961     dst.internalComplete(null, ex);
962     }
963     }
964     private static final long serialVersionUID = 5232453952276885070L;
965     }
966    
967 dl 1.28 static final class OrApplyCompletion<T,U> extends Completion {
968 jsr166 1.2 final CompletableFuture<? extends T> src;
969     final CompletableFuture<? extends T> snd;
970     final Function<? super T,? extends U> fn;
971     final CompletableFuture<U> dst;
972 dl 1.1 final Executor executor;
973 dl 1.28 OrApplyCompletion(CompletableFuture<? extends T> src,
974     CompletableFuture<? extends T> snd,
975     Function<? super T,? extends U> fn,
976     CompletableFuture<U> dst, Executor executor) {
977 dl 1.1 this.src = src; this.snd = snd;
978     this.fn = fn; this.dst = dst;
979     this.executor = executor;
980     }
981 jsr166 1.26 public final void run() {
982 dl 1.28 final CompletableFuture<? extends T> a;
983     final CompletableFuture<? extends T> b;
984     final Function<? super T,? extends U> fn;
985     final CompletableFuture<U> dst;
986 jsr166 1.2 Object r; T t; Throwable ex;
987     if ((dst = this.dst) != null &&
988 dl 1.1 (fn = this.fn) != null &&
989     (((a = this.src) != null && (r = a.result) != null) ||
990     ((b = this.snd) != null && (r = b.result) != null)) &&
991     compareAndSet(0, 1)) {
992 jsr166 1.2 if (r instanceof AltResult) {
993 dl 1.19 ex = ((AltResult)r).ex;
994 jsr166 1.2 t = null;
995     }
996 dl 1.19 else {
997     ex = null;
998 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
999     t = tr;
1000 dl 1.1 }
1001 dl 1.20 Executor e = executor;
1002     U u = null;
1003 dl 1.19 if (ex == null) {
1004     try {
1005 dl 1.20 if (e != null)
1006 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
1007 dl 1.19 else
1008 dl 1.20 u = fn.apply(t);
1009 dl 1.19 } catch (Throwable rex) {
1010     ex = rex;
1011     }
1012     }
1013 dl 1.20 if (e == null || ex != null)
1014     dst.internalComplete(u, ex);
1015 jsr166 1.2 }
1016     }
1017 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1018 dl 1.1 }
1019    
1020 dl 1.28 static final class OrAcceptCompletion<T> extends Completion {
1021 dl 1.7 final CompletableFuture<? extends T> src;
1022     final CompletableFuture<? extends T> snd;
1023 dl 1.34 final Consumer<? super T> fn;
1024 dl 1.7 final CompletableFuture<Void> dst;
1025     final Executor executor;
1026 dl 1.28 OrAcceptCompletion(CompletableFuture<? extends T> src,
1027     CompletableFuture<? extends T> snd,
1028 dl 1.34 Consumer<? super T> fn,
1029 dl 1.28 CompletableFuture<Void> dst, Executor executor) {
1030 dl 1.7 this.src = src; this.snd = snd;
1031     this.fn = fn; this.dst = dst;
1032     this.executor = executor;
1033     }
1034 jsr166 1.26 public final void run() {
1035 dl 1.28 final CompletableFuture<? extends T> a;
1036     final CompletableFuture<? extends T> b;
1037 dl 1.34 final Consumer<? super T> fn;
1038 dl 1.28 final CompletableFuture<Void> dst;
1039 dl 1.7 Object r; T t; Throwable ex;
1040     if ((dst = this.dst) != null &&
1041     (fn = this.fn) != null &&
1042     (((a = this.src) != null && (r = a.result) != null) ||
1043     ((b = this.snd) != null && (r = b.result) != null)) &&
1044     compareAndSet(0, 1)) {
1045     if (r instanceof AltResult) {
1046 dl 1.19 ex = ((AltResult)r).ex;
1047 dl 1.7 t = null;
1048     }
1049 dl 1.19 else {
1050     ex = null;
1051 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1052     t = tr;
1053 dl 1.19 }
1054 dl 1.20 Executor e = executor;
1055 dl 1.19 if (ex == null) {
1056     try {
1057 dl 1.20 if (e != null)
1058 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
1059 dl 1.20 else
1060 dl 1.19 fn.accept(t);
1061     } catch (Throwable rex) {
1062     ex = rex;
1063 dl 1.7 }
1064     }
1065 dl 1.20 if (e == null || ex != null)
1066     dst.internalComplete(null, ex);
1067 dl 1.7 }
1068     }
1069 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1070 dl 1.7 }
1071    
1072 dl 1.28 static final class OrRunCompletion<T> extends Completion {
1073 jsr166 1.2 final CompletableFuture<? extends T> src;
1074     final CompletableFuture<?> snd;
1075     final Runnable fn;
1076     final CompletableFuture<Void> dst;
1077 dl 1.1 final Executor executor;
1078 dl 1.28 OrRunCompletion(CompletableFuture<? extends T> src,
1079     CompletableFuture<?> snd,
1080     Runnable fn,
1081     CompletableFuture<Void> dst, Executor executor) {
1082 dl 1.1 this.src = src; this.snd = snd;
1083     this.fn = fn; this.dst = dst;
1084     this.executor = executor;
1085     }
1086 jsr166 1.26 public final void run() {
1087 dl 1.28 final CompletableFuture<? extends T> a;
1088 dl 1.1 final CompletableFuture<?> b;
1089     final Runnable fn;
1090     final CompletableFuture<Void> dst;
1091 dl 1.28 Object r; Throwable ex;
1092 jsr166 1.2 if ((dst = this.dst) != null &&
1093 dl 1.1 (fn = this.fn) != null &&
1094     (((a = this.src) != null && (r = a.result) != null) ||
1095     ((b = this.snd) != null && (r = b.result) != null)) &&
1096     compareAndSet(0, 1)) {
1097 dl 1.19 if (r instanceof AltResult)
1098     ex = ((AltResult)r).ex;
1099     else
1100     ex = null;
1101 dl 1.20 Executor e = executor;
1102 dl 1.19 if (ex == null) {
1103 dl 1.1 try {
1104 dl 1.20 if (e != null)
1105 dl 1.28 e.execute(new AsyncRun(fn, dst));
1106 dl 1.20 else
1107 dl 1.1 fn.run();
1108     } catch (Throwable rex) {
1109 dl 1.19 ex = rex;
1110 dl 1.1 }
1111     }
1112 dl 1.20 if (e == null || ex != null)
1113     dst.internalComplete(null, ex);
1114 jsr166 1.2 }
1115     }
1116 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1117 dl 1.1 }
1118    
1119 dl 1.35 static final class OrCompletion extends Completion {
1120     final CompletableFuture<?> src;
1121     final CompletableFuture<?> snd;
1122     final CompletableFuture<?> dst;
1123     OrCompletion(CompletableFuture<?> src,
1124     CompletableFuture<?> snd,
1125     CompletableFuture<?> dst) {
1126     this.src = src; this.snd = snd; this.dst = dst;
1127     }
1128     public final void run() {
1129     final CompletableFuture<?> a;
1130     final CompletableFuture<?> b;
1131     final CompletableFuture<?> dst;
1132     Object r, t; Throwable ex;
1133     if ((dst = this.dst) != null &&
1134     (((a = this.src) != null && (r = a.result) != null) ||
1135     ((b = this.snd) != null && (r = b.result) != null)) &&
1136     compareAndSet(0, 1)) {
1137     if (r instanceof AltResult) {
1138     ex = ((AltResult)r).ex;
1139     t = null;
1140     }
1141     else {
1142     ex = null;
1143     t = r;
1144     }
1145     dst.internalComplete(t, ex);
1146     }
1147     }
1148     private static final long serialVersionUID = 5232453952276885070L;
1149     }
1150    
1151 dl 1.28 static final class ExceptionCompletion<T> extends Completion {
1152 jsr166 1.2 final CompletableFuture<? extends T> src;
1153 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1154     final CompletableFuture<T> dst;
1155 dl 1.28 ExceptionCompletion(CompletableFuture<? extends T> src,
1156     Function<? super Throwable, ? extends T> fn,
1157     CompletableFuture<T> dst) {
1158 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1159     }
1160 jsr166 1.26 public final void run() {
1161 dl 1.28 final CompletableFuture<? extends T> a;
1162     final Function<? super Throwable, ? extends T> fn;
1163     final CompletableFuture<T> dst;
1164 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1165 jsr166 1.2 if ((dst = this.dst) != null &&
1166 dl 1.1 (fn = this.fn) != null &&
1167     (a = this.src) != null &&
1168     (r = a.result) != null &&
1169     compareAndSet(0, 1)) {
1170 dl 1.20 if ((r instanceof AltResult) &&
1171     (ex = ((AltResult)r).ex) != null) {
1172     try {
1173     t = fn.apply(ex);
1174     } catch (Throwable rex) {
1175     dx = rex;
1176 dl 1.1 }
1177     }
1178 dl 1.28 else {
1179     @SuppressWarnings("unchecked") T tr = (T) r;
1180     t = tr;
1181     }
1182 dl 1.20 dst.internalComplete(t, dx);
1183 dl 1.1 }
1184     }
1185 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1186 dl 1.1 }
1187    
1188 dl 1.35 static final class ThenCopy extends Completion {
1189     final CompletableFuture<?> src;
1190     final CompletableFuture<?> dst;
1191     ThenCopy(CompletableFuture<?> src,
1192     CompletableFuture<?> dst) {
1193 dl 1.17 this.src = src; this.dst = dst;
1194     }
1195 jsr166 1.26 public final void run() {
1196 dl 1.35 final CompletableFuture<?> a;
1197     final CompletableFuture<?> dst;
1198 dl 1.20 Object r; Object t; Throwable ex;
1199 dl 1.17 if ((dst = this.dst) != null &&
1200     (a = this.src) != null &&
1201     (r = a.result) != null &&
1202     compareAndSet(0, 1)) {
1203 dl 1.20 if (r instanceof AltResult) {
1204     ex = ((AltResult)r).ex;
1205     t = null;
1206     }
1207     else {
1208     ex = null;
1209     t = r;
1210     }
1211     dst.internalComplete(t, ex);
1212 dl 1.17 }
1213     }
1214 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1215 dl 1.17 }
1216    
1217 dl 1.28 static final class HandleCompletion<T,U> extends Completion {
1218 dl 1.17 final CompletableFuture<? extends T> src;
1219     final BiFunction<? super T, Throwable, ? extends U> fn;
1220     final CompletableFuture<U> dst;
1221 dl 1.28 HandleCompletion(CompletableFuture<? extends T> src,
1222     BiFunction<? super T, Throwable, ? extends U> fn,
1223 jsr166 1.64 CompletableFuture<U> dst) {
1224 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1225     }
1226 jsr166 1.26 public final void run() {
1227 dl 1.28 final CompletableFuture<? extends T> a;
1228     final BiFunction<? super T, Throwable, ? extends U> fn;
1229     final CompletableFuture<U> dst;
1230 dl 1.17 Object r; T t; Throwable ex;
1231     if ((dst = this.dst) != null &&
1232     (fn = this.fn) != null &&
1233     (a = this.src) != null &&
1234     (r = a.result) != null &&
1235     compareAndSet(0, 1)) {
1236     if (r instanceof AltResult) {
1237     ex = ((AltResult)r).ex;
1238     t = null;
1239     }
1240     else {
1241     ex = null;
1242 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1243     t = tr;
1244 dl 1.17 }
1245 dl 1.20 U u = null; Throwable dx = null;
1246 dl 1.17 try {
1247 dl 1.20 u = fn.apply(t, ex);
1248 dl 1.17 } catch (Throwable rex) {
1249 dl 1.20 dx = rex;
1250 dl 1.17 }
1251 dl 1.20 dst.internalComplete(u, dx);
1252 dl 1.17 }
1253     }
1254 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1255 dl 1.17 }
1256    
1257 dl 1.28 static final class ComposeCompletion<T,U> extends Completion {
1258 dl 1.17 final CompletableFuture<? extends T> src;
1259     final Function<? super T, CompletableFuture<U>> fn;
1260     final CompletableFuture<U> dst;
1261 dl 1.37 final Executor executor;
1262 dl 1.28 ComposeCompletion(CompletableFuture<? extends T> src,
1263     Function<? super T, CompletableFuture<U>> fn,
1264 jsr166 1.64 CompletableFuture<U> dst, Executor executor) {
1265 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1266 dl 1.37 this.executor = executor;
1267 dl 1.17 }
1268 jsr166 1.26 public final void run() {
1269 dl 1.28 final CompletableFuture<? extends T> a;
1270     final Function<? super T, CompletableFuture<U>> fn;
1271     final CompletableFuture<U> dst;
1272 dl 1.37 Object r; T t; Throwable ex; Executor e;
1273 dl 1.17 if ((dst = this.dst) != null &&
1274     (fn = this.fn) != null &&
1275     (a = this.src) != null &&
1276     (r = a.result) != null &&
1277     compareAndSet(0, 1)) {
1278     if (r instanceof AltResult) {
1279     ex = ((AltResult)r).ex;
1280     t = null;
1281     }
1282     else {
1283     ex = null;
1284 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1285     t = tr;
1286 dl 1.17 }
1287     CompletableFuture<U> c = null;
1288 dl 1.20 U u = null;
1289     boolean complete = false;
1290 dl 1.17 if (ex == null) {
1291 dl 1.37 if ((e = executor) != null)
1292     e.execute(new AsyncCompose<T,U>(t, fn, dst));
1293     else {
1294     try {
1295     if ((c = fn.apply(t)) == null)
1296     ex = new NullPointerException();
1297     } catch (Throwable rex) {
1298     ex = rex;
1299     }
1300 dl 1.17 }
1301     }
1302 dl 1.37 if (c != null) {
1303 dl 1.35 ThenCopy d = null;
1304 dl 1.18 Object s;
1305     if ((s = c.result) == null) {
1306     CompletionNode p = new CompletionNode
1307 dl 1.35 (d = new ThenCopy(c, dst));
1308 dl 1.18 while ((s = c.result) == null) {
1309     if (UNSAFE.compareAndSwapObject
1310     (c, COMPLETIONS, p.next = c.completions, p))
1311     break;
1312     }
1313 dl 1.17 }
1314 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1315 dl 1.20 complete = true;
1316 dl 1.18 if (s instanceof AltResult) {
1317     ex = ((AltResult)s).ex; // no rewrap
1318     u = null;
1319 dl 1.28 }
1320     else {
1321     @SuppressWarnings("unchecked") U us = (U) s;
1322     u = us;
1323     }
1324     }
1325     }
1326     if (complete || ex != null)
1327     dst.internalComplete(u, ex);
1328     if (c != null)
1329     c.helpPostComplete();
1330     }
1331     }
1332     private static final long serialVersionUID = 5232453952276885070L;
1333     }
1334    
1335     // public methods
1336    
1337     /**
1338     * Creates a new incomplete CompletableFuture.
1339     */
1340     public CompletableFuture() {
1341     }
1342    
1343     /**
1344 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1345     * by a task running in the {@link ForkJoinPool#commonPool()} with
1346     * the value obtained by calling the given Supplier.
1347 dl 1.28 *
1348     * @param supplier a function returning the value to be used
1349     * to complete the returned CompletableFuture
1350 jsr166 1.58 * @return the new CompletableFuture
1351 dl 1.28 */
1352     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1353     if (supplier == null) throw new NullPointerException();
1354     CompletableFuture<U> f = new CompletableFuture<U>();
1355     ForkJoinPool.commonPool().
1356     execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1357     return f;
1358     }
1359    
1360     /**
1361 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1362     * by a task running in the given executor with the value obtained
1363     * by calling the given Supplier.
1364 dl 1.28 *
1365     * @param supplier a function returning the value to be used
1366     * to complete the returned CompletableFuture
1367     * @param executor the executor to use for asynchronous execution
1368 jsr166 1.58 * @return the new CompletableFuture
1369 dl 1.28 */
1370     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1371     Executor executor) {
1372     if (executor == null || supplier == null)
1373     throw new NullPointerException();
1374     CompletableFuture<U> f = new CompletableFuture<U>();
1375     executor.execute(new AsyncSupply<U>(supplier, f));
1376     return f;
1377     }
1378    
1379     /**
1380 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1381     * by a task running in the {@link ForkJoinPool#commonPool()} after
1382     * it runs the given action.
1383 dl 1.28 *
1384     * @param runnable the action to run before completing the
1385     * returned CompletableFuture
1386 jsr166 1.58 * @return the new CompletableFuture
1387 dl 1.28 */
1388     public static CompletableFuture<Void> runAsync(Runnable runnable) {
1389     if (runnable == null) throw new NullPointerException();
1390     CompletableFuture<Void> f = new CompletableFuture<Void>();
1391     ForkJoinPool.commonPool().
1392     execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1393     return f;
1394     }
1395    
1396     /**
1397 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1398     * by a task running in the given executor after it runs the given
1399     * action.
1400 dl 1.28 *
1401     * @param runnable the action to run before completing the
1402     * returned CompletableFuture
1403     * @param executor the executor to use for asynchronous execution
1404 jsr166 1.58 * @return the new CompletableFuture
1405 dl 1.28 */
1406     public static CompletableFuture<Void> runAsync(Runnable runnable,
1407     Executor executor) {
1408     if (executor == null || runnable == null)
1409     throw new NullPointerException();
1410     CompletableFuture<Void> f = new CompletableFuture<Void>();
1411     executor.execute(new AsyncRun(runnable, f));
1412     return f;
1413     }
1414    
1415     /**
1416     * Returns {@code true} if completed in any fashion: normally,
1417     * exceptionally, or via cancellation.
1418     *
1419     * @return {@code true} if completed
1420     */
1421     public boolean isDone() {
1422     return result != null;
1423     }
1424    
1425     /**
1426 dl 1.49 * Waits if necessary for this future to complete, and then
1427 dl 1.48 * returns its result.
1428 dl 1.28 *
1429 dl 1.48 * @return the result value
1430     * @throws CancellationException if this future was cancelled
1431     * @throws ExecutionException if this future completed exceptionally
1432 dl 1.28 * @throws InterruptedException if the current thread was interrupted
1433     * while waiting
1434     */
1435     public T get() throws InterruptedException, ExecutionException {
1436     Object r; Throwable ex, cause;
1437     if ((r = result) == null && (r = waitingGet(true)) == null)
1438     throw new InterruptedException();
1439 jsr166 1.45 if (!(r instanceof AltResult)) {
1440     @SuppressWarnings("unchecked") T tr = (T) r;
1441     return tr;
1442     }
1443     if ((ex = ((AltResult)r).ex) == null)
1444 dl 1.28 return null;
1445 jsr166 1.45 if (ex instanceof CancellationException)
1446     throw (CancellationException)ex;
1447     if ((ex instanceof CompletionException) &&
1448     (cause = ex.getCause()) != null)
1449     ex = cause;
1450     throw new ExecutionException(ex);
1451 dl 1.28 }
1452    
1453     /**
1454 dl 1.49 * Waits if necessary for at most the given time for this future
1455     * to complete, and then returns its result, if available.
1456 dl 1.28 *
1457     * @param timeout the maximum time to wait
1458     * @param unit the time unit of the timeout argument
1459 dl 1.48 * @return the result value
1460     * @throws CancellationException if this future was cancelled
1461     * @throws ExecutionException if this future completed exceptionally
1462 dl 1.28 * @throws InterruptedException if the current thread was interrupted
1463     * while waiting
1464     * @throws TimeoutException if the wait timed out
1465     */
1466     public T get(long timeout, TimeUnit unit)
1467     throws InterruptedException, ExecutionException, TimeoutException {
1468     Object r; Throwable ex, cause;
1469     long nanos = unit.toNanos(timeout);
1470     if (Thread.interrupted())
1471     throw new InterruptedException();
1472     if ((r = result) == null)
1473     r = timedAwaitDone(nanos);
1474 jsr166 1.45 if (!(r instanceof AltResult)) {
1475     @SuppressWarnings("unchecked") T tr = (T) r;
1476     return tr;
1477     }
1478     if ((ex = ((AltResult)r).ex) == null)
1479 dl 1.28 return null;
1480 jsr166 1.45 if (ex instanceof CancellationException)
1481     throw (CancellationException)ex;
1482     if ((ex instanceof CompletionException) &&
1483     (cause = ex.getCause()) != null)
1484     ex = cause;
1485     throw new ExecutionException(ex);
1486 dl 1.28 }
1487    
1488     /**
1489     * Returns the result value when complete, or throws an
1490     * (unchecked) exception if completed exceptionally. To better
1491     * conform with the use of common functional forms, if a
1492     * computation involved in the completion of this
1493     * CompletableFuture threw an exception, this method throws an
1494     * (unchecked) {@link CompletionException} with the underlying
1495     * exception as its cause.
1496     *
1497     * @return the result value
1498     * @throws CancellationException if the computation was cancelled
1499 jsr166 1.55 * @throws CompletionException if this future completed
1500     * exceptionally or a completion computation threw an exception
1501 dl 1.28 */
1502     public T join() {
1503     Object r; Throwable ex;
1504     if ((r = result) == null)
1505     r = waitingGet(false);
1506 jsr166 1.45 if (!(r instanceof AltResult)) {
1507     @SuppressWarnings("unchecked") T tr = (T) r;
1508     return tr;
1509     }
1510     if ((ex = ((AltResult)r).ex) == null)
1511 dl 1.28 return null;
1512 jsr166 1.45 if (ex instanceof CancellationException)
1513     throw (CancellationException)ex;
1514     if (ex instanceof CompletionException)
1515     throw (CompletionException)ex;
1516     throw new CompletionException(ex);
1517 dl 1.28 }
1518    
1519     /**
1520     * Returns the result value (or throws any encountered exception)
1521     * if completed, else returns the given valueIfAbsent.
1522     *
1523     * @param valueIfAbsent the value to return if not completed
1524     * @return the result value, if completed, else the given valueIfAbsent
1525     * @throws CancellationException if the computation was cancelled
1526 jsr166 1.55 * @throws CompletionException if this future completed
1527     * exceptionally or a completion computation threw an exception
1528 dl 1.28 */
1529     public T getNow(T valueIfAbsent) {
1530     Object r; Throwable ex;
1531     if ((r = result) == null)
1532     return valueIfAbsent;
1533 jsr166 1.45 if (!(r instanceof AltResult)) {
1534     @SuppressWarnings("unchecked") T tr = (T) r;
1535     return tr;
1536     }
1537     if ((ex = ((AltResult)r).ex) == null)
1538 dl 1.28 return null;
1539 jsr166 1.45 if (ex instanceof CancellationException)
1540     throw (CancellationException)ex;
1541     if (ex instanceof CompletionException)
1542     throw (CompletionException)ex;
1543     throw new CompletionException(ex);
1544 dl 1.28 }
1545    
1546     /**
1547     * If not already completed, sets the value returned by {@link
1548     * #get()} and related methods to the given value.
1549     *
1550     * @param value the result value
1551     * @return {@code true} if this invocation caused this CompletableFuture
1552     * to transition to a completed state, else {@code false}
1553     */
1554     public boolean complete(T value) {
1555     boolean triggered = result == null &&
1556     UNSAFE.compareAndSwapObject(this, RESULT, null,
1557     value == null ? NIL : value);
1558     postComplete();
1559     return triggered;
1560     }
1561    
1562     /**
1563     * If not already completed, causes invocations of {@link #get()}
1564     * and related methods to throw the given exception.
1565     *
1566     * @param ex the exception
1567     * @return {@code true} if this invocation caused this CompletableFuture
1568     * to transition to a completed state, else {@code false}
1569     */
1570     public boolean completeExceptionally(Throwable ex) {
1571     if (ex == null) throw new NullPointerException();
1572     boolean triggered = result == null &&
1573     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1574     postComplete();
1575     return triggered;
1576     }
1577    
1578     /**
1579 jsr166 1.66 * Returns a new CompletableFuture that is completed
1580     * when this CompletableFuture completes, with the result of the
1581     * given function of this CompletableFuture's result.
1582     *
1583     * <p>If this CompletableFuture completes exceptionally,
1584     * then the returned CompletableFuture also does so, with a
1585     * CompletionException holding this exception as its cause.
1586 dl 1.28 *
1587     * @param fn the function to use to compute the value of
1588     * the returned CompletableFuture
1589     * @return the new CompletableFuture
1590     */
1591     public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1592     return doThenApply(fn, null);
1593     }
1594    
1595     /**
1596 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1597     * when this CompletableFuture completes, with the result of the
1598     * given function of this CompletableFuture's result, called from a
1599     * task running in the {@link ForkJoinPool#commonPool()}.
1600     *
1601     * <p>If this CompletableFuture completes exceptionally,
1602     * then the returned CompletableFuture also does so, with a
1603 dl 1.28 * CompletionException holding this exception as its cause.
1604     *
1605     * @param fn the function to use to compute the value of
1606     * the returned CompletableFuture
1607     * @return the new CompletableFuture
1608     */
1609 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
1610     (Function<? super T,? extends U> fn) {
1611 dl 1.28 return doThenApply(fn, ForkJoinPool.commonPool());
1612 dl 1.17 }
1613    
1614 dl 1.28 /**
1615 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1616     * when this CompletableFuture completes, with the result of the
1617     * given function of this CompletableFuture's result, called from a
1618     * task running in the given executor.
1619     *
1620     * <p>If this CompletableFuture completes exceptionally,
1621     * then the returned CompletableFuture also does so, with a
1622     * CompletionException holding this exception as its cause.
1623 dl 1.28 *
1624     * @param fn the function to use to compute the value of
1625     * the returned CompletableFuture
1626     * @param executor the executor to use for asynchronous execution
1627     * @return the new CompletableFuture
1628     */
1629 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
1630     (Function<? super T,? extends U> fn,
1631     Executor executor) {
1632 dl 1.28 if (executor == null) throw new NullPointerException();
1633     return doThenApply(fn, executor);
1634     }
1635 dl 1.1
1636 dl 1.48 private <U> CompletableFuture<U> doThenApply
1637     (Function<? super T,? extends U> fn,
1638     Executor e) {
1639 dl 1.1 if (fn == null) throw new NullPointerException();
1640     CompletableFuture<U> dst = new CompletableFuture<U>();
1641 dl 1.28 ApplyCompletion<T,U> d = null;
1642 jsr166 1.2 Object r;
1643     if ((r = result) == null) {
1644 dl 1.1 CompletionNode p = new CompletionNode
1645 dl 1.28 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1646 dl 1.1 while ((r = result) == null) {
1647     if (UNSAFE.compareAndSwapObject
1648     (this, COMPLETIONS, p.next = completions, p))
1649     break;
1650     }
1651     }
1652 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1653 dl 1.19 T t; Throwable ex;
1654 jsr166 1.2 if (r instanceof AltResult) {
1655 dl 1.19 ex = ((AltResult)r).ex;
1656 jsr166 1.2 t = null;
1657 dl 1.1 }
1658 dl 1.19 else {
1659     ex = null;
1660 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1661     t = tr;
1662 dl 1.19 }
1663 dl 1.20 U u = null;
1664 jsr166 1.2 if (ex == null) {
1665 dl 1.1 try {
1666 dl 1.20 if (e != null)
1667 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
1668 dl 1.1 else
1669 dl 1.20 u = fn.apply(t);
1670 dl 1.1 } catch (Throwable rex) {
1671 dl 1.19 ex = rex;
1672 dl 1.1 }
1673     }
1674 dl 1.20 if (e == null || ex != null)
1675     dst.internalComplete(u, ex);
1676 dl 1.17 }
1677 dl 1.20 helpPostComplete();
1678 dl 1.1 return dst;
1679     }
1680    
1681 dl 1.28 /**
1682 jsr166 1.66 * Returns a new CompletableFuture that is completed
1683     * when this CompletableFuture completes, after performing the given
1684     * action with this CompletableFuture's result.
1685     *
1686     * <p>If this CompletableFuture completes exceptionally,
1687     * then the returned CompletableFuture also does so, with a
1688     * CompletionException holding this exception as its cause.
1689 dl 1.28 *
1690     * @param block the action to perform before completing the
1691     * returned CompletableFuture
1692     * @return the new CompletableFuture
1693     */
1694 dl 1.34 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1695 dl 1.28 return doThenAccept(block, null);
1696     }
1697    
1698     /**
1699 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1700     * when this CompletableFuture completes, after performing the given
1701     * action with this CompletableFuture's result from a task running
1702     * in the {@link ForkJoinPool#commonPool()}.
1703     *
1704     * <p>If this CompletableFuture completes exceptionally,
1705     * then the returned CompletableFuture also does so, with a
1706     * CompletionException holding this exception as its cause.
1707 dl 1.28 *
1708     * @param block the action to perform before completing the
1709     * returned CompletableFuture
1710     * @return the new CompletableFuture
1711     */
1712 dl 1.34 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1713 dl 1.28 return doThenAccept(block, ForkJoinPool.commonPool());
1714     }
1715    
1716     /**
1717 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1718     * when this CompletableFuture completes, after performing the given
1719     * action with this CompletableFuture's result from a task running
1720     * in the given executor.
1721     *
1722     * <p>If this CompletableFuture completes exceptionally,
1723     * then the returned CompletableFuture also does so, with a
1724     * CompletionException holding this exception as its cause.
1725 dl 1.28 *
1726     * @param block the action to perform before completing the
1727     * returned CompletableFuture
1728     * @param executor the executor to use for asynchronous execution
1729     * @return the new CompletableFuture
1730     */
1731 dl 1.34 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1732 dl 1.28 Executor executor) {
1733     if (executor == null) throw new NullPointerException();
1734     return doThenAccept(block, executor);
1735     }
1736    
1737 dl 1.34 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1738 dl 1.28 Executor e) {
1739 dl 1.7 if (fn == null) throw new NullPointerException();
1740     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1741 dl 1.28 AcceptCompletion<T> d = null;
1742 dl 1.7 Object r;
1743     if ((r = result) == null) {
1744     CompletionNode p = new CompletionNode
1745 dl 1.28 (d = new AcceptCompletion<T>(this, fn, dst, e));
1746 dl 1.7 while ((r = result) == null) {
1747     if (UNSAFE.compareAndSwapObject
1748     (this, COMPLETIONS, p.next = completions, p))
1749     break;
1750     }
1751     }
1752     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1753 dl 1.19 T t; Throwable ex;
1754 dl 1.7 if (r instanceof AltResult) {
1755 dl 1.19 ex = ((AltResult)r).ex;
1756 dl 1.7 t = null;
1757     }
1758 dl 1.19 else {
1759     ex = null;
1760 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1761     t = tr;
1762 dl 1.19 }
1763 dl 1.7 if (ex == null) {
1764     try {
1765 dl 1.20 if (e != null)
1766 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
1767 dl 1.20 else
1768 dl 1.7 fn.accept(t);
1769     } catch (Throwable rex) {
1770 dl 1.19 ex = rex;
1771 dl 1.7 }
1772     }
1773 dl 1.20 if (e == null || ex != null)
1774     dst.internalComplete(null, ex);
1775 dl 1.17 }
1776 dl 1.20 helpPostComplete();
1777 dl 1.7 return dst;
1778     }
1779    
1780 dl 1.28 /**
1781 jsr166 1.66 * Returns a new CompletableFuture that is completed
1782     * when this CompletableFuture completes, after performing the given
1783     * action.
1784     *
1785     * <p>If this CompletableFuture completes exceptionally,
1786 dl 1.28 * then the returned CompletableFuture also does so, with a
1787     * CompletionException holding this exception as its cause.
1788     *
1789     * @param action the action to perform before completing the
1790     * returned CompletableFuture
1791     * @return the new CompletableFuture
1792     */
1793     public CompletableFuture<Void> thenRun(Runnable action) {
1794     return doThenRun(action, null);
1795     }
1796    
1797     /**
1798 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1799     * when this CompletableFuture completes, after performing the given
1800     * action from a task running in the {@link ForkJoinPool#commonPool()}.
1801     *
1802     * <p>If this CompletableFuture completes exceptionally,
1803 dl 1.28 * then the returned CompletableFuture also does so, with a
1804     * CompletionException holding this exception as its cause.
1805     *
1806     * @param action the action to perform before completing the
1807     * returned CompletableFuture
1808     * @return the new CompletableFuture
1809     */
1810     public CompletableFuture<Void> thenRunAsync(Runnable action) {
1811     return doThenRun(action, ForkJoinPool.commonPool());
1812     }
1813    
1814     /**
1815 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1816     * when this CompletableFuture completes, after performing the given
1817     * action from a task running in the given executor.
1818     *
1819     * <p>If this CompletableFuture completes exceptionally,
1820     * then the returned CompletableFuture also does so, with a
1821     * CompletionException holding this exception as its cause.
1822 dl 1.28 *
1823     * @param action the action to perform before completing the
1824     * returned CompletableFuture
1825     * @param executor the executor to use for asynchronous execution
1826     * @return the new CompletableFuture
1827     */
1828     public CompletableFuture<Void> thenRunAsync(Runnable action,
1829     Executor executor) {
1830     if (executor == null) throw new NullPointerException();
1831     return doThenRun(action, executor);
1832     }
1833    
1834     private CompletableFuture<Void> doThenRun(Runnable action,
1835     Executor e) {
1836 dl 1.1 if (action == null) throw new NullPointerException();
1837     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1838 dl 1.28 RunCompletion<T> d = null;
1839 jsr166 1.2 Object r;
1840     if ((r = result) == null) {
1841 dl 1.1 CompletionNode p = new CompletionNode
1842 dl 1.28 (d = new RunCompletion<T>(this, action, dst, e));
1843 dl 1.1 while ((r = result) == null) {
1844     if (UNSAFE.compareAndSwapObject
1845     (this, COMPLETIONS, p.next = completions, p))
1846     break;
1847     }
1848     }
1849 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1850 dl 1.19 Throwable ex;
1851     if (r instanceof AltResult)
1852 dl 1.28 ex = ((AltResult)r).ex;
1853     else
1854     ex = null;
1855     if (ex == null) {
1856     try {
1857     if (e != null)
1858     e.execute(new AsyncRun(action, dst));
1859     else
1860     action.run();
1861     } catch (Throwable rex) {
1862     ex = rex;
1863     }
1864     }
1865     if (e == null || ex != null)
1866     dst.internalComplete(null, ex);
1867     }
1868     helpPostComplete();
1869     return dst;
1870     }
1871    
1872     /**
1873 jsr166 1.66 * Returns a new CompletableFuture that is completed
1874     * when both this and the other given CompletableFuture complete,
1875     * with the result of the given function of the results of the two
1876     * CompletableFutures.
1877     *
1878     * <p>If this and/or the other CompletableFuture complete exceptionally,
1879     * then the returned CompletableFuture also does so, with a
1880 dl 1.28 * CompletionException holding the exception as its cause.
1881     *
1882     * @param other the other CompletableFuture
1883     * @param fn the function to use to compute the value of
1884     * the returned CompletableFuture
1885     * @return the new CompletableFuture
1886     */
1887 dl 1.48 public <U,V> CompletableFuture<V> thenCombine
1888     (CompletableFuture<? extends U> other,
1889     BiFunction<? super T,? super U,? extends V> fn) {
1890 dl 1.28 return doThenBiApply(other, fn, null);
1891     }
1892    
1893     /**
1894 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1895     * when both this and the other given CompletableFuture complete,
1896     * with the result of the given function of the results of the two
1897     * CompletableFutures, called from a task running in the
1898     * {@link ForkJoinPool#commonPool()}.
1899     *
1900     * <p>If this and/or the other CompletableFuture complete exceptionally,
1901     * then the returned CompletableFuture also does so, with a
1902 dl 1.28 * CompletionException holding the exception as its cause.
1903     *
1904     * @param other the other CompletableFuture
1905     * @param fn the function to use to compute the value of
1906     * the returned CompletableFuture
1907     * @return the new CompletableFuture
1908     */
1909 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
1910     (CompletableFuture<? extends U> other,
1911     BiFunction<? super T,? super U,? extends V> fn) {
1912 dl 1.28 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1913     }
1914    
1915     /**
1916 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
1917     * when both this and the other given CompletableFuture complete,
1918     * with the result of the given function of the results of the two
1919     * CompletableFutures, called from a task running in the
1920     * given executor.
1921     *
1922     * <p>If this and/or the other CompletableFuture complete exceptionally,
1923     * then the returned CompletableFuture also does so, with a
1924 dl 1.28 * CompletionException holding the exception as its cause.
1925     *
1926     * @param other the other CompletableFuture
1927     * @param fn the function to use to compute the value of
1928     * the returned CompletableFuture
1929     * @param executor the executor to use for asynchronous execution
1930     * @return the new CompletableFuture
1931     */
1932 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
1933     (CompletableFuture<? extends U> other,
1934     BiFunction<? super T,? super U,? extends V> fn,
1935     Executor executor) {
1936 dl 1.28 if (executor == null) throw new NullPointerException();
1937     return doThenBiApply(other, fn, executor);
1938 dl 1.1 }
1939    
1940 dl 1.48 private <U,V> CompletableFuture<V> doThenBiApply
1941     (CompletableFuture<? extends U> other,
1942     BiFunction<? super T,? super U,? extends V> fn,
1943     Executor e) {
1944 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
1945 jsr166 1.2 CompletableFuture<V> dst = new CompletableFuture<V>();
1946 dl 1.28 BiApplyCompletion<T,U,V> d = null;
1947 jsr166 1.2 Object r, s = null;
1948     if ((r = result) == null || (s = other.result) == null) {
1949 dl 1.28 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1950 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
1951     while ((r == null && (r = result) == null) ||
1952     (s == null && (s = other.result) == null)) {
1953     if (q != null) {
1954     if (s != null ||
1955     UNSAFE.compareAndSwapObject
1956     (other, COMPLETIONS, q.next = other.completions, q))
1957     break;
1958     }
1959     else if (r != null ||
1960     UNSAFE.compareAndSwapObject
1961     (this, COMPLETIONS, p.next = completions, p)) {
1962     if (s != null)
1963     break;
1964     q = new CompletionNode(d);
1965     }
1966     }
1967     }
1968 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1969 dl 1.19 T t; U u; Throwable ex;
1970 jsr166 1.2 if (r instanceof AltResult) {
1971 dl 1.19 ex = ((AltResult)r).ex;
1972 jsr166 1.2 t = null;
1973 dl 1.1 }
1974 dl 1.19 else {
1975     ex = null;
1976 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1977     t = tr;
1978 dl 1.19 }
1979 dl 1.1 if (ex != null)
1980     u = null;
1981     else if (s instanceof AltResult) {
1982 dl 1.19 ex = ((AltResult)s).ex;
1983 dl 1.1 u = null;
1984     }
1985 dl 1.28 else {
1986     @SuppressWarnings("unchecked") U us = (U) s;
1987     u = us;
1988     }
1989 dl 1.20 V v = null;
1990 jsr166 1.2 if (ex == null) {
1991 dl 1.1 try {
1992 dl 1.20 if (e != null)
1993 dl 1.28 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1994 dl 1.1 else
1995 dl 1.20 v = fn.apply(t, u);
1996 dl 1.1 } catch (Throwable rex) {
1997 dl 1.19 ex = rex;
1998 dl 1.1 }
1999     }
2000 dl 1.20 if (e == null || ex != null)
2001     dst.internalComplete(v, ex);
2002 jsr166 1.2 }
2003 dl 1.20 helpPostComplete();
2004     other.helpPostComplete();
2005 jsr166 1.2 return dst;
2006 dl 1.1 }
2007    
2008 dl 1.28 /**
2009 jsr166 1.66 * Returns a new CompletableFuture that is completed
2010     * when both this and the other given CompletableFuture complete,
2011     * after performing the given action with the results of the two
2012     * CompletableFutures.
2013     *
2014     * <p>If this and/or the other CompletableFuture complete exceptionally,
2015     * then the returned CompletableFuture also does so, with a
2016     * CompletionException holding one of these exceptions as its cause.
2017 dl 1.28 *
2018     * @param other the other CompletableFuture
2019     * @param block the action to perform before completing the
2020     * returned CompletableFuture
2021     * @return the new CompletableFuture
2022     */
2023 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBoth
2024     (CompletableFuture<? extends U> other,
2025     BiConsumer<? super T, ? super U> block) {
2026 dl 1.28 return doThenBiAccept(other, block, null);
2027     }
2028    
2029     /**
2030 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2031     * when both this and the other given CompletableFuture complete,
2032     * after performing the given action with the results of the two
2033     * CompletableFutures from a task running in the {@link
2034     * ForkJoinPool#commonPool()}.
2035     *
2036     * <p>If this and/or the other CompletableFuture complete exceptionally,
2037     * then the returned CompletableFuture also does so, with a
2038     * CompletionException holding one of these exceptions as its cause.
2039 dl 1.28 *
2040     * @param other the other CompletableFuture
2041     * @param block the action to perform before completing the
2042     * returned CompletableFuture
2043     * @return the new CompletableFuture
2044     */
2045 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2046     (CompletableFuture<? extends U> other,
2047     BiConsumer<? super T, ? super U> block) {
2048 dl 1.28 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2049     }
2050    
2051     /**
2052 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2053     * when both this and the other given CompletableFuture complete,
2054     * after performing the given action with the results of the two
2055     * CompletableFutures from a task running in the given executor.
2056     *
2057     * <p>If this and/or the other CompletableFuture complete exceptionally,
2058 jsr166 1.47 * then the returned CompletableFuture also does so, with a
2059     * CompletionException holding one of these exceptions as its cause.
2060 dl 1.28 *
2061     * @param other the other CompletableFuture
2062     * @param block the action to perform before completing the
2063     * returned CompletableFuture
2064     * @param executor the executor to use for asynchronous execution
2065     * @return the new CompletableFuture
2066     */
2067 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2068     (CompletableFuture<? extends U> other,
2069     BiConsumer<? super T, ? super U> block,
2070     Executor executor) {
2071 dl 1.28 if (executor == null) throw new NullPointerException();
2072     return doThenBiAccept(other, block, executor);
2073     }
2074    
2075 dl 1.48 private <U> CompletableFuture<Void> doThenBiAccept
2076     (CompletableFuture<? extends U> other,
2077     BiConsumer<? super T,? super U> fn,
2078     Executor e) {
2079 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2080     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2081 dl 1.28 BiAcceptCompletion<T,U> d = null;
2082 dl 1.7 Object r, s = null;
2083     if ((r = result) == null || (s = other.result) == null) {
2084 dl 1.28 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2085 dl 1.7 CompletionNode q = null, p = new CompletionNode(d);
2086     while ((r == null && (r = result) == null) ||
2087     (s == null && (s = other.result) == null)) {
2088     if (q != null) {
2089     if (s != null ||
2090     UNSAFE.compareAndSwapObject
2091     (other, COMPLETIONS, q.next = other.completions, q))
2092     break;
2093     }
2094     else if (r != null ||
2095     UNSAFE.compareAndSwapObject
2096     (this, COMPLETIONS, p.next = completions, p)) {
2097     if (s != null)
2098     break;
2099     q = new CompletionNode(d);
2100     }
2101     }
2102     }
2103     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2104 dl 1.19 T t; U u; Throwable ex;
2105 dl 1.7 if (r instanceof AltResult) {
2106 dl 1.19 ex = ((AltResult)r).ex;
2107 dl 1.7 t = null;
2108     }
2109 dl 1.19 else {
2110     ex = null;
2111 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2112     t = tr;
2113 dl 1.19 }
2114 dl 1.7 if (ex != null)
2115     u = null;
2116     else if (s instanceof AltResult) {
2117 dl 1.19 ex = ((AltResult)s).ex;
2118 dl 1.7 u = null;
2119     }
2120 dl 1.28 else {
2121     @SuppressWarnings("unchecked") U us = (U) s;
2122     u = us;
2123     }
2124 dl 1.7 if (ex == null) {
2125     try {
2126 dl 1.20 if (e != null)
2127 dl 1.28 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2128 dl 1.20 else
2129 dl 1.7 fn.accept(t, u);
2130     } catch (Throwable rex) {
2131 dl 1.19 ex = rex;
2132 dl 1.7 }
2133     }
2134 dl 1.20 if (e == null || ex != null)
2135     dst.internalComplete(null, ex);
2136 dl 1.7 }
2137 dl 1.20 helpPostComplete();
2138     other.helpPostComplete();
2139 dl 1.7 return dst;
2140     }
2141    
2142 dl 1.28 /**
2143 jsr166 1.66 * Returns a new CompletableFuture that is completed
2144     * when both this and the other given CompletableFuture complete,
2145     * after performing the given action.
2146     *
2147     * <p>If this and/or the other CompletableFuture complete exceptionally,
2148 jsr166 1.47 * then the returned CompletableFuture also does so, with a
2149     * CompletionException holding one of these exceptions as its cause.
2150 dl 1.28 *
2151     * @param other the other CompletableFuture
2152     * @param action the action to perform before completing the
2153     * returned CompletableFuture
2154     * @return the new CompletableFuture
2155     */
2156     public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2157     Runnable action) {
2158     return doThenBiRun(other, action, null);
2159     }
2160    
2161     /**
2162 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2163     * when both this and the other given CompletableFuture complete,
2164     * after performing the given action from a task running in the
2165     * {@link ForkJoinPool#commonPool()}.
2166     *
2167     * <p>If this and/or the other CompletableFuture complete exceptionally,
2168 jsr166 1.47 * then the returned CompletableFuture also does so, with a
2169     * CompletionException holding one of these exceptions as its cause.
2170 dl 1.28 *
2171     * @param other the other CompletableFuture
2172     * @param action the action to perform before completing the
2173     * returned CompletableFuture
2174     * @return the new CompletableFuture
2175     */
2176     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2177     Runnable action) {
2178     return doThenBiRun(other, action, ForkJoinPool.commonPool());
2179     }
2180    
2181     /**
2182 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2183     * when both this and the other given CompletableFuture complete,
2184     * after performing the given action from a task running in the
2185     * given executor.
2186     *
2187     * <p>If this and/or the other CompletableFuture complete exceptionally,
2188 jsr166 1.47 * then the returned CompletableFuture also does so, with a
2189     * CompletionException holding one of these exceptions as its cause.
2190 dl 1.28 *
2191     * @param other the other CompletableFuture
2192     * @param action the action to perform before completing the
2193     * returned CompletableFuture
2194     * @param executor the executor to use for asynchronous execution
2195     * @return the new CompletableFuture
2196     */
2197     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2198     Runnable action,
2199     Executor executor) {
2200     if (executor == null) throw new NullPointerException();
2201     return doThenBiRun(other, action, executor);
2202     }
2203    
2204     private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2205     Runnable action,
2206     Executor e) {
2207 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2208 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2209 dl 1.28 BiRunCompletion<T> d = null;
2210 jsr166 1.2 Object r, s = null;
2211     if ((r = result) == null || (s = other.result) == null) {
2212 dl 1.28 d = new BiRunCompletion<T>(this, other, action, dst, e);
2213 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2214     while ((r == null && (r = result) == null) ||
2215     (s == null && (s = other.result) == null)) {
2216     if (q != null) {
2217     if (s != null ||
2218     UNSAFE.compareAndSwapObject
2219     (other, COMPLETIONS, q.next = other.completions, q))
2220     break;
2221     }
2222     else if (r != null ||
2223     UNSAFE.compareAndSwapObject
2224     (this, COMPLETIONS, p.next = completions, p)) {
2225     if (s != null)
2226     break;
2227     q = new CompletionNode(d);
2228     }
2229     }
2230     }
2231 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2232 dl 1.19 Throwable ex;
2233     if (r instanceof AltResult)
2234     ex = ((AltResult)r).ex;
2235     else
2236     ex = null;
2237     if (ex == null && (s instanceof AltResult))
2238     ex = ((AltResult)s).ex;
2239     if (ex == null) {
2240 dl 1.1 try {
2241 dl 1.20 if (e != null)
2242 dl 1.28 e.execute(new AsyncRun(action, dst));
2243 dl 1.20 else
2244 dl 1.1 action.run();
2245     } catch (Throwable rex) {
2246 dl 1.19 ex = rex;
2247 dl 1.1 }
2248     }
2249 dl 1.20 if (e == null || ex != null)
2250     dst.internalComplete(null, ex);
2251 jsr166 1.2 }
2252 dl 1.20 helpPostComplete();
2253     other.helpPostComplete();
2254 jsr166 1.2 return dst;
2255 dl 1.1 }
2256    
2257 dl 1.28 /**
2258 jsr166 1.66 * Returns a new CompletableFuture that is completed
2259     * when either this or the other given CompletableFuture completes,
2260     * with the result of the given function of either this or the other
2261     * CompletableFuture's result.
2262     *
2263     * <p>If this and/or the other CompletableFuture complete exceptionally,
2264 dl 1.28 * then the returned CompletableFuture may also do so, with a
2265     * CompletionException holding one of these exceptions as its cause.
2266 jsr166 1.66 * No guarantees are made about which result or exception is used in
2267     * the returned CompletableFuture.
2268 dl 1.28 *
2269     * @param other the other CompletableFuture
2270     * @param fn the function to use to compute the value of
2271     * the returned CompletableFuture
2272     * @return the new CompletableFuture
2273     */
2274 dl 1.48 public <U> CompletableFuture<U> applyToEither
2275     (CompletableFuture<? extends T> other,
2276     Function<? super T, U> fn) {
2277 dl 1.28 return doOrApply(other, fn, null);
2278     }
2279    
2280     /**
2281 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2282     * when either this or the other given CompletableFuture completes,
2283     * with the result of the given function of either this or the other
2284     * CompletableFuture's result, called from a task running in the
2285     * {@link ForkJoinPool#commonPool()}.
2286     *
2287     * <p>If this and/or the other CompletableFuture complete exceptionally,
2288 dl 1.28 * then the returned CompletableFuture may also do so, with a
2289     * CompletionException holding one of these exceptions as its cause.
2290 jsr166 1.66 * No guarantees are made about which result or exception is used in
2291     * the returned CompletableFuture.
2292 dl 1.28 *
2293     * @param other the other CompletableFuture
2294     * @param fn the function to use to compute the value of
2295     * the returned CompletableFuture
2296     * @return the new CompletableFuture
2297     */
2298 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2299     (CompletableFuture<? extends T> other,
2300     Function<? super T, U> fn) {
2301 dl 1.28 return doOrApply(other, fn, ForkJoinPool.commonPool());
2302     }
2303    
2304     /**
2305 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2306     * when either this or the other given CompletableFuture completes,
2307     * with the result of the given function of either this or the other
2308     * CompletableFuture's result, called from a task running in the
2309     * given executor.
2310     *
2311     * <p>If this and/or the other CompletableFuture complete exceptionally,
2312     * then the returned CompletableFuture may also do so, with a
2313 dl 1.28 * CompletionException holding one of these exceptions as its cause.
2314 jsr166 1.66 * No guarantees are made about which result or exception is used in
2315     * the returned CompletableFuture.
2316 dl 1.28 *
2317     * @param other the other CompletableFuture
2318     * @param fn the function to use to compute the value of
2319     * the returned CompletableFuture
2320     * @param executor the executor to use for asynchronous execution
2321     * @return the new CompletableFuture
2322     */
2323 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2324     (CompletableFuture<? extends T> other,
2325     Function<? super T, U> fn,
2326     Executor executor) {
2327 dl 1.28 if (executor == null) throw new NullPointerException();
2328     return doOrApply(other, fn, executor);
2329     }
2330    
2331 dl 1.48 private <U> CompletableFuture<U> doOrApply
2332     (CompletableFuture<? extends T> other,
2333     Function<? super T, U> fn,
2334     Executor e) {
2335 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2336 jsr166 1.2 CompletableFuture<U> dst = new CompletableFuture<U>();
2337 dl 1.28 OrApplyCompletion<T,U> d = null;
2338 jsr166 1.2 Object r;
2339     if ((r = result) == null && (r = other.result) == null) {
2340 dl 1.28 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2341 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2342     while ((r = result) == null && (r = other.result) == null) {
2343     if (q != null) {
2344     if (UNSAFE.compareAndSwapObject
2345     (other, COMPLETIONS, q.next = other.completions, q))
2346     break;
2347     }
2348     else if (UNSAFE.compareAndSwapObject
2349     (this, COMPLETIONS, p.next = completions, p))
2350     q = new CompletionNode(d);
2351     }
2352 jsr166 1.2 }
2353     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2354 dl 1.19 T t; Throwable ex;
2355 jsr166 1.2 if (r instanceof AltResult) {
2356 dl 1.19 ex = ((AltResult)r).ex;
2357 jsr166 1.2 t = null;
2358 dl 1.1 }
2359 dl 1.19 else {
2360     ex = null;
2361 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2362     t = tr;
2363 dl 1.19 }
2364 dl 1.20 U u = null;
2365 jsr166 1.2 if (ex == null) {
2366 dl 1.1 try {
2367 dl 1.20 if (e != null)
2368 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
2369 dl 1.1 else
2370 dl 1.20 u = fn.apply(t);
2371 dl 1.1 } catch (Throwable rex) {
2372 dl 1.19 ex = rex;
2373 dl 1.1 }
2374     }
2375 dl 1.20 if (e == null || ex != null)
2376     dst.internalComplete(u, ex);
2377 jsr166 1.2 }
2378 dl 1.20 helpPostComplete();
2379     other.helpPostComplete();
2380 jsr166 1.2 return dst;
2381 dl 1.1 }
2382    
2383 dl 1.28 /**
2384 jsr166 1.66 * Returns a new CompletableFuture that is completed
2385     * when either this or the other given CompletableFuture completes,
2386     * after performing the given action with the result of either this
2387     * or the other CompletableFuture's result.
2388     *
2389     * <p>If this and/or the other CompletableFuture complete exceptionally,
2390 jsr166 1.47 * then the returned CompletableFuture may also do so, with a
2391     * CompletionException holding one of these exceptions as its cause.
2392 jsr166 1.66 * No guarantees are made about which result or exception is used in
2393     * the returned CompletableFuture.
2394 dl 1.28 *
2395     * @param other the other CompletableFuture
2396     * @param block the action to perform before completing the
2397     * returned CompletableFuture
2398     * @return the new CompletableFuture
2399     */
2400 dl 1.48 public CompletableFuture<Void> acceptEither
2401     (CompletableFuture<? extends T> other,
2402     Consumer<? super T> block) {
2403 dl 1.28 return doOrAccept(other, block, null);
2404     }
2405    
2406     /**
2407 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2408     * when either this or the other given CompletableFuture completes,
2409     * after performing the given action with the result of either this
2410     * or the other CompletableFuture's result from a task running in
2411     * the {@link ForkJoinPool#commonPool()}.
2412     *
2413     * <p>If this and/or the other CompletableFuture complete exceptionally,
2414 jsr166 1.47 * then the returned CompletableFuture may also do so, with a
2415     * CompletionException holding one of these exceptions as its cause.
2416 jsr166 1.66 * No guarantees are made about which result or exception is used in
2417     * the returned CompletableFuture.
2418 dl 1.28 *
2419     * @param other the other CompletableFuture
2420     * @param block the action to perform before completing the
2421     * returned CompletableFuture
2422     * @return the new CompletableFuture
2423     */
2424 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2425     (CompletableFuture<? extends T> other,
2426     Consumer<? super T> block) {
2427 dl 1.28 return doOrAccept(other, block, ForkJoinPool.commonPool());
2428     }
2429    
2430     /**
2431 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2432     * when either this or the other given CompletableFuture completes,
2433     * after performing the given action with the result of either this
2434     * or the other CompletableFuture's result from a task running in
2435     * the given executor.
2436     *
2437     * <p>If this and/or the other CompletableFuture complete exceptionally,
2438 jsr166 1.47 * then the returned CompletableFuture may also do so, with a
2439     * CompletionException holding one of these exceptions as its cause.
2440 jsr166 1.66 * No guarantees are made about which result or exception is used in
2441     * the returned CompletableFuture.
2442 dl 1.28 *
2443     * @param other the other CompletableFuture
2444     * @param block the action to perform before completing the
2445     * returned CompletableFuture
2446     * @param executor the executor to use for asynchronous execution
2447     * @return the new CompletableFuture
2448     */
2449 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2450     (CompletableFuture<? extends T> other,
2451     Consumer<? super T> block,
2452     Executor executor) {
2453 dl 1.28 if (executor == null) throw new NullPointerException();
2454     return doOrAccept(other, block, executor);
2455     }
2456    
2457 dl 1.48 private CompletableFuture<Void> doOrAccept
2458     (CompletableFuture<? extends T> other,
2459     Consumer<? super T> fn,
2460     Executor e) {
2461 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2462     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2463 dl 1.28 OrAcceptCompletion<T> d = null;
2464 dl 1.7 Object r;
2465     if ((r = result) == null && (r = other.result) == null) {
2466 dl 1.28 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2467 dl 1.7 CompletionNode q = null, p = new CompletionNode(d);
2468     while ((r = result) == null && (r = other.result) == null) {
2469     if (q != null) {
2470     if (UNSAFE.compareAndSwapObject
2471     (other, COMPLETIONS, q.next = other.completions, q))
2472     break;
2473     }
2474     else if (UNSAFE.compareAndSwapObject
2475     (this, COMPLETIONS, p.next = completions, p))
2476     q = new CompletionNode(d);
2477     }
2478     }
2479     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2480 dl 1.19 T t; Throwable ex;
2481 dl 1.7 if (r instanceof AltResult) {
2482 dl 1.19 ex = ((AltResult)r).ex;
2483 dl 1.7 t = null;
2484     }
2485 dl 1.19 else {
2486     ex = null;
2487 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2488     t = tr;
2489 dl 1.19 }
2490 dl 1.7 if (ex == null) {
2491     try {
2492 dl 1.20 if (e != null)
2493 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
2494 dl 1.20 else
2495 dl 1.7 fn.accept(t);
2496     } catch (Throwable rex) {
2497 dl 1.19 ex = rex;
2498 dl 1.7 }
2499     }
2500 dl 1.20 if (e == null || ex != null)
2501     dst.internalComplete(null, ex);
2502 dl 1.7 }
2503 dl 1.20 helpPostComplete();
2504     other.helpPostComplete();
2505 dl 1.7 return dst;
2506     }
2507    
2508 dl 1.28 /**
2509 jsr166 1.60 * Returns a new CompletableFuture that is completed
2510 jsr166 1.66 * when either this or the other given CompletableFuture completes,
2511     * after performing the given action.
2512     *
2513     * <p>If this and/or the other CompletableFuture complete exceptionally,
2514 dl 1.28 * then the returned CompletableFuture may also do so, with a
2515     * CompletionException holding one of these exceptions as its cause.
2516 jsr166 1.66 * No guarantees are made about which result or exception is used in
2517     * the returned CompletableFuture.
2518 dl 1.28 *
2519     * @param other the other CompletableFuture
2520     * @param action the action to perform before completing the
2521     * returned CompletableFuture
2522     * @return the new CompletableFuture
2523     */
2524     public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2525     Runnable action) {
2526     return doOrRun(other, action, null);
2527     }
2528    
2529     /**
2530 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2531     * when either this or the other given CompletableFuture completes,
2532     * after performing the given action from a task running in the
2533     * {@link ForkJoinPool#commonPool()}.
2534     *
2535     * <p>If this and/or the other CompletableFuture complete exceptionally,
2536 dl 1.28 * then the returned CompletableFuture may also do so, with a
2537     * CompletionException holding one of these exceptions as its cause.
2538 jsr166 1.66 * No guarantees are made about which result or exception is used in
2539     * the returned CompletableFuture.
2540 dl 1.28 *
2541     * @param other the other CompletableFuture
2542     * @param action the action to perform before completing the
2543     * returned CompletableFuture
2544     * @return the new CompletableFuture
2545     */
2546 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2547     (CompletableFuture<?> other,
2548     Runnable action) {
2549 dl 1.28 return doOrRun(other, action, ForkJoinPool.commonPool());
2550     }
2551    
2552     /**
2553 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2554     * when either this or the other given CompletableFuture completes,
2555     * after performing the given action from a task running in the
2556     * given executor.
2557     *
2558     * <p>If this and/or the other CompletableFuture complete exceptionally,
2559 jsr166 1.47 * then the returned CompletableFuture may also do so, with a
2560     * CompletionException holding one of these exceptions as its cause.
2561 jsr166 1.66 * No guarantees are made about which result or exception is used in
2562     * the returned CompletableFuture.
2563 dl 1.28 *
2564     * @param other the other CompletableFuture
2565     * @param action the action to perform before completing the
2566     * returned CompletableFuture
2567     * @param executor the executor to use for asynchronous execution
2568     * @return the new CompletableFuture
2569     */
2570 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2571     (CompletableFuture<?> other,
2572     Runnable action,
2573     Executor executor) {
2574 dl 1.28 if (executor == null) throw new NullPointerException();
2575     return doOrRun(other, action, executor);
2576     }
2577    
2578     private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2579     Runnable action,
2580     Executor e) {
2581 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2582 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2583 dl 1.28 OrRunCompletion<T> d = null;
2584 jsr166 1.2 Object r;
2585     if ((r = result) == null && (r = other.result) == null) {
2586 dl 1.28 d = new OrRunCompletion<T>(this, other, action, dst, e);
2587 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2588     while ((r = result) == null && (r = other.result) == null) {
2589     if (q != null) {
2590     if (UNSAFE.compareAndSwapObject
2591     (other, COMPLETIONS, q.next = other.completions, q))
2592     break;
2593     }
2594     else if (UNSAFE.compareAndSwapObject
2595     (this, COMPLETIONS, p.next = completions, p))
2596     q = new CompletionNode(d);
2597     }
2598 jsr166 1.2 }
2599     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2600 dl 1.19 Throwable ex;
2601     if (r instanceof AltResult)
2602     ex = ((AltResult)r).ex;
2603     else
2604     ex = null;
2605     if (ex == null) {
2606 dl 1.1 try {
2607 dl 1.20 if (e != null)
2608 dl 1.28 e.execute(new AsyncRun(action, dst));
2609 dl 1.20 else
2610 dl 1.1 action.run();
2611     } catch (Throwable rex) {
2612 dl 1.19 ex = rex;
2613 dl 1.1 }
2614     }
2615 dl 1.20 if (e == null || ex != null)
2616     dst.internalComplete(null, ex);
2617 jsr166 1.2 }
2618 dl 1.20 helpPostComplete();
2619     other.helpPostComplete();
2620 jsr166 1.2 return dst;
2621 dl 1.1 }
2622    
2623 dl 1.28 /**
2624 dl 1.68 * Returns a CompletableFuture that upon completion, has the same
2625     * value as produced by the given function of the result of this
2626     * CompletableFuture.
2627 jsr166 1.66 *
2628 dl 1.68 * <p>If this CompletableFuture completes exceptionally, then the
2629     * returned CompletableFuture also does so, with a
2630 dl 1.28 * CompletionException holding this exception as its cause.
2631 dl 1.68 * Similarly, if the computed CompletableFuture completes
2632     * exceptionally, then so does the returned CompletableFuture.
2633 dl 1.28 *
2634 jsr166 1.39 * @param fn the function returning a new CompletableFuture
2635 dl 1.68 * @return the CompletableFuture
2636 dl 1.28 */
2637 dl 1.48 public <U> CompletableFuture<U> thenCompose
2638     (Function<? super T, CompletableFuture<U>> fn) {
2639 dl 1.37 return doCompose(fn, null);
2640     }
2641    
2642     /**
2643 dl 1.68 * Returns a CompletableFuture that upon completion, has the same
2644     * value as that produced asynchronously using the {@link
2645     * ForkJoinPool#commonPool()} by the given function of the result
2646     * of this CompletableFuture.
2647 jsr166 1.66 *
2648 dl 1.68 * <p>If this CompletableFuture completes exceptionally, then the
2649     * returned CompletableFuture also does so, with a
2650 dl 1.37 * CompletionException holding this exception as its cause.
2651 dl 1.68 * Similarly, if the computed CompletableFuture completes
2652     * exceptionally, then so does the returned CompletableFuture.
2653 dl 1.37 *
2654 jsr166 1.39 * @param fn the function returning a new CompletableFuture
2655 dl 1.68 * @return the CompletableFuture
2656 dl 1.37 */
2657 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2658     (Function<? super T, CompletableFuture<U>> fn) {
2659 dl 1.37 return doCompose(fn, ForkJoinPool.commonPool());
2660     }
2661    
2662     /**
2663 dl 1.68 * Returns a CompletableFuture that upon completion, has the same
2664     * value as that produced asynchronously using the given executor
2665     * by the given function of this CompletableFuture.
2666 jsr166 1.66 *
2667 dl 1.68 * <p>If this CompletableFuture completes exceptionally, then the
2668     * returned CompletableFuture also does so, with a
2669 dl 1.37 * CompletionException holding this exception as its cause.
2670 dl 1.68 * Similarly, if the computed CompletableFuture completes
2671     * exceptionally, then so does the returned CompletableFuture.
2672 dl 1.37 *
2673 jsr166 1.39 * @param fn the function returning a new CompletableFuture
2674 dl 1.37 * @param executor the executor to use for asynchronous execution
2675     * @return the CompletableFuture, that {@code isDone()} upon
2676     * return if completed by the given function, or an exception
2677 jsr166 1.39 * occurs
2678 dl 1.37 */
2679 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2680     (Function<? super T, CompletableFuture<U>> fn,
2681     Executor executor) {
2682 dl 1.37 if (executor == null) throw new NullPointerException();
2683     return doCompose(fn, executor);
2684     }
2685    
2686 dl 1.48 private <U> CompletableFuture<U> doCompose
2687     (Function<? super T, CompletableFuture<U>> fn,
2688     Executor e) {
2689 dl 1.28 if (fn == null) throw new NullPointerException();
2690     CompletableFuture<U> dst = null;
2691     ComposeCompletion<T,U> d = null;
2692     Object r;
2693     if ((r = result) == null) {
2694     dst = new CompletableFuture<U>();
2695     CompletionNode p = new CompletionNode
2696 dl 1.37 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2697 dl 1.28 while ((r = result) == null) {
2698     if (UNSAFE.compareAndSwapObject
2699     (this, COMPLETIONS, p.next = completions, p))
2700     break;
2701     }
2702     }
2703     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2704     T t; Throwable ex;
2705     if (r instanceof AltResult) {
2706     ex = ((AltResult)r).ex;
2707     t = null;
2708     }
2709     else {
2710     ex = null;
2711     @SuppressWarnings("unchecked") T tr = (T) r;
2712     t = tr;
2713     }
2714     if (ex == null) {
2715 dl 1.37 if (e != null) {
2716     if (dst == null)
2717     dst = new CompletableFuture<U>();
2718     e.execute(new AsyncCompose<T,U>(t, fn, dst));
2719     }
2720     else {
2721     try {
2722 jsr166 1.61 if ((dst = fn.apply(t)) == null)
2723     ex = new NullPointerException();
2724 dl 1.37 } catch (Throwable rex) {
2725     ex = rex;
2726     }
2727 jsr166 1.61 if (dst == null)
2728 dl 1.37 dst = new CompletableFuture<U>();
2729 dl 1.28 }
2730     }
2731 dl 1.37 if (e == null && ex != null)
2732 dl 1.28 dst.internalComplete(null, ex);
2733     }
2734     helpPostComplete();
2735     dst.helpPostComplete();
2736     return dst;
2737     }
2738    
2739     /**
2740 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
2741     * CompletableFuture completes, with the result of the given
2742     * function of the exception triggering this CompletableFuture's
2743     * completion when it completes exceptionally; otherwise, if this
2744     * CompletableFuture completes normally, then the returned
2745     * CompletableFuture also completes normally with the same value.
2746 dl 1.28 *
2747     * @param fn the function to use to compute the value of the
2748     * returned CompletableFuture if this CompletableFuture completed
2749     * exceptionally
2750     * @return the new CompletableFuture
2751     */
2752 dl 1.48 public CompletableFuture<T> exceptionally
2753     (Function<Throwable, ? extends T> fn) {
2754 dl 1.28 if (fn == null) throw new NullPointerException();
2755     CompletableFuture<T> dst = new CompletableFuture<T>();
2756     ExceptionCompletion<T> d = null;
2757     Object r;
2758     if ((r = result) == null) {
2759     CompletionNode p =
2760     new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2761     while ((r = result) == null) {
2762     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2763     p.next = completions, p))
2764     break;
2765     }
2766     }
2767     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2768     T t = null; Throwable ex, dx = null;
2769     if (r instanceof AltResult) {
2770     if ((ex = ((AltResult)r).ex) != null) {
2771     try {
2772     t = fn.apply(ex);
2773     } catch (Throwable rex) {
2774     dx = rex;
2775     }
2776     }
2777     }
2778     else {
2779     @SuppressWarnings("unchecked") T tr = (T) r;
2780     t = tr;
2781     }
2782     dst.internalComplete(t, dx);
2783     }
2784     helpPostComplete();
2785     return dst;
2786     }
2787    
2788     /**
2789 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
2790     * CompletableFuture completes, with the result of the given
2791     * function of the result and exception of this CompletableFuture's
2792     * completion. The given function is invoked with the result (or
2793     * {@code null} if none) and the exception (or {@code null} if none)
2794     * of this CompletableFuture when complete.
2795 dl 1.28 *
2796     * @param fn the function to use to compute the value of the
2797     * returned CompletableFuture
2798     * @return the new CompletableFuture
2799     */
2800 dl 1.48 public <U> CompletableFuture<U> handle
2801     (BiFunction<? super T, Throwable, ? extends U> fn) {
2802 dl 1.28 if (fn == null) throw new NullPointerException();
2803     CompletableFuture<U> dst = new CompletableFuture<U>();
2804     HandleCompletion<T,U> d = null;
2805     Object r;
2806     if ((r = result) == null) {
2807     CompletionNode p =
2808     new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2809     while ((r = result) == null) {
2810     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2811     p.next = completions, p))
2812     break;
2813     }
2814     }
2815     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2816     T t; Throwable ex;
2817     if (r instanceof AltResult) {
2818     ex = ((AltResult)r).ex;
2819     t = null;
2820     }
2821     else {
2822     ex = null;
2823     @SuppressWarnings("unchecked") T tr = (T) r;
2824     t = tr;
2825     }
2826     U u; Throwable dx;
2827     try {
2828     u = fn.apply(t, ex);
2829     dx = null;
2830     } catch (Throwable rex) {
2831     dx = rex;
2832     u = null;
2833     }
2834     dst.internalComplete(u, dx);
2835     }
2836     helpPostComplete();
2837     return dst;
2838     }
2839    
2840 dl 1.35
2841     /* ------------- Arbitrary-arity constructions -------------- */
2842    
2843     /*
2844     * The basic plan of attack is to recursively form binary
2845     * completion trees of elements. This can be overkill for small
2846     * sets, but scales nicely. The And/All vs Or/Any forms use the
2847     * same idea, but details differ.
2848     */
2849    
2850     /**
2851     * Returns a new CompletableFuture that is completed when all of
2852 jsr166 1.66 * the given CompletableFutures complete. If any of the given
2853 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
2854     * CompletableFuture also does so, with a CompletionException
2855     * holding this exception as its cause. Otherwise, the results,
2856     * if any, of the given CompletableFutures are not reflected in
2857     * the returned CompletableFuture, but may be obtained by
2858     * inspecting them individually. If no CompletableFutures are
2859     * provided, returns a CompletableFuture completed with the value
2860     * {@code null}.
2861 dl 1.35 *
2862     * <p>Among the applications of this method is to await completion
2863     * of a set of independent CompletableFutures before continuing a
2864     * program, as in: {@code CompletableFuture.allOf(c1, c2,
2865     * c3).join();}.
2866     *
2867     * @param cfs the CompletableFutures
2868 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
2869 dl 1.35 * given CompletableFutures complete
2870     * @throws NullPointerException if the array or any of its elements are
2871     * {@code null}
2872     */
2873     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2874     int len = cfs.length; // Directly handle empty and singleton cases
2875     if (len > 1)
2876     return allTree(cfs, 0, len - 1);
2877     else {
2878     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2879     CompletableFuture<?> f;
2880     if (len == 0)
2881     dst.result = NIL;
2882     else if ((f = cfs[0]) == null)
2883     throw new NullPointerException();
2884     else {
2885     ThenCopy d = null;
2886     CompletionNode p = null;
2887     Object r;
2888     while ((r = f.result) == null) {
2889     if (d == null)
2890     d = new ThenCopy(f, dst);
2891     else if (p == null)
2892     p = new CompletionNode(d);
2893     else if (UNSAFE.compareAndSwapObject
2894     (f, COMPLETIONS, p.next = f.completions, p))
2895     break;
2896     }
2897     if (r != null && (d == null || d.compareAndSet(0, 1)))
2898     dst.internalComplete(null, (r instanceof AltResult) ?
2899     ((AltResult)r).ex : null);
2900     f.helpPostComplete();
2901     }
2902     return dst;
2903     }
2904     }
2905    
2906     /**
2907     * Recursively constructs an And'ed tree of CompletableFutures.
2908     * Called only when array known to have at least two elements.
2909     */
2910     private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2911     int lo, int hi) {
2912     CompletableFuture<?> fst, snd;
2913     int mid = (lo + hi) >>> 1;
2914     if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2915     (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2916     throw new NullPointerException();
2917     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2918     AndCompletion d = null;
2919     CompletionNode p = null, q = null;
2920     Object r = null, s = null;
2921     while ((r = fst.result) == null || (s = snd.result) == null) {
2922     if (d == null)
2923     d = new AndCompletion(fst, snd, dst);
2924     else if (p == null)
2925     p = new CompletionNode(d);
2926     else if (q == null) {
2927     if (UNSAFE.compareAndSwapObject
2928     (fst, COMPLETIONS, p.next = fst.completions, p))
2929     q = new CompletionNode(d);
2930     }
2931     else if (UNSAFE.compareAndSwapObject
2932     (snd, COMPLETIONS, q.next = snd.completions, q))
2933     break;
2934     }
2935     if ((r != null || (r = fst.result) != null) &&
2936     (s != null || (s = snd.result) != null) &&
2937     (d == null || d.compareAndSet(0, 1))) {
2938     Throwable ex;
2939     if (r instanceof AltResult)
2940     ex = ((AltResult)r).ex;
2941     else
2942     ex = null;
2943     if (ex == null && (s instanceof AltResult))
2944     ex = ((AltResult)s).ex;
2945     dst.internalComplete(null, ex);
2946     }
2947     fst.helpPostComplete();
2948     snd.helpPostComplete();
2949     return dst;
2950     }
2951    
2952     /**
2953 jsr166 1.69 * Returns a new CompletableFuture that is completed when any of the
2954     * given CompletableFutures complete, with the same result if it
2955     * completed normally. Otherwise, if it completed exceptionally,
2956     * the returned CompletableFuture also does so, with a
2957     * CompletionException holding this exception as its cause. If no
2958 jsr166 1.66 * CompletableFutures are provided, returns an incomplete
2959     * CompletableFuture.
2960 dl 1.35 *
2961     * @param cfs the CompletableFutures
2962 jsr166 1.59 * @return a new CompletableFuture that is completed when any of the
2963 dl 1.35 * given CompletableFutures complete
2964     * @throws NullPointerException if the array or any of its elements are
2965     * {@code null}
2966     */
2967     public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2968     int len = cfs.length; // Same idea as allOf
2969     if (len > 1)
2970     return anyTree(cfs, 0, len - 1);
2971     else {
2972     CompletableFuture<?> dst = new CompletableFuture<Object>();
2973     CompletableFuture<?> f;
2974     if (len == 0)
2975 dl 1.48 ; // skip
2976 dl 1.35 else if ((f = cfs[0]) == null)
2977     throw new NullPointerException();
2978     else {
2979     ThenCopy d = null;
2980     CompletionNode p = null;
2981     Object r;
2982     while ((r = f.result) == null) {
2983     if (d == null)
2984     d = new ThenCopy(f, dst);
2985     else if (p == null)
2986     p = new CompletionNode(d);
2987     else if (UNSAFE.compareAndSwapObject
2988     (f, COMPLETIONS, p.next = f.completions, p))
2989     break;
2990     }
2991     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2992     Throwable ex; Object t;
2993     if (r instanceof AltResult) {
2994     ex = ((AltResult)r).ex;
2995     t = null;
2996     }
2997     else {
2998     ex = null;
2999     t = r;
3000     }
3001     dst.internalComplete(t, ex);
3002     }
3003     f.helpPostComplete();
3004     }
3005     return dst;
3006     }
3007     }
3008    
3009     /**
3010 jsr166 1.44 * Recursively constructs an Or'ed tree of CompletableFutures.
3011 dl 1.35 */
3012     private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
3013     int lo, int hi) {
3014     CompletableFuture<?> fst, snd;
3015     int mid = (lo + hi) >>> 1;
3016     if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3017     (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3018     throw new NullPointerException();
3019     CompletableFuture<?> dst = new CompletableFuture<Object>();
3020     OrCompletion d = null;
3021     CompletionNode p = null, q = null;
3022     Object r;
3023     while ((r = fst.result) == null && (r = snd.result) == null) {
3024     if (d == null)
3025     d = new OrCompletion(fst, snd, dst);
3026     else if (p == null)
3027     p = new CompletionNode(d);
3028     else if (q == null) {
3029     if (UNSAFE.compareAndSwapObject
3030     (fst, COMPLETIONS, p.next = fst.completions, p))
3031     q = new CompletionNode(d);
3032     }
3033     else if (UNSAFE.compareAndSwapObject
3034     (snd, COMPLETIONS, q.next = snd.completions, q))
3035     break;
3036     }
3037     if ((r != null || (r = fst.result) != null ||
3038     (r = snd.result) != null) &&
3039     (d == null || d.compareAndSet(0, 1))) {
3040     Throwable ex; Object t;
3041     if (r instanceof AltResult) {
3042     ex = ((AltResult)r).ex;
3043     t = null;
3044     }
3045     else {
3046     ex = null;
3047     t = r;
3048     }
3049     dst.internalComplete(t, ex);
3050     }
3051     fst.helpPostComplete();
3052     snd.helpPostComplete();
3053     return dst;
3054     }
3055    
3056    
3057     /* ------------- Control and status methods -------------- */
3058    
3059 dl 1.28 /**
3060 dl 1.37 * If not already completed, completes this CompletableFuture with
3061     * a {@link CancellationException}. Dependent CompletableFutures
3062     * that have not already completed will also complete
3063     * exceptionally, with a {@link CompletionException} caused by
3064     * this {@code CancellationException}.
3065 dl 1.28 *
3066     * @param mayInterruptIfRunning this value has no effect in this
3067     * implementation because interrupts are not used to control
3068     * processing.
3069     *
3070     * @return {@code true} if this task is now cancelled
3071     */
3072     public boolean cancel(boolean mayInterruptIfRunning) {
3073 dl 1.46 boolean cancelled = (result == null) &&
3074     UNSAFE.compareAndSwapObject
3075     (this, RESULT, null, new AltResult(new CancellationException()));
3076     postComplete();
3077 dl 1.48 return cancelled || isCancelled();
3078 dl 1.28 }
3079    
3080     /**
3081     * Returns {@code true} if this CompletableFuture was cancelled
3082     * before it completed normally.
3083     *
3084     * @return {@code true} if this CompletableFuture was cancelled
3085     * before it completed normally
3086     */
3087     public boolean isCancelled() {
3088     Object r;
3089 jsr166 1.43 return ((r = result) instanceof AltResult) &&
3090     (((AltResult)r).ex instanceof CancellationException);
3091 dl 1.28 }
3092    
3093     /**
3094     * Forcibly sets or resets the value subsequently returned by
3095 jsr166 1.42 * method {@link #get()} and related methods, whether or not
3096     * already completed. This method is designed for use only in
3097     * error recovery actions, and even in such situations may result
3098     * in ongoing dependent completions using established versus
3099 dl 1.30 * overwritten outcomes.
3100 dl 1.28 *
3101     * @param value the completion value
3102     */
3103     public void obtrudeValue(T value) {
3104     result = (value == null) ? NIL : value;
3105     postComplete();
3106     }
3107    
3108 dl 1.30 /**
3109 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
3110     * and related methods to throw the given exception, whether or
3111     * not already completed. This method is designed for use only in
3112 dl 1.30 * recovery actions, and even in such situations may result in
3113     * ongoing dependent completions using established versus
3114     * overwritten outcomes.
3115     *
3116     * @param ex the exception
3117     */
3118     public void obtrudeException(Throwable ex) {
3119     if (ex == null) throw new NullPointerException();
3120     result = new AltResult(ex);
3121     postComplete();
3122     }
3123    
3124 dl 1.35 /**
3125     * Returns the estimated number of CompletableFutures whose
3126     * completions are awaiting completion of this CompletableFuture.
3127     * This method is designed for use in monitoring system state, not
3128     * for synchronization control.
3129     *
3130     * @return the number of dependent CompletableFutures
3131     */
3132     public int getNumberOfDependents() {
3133     int count = 0;
3134     for (CompletionNode p = completions; p != null; p = p.next)
3135     ++count;
3136     return count;
3137     }
3138    
3139     /**
3140     * Returns a string identifying this CompletableFuture, as well as
3141 jsr166 1.40 * its completion state. The state, in brackets, contains the
3142 dl 1.35 * String {@code "Completed Normally"} or the String {@code
3143     * "Completed Exceptionally"}, or the String {@code "Not
3144     * completed"} followed by the number of CompletableFutures
3145     * dependent upon its completion, if any.
3146     *
3147     * @return a string identifying this CompletableFuture, as well as its state
3148     */
3149     public String toString() {
3150     Object r = result;
3151 jsr166 1.40 int count;
3152     return super.toString() +
3153     ((r == null) ?
3154     (((count = getNumberOfDependents()) == 0) ?
3155     "[Not completed]" :
3156     "[Not completed, " + count + " dependents]") :
3157     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3158     "[Completed exceptionally]" :
3159     "[Completed normally]"));
3160 dl 1.35 }
3161    
3162 dl 1.1 // Unsafe mechanics
3163     private static final sun.misc.Unsafe UNSAFE;
3164     private static final long RESULT;
3165     private static final long WAITERS;
3166     private static final long COMPLETIONS;
3167     static {
3168     try {
3169     UNSAFE = sun.misc.Unsafe.getUnsafe();
3170     Class<?> k = CompletableFuture.class;
3171     RESULT = UNSAFE.objectFieldOffset
3172     (k.getDeclaredField("result"));
3173     WAITERS = UNSAFE.objectFieldOffset
3174     (k.getDeclaredField("waiters"));
3175     COMPLETIONS = UNSAFE.objectFieldOffset
3176     (k.getDeclaredField("completions"));
3177     } catch (Exception e) {
3178     throw new Error(e);
3179     }
3180     }
3181     }