ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.96
Committed: Wed Jul 24 15:25:27 2013 UTC (10 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.95: +48 -35 lines
Log Message:
Incorporate review suggeastions; cope with disabled commonPool

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.88 import java.util.concurrent.CompletionException;
23     import java.util.concurrent.CompletionStage;
24 dl 1.1 import java.util.concurrent.atomic.AtomicInteger;
25     import java.util.concurrent.locks.LockSupport;
26    
27     /**
28     * A {@link Future} that may be explicitly completed (setting its
29 dl 1.88 * value and status), and may be used as a {@link CompletionStage},
30     * supporting dependent functions and actions that trigger upon its
31     * completion.
32 dl 1.1 *
33 jsr166 1.50 * <p>When two or more threads attempt to
34     * {@link #complete complete},
35 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
36 jsr166 1.50 * {@link #cancel cancel}
37     * a CompletableFuture, only one of them succeeds.
38 dl 1.19 *
39 dl 1.91 * <p>In addition to these and related methods for directly
40     * manipulating status and results, CompletableFuture implements
41     * interface {@link CompletionStage} with the following policies: <ul>
42 dl 1.35 *
43 dl 1.88 * <li>Actions supplied for dependent completions of
44     * <em>non-async</em> methods may be performed by the thread that
45     * completes the current CompletableFuture, or by any other caller of
46 dl 1.96 * a completion method.</li>
47 jsr166 1.65 *
48 dl 1.88 * <li>All <em>async</em> methods without an explicit Executor
49 dl 1.96 * argument are performed using the {@link ForkJoinPool#commonPool()}
50     * (unless it does not support a parallelism level of at least two, in
51     * which case, a new Thread is used). To simplify monitoring,
52     * debugging, and tracking, all generated asynchronous tasks are
53     * instances of the marker interface {@link
54     * AsynchronousCompletionTask}. </li>
55 dl 1.35 *
56 dl 1.88 * <li>All CompletionStage methods are implemented independently of
57     * other public methods, so the behavior of one method is not impacted
58     * by overrides of others in subclasses. </li> </ul>
59     *
60 dl 1.91 * <p>CompletableFuture also implements {@link Future} with the following
61 dl 1.88 * policies: <ul>
62     *
63     * <li>Since (unlike {@link FutureTask}) this class has no direct
64 jsr166 1.55 * control over the computation that causes it to be completed,
65 dl 1.88 * cancellation is treated as just another form of exceptional
66     * completion. Method {@link #cancel cancel} has the same effect as
67     * {@code completeExceptionally(new CancellationException())}. Method
68     * {@link #isCompletedExceptionally} can be used to determine if a
69     * CompletableFuture completed in any exceptional fashion.</li>
70 jsr166 1.55 *
71 dl 1.88 * <li>In case of exceptional completion with a CompletionException,
72 jsr166 1.55 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
73     * {@link ExecutionException} with the same cause as held in the
74 dl 1.88 * corresponding CompletionException. To simplify usage in most
75     * contexts, this class also defines methods {@link #join()} and
76     * {@link #getNow} that instead throw the CompletionException directly
77     * in these cases.</li> </ul>
78 jsr166 1.80 *
79 dl 1.1 * @author Doug Lea
80     * @since 1.8
81     */
82 dl 1.88 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
83 dl 1.28
84 dl 1.1 /*
85 dl 1.20 * Overview:
86 dl 1.1 *
87 jsr166 1.32 * 1. Non-nullness of field result (set via CAS) indicates done.
88     * An AltResult is used to box null as a result, as well as to
89     * hold exceptions. Using a single field makes completion fast
90 dl 1.20 * and simple to detect and trigger, at the expense of a lot of
91     * encoding and decoding that infiltrates many methods. One minor
92     * simplification relies on the (static) NIL (to box null results)
93     * being the only AltResult with a null exception field, so we
94 dl 1.28 * don't usually need explicit comparisons with NIL. The CF
95     * exception propagation mechanics surrounding decoding rely on
96     * unchecked casts of decoded results really being unchecked,
97     * where user type errors are caught at point of use, as is
98     * currently the case in Java. These are highlighted by using
99     * SuppressWarnings-annotated temporaries.
100 dl 1.1 *
101     * 2. Waiters are held in a Treiber stack similar to the one used
102 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
103 dl 1.28 * internal documentation for algorithmic details.
104 dl 1.1 *
105     * 3. Completions are also kept in a list/stack, and pulled off
106 dl 1.28 * and run when completion is triggered. (We could even use the
107 jsr166 1.24 * same stack as for waiters, but would give up the potential
108 dl 1.20 * parallelism obtained because woken waiters help release/run
109 dl 1.28 * others -- see method postComplete). Because post-processing
110     * may race with direct calls, class Completion opportunistically
111     * extends AtomicInteger so callers can claim the action via
112     * compareAndSet(0, 1). The Completion.run methods are all
113     * written a boringly similar uniform way (that sometimes includes
114 jsr166 1.55 * unnecessary-looking checks, kept to maintain uniformity).
115     * There are enough dimensions upon which they differ that
116     * attempts to factor commonalities while maintaining efficiency
117     * require more lines of code than they would save.
118 dl 1.20 *
119     * 4. The exported then/and/or methods do support a bit of
120 dl 1.28 * factoring (see doThenApply etc). They must cope with the
121     * intrinsic races surrounding addition of a dependent action
122     * versus performing the action directly because the task is
123     * already complete. For example, a CF may not be complete upon
124     * entry, so a dependent completion is added, but by the time it
125     * is added, the target CF is complete, so must be directly
126 dl 1.20 * executed. This is all done while avoiding unnecessary object
127     * construction in safe-bypass cases.
128 dl 1.1 */
129    
130 dl 1.28 // preliminaries
131 dl 1.20
132 dl 1.1 static final class AltResult {
133     final Throwable ex; // null only for NIL
134 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
135 dl 1.1 }
136    
137     static final AltResult NIL = new AltResult(null);
138    
139 dl 1.20 // Fields
140    
141     volatile Object result; // Either the result or boxed AltResult
142 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
143     volatile CompletionNode completions; // list (Treiber stack) of completions
144    
145 dl 1.20 // Basic utilities for triggering and processing completions
146    
147     /**
148     * Removes and signals all waiting threads and runs all completions.
149     */
150     final void postComplete() {
151     WaitNode q; Thread t;
152     while ((q = waiters) != null) {
153     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
154     (t = q.thread) != null) {
155     q.thread = null;
156     LockSupport.unpark(t);
157     }
158     }
159    
160     CompletionNode h; Completion c;
161     while ((h = completions) != null) {
162     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
163     (c = h.completion) != null)
164     c.run();
165     }
166     }
167    
168     /**
169     * Triggers completion with the encoding of the given arguments:
170     * if the exception is non-null, encodes it as a wrapped
171     * CompletionException unless it is one already. Otherwise uses
172     * the given result, boxed as NIL if null.
173     */
174 dl 1.75 final void internalComplete(T v, Throwable ex) {
175 dl 1.20 if (result == null)
176     UNSAFE.compareAndSwapObject
177     (this, RESULT, null,
178     (ex == null) ? (v == null) ? NIL : v :
179     new AltResult((ex instanceof CompletionException) ? ex :
180     new CompletionException(ex)));
181     postComplete(); // help out even if not triggered
182     }
183    
184     /**
185 jsr166 1.25 * If triggered, helps release and/or process completions.
186 dl 1.20 */
187     final void helpPostComplete() {
188     if (result != null)
189     postComplete();
190     }
191    
192 dl 1.28 /* ------------- waiting for completions -------------- */
193 dl 1.20
194 dl 1.35 /** Number of processors, for spin control */
195     static final int NCPU = Runtime.getRuntime().availableProcessors();
196    
197 dl 1.1 /**
198 dl 1.28 * Heuristic spin value for waitingGet() before blocking on
199     * multiprocessors
200 dl 1.1 */
201 dl 1.35 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
202 dl 1.1
203     /**
204 dl 1.28 * Linked nodes to record waiting threads in a Treiber stack. See
205     * other classes such as Phaser and SynchronousQueue for more
206     * detailed explanation. This class implements ManagedBlocker to
207     * avoid starvation when blocking actions pile up in
208     * ForkJoinPools.
209     */
210     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
211     long nanos; // wait time if timed
212     final long deadline; // non-zero if timed
213     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
214     volatile Thread thread;
215     volatile WaitNode next;
216     WaitNode(boolean interruptible, long nanos, long deadline) {
217     this.thread = Thread.currentThread();
218     this.interruptControl = interruptible ? 1 : 0;
219     this.nanos = nanos;
220     this.deadline = deadline;
221     }
222     public boolean isReleasable() {
223     if (thread == null)
224     return true;
225     if (Thread.interrupted()) {
226     int i = interruptControl;
227     interruptControl = -1;
228     if (i > 0)
229     return true;
230     }
231     if (deadline != 0L &&
232     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
233     thread = null;
234     return true;
235     }
236     return false;
237     }
238     public boolean block() {
239     if (isReleasable())
240     return true;
241     else if (deadline == 0L)
242     LockSupport.park(this);
243     else if (nanos > 0L)
244     LockSupport.parkNanos(this, nanos);
245     return isReleasable();
246     }
247 dl 1.1 }
248    
249     /**
250 dl 1.28 * Returns raw result after waiting, or null if interruptible and
251     * interrupted.
252 dl 1.1 */
253 dl 1.28 private Object waitingGet(boolean interruptible) {
254     WaitNode q = null;
255     boolean queued = false;
256 dl 1.35 int spins = SPINS;
257 dl 1.28 for (Object r;;) {
258     if ((r = result) != null) {
259     if (q != null) { // suppress unpark
260     q.thread = null;
261     if (q.interruptControl < 0) {
262     if (interruptible) {
263     removeWaiter(q);
264     return null;
265     }
266     Thread.currentThread().interrupt();
267     }
268     }
269     postComplete(); // help release others
270     return r;
271     }
272     else if (spins > 0) {
273 dl 1.35 int rnd = ThreadLocalRandom.nextSecondarySeed();
274     if (rnd == 0)
275     rnd = ThreadLocalRandom.current().nextInt();
276     if (rnd >= 0)
277 dl 1.28 --spins;
278     }
279     else if (q == null)
280     q = new WaitNode(interruptible, 0L, 0L);
281     else if (!queued)
282     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
283     q.next = waiters, q);
284     else if (interruptible && q.interruptControl < 0) {
285     removeWaiter(q);
286     return null;
287     }
288     else if (q.thread != null && result == null) {
289     try {
290     ForkJoinPool.managedBlock(q);
291 jsr166 1.31 } catch (InterruptedException ex) {
292 dl 1.28 q.interruptControl = -1;
293     }
294     }
295     }
296 dl 1.1 }
297    
298     /**
299 dl 1.28 * Awaits completion or aborts on interrupt or timeout.
300 dl 1.1 *
301 dl 1.28 * @param nanos time to wait
302     * @return raw result
303 dl 1.1 */
304 dl 1.28 private Object timedAwaitDone(long nanos)
305     throws InterruptedException, TimeoutException {
306     WaitNode q = null;
307     boolean queued = false;
308     for (Object r;;) {
309     if ((r = result) != null) {
310     if (q != null) {
311     q.thread = null;
312     if (q.interruptControl < 0) {
313     removeWaiter(q);
314     throw new InterruptedException();
315     }
316     }
317     postComplete();
318     return r;
319     }
320     else if (q == null) {
321     if (nanos <= 0L)
322     throw new TimeoutException();
323     long d = System.nanoTime() + nanos;
324 jsr166 1.31 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
325 dl 1.28 }
326     else if (!queued)
327     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
328     q.next = waiters, q);
329     else if (q.interruptControl < 0) {
330     removeWaiter(q);
331     throw new InterruptedException();
332     }
333     else if (q.nanos <= 0L) {
334     if (result == null) {
335     removeWaiter(q);
336     throw new TimeoutException();
337     }
338     }
339     else if (q.thread != null && result == null) {
340     try {
341     ForkJoinPool.managedBlock(q);
342 jsr166 1.31 } catch (InterruptedException ex) {
343 dl 1.28 q.interruptControl = -1;
344     }
345     }
346     }
347 dl 1.1 }
348    
349     /**
350 dl 1.28 * Tries to unlink a timed-out or interrupted wait node to avoid
351     * accumulating garbage. Internal nodes are simply unspliced
352     * without CAS since it is harmless if they are traversed anyway
353     * by releasers. To avoid effects of unsplicing from already
354     * removed nodes, the list is retraversed in case of an apparent
355     * race. This is slow when there are a lot of nodes, but we don't
356     * expect lists to be long enough to outweigh higher-overhead
357     * schemes.
358 dl 1.1 */
359 dl 1.28 private void removeWaiter(WaitNode node) {
360     if (node != null) {
361     node.thread = null;
362     retry:
363     for (;;) { // restart on removeWaiter race
364     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
365     s = q.next;
366     if (q.thread != null)
367     pred = q;
368     else if (pred != null) {
369     pred.next = s;
370     if (pred.thread == null) // check for race
371     continue retry;
372     }
373     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
374     continue retry;
375     }
376     break;
377     }
378     }
379 dl 1.1 }
380    
381 dl 1.28 /* ------------- Async tasks -------------- */
382    
383 dl 1.1 /**
384 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
385 dl 1.28 * {@code async} methods. This may be useful for monitoring,
386     * debugging, and tracking asynchronous activities.
387 jsr166 1.57 *
388     * @since 1.8
389 dl 1.1 */
390 dl 1.28 public static interface AsynchronousCompletionTask {
391 dl 1.1 }
392    
393 dl 1.28 /** Base class can act as either FJ or plain Runnable */
394 jsr166 1.33 abstract static class Async extends ForkJoinTask<Void>
395 dl 1.28 implements Runnable, AsynchronousCompletionTask {
396     public final Void getRawResult() { return null; }
397     public final void setRawResult(Void v) { }
398     public final void run() { exec(); }
399 jsr166 1.26 }
400    
401 dl 1.96 /**
402     * Starts the given async task using the given executor, unless
403     * the executor is ForkJoinPool.commonPool and it has been
404     * disabled, in which case starts a new thread.
405     */
406     static void execAsync(Executor e, Async r) {
407     if (e == ForkJoinPool.commonPool() &&
408     ForkJoinPool.getCommonPoolParallelism() <= 1)
409     new Thread(r).start();
410     else
411     e.execute(r);
412     }
413    
414 dl 1.28 static final class AsyncRun extends Async {
415     final Runnable fn;
416     final CompletableFuture<Void> dst;
417     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
418     this.fn = fn; this.dst = dst;
419     }
420     public final boolean exec() {
421     CompletableFuture<Void> d; Throwable ex;
422 dl 1.29 if ((d = this.dst) != null && d.result == null) {
423 dl 1.28 try {
424     fn.run();
425     ex = null;
426     } catch (Throwable rex) {
427     ex = rex;
428     }
429     d.internalComplete(null, ex);
430 dl 1.19 }
431 dl 1.28 return true;
432 dl 1.19 }
433 dl 1.28 private static final long serialVersionUID = 5232453952276885070L;
434 dl 1.19 }
435    
436 dl 1.28 static final class AsyncSupply<U> extends Async {
437     final Supplier<U> fn;
438     final CompletableFuture<U> dst;
439     AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
440     this.fn = fn; this.dst = dst;
441     }
442     public final boolean exec() {
443     CompletableFuture<U> d; U u; Throwable ex;
444 dl 1.29 if ((d = this.dst) != null && d.result == null) {
445 dl 1.28 try {
446     u = fn.get();
447     ex = null;
448     } catch (Throwable rex) {
449     ex = rex;
450     u = null;
451     }
452     d.internalComplete(u, ex);
453 dl 1.1 }
454     return true;
455     }
456     private static final long serialVersionUID = 5232453952276885070L;
457     }
458    
459 dl 1.28 static final class AsyncApply<T,U> extends Async {
460 jsr166 1.63 final T arg;
461 dl 1.28 final Function<? super T,? extends U> fn;
462 dl 1.1 final CompletableFuture<U> dst;
463 dl 1.28 AsyncApply(T arg, Function<? super T,? extends U> fn,
464 dl 1.35 CompletableFuture<U> dst) {
465 dl 1.1 this.arg = arg; this.fn = fn; this.dst = dst;
466     }
467     public final boolean exec() {
468 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
469 dl 1.29 if ((d = this.dst) != null && d.result == null) {
470 dl 1.21 try {
471     u = fn.apply(arg);
472     ex = null;
473     } catch (Throwable rex) {
474     ex = rex;
475     u = null;
476     }
477     d.internalComplete(u, ex);
478 dl 1.1 }
479     return true;
480     }
481     private static final long serialVersionUID = 5232453952276885070L;
482     }
483    
484 jsr166 1.81 static final class AsyncCombine<T,U,V> extends Async {
485 dl 1.1 final T arg1;
486     final U arg2;
487 jsr166 1.63 final BiFunction<? super T,? super U,? extends V> fn;
488 dl 1.1 final CompletableFuture<V> dst;
489 jsr166 1.81 AsyncCombine(T arg1, U arg2,
490 dl 1.35 BiFunction<? super T,? super U,? extends V> fn,
491     CompletableFuture<V> dst) {
492 dl 1.1 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
493     }
494     public final boolean exec() {
495 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
496 dl 1.29 if ((d = this.dst) != null && d.result == null) {
497 dl 1.21 try {
498     v = fn.apply(arg1, arg2);
499     ex = null;
500     } catch (Throwable rex) {
501     ex = rex;
502     v = null;
503     }
504     d.internalComplete(v, ex);
505 dl 1.1 }
506     return true;
507     }
508     private static final long serialVersionUID = 5232453952276885070L;
509     }
510    
511 dl 1.28 static final class AsyncAccept<T> extends Async {
512 jsr166 1.63 final T arg;
513 dl 1.34 final Consumer<? super T> fn;
514 dl 1.88 final CompletableFuture<?> dst;
515 dl 1.34 AsyncAccept(T arg, Consumer<? super T> fn,
516 dl 1.88 CompletableFuture<?> dst) {
517 dl 1.7 this.arg = arg; this.fn = fn; this.dst = dst;
518     }
519     public final boolean exec() {
520 dl 1.88 CompletableFuture<?> d; Throwable ex;
521 dl 1.29 if ((d = this.dst) != null && d.result == null) {
522 dl 1.21 try {
523     fn.accept(arg);
524     ex = null;
525     } catch (Throwable rex) {
526     ex = rex;
527     }
528     d.internalComplete(null, ex);
529 dl 1.7 }
530     return true;
531     }
532     private static final long serialVersionUID = 5232453952276885070L;
533     }
534    
535 jsr166 1.81 static final class AsyncAcceptBoth<T,U> extends Async {
536 dl 1.7 final T arg1;
537     final U arg2;
538 jsr166 1.63 final BiConsumer<? super T,? super U> fn;
539 dl 1.88 final CompletableFuture<?> dst;
540 jsr166 1.81 AsyncAcceptBoth(T arg1, U arg2,
541     BiConsumer<? super T,? super U> fn,
542 dl 1.88 CompletableFuture<?> dst) {
543 dl 1.7 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
544     }
545     public final boolean exec() {
546 dl 1.88 CompletableFuture<?> d; Throwable ex;
547 dl 1.29 if ((d = this.dst) != null && d.result == null) {
548 dl 1.21 try {
549     fn.accept(arg1, arg2);
550     ex = null;
551     } catch (Throwable rex) {
552     ex = rex;
553     }
554     d.internalComplete(null, ex);
555 dl 1.7 }
556     return true;
557     }
558     private static final long serialVersionUID = 5232453952276885070L;
559     }
560    
561 dl 1.37 static final class AsyncCompose<T,U> extends Async {
562 jsr166 1.63 final T arg;
563 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
564 dl 1.37 final CompletableFuture<U> dst;
565     AsyncCompose(T arg,
566 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
567 dl 1.37 CompletableFuture<U> dst) {
568     this.arg = arg; this.fn = fn; this.dst = dst;
569     }
570     public final boolean exec() {
571     CompletableFuture<U> d, fr; U u; Throwable ex;
572     if ((d = this.dst) != null && d.result == null) {
573     try {
574 dl 1.88 CompletionStage<U> cs = fn.apply(arg);
575     fr = (cs == null) ? null : cs.toCompletableFuture();
576 jsr166 1.61 ex = (fr == null) ? new NullPointerException() : null;
577 dl 1.37 } catch (Throwable rex) {
578     ex = rex;
579     fr = null;
580     }
581     if (ex != null)
582     u = null;
583     else {
584     Object r = fr.result;
585 dl 1.67 if (r == null)
586     r = fr.waitingGet(false);
587 dl 1.37 if (r instanceof AltResult) {
588     ex = ((AltResult)r).ex;
589     u = null;
590     }
591     else {
592     @SuppressWarnings("unchecked") U ur = (U) r;
593     u = ur;
594     }
595     }
596     d.internalComplete(u, ex);
597     }
598     return true;
599     }
600     private static final long serialVersionUID = 5232453952276885070L;
601     }
602    
603 dl 1.91 static final class AsyncWhenComplete<T> extends Async {
604     final T arg1;
605     final Throwable arg2;
606     final BiConsumer<? super T,? super Throwable> fn;
607     final CompletableFuture<T> dst;
608     AsyncWhenComplete(T arg1, Throwable arg2,
609     BiConsumer<? super T,? super Throwable> fn,
610     CompletableFuture<T> dst) {
611     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
612     }
613     public final boolean exec() {
614 jsr166 1.92 CompletableFuture<T> d;
615 dl 1.91 if ((d = this.dst) != null && d.result == null) {
616     Throwable ex = arg2;
617     try {
618     fn.accept(arg1, ex);
619     } catch (Throwable rex) {
620     if (ex == null)
621     ex = rex;
622     }
623     d.internalComplete(arg1, ex);
624     }
625     return true;
626     }
627     private static final long serialVersionUID = 5232453952276885070L;
628     }
629    
630 dl 1.1 /* ------------- Completions -------------- */
631    
632 dl 1.28 /**
633     * Simple linked list nodes to record completions, used in
634     * basically the same way as WaitNodes. (We separate nodes from
635     * the Completions themselves mainly because for the And and Or
636     * methods, the same Completion object resides in two lists.)
637     */
638     static final class CompletionNode {
639     final Completion completion;
640     volatile CompletionNode next;
641     CompletionNode(Completion completion) { this.completion = completion; }
642     }
643    
644 dl 1.1 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
645 jsr166 1.33 abstract static class Completion extends AtomicInteger implements Runnable {
646 dl 1.1 }
647    
648 jsr166 1.81 static final class ThenApply<T,U> extends Completion {
649 jsr166 1.2 final CompletableFuture<? extends T> src;
650     final Function<? super T,? extends U> fn;
651     final CompletableFuture<U> dst;
652 dl 1.1 final Executor executor;
653 jsr166 1.81 ThenApply(CompletableFuture<? extends T> src,
654     Function<? super T,? extends U> fn,
655     CompletableFuture<U> dst,
656     Executor executor) {
657 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
658     this.executor = executor;
659     }
660 jsr166 1.26 public final void run() {
661 dl 1.28 final CompletableFuture<? extends T> a;
662     final Function<? super T,? extends U> fn;
663     final CompletableFuture<U> dst;
664 jsr166 1.2 Object r; T t; Throwable ex;
665     if ((dst = this.dst) != null &&
666 dl 1.1 (fn = this.fn) != null &&
667     (a = this.src) != null &&
668     (r = a.result) != null &&
669     compareAndSet(0, 1)) {
670 jsr166 1.2 if (r instanceof AltResult) {
671 dl 1.19 ex = ((AltResult)r).ex;
672 dl 1.1 t = null;
673     }
674 dl 1.17 else {
675     ex = null;
676 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
677     t = tr;
678 dl 1.1 }
679 dl 1.20 Executor e = executor;
680     U u = null;
681 dl 1.17 if (ex == null) {
682     try {
683 dl 1.20 if (e != null)
684 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
685 dl 1.17 else
686 dl 1.20 u = fn.apply(t);
687 dl 1.17 } catch (Throwable rex) {
688     ex = rex;
689     }
690     }
691 dl 1.20 if (e == null || ex != null)
692     dst.internalComplete(u, ex);
693 dl 1.1 }
694     }
695 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
696 dl 1.1 }
697    
698 jsr166 1.81 static final class ThenAccept<T> extends Completion {
699 dl 1.7 final CompletableFuture<? extends T> src;
700 dl 1.34 final Consumer<? super T> fn;
701 dl 1.88 final CompletableFuture<?> dst;
702 dl 1.7 final Executor executor;
703 jsr166 1.81 ThenAccept(CompletableFuture<? extends T> src,
704     Consumer<? super T> fn,
705 dl 1.88 CompletableFuture<?> dst,
706 jsr166 1.81 Executor executor) {
707 dl 1.7 this.src = src; this.fn = fn; this.dst = dst;
708     this.executor = executor;
709     }
710 jsr166 1.26 public final void run() {
711 dl 1.28 final CompletableFuture<? extends T> a;
712 dl 1.34 final Consumer<? super T> fn;
713 dl 1.88 final CompletableFuture<?> dst;
714 dl 1.7 Object r; T t; Throwable ex;
715     if ((dst = this.dst) != null &&
716     (fn = this.fn) != null &&
717     (a = this.src) != null &&
718     (r = a.result) != null &&
719     compareAndSet(0, 1)) {
720     if (r instanceof AltResult) {
721 dl 1.19 ex = ((AltResult)r).ex;
722 dl 1.7 t = null;
723     }
724 dl 1.17 else {
725     ex = null;
726 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
727     t = tr;
728 dl 1.17 }
729 dl 1.20 Executor e = executor;
730 dl 1.17 if (ex == null) {
731     try {
732 dl 1.20 if (e != null)
733 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
734 dl 1.20 else
735 dl 1.17 fn.accept(t);
736     } catch (Throwable rex) {
737     ex = rex;
738 dl 1.7 }
739     }
740 dl 1.20 if (e == null || ex != null)
741     dst.internalComplete(null, ex);
742 dl 1.7 }
743     }
744 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
745 dl 1.7 }
746    
747 jsr166 1.82 static final class ThenRun extends Completion {
748     final CompletableFuture<?> src;
749 jsr166 1.2 final Runnable fn;
750     final CompletableFuture<Void> dst;
751 dl 1.1 final Executor executor;
752 jsr166 1.82 ThenRun(CompletableFuture<?> src,
753 jsr166 1.81 Runnable fn,
754     CompletableFuture<Void> dst,
755     Executor executor) {
756 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
757     this.executor = executor;
758     }
759 jsr166 1.26 public final void run() {
760 jsr166 1.82 final CompletableFuture<?> a;
761 dl 1.28 final Runnable fn;
762     final CompletableFuture<Void> dst;
763 jsr166 1.2 Object r; Throwable ex;
764     if ((dst = this.dst) != null &&
765 dl 1.1 (fn = this.fn) != null &&
766     (a = this.src) != null &&
767     (r = a.result) != null &&
768     compareAndSet(0, 1)) {
769 dl 1.19 if (r instanceof AltResult)
770     ex = ((AltResult)r).ex;
771 dl 1.17 else
772     ex = null;
773 dl 1.20 Executor e = executor;
774 dl 1.17 if (ex == null) {
775     try {
776 dl 1.20 if (e != null)
777 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
778 dl 1.20 else
779 dl 1.17 fn.run();
780     } catch (Throwable rex) {
781     ex = rex;
782 dl 1.1 }
783     }
784 dl 1.20 if (e == null || ex != null)
785     dst.internalComplete(null, ex);
786 dl 1.1 }
787     }
788 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
789 dl 1.1 }
790    
791 jsr166 1.81 static final class ThenCombine<T,U,V> extends Completion {
792 jsr166 1.2 final CompletableFuture<? extends T> src;
793     final CompletableFuture<? extends U> snd;
794     final BiFunction<? super T,? super U,? extends V> fn;
795     final CompletableFuture<V> dst;
796 dl 1.1 final Executor executor;
797 jsr166 1.81 ThenCombine(CompletableFuture<? extends T> src,
798     CompletableFuture<? extends U> snd,
799     BiFunction<? super T,? super U,? extends V> fn,
800     CompletableFuture<V> dst,
801     Executor executor) {
802 dl 1.1 this.src = src; this.snd = snd;
803     this.fn = fn; this.dst = dst;
804     this.executor = executor;
805     }
806 jsr166 1.26 public final void run() {
807 dl 1.28 final CompletableFuture<? extends T> a;
808     final CompletableFuture<? extends U> b;
809     final BiFunction<? super T,? super U,? extends V> fn;
810     final CompletableFuture<V> dst;
811 dl 1.85 Object r, s; T t; U u; Throwable ex;
812     if ((dst = this.dst) != null &&
813     (fn = this.fn) != null &&
814     (a = this.src) != null &&
815     (r = a.result) != null &&
816     (b = this.snd) != null &&
817     (s = b.result) != null &&
818     compareAndSet(0, 1)) {
819     if (r instanceof AltResult) {
820     ex = ((AltResult)r).ex;
821     t = null;
822     }
823     else {
824     ex = null;
825     @SuppressWarnings("unchecked") T tr = (T) r;
826     t = tr;
827     }
828     if (ex != null)
829     u = null;
830     else if (s instanceof AltResult) {
831     ex = ((AltResult)s).ex;
832     u = null;
833     }
834     else {
835     @SuppressWarnings("unchecked") U us = (U) s;
836     u = us;
837     }
838 dl 1.20 Executor e = executor;
839     V v = null;
840 dl 1.19 if (ex == null) {
841     try {
842 dl 1.20 if (e != null)
843 dl 1.96 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
844 dl 1.19 else
845 dl 1.20 v = fn.apply(t, u);
846 dl 1.19 } catch (Throwable rex) {
847     ex = rex;
848     }
849 dl 1.1 }
850 dl 1.20 if (e == null || ex != null)
851     dst.internalComplete(v, ex);
852 jsr166 1.2 }
853     }
854 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
855 dl 1.1 }
856    
857 jsr166 1.81 static final class ThenAcceptBoth<T,U> extends Completion {
858 dl 1.7 final CompletableFuture<? extends T> src;
859     final CompletableFuture<? extends U> snd;
860 dl 1.34 final BiConsumer<? super T,? super U> fn;
861 dl 1.7 final CompletableFuture<Void> dst;
862     final Executor executor;
863 jsr166 1.81 ThenAcceptBoth(CompletableFuture<? extends T> src,
864     CompletableFuture<? extends U> snd,
865     BiConsumer<? super T,? super U> fn,
866     CompletableFuture<Void> dst,
867     Executor executor) {
868 dl 1.7 this.src = src; this.snd = snd;
869     this.fn = fn; this.dst = dst;
870     this.executor = executor;
871     }
872 jsr166 1.26 public final void run() {
873 dl 1.28 final CompletableFuture<? extends T> a;
874     final CompletableFuture<? extends U> b;
875 dl 1.34 final BiConsumer<? super T,? super U> fn;
876 dl 1.28 final CompletableFuture<Void> dst;
877 dl 1.85 Object r, s; T t; U u; Throwable ex;
878     if ((dst = this.dst) != null &&
879     (fn = this.fn) != null &&
880     (a = this.src) != null &&
881     (r = a.result) != null &&
882     (b = this.snd) != null &&
883     (s = b.result) != null &&
884     compareAndSet(0, 1)) {
885     if (r instanceof AltResult) {
886     ex = ((AltResult)r).ex;
887     t = null;
888     }
889     else {
890     ex = null;
891     @SuppressWarnings("unchecked") T tr = (T) r;
892     t = tr;
893     }
894     if (ex != null)
895     u = null;
896     else if (s instanceof AltResult) {
897     ex = ((AltResult)s).ex;
898     u = null;
899     }
900     else {
901     @SuppressWarnings("unchecked") U us = (U) s;
902     u = us;
903     }
904 dl 1.20 Executor e = executor;
905 dl 1.19 if (ex == null) {
906     try {
907 dl 1.20 if (e != null)
908 dl 1.96 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
909 dl 1.20 else
910 dl 1.19 fn.accept(t, u);
911     } catch (Throwable rex) {
912     ex = rex;
913 dl 1.7 }
914     }
915 dl 1.20 if (e == null || ex != null)
916     dst.internalComplete(null, ex);
917 dl 1.7 }
918     }
919 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
920 dl 1.7 }
921    
922 jsr166 1.82 static final class RunAfterBoth extends Completion {
923     final CompletableFuture<?> src;
924 jsr166 1.2 final CompletableFuture<?> snd;
925     final Runnable fn;
926     final CompletableFuture<Void> dst;
927 dl 1.1 final Executor executor;
928 jsr166 1.82 RunAfterBoth(CompletableFuture<?> src,
929 jsr166 1.81 CompletableFuture<?> snd,
930     Runnable fn,
931     CompletableFuture<Void> dst,
932     Executor executor) {
933 dl 1.1 this.src = src; this.snd = snd;
934     this.fn = fn; this.dst = dst;
935     this.executor = executor;
936     }
937 jsr166 1.26 public final void run() {
938 jsr166 1.82 final CompletableFuture<?> a;
939 dl 1.1 final CompletableFuture<?> b;
940     final Runnable fn;
941     final CompletableFuture<Void> dst;
942 dl 1.85 Object r, s; Throwable ex;
943     if ((dst = this.dst) != null &&
944     (fn = this.fn) != null &&
945     (a = this.src) != null &&
946     (r = a.result) != null &&
947     (b = this.snd) != null &&
948     (s = b.result) != null &&
949     compareAndSet(0, 1)) {
950     if (r instanceof AltResult)
951     ex = ((AltResult)r).ex;
952     else
953     ex = null;
954     if (ex == null && (s instanceof AltResult))
955     ex = ((AltResult)s).ex;
956 dl 1.20 Executor e = executor;
957 dl 1.19 if (ex == null) {
958     try {
959 dl 1.20 if (e != null)
960 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
961 dl 1.20 else
962 dl 1.19 fn.run();
963     } catch (Throwable rex) {
964     ex = rex;
965 dl 1.1 }
966 jsr166 1.2 }
967 dl 1.20 if (e == null || ex != null)
968     dst.internalComplete(null, ex);
969 jsr166 1.2 }
970     }
971 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
972 dl 1.1 }
973    
974 dl 1.35 static final class AndCompletion extends Completion {
975     final CompletableFuture<?> src;
976     final CompletableFuture<?> snd;
977     final CompletableFuture<Void> dst;
978     AndCompletion(CompletableFuture<?> src,
979     CompletableFuture<?> snd,
980     CompletableFuture<Void> dst) {
981     this.src = src; this.snd = snd; this.dst = dst;
982     }
983     public final void run() {
984     final CompletableFuture<?> a;
985     final CompletableFuture<?> b;
986     final CompletableFuture<Void> dst;
987     Object r, s; Throwable ex;
988     if ((dst = this.dst) != null &&
989     (a = this.src) != null &&
990     (r = a.result) != null &&
991     (b = this.snd) != null &&
992     (s = b.result) != null &&
993     compareAndSet(0, 1)) {
994     if (r instanceof AltResult)
995     ex = ((AltResult)r).ex;
996     else
997     ex = null;
998     if (ex == null && (s instanceof AltResult))
999     ex = ((AltResult)s).ex;
1000     dst.internalComplete(null, ex);
1001     }
1002     }
1003     private static final long serialVersionUID = 5232453952276885070L;
1004     }
1005    
1006 jsr166 1.81 static final class ApplyToEither<T,U> extends Completion {
1007 jsr166 1.2 final CompletableFuture<? extends T> src;
1008     final CompletableFuture<? extends T> snd;
1009     final Function<? super T,? extends U> fn;
1010     final CompletableFuture<U> dst;
1011 dl 1.1 final Executor executor;
1012 jsr166 1.81 ApplyToEither(CompletableFuture<? extends T> src,
1013     CompletableFuture<? extends T> snd,
1014     Function<? super T,? extends U> fn,
1015     CompletableFuture<U> dst,
1016     Executor executor) {
1017 dl 1.1 this.src = src; this.snd = snd;
1018     this.fn = fn; this.dst = dst;
1019     this.executor = executor;
1020     }
1021 jsr166 1.26 public final void run() {
1022 dl 1.28 final CompletableFuture<? extends T> a;
1023     final CompletableFuture<? extends T> b;
1024     final Function<? super T,? extends U> fn;
1025     final CompletableFuture<U> dst;
1026 jsr166 1.2 Object r; T t; Throwable ex;
1027     if ((dst = this.dst) != null &&
1028 dl 1.1 (fn = this.fn) != null &&
1029     (((a = this.src) != null && (r = a.result) != null) ||
1030     ((b = this.snd) != null && (r = b.result) != null)) &&
1031     compareAndSet(0, 1)) {
1032 jsr166 1.2 if (r instanceof AltResult) {
1033 dl 1.19 ex = ((AltResult)r).ex;
1034 jsr166 1.2 t = null;
1035     }
1036 dl 1.19 else {
1037     ex = null;
1038 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1039     t = tr;
1040 dl 1.1 }
1041 dl 1.20 Executor e = executor;
1042     U u = null;
1043 dl 1.19 if (ex == null) {
1044     try {
1045 dl 1.20 if (e != null)
1046 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1047 dl 1.19 else
1048 dl 1.20 u = fn.apply(t);
1049 dl 1.19 } catch (Throwable rex) {
1050     ex = rex;
1051     }
1052     }
1053 dl 1.20 if (e == null || ex != null)
1054     dst.internalComplete(u, ex);
1055 jsr166 1.2 }
1056     }
1057 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1058 dl 1.1 }
1059    
1060 jsr166 1.81 static final class AcceptEither<T> extends Completion {
1061 dl 1.7 final CompletableFuture<? extends T> src;
1062     final CompletableFuture<? extends T> snd;
1063 dl 1.34 final Consumer<? super T> fn;
1064 dl 1.7 final CompletableFuture<Void> dst;
1065     final Executor executor;
1066 jsr166 1.81 AcceptEither(CompletableFuture<? extends T> src,
1067     CompletableFuture<? extends T> snd,
1068     Consumer<? super T> fn,
1069     CompletableFuture<Void> dst,
1070     Executor executor) {
1071 dl 1.7 this.src = src; this.snd = snd;
1072     this.fn = fn; this.dst = dst;
1073     this.executor = executor;
1074     }
1075 jsr166 1.26 public final void run() {
1076 dl 1.28 final CompletableFuture<? extends T> a;
1077     final CompletableFuture<? extends T> b;
1078 dl 1.34 final Consumer<? super T> fn;
1079 dl 1.28 final CompletableFuture<Void> dst;
1080 dl 1.7 Object r; T t; Throwable ex;
1081     if ((dst = this.dst) != null &&
1082     (fn = this.fn) != null &&
1083     (((a = this.src) != null && (r = a.result) != null) ||
1084     ((b = this.snd) != null && (r = b.result) != null)) &&
1085     compareAndSet(0, 1)) {
1086     if (r instanceof AltResult) {
1087 dl 1.19 ex = ((AltResult)r).ex;
1088 dl 1.7 t = null;
1089     }
1090 dl 1.19 else {
1091     ex = null;
1092 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1093     t = tr;
1094 dl 1.19 }
1095 dl 1.20 Executor e = executor;
1096 dl 1.19 if (ex == null) {
1097     try {
1098 dl 1.20 if (e != null)
1099 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1100 dl 1.20 else
1101 dl 1.19 fn.accept(t);
1102     } catch (Throwable rex) {
1103     ex = rex;
1104 dl 1.7 }
1105     }
1106 dl 1.20 if (e == null || ex != null)
1107     dst.internalComplete(null, ex);
1108 dl 1.7 }
1109     }
1110 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1111 dl 1.7 }
1112    
1113 jsr166 1.82 static final class RunAfterEither extends Completion {
1114     final CompletableFuture<?> src;
1115 jsr166 1.2 final CompletableFuture<?> snd;
1116     final Runnable fn;
1117     final CompletableFuture<Void> dst;
1118 dl 1.1 final Executor executor;
1119 jsr166 1.82 RunAfterEither(CompletableFuture<?> src,
1120 jsr166 1.81 CompletableFuture<?> snd,
1121     Runnable fn,
1122     CompletableFuture<Void> dst,
1123     Executor executor) {
1124 dl 1.1 this.src = src; this.snd = snd;
1125     this.fn = fn; this.dst = dst;
1126     this.executor = executor;
1127     }
1128 jsr166 1.26 public final void run() {
1129 jsr166 1.82 final CompletableFuture<?> a;
1130 dl 1.1 final CompletableFuture<?> b;
1131     final Runnable fn;
1132     final CompletableFuture<Void> dst;
1133 dl 1.28 Object r; Throwable ex;
1134 jsr166 1.2 if ((dst = this.dst) != null &&
1135 dl 1.1 (fn = this.fn) != null &&
1136     (((a = this.src) != null && (r = a.result) != null) ||
1137     ((b = this.snd) != null && (r = b.result) != null)) &&
1138     compareAndSet(0, 1)) {
1139 dl 1.19 if (r instanceof AltResult)
1140     ex = ((AltResult)r).ex;
1141     else
1142     ex = null;
1143 dl 1.20 Executor e = executor;
1144 dl 1.19 if (ex == null) {
1145 dl 1.1 try {
1146 dl 1.20 if (e != null)
1147 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
1148 dl 1.20 else
1149 dl 1.1 fn.run();
1150     } catch (Throwable rex) {
1151 dl 1.19 ex = rex;
1152 dl 1.1 }
1153     }
1154 dl 1.20 if (e == null || ex != null)
1155     dst.internalComplete(null, ex);
1156 jsr166 1.2 }
1157     }
1158 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1159 dl 1.1 }
1160    
1161 dl 1.35 static final class OrCompletion extends Completion {
1162     final CompletableFuture<?> src;
1163     final CompletableFuture<?> snd;
1164 dl 1.77 final CompletableFuture<Object> dst;
1165 dl 1.35 OrCompletion(CompletableFuture<?> src,
1166     CompletableFuture<?> snd,
1167 dl 1.77 CompletableFuture<Object> dst) {
1168 dl 1.35 this.src = src; this.snd = snd; this.dst = dst;
1169     }
1170     public final void run() {
1171     final CompletableFuture<?> a;
1172     final CompletableFuture<?> b;
1173 dl 1.77 final CompletableFuture<Object> dst;
1174     Object r, t; Throwable ex;
1175 dl 1.35 if ((dst = this.dst) != null &&
1176     (((a = this.src) != null && (r = a.result) != null) ||
1177     ((b = this.snd) != null && (r = b.result) != null)) &&
1178     compareAndSet(0, 1)) {
1179 dl 1.77 if (r instanceof AltResult) {
1180 dl 1.35 ex = ((AltResult)r).ex;
1181 dl 1.77 t = null;
1182     }
1183     else {
1184 dl 1.35 ex = null;
1185 dl 1.77 t = r;
1186     }
1187     dst.internalComplete(t, ex);
1188 dl 1.35 }
1189     }
1190     private static final long serialVersionUID = 5232453952276885070L;
1191     }
1192    
1193 dl 1.28 static final class ExceptionCompletion<T> extends Completion {
1194 jsr166 1.2 final CompletableFuture<? extends T> src;
1195 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1196     final CompletableFuture<T> dst;
1197 dl 1.28 ExceptionCompletion(CompletableFuture<? extends T> src,
1198     Function<? super Throwable, ? extends T> fn,
1199     CompletableFuture<T> dst) {
1200 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1201     }
1202 jsr166 1.26 public final void run() {
1203 dl 1.28 final CompletableFuture<? extends T> a;
1204     final Function<? super Throwable, ? extends T> fn;
1205     final CompletableFuture<T> dst;
1206 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1207 jsr166 1.2 if ((dst = this.dst) != null &&
1208 dl 1.1 (fn = this.fn) != null &&
1209     (a = this.src) != null &&
1210     (r = a.result) != null &&
1211     compareAndSet(0, 1)) {
1212 dl 1.20 if ((r instanceof AltResult) &&
1213 jsr166 1.87 (ex = ((AltResult)r).ex) != null) {
1214 dl 1.20 try {
1215     t = fn.apply(ex);
1216     } catch (Throwable rex) {
1217     dx = rex;
1218 dl 1.1 }
1219     }
1220 dl 1.28 else {
1221     @SuppressWarnings("unchecked") T tr = (T) r;
1222     t = tr;
1223     }
1224 dl 1.20 dst.internalComplete(t, dx);
1225 dl 1.1 }
1226     }
1227 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1228 dl 1.1 }
1229    
1230 dl 1.88 static final class WhenCompleteCompletion<T> extends Completion {
1231     final CompletableFuture<? extends T> src;
1232     final BiConsumer<? super T, ? super Throwable> fn;
1233     final CompletableFuture<T> dst;
1234     final Executor executor;
1235     WhenCompleteCompletion(CompletableFuture<? extends T> src,
1236     BiConsumer<? super T, ? super Throwable> fn,
1237     CompletableFuture<T> dst,
1238     Executor executor) {
1239     this.src = src; this.fn = fn; this.dst = dst;
1240     this.executor = executor;
1241     }
1242     public final void run() {
1243     final CompletableFuture<? extends T> a;
1244     final BiConsumer<? super T, ? super Throwable> fn;
1245     final CompletableFuture<T> dst;
1246     Object r; T t; Throwable ex;
1247     if ((dst = this.dst) != null &&
1248     (fn = this.fn) != null &&
1249     (a = this.src) != null &&
1250     (r = a.result) != null &&
1251     compareAndSet(0, 1)) {
1252     if (r instanceof AltResult) {
1253     ex = ((AltResult)r).ex;
1254     t = null;
1255     }
1256     else {
1257     ex = null;
1258     @SuppressWarnings("unchecked") T tr = (T) r;
1259     t = tr;
1260     }
1261     Executor e = executor;
1262     Throwable dx = null;
1263     try {
1264     if (e != null)
1265 dl 1.96 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
1266 dl 1.88 else
1267     fn.accept(t, ex);
1268     } catch (Throwable rex) {
1269     dx = rex;
1270     }
1271     if (e == null || dx != null)
1272     dst.internalComplete(t, ex != null ? ex : dx);
1273     }
1274     }
1275     private static final long serialVersionUID = 5232453952276885070L;
1276     }
1277    
1278 dl 1.75 static final class ThenCopy<T> extends Completion {
1279 dl 1.77 final CompletableFuture<?> src;
1280 dl 1.75 final CompletableFuture<T> dst;
1281 dl 1.77 ThenCopy(CompletableFuture<?> src,
1282 dl 1.75 CompletableFuture<T> dst) {
1283 dl 1.17 this.src = src; this.dst = dst;
1284     }
1285 jsr166 1.26 public final void run() {
1286 dl 1.77 final CompletableFuture<?> a;
1287 dl 1.75 final CompletableFuture<T> dst;
1288     Object r; T t; Throwable ex;
1289 dl 1.17 if ((dst = this.dst) != null &&
1290     (a = this.src) != null &&
1291     (r = a.result) != null &&
1292     compareAndSet(0, 1)) {
1293 dl 1.20 if (r instanceof AltResult) {
1294     ex = ((AltResult)r).ex;
1295     t = null;
1296     }
1297     else {
1298     ex = null;
1299 dl 1.75 @SuppressWarnings("unchecked") T tr = (T) r;
1300     t = tr;
1301 dl 1.20 }
1302     dst.internalComplete(t, ex);
1303 dl 1.17 }
1304     }
1305 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1306 dl 1.17 }
1307    
1308 dl 1.75 // version of ThenCopy for CompletableFuture<Void> dst
1309     static final class ThenPropagate extends Completion {
1310     final CompletableFuture<?> src;
1311     final CompletableFuture<Void> dst;
1312     ThenPropagate(CompletableFuture<?> src,
1313     CompletableFuture<Void> dst) {
1314     this.src = src; this.dst = dst;
1315     }
1316     public final void run() {
1317     final CompletableFuture<?> a;
1318     final CompletableFuture<Void> dst;
1319     Object r; Throwable ex;
1320     if ((dst = this.dst) != null &&
1321     (a = this.src) != null &&
1322     (r = a.result) != null &&
1323     compareAndSet(0, 1)) {
1324     if (r instanceof AltResult)
1325     ex = ((AltResult)r).ex;
1326     else
1327     ex = null;
1328     dst.internalComplete(null, ex);
1329     }
1330     }
1331     private static final long serialVersionUID = 5232453952276885070L;
1332     }
1333    
1334 dl 1.28 static final class HandleCompletion<T,U> extends Completion {
1335 dl 1.17 final CompletableFuture<? extends T> src;
1336     final BiFunction<? super T, Throwable, ? extends U> fn;
1337     final CompletableFuture<U> dst;
1338 dl 1.88 final Executor executor;
1339 dl 1.28 HandleCompletion(CompletableFuture<? extends T> src,
1340     BiFunction<? super T, Throwable, ? extends U> fn,
1341 dl 1.88 CompletableFuture<U> dst,
1342     Executor executor) {
1343 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1344 dl 1.88 this.executor = executor;
1345 dl 1.17 }
1346 jsr166 1.26 public final void run() {
1347 dl 1.28 final CompletableFuture<? extends T> a;
1348     final BiFunction<? super T, Throwable, ? extends U> fn;
1349     final CompletableFuture<U> dst;
1350 dl 1.17 Object r; T t; Throwable ex;
1351     if ((dst = this.dst) != null &&
1352     (fn = this.fn) != null &&
1353     (a = this.src) != null &&
1354     (r = a.result) != null &&
1355     compareAndSet(0, 1)) {
1356     if (r instanceof AltResult) {
1357     ex = ((AltResult)r).ex;
1358     t = null;
1359     }
1360     else {
1361     ex = null;
1362 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1363     t = tr;
1364 dl 1.17 }
1365 dl 1.88 Executor e = executor;
1366     U u = null;
1367     Throwable dx = null;
1368 dl 1.17 try {
1369 dl 1.88 if (e != null)
1370 dl 1.96 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1371 dl 1.88 else
1372     u = fn.apply(t, ex);
1373 dl 1.17 } catch (Throwable rex) {
1374 dl 1.20 dx = rex;
1375 dl 1.17 }
1376 dl 1.88 if (e == null || dx != null)
1377     dst.internalComplete(u, dx);
1378 dl 1.17 }
1379     }
1380 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1381 dl 1.17 }
1382    
1383 jsr166 1.81 static final class ThenCompose<T,U> extends Completion {
1384 dl 1.17 final CompletableFuture<? extends T> src;
1385 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1386 dl 1.17 final CompletableFuture<U> dst;
1387 dl 1.37 final Executor executor;
1388 jsr166 1.81 ThenCompose(CompletableFuture<? extends T> src,
1389 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
1390 jsr166 1.81 CompletableFuture<U> dst,
1391     Executor executor) {
1392 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1393 dl 1.37 this.executor = executor;
1394 dl 1.17 }
1395 jsr166 1.26 public final void run() {
1396 dl 1.28 final CompletableFuture<? extends T> a;
1397 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1398 dl 1.28 final CompletableFuture<U> dst;
1399 dl 1.37 Object r; T t; Throwable ex; Executor e;
1400 dl 1.17 if ((dst = this.dst) != null &&
1401     (fn = this.fn) != null &&
1402     (a = this.src) != null &&
1403     (r = a.result) != null &&
1404     compareAndSet(0, 1)) {
1405     if (r instanceof AltResult) {
1406     ex = ((AltResult)r).ex;
1407     t = null;
1408     }
1409     else {
1410     ex = null;
1411 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1412     t = tr;
1413 dl 1.17 }
1414     CompletableFuture<U> c = null;
1415 dl 1.20 U u = null;
1416     boolean complete = false;
1417 dl 1.17 if (ex == null) {
1418 dl 1.37 if ((e = executor) != null)
1419 dl 1.96 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1420 dl 1.37 else {
1421     try {
1422 dl 1.88 CompletionStage<U> cs = fn.apply(t);
1423     c = (cs == null) ? null : cs.toCompletableFuture();
1424     if (c == null)
1425 dl 1.37 ex = new NullPointerException();
1426     } catch (Throwable rex) {
1427     ex = rex;
1428     }
1429 dl 1.17 }
1430     }
1431 dl 1.37 if (c != null) {
1432 dl 1.75 ThenCopy<U> d = null;
1433 dl 1.18 Object s;
1434     if ((s = c.result) == null) {
1435     CompletionNode p = new CompletionNode
1436 dl 1.75 (d = new ThenCopy<U>(c, dst));
1437 dl 1.18 while ((s = c.result) == null) {
1438     if (UNSAFE.compareAndSwapObject
1439     (c, COMPLETIONS, p.next = c.completions, p))
1440     break;
1441     }
1442 dl 1.17 }
1443 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1444 dl 1.20 complete = true;
1445 dl 1.18 if (s instanceof AltResult) {
1446     ex = ((AltResult)s).ex; // no rewrap
1447     u = null;
1448 dl 1.28 }
1449     else {
1450     @SuppressWarnings("unchecked") U us = (U) s;
1451     u = us;
1452     }
1453     }
1454     }
1455     if (complete || ex != null)
1456     dst.internalComplete(u, ex);
1457     if (c != null)
1458     c.helpPostComplete();
1459     }
1460     }
1461     private static final long serialVersionUID = 5232453952276885070L;
1462     }
1463    
1464 dl 1.88 // Implementations of stage methods with (plain, async, Executor) forms
1465 dl 1.28
1466 dl 1.88 private <U> CompletableFuture<U> doThenApply
1467     (Function<? super T,? extends U> fn,
1468     Executor e) {
1469     if (fn == null) throw new NullPointerException();
1470     CompletableFuture<U> dst = new CompletableFuture<U>();
1471     ThenApply<T,U> d = null;
1472     Object r;
1473     if ((r = result) == null) {
1474     CompletionNode p = new CompletionNode
1475     (d = new ThenApply<T,U>(this, fn, dst, e));
1476     while ((r = result) == null) {
1477     if (UNSAFE.compareAndSwapObject
1478     (this, COMPLETIONS, p.next = completions, p))
1479     break;
1480     }
1481     }
1482     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1483     T t; Throwable ex;
1484     if (r instanceof AltResult) {
1485     ex = ((AltResult)r).ex;
1486     t = null;
1487     }
1488     else {
1489     ex = null;
1490     @SuppressWarnings("unchecked") T tr = (T) r;
1491     t = tr;
1492     }
1493     U u = null;
1494     if (ex == null) {
1495     try {
1496     if (e != null)
1497 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1498 dl 1.88 else
1499     u = fn.apply(t);
1500     } catch (Throwable rex) {
1501     ex = rex;
1502     }
1503     }
1504     if (e == null || ex != null)
1505     dst.internalComplete(u, ex);
1506     }
1507     helpPostComplete();
1508     return dst;
1509 dl 1.28 }
1510    
1511 dl 1.88 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1512     Executor e) {
1513     if (fn == null) throw new NullPointerException();
1514     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1515     ThenAccept<T> d = null;
1516     Object r;
1517     if ((r = result) == null) {
1518     CompletionNode p = new CompletionNode
1519     (d = new ThenAccept<T>(this, fn, dst, e));
1520     while ((r = result) == null) {
1521     if (UNSAFE.compareAndSwapObject
1522     (this, COMPLETIONS, p.next = completions, p))
1523     break;
1524     }
1525     }
1526     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1527     T t; Throwable ex;
1528     if (r instanceof AltResult) {
1529     ex = ((AltResult)r).ex;
1530     t = null;
1531     }
1532     else {
1533     ex = null;
1534     @SuppressWarnings("unchecked") T tr = (T) r;
1535     t = tr;
1536     }
1537     if (ex == null) {
1538     try {
1539     if (e != null)
1540 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1541 dl 1.88 else
1542     fn.accept(t);
1543     } catch (Throwable rex) {
1544     ex = rex;
1545     }
1546     }
1547     if (e == null || ex != null)
1548     dst.internalComplete(null, ex);
1549     }
1550     helpPostComplete();
1551     return dst;
1552 dl 1.28 }
1553    
1554 dl 1.88 private CompletableFuture<Void> doThenRun(Runnable action,
1555     Executor e) {
1556     if (action == null) throw new NullPointerException();
1557     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1558     ThenRun d = null;
1559     Object r;
1560     if ((r = result) == null) {
1561     CompletionNode p = new CompletionNode
1562     (d = new ThenRun(this, action, dst, e));
1563     while ((r = result) == null) {
1564     if (UNSAFE.compareAndSwapObject
1565     (this, COMPLETIONS, p.next = completions, p))
1566     break;
1567     }
1568     }
1569     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1570     Throwable ex;
1571     if (r instanceof AltResult)
1572     ex = ((AltResult)r).ex;
1573     else
1574     ex = null;
1575     if (ex == null) {
1576     try {
1577     if (e != null)
1578 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1579 dl 1.88 else
1580     action.run();
1581     } catch (Throwable rex) {
1582     ex = rex;
1583     }
1584     }
1585     if (e == null || ex != null)
1586     dst.internalComplete(null, ex);
1587     }
1588     helpPostComplete();
1589     return dst;
1590     }
1591    
1592     private <U,V> CompletableFuture<V> doThenCombine
1593     (CompletableFuture<? extends U> other,
1594     BiFunction<? super T,? super U,? extends V> fn,
1595     Executor e) {
1596     if (other == null || fn == null) throw new NullPointerException();
1597     CompletableFuture<V> dst = new CompletableFuture<V>();
1598     ThenCombine<T,U,V> d = null;
1599     Object r, s = null;
1600     if ((r = result) == null || (s = other.result) == null) {
1601     d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
1602     CompletionNode q = null, p = new CompletionNode(d);
1603     while ((r == null && (r = result) == null) ||
1604     (s == null && (s = other.result) == null)) {
1605     if (q != null) {
1606     if (s != null ||
1607     UNSAFE.compareAndSwapObject
1608     (other, COMPLETIONS, q.next = other.completions, q))
1609     break;
1610     }
1611     else if (r != null ||
1612     UNSAFE.compareAndSwapObject
1613     (this, COMPLETIONS, p.next = completions, p)) {
1614     if (s != null)
1615     break;
1616     q = new CompletionNode(d);
1617     }
1618     }
1619     }
1620     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1621     T t; U u; Throwable ex;
1622     if (r instanceof AltResult) {
1623     ex = ((AltResult)r).ex;
1624     t = null;
1625     }
1626     else {
1627     ex = null;
1628     @SuppressWarnings("unchecked") T tr = (T) r;
1629     t = tr;
1630     }
1631     if (ex != null)
1632     u = null;
1633     else if (s instanceof AltResult) {
1634     ex = ((AltResult)s).ex;
1635     u = null;
1636     }
1637     else {
1638     @SuppressWarnings("unchecked") U us = (U) s;
1639     u = us;
1640     }
1641     V v = null;
1642     if (ex == null) {
1643     try {
1644     if (e != null)
1645 dl 1.96 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
1646 dl 1.88 else
1647     v = fn.apply(t, u);
1648     } catch (Throwable rex) {
1649     ex = rex;
1650     }
1651     }
1652     if (e == null || ex != null)
1653     dst.internalComplete(v, ex);
1654     }
1655     helpPostComplete();
1656     other.helpPostComplete();
1657     return dst;
1658     }
1659    
1660     private <U> CompletableFuture<Void> doThenAcceptBoth
1661     (CompletableFuture<? extends U> other,
1662     BiConsumer<? super T,? super U> fn,
1663     Executor e) {
1664     if (other == null || fn == null) throw new NullPointerException();
1665     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1666     ThenAcceptBoth<T,U> d = null;
1667     Object r, s = null;
1668     if ((r = result) == null || (s = other.result) == null) {
1669     d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1670     CompletionNode q = null, p = new CompletionNode(d);
1671     while ((r == null && (r = result) == null) ||
1672     (s == null && (s = other.result) == null)) {
1673     if (q != null) {
1674     if (s != null ||
1675     UNSAFE.compareAndSwapObject
1676     (other, COMPLETIONS, q.next = other.completions, q))
1677     break;
1678     }
1679     else if (r != null ||
1680     UNSAFE.compareAndSwapObject
1681     (this, COMPLETIONS, p.next = completions, p)) {
1682     if (s != null)
1683     break;
1684     q = new CompletionNode(d);
1685     }
1686     }
1687     }
1688     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1689     T t; U u; Throwable ex;
1690     if (r instanceof AltResult) {
1691     ex = ((AltResult)r).ex;
1692     t = null;
1693     }
1694     else {
1695     ex = null;
1696     @SuppressWarnings("unchecked") T tr = (T) r;
1697     t = tr;
1698     }
1699     if (ex != null)
1700     u = null;
1701     else if (s instanceof AltResult) {
1702     ex = ((AltResult)s).ex;
1703     u = null;
1704     }
1705     else {
1706     @SuppressWarnings("unchecked") U us = (U) s;
1707     u = us;
1708     }
1709     if (ex == null) {
1710     try {
1711     if (e != null)
1712 dl 1.96 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1713 dl 1.88 else
1714     fn.accept(t, u);
1715     } catch (Throwable rex) {
1716     ex = rex;
1717     }
1718     }
1719     if (e == null || ex != null)
1720     dst.internalComplete(null, ex);
1721     }
1722     helpPostComplete();
1723     other.helpPostComplete();
1724     return dst;
1725     }
1726    
1727     private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1728     Runnable action,
1729     Executor e) {
1730     if (other == null || action == null) throw new NullPointerException();
1731     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1732     RunAfterBoth d = null;
1733     Object r, s = null;
1734     if ((r = result) == null || (s = other.result) == null) {
1735     d = new RunAfterBoth(this, other, action, dst, e);
1736     CompletionNode q = null, p = new CompletionNode(d);
1737     while ((r == null && (r = result) == null) ||
1738     (s == null && (s = other.result) == null)) {
1739     if (q != null) {
1740     if (s != null ||
1741     UNSAFE.compareAndSwapObject
1742     (other, COMPLETIONS, q.next = other.completions, q))
1743     break;
1744     }
1745     else if (r != null ||
1746     UNSAFE.compareAndSwapObject
1747     (this, COMPLETIONS, p.next = completions, p)) {
1748     if (s != null)
1749     break;
1750     q = new CompletionNode(d);
1751     }
1752     }
1753     }
1754     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1755     Throwable ex;
1756     if (r instanceof AltResult)
1757     ex = ((AltResult)r).ex;
1758     else
1759     ex = null;
1760     if (ex == null && (s instanceof AltResult))
1761     ex = ((AltResult)s).ex;
1762     if (ex == null) {
1763     try {
1764     if (e != null)
1765 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1766 dl 1.88 else
1767     action.run();
1768     } catch (Throwable rex) {
1769     ex = rex;
1770     }
1771     }
1772     if (e == null || ex != null)
1773     dst.internalComplete(null, ex);
1774     }
1775     helpPostComplete();
1776     other.helpPostComplete();
1777     return dst;
1778     }
1779    
1780     private <U> CompletableFuture<U> doApplyToEither
1781     (CompletableFuture<? extends T> other,
1782     Function<? super T, U> fn,
1783     Executor e) {
1784     if (other == null || fn == null) throw new NullPointerException();
1785     CompletableFuture<U> dst = new CompletableFuture<U>();
1786     ApplyToEither<T,U> d = null;
1787     Object r;
1788     if ((r = result) == null && (r = other.result) == null) {
1789     d = new ApplyToEither<T,U>(this, other, fn, dst, e);
1790     CompletionNode q = null, p = new CompletionNode(d);
1791     while ((r = result) == null && (r = other.result) == null) {
1792     if (q != null) {
1793     if (UNSAFE.compareAndSwapObject
1794     (other, COMPLETIONS, q.next = other.completions, q))
1795     break;
1796     }
1797     else if (UNSAFE.compareAndSwapObject
1798     (this, COMPLETIONS, p.next = completions, p))
1799     q = new CompletionNode(d);
1800     }
1801     }
1802     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1803     T t; Throwable ex;
1804     if (r instanceof AltResult) {
1805     ex = ((AltResult)r).ex;
1806     t = null;
1807     }
1808     else {
1809     ex = null;
1810     @SuppressWarnings("unchecked") T tr = (T) r;
1811     t = tr;
1812     }
1813     U u = null;
1814     if (ex == null) {
1815     try {
1816     if (e != null)
1817 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1818 dl 1.88 else
1819     u = fn.apply(t);
1820     } catch (Throwable rex) {
1821     ex = rex;
1822     }
1823     }
1824     if (e == null || ex != null)
1825     dst.internalComplete(u, ex);
1826     }
1827     helpPostComplete();
1828     other.helpPostComplete();
1829     return dst;
1830     }
1831    
1832     private CompletableFuture<Void> doAcceptEither
1833     (CompletableFuture<? extends T> other,
1834     Consumer<? super T> fn,
1835     Executor e) {
1836     if (other == null || fn == null) throw new NullPointerException();
1837     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1838     AcceptEither<T> d = null;
1839     Object r;
1840     if ((r = result) == null && (r = other.result) == null) {
1841     d = new AcceptEither<T>(this, other, fn, dst, e);
1842     CompletionNode q = null, p = new CompletionNode(d);
1843     while ((r = result) == null && (r = other.result) == null) {
1844     if (q != null) {
1845     if (UNSAFE.compareAndSwapObject
1846     (other, COMPLETIONS, q.next = other.completions, q))
1847     break;
1848     }
1849     else if (UNSAFE.compareAndSwapObject
1850     (this, COMPLETIONS, p.next = completions, p))
1851     q = new CompletionNode(d);
1852     }
1853     }
1854     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1855     T t; Throwable ex;
1856     if (r instanceof AltResult) {
1857     ex = ((AltResult)r).ex;
1858     t = null;
1859     }
1860     else {
1861     ex = null;
1862     @SuppressWarnings("unchecked") T tr = (T) r;
1863     t = tr;
1864     }
1865     if (ex == null) {
1866     try {
1867     if (e != null)
1868 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1869 dl 1.88 else
1870     fn.accept(t);
1871     } catch (Throwable rex) {
1872     ex = rex;
1873     }
1874     }
1875     if (e == null || ex != null)
1876     dst.internalComplete(null, ex);
1877     }
1878     helpPostComplete();
1879     other.helpPostComplete();
1880     return dst;
1881     }
1882    
1883     private CompletableFuture<Void> doRunAfterEither
1884     (CompletableFuture<?> other,
1885     Runnable action,
1886     Executor e) {
1887     if (other == null || action == null) throw new NullPointerException();
1888     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1889     RunAfterEither d = null;
1890     Object r;
1891     if ((r = result) == null && (r = other.result) == null) {
1892     d = new RunAfterEither(this, other, action, dst, e);
1893     CompletionNode q = null, p = new CompletionNode(d);
1894     while ((r = result) == null && (r = other.result) == null) {
1895     if (q != null) {
1896     if (UNSAFE.compareAndSwapObject
1897     (other, COMPLETIONS, q.next = other.completions, q))
1898     break;
1899     }
1900     else if (UNSAFE.compareAndSwapObject
1901     (this, COMPLETIONS, p.next = completions, p))
1902     q = new CompletionNode(d);
1903     }
1904     }
1905     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1906     Throwable ex;
1907     if (r instanceof AltResult)
1908     ex = ((AltResult)r).ex;
1909     else
1910     ex = null;
1911     if (ex == null) {
1912     try {
1913     if (e != null)
1914 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1915 dl 1.88 else
1916     action.run();
1917     } catch (Throwable rex) {
1918     ex = rex;
1919     }
1920     }
1921     if (e == null || ex != null)
1922     dst.internalComplete(null, ex);
1923     }
1924     helpPostComplete();
1925     other.helpPostComplete();
1926     return dst;
1927     }
1928    
1929     private <U> CompletableFuture<U> doThenCompose
1930     (Function<? super T, ? extends CompletionStage<U>> fn,
1931     Executor e) {
1932     if (fn == null) throw new NullPointerException();
1933     CompletableFuture<U> dst = null;
1934     ThenCompose<T,U> d = null;
1935     Object r;
1936     if ((r = result) == null) {
1937     dst = new CompletableFuture<U>();
1938     CompletionNode p = new CompletionNode
1939     (d = new ThenCompose<T,U>(this, fn, dst, e));
1940     while ((r = result) == null) {
1941     if (UNSAFE.compareAndSwapObject
1942     (this, COMPLETIONS, p.next = completions, p))
1943     break;
1944     }
1945     }
1946     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1947     T t; Throwable ex;
1948     if (r instanceof AltResult) {
1949     ex = ((AltResult)r).ex;
1950     t = null;
1951     }
1952     else {
1953     ex = null;
1954     @SuppressWarnings("unchecked") T tr = (T) r;
1955     t = tr;
1956     }
1957     if (ex == null) {
1958     if (e != null) {
1959     if (dst == null)
1960     dst = new CompletableFuture<U>();
1961 dl 1.96 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1962 dl 1.88 }
1963     else {
1964     try {
1965     CompletionStage<U> cs = fn.apply(t);
1966     if (cs == null ||
1967     (dst = cs.toCompletableFuture()) == null)
1968     ex = new NullPointerException();
1969     } catch (Throwable rex) {
1970     ex = rex;
1971     }
1972     }
1973     }
1974     if (dst == null)
1975     dst = new CompletableFuture<U>();
1976     if (e == null || ex != null)
1977     dst.internalComplete(null, ex);
1978     }
1979     helpPostComplete();
1980     dst.helpPostComplete();
1981     return dst;
1982     }
1983    
1984     private CompletableFuture<T> doWhenComplete
1985     (BiConsumer<? super T, ? super Throwable> fn,
1986     Executor e) {
1987     if (fn == null) throw new NullPointerException();
1988     CompletableFuture<T> dst = new CompletableFuture<T>();
1989     WhenCompleteCompletion<T> d = null;
1990     Object r;
1991     if ((r = result) == null) {
1992     CompletionNode p =
1993     new CompletionNode(d = new WhenCompleteCompletion<T>
1994     (this, fn, dst, e));
1995     while ((r = result) == null) {
1996     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1997     p.next = completions, p))
1998     break;
1999     }
2000     }
2001     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2002     T t; Throwable ex;
2003     if (r instanceof AltResult) {
2004     ex = ((AltResult)r).ex;
2005     t = null;
2006     }
2007     else {
2008     ex = null;
2009     @SuppressWarnings("unchecked") T tr = (T) r;
2010     t = tr;
2011     }
2012     Throwable dx = null;
2013     try {
2014     if (e != null)
2015 dl 1.96 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
2016 dl 1.88 else
2017     fn.accept(t, ex);
2018     } catch (Throwable rex) {
2019     dx = rex;
2020     }
2021     if (e == null || dx != null)
2022     dst.internalComplete(t, ex != null ? ex : dx);
2023     }
2024     helpPostComplete();
2025     return dst;
2026     }
2027    
2028     private <U> CompletableFuture<U> doHandle
2029     (BiFunction<? super T, Throwable, ? extends U> fn,
2030     Executor e) {
2031     if (fn == null) throw new NullPointerException();
2032     CompletableFuture<U> dst = new CompletableFuture<U>();
2033     HandleCompletion<T,U> d = null;
2034     Object r;
2035     if ((r = result) == null) {
2036     CompletionNode p =
2037     new CompletionNode(d = new HandleCompletion<T,U>
2038     (this, fn, dst, e));
2039     while ((r = result) == null) {
2040     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2041     p.next = completions, p))
2042     break;
2043     }
2044     }
2045     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2046     T t; Throwable ex;
2047     if (r instanceof AltResult) {
2048     ex = ((AltResult)r).ex;
2049     t = null;
2050     }
2051     else {
2052     ex = null;
2053     @SuppressWarnings("unchecked") T tr = (T) r;
2054     t = tr;
2055     }
2056 jsr166 1.90 U u = null;
2057 dl 1.88 Throwable dx = null;
2058     try {
2059     if (e != null)
2060 dl 1.96 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2061 dl 1.88 else {
2062     u = fn.apply(t, ex);
2063     dx = null;
2064     }
2065     } catch (Throwable rex) {
2066     dx = rex;
2067     u = null;
2068     }
2069     if (e == null || dx != null)
2070     dst.internalComplete(u, dx);
2071     }
2072     helpPostComplete();
2073     return dst;
2074     }
2075    
2076    
2077     // public methods
2078    
2079     /**
2080     * Creates a new incomplete CompletableFuture.
2081     */
2082     public CompletableFuture() {
2083     }
2084    
2085     /**
2086     * Returns a new CompletableFuture that is asynchronously completed
2087     * by a task running in the {@link ForkJoinPool#commonPool()} with
2088     * the value obtained by calling the given Supplier.
2089     *
2090     * @param supplier a function returning the value to be used
2091     * to complete the returned CompletableFuture
2092 jsr166 1.95 * @param <U> the function's return type
2093 dl 1.88 * @return the new CompletableFuture
2094     */
2095     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2096     if (supplier == null) throw new NullPointerException();
2097     CompletableFuture<U> f = new CompletableFuture<U>();
2098 dl 1.96 execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
2099 dl 1.88 return f;
2100     }
2101    
2102     /**
2103     * Returns a new CompletableFuture that is asynchronously completed
2104     * by a task running in the given executor with the value obtained
2105     * by calling the given Supplier.
2106     *
2107     * @param supplier a function returning the value to be used
2108     * to complete the returned CompletableFuture
2109     * @param executor the executor to use for asynchronous execution
2110 jsr166 1.95 * @param <U> the function's return type
2111 dl 1.88 * @return the new CompletableFuture
2112     */
2113     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2114     Executor executor) {
2115     if (executor == null || supplier == null)
2116     throw new NullPointerException();
2117     CompletableFuture<U> f = new CompletableFuture<U>();
2118 dl 1.96 execAsync(executor, new AsyncSupply<U>(supplier, f));
2119 dl 1.88 return f;
2120 dl 1.28 }
2121    
2122     /**
2123 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2124     * by a task running in the {@link ForkJoinPool#commonPool()} after
2125     * it runs the given action.
2126 dl 1.28 *
2127     * @param runnable the action to run before completing the
2128     * returned CompletableFuture
2129 jsr166 1.58 * @return the new CompletableFuture
2130 dl 1.28 */
2131     public static CompletableFuture<Void> runAsync(Runnable runnable) {
2132     if (runnable == null) throw new NullPointerException();
2133     CompletableFuture<Void> f = new CompletableFuture<Void>();
2134 dl 1.96 execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f));
2135 dl 1.28 return f;
2136     }
2137    
2138     /**
2139 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2140     * by a task running in the given executor after it runs the given
2141     * action.
2142 dl 1.28 *
2143     * @param runnable the action to run before completing the
2144     * returned CompletableFuture
2145     * @param executor the executor to use for asynchronous execution
2146 jsr166 1.58 * @return the new CompletableFuture
2147 dl 1.28 */
2148     public static CompletableFuture<Void> runAsync(Runnable runnable,
2149     Executor executor) {
2150     if (executor == null || runnable == null)
2151     throw new NullPointerException();
2152     CompletableFuture<Void> f = new CompletableFuture<Void>();
2153 dl 1.96 execAsync(executor, new AsyncRun(runnable, f));
2154 dl 1.28 return f;
2155     }
2156    
2157     /**
2158 dl 1.77 * Returns a new CompletableFuture that is already completed with
2159     * the given value.
2160     *
2161     * @param value the value
2162 jsr166 1.95 * @param <U> the type of the value
2163 dl 1.77 * @return the completed CompletableFuture
2164     */
2165     public static <U> CompletableFuture<U> completedFuture(U value) {
2166     CompletableFuture<U> f = new CompletableFuture<U>();
2167 jsr166 1.78 f.result = (value == null) ? NIL : value;
2168 dl 1.77 return f;
2169     }
2170    
2171     /**
2172 dl 1.28 * Returns {@code true} if completed in any fashion: normally,
2173     * exceptionally, or via cancellation.
2174     *
2175     * @return {@code true} if completed
2176     */
2177     public boolean isDone() {
2178     return result != null;
2179     }
2180    
2181     /**
2182 dl 1.49 * Waits if necessary for this future to complete, and then
2183 dl 1.48 * returns its result.
2184 dl 1.28 *
2185 dl 1.48 * @return the result value
2186     * @throws CancellationException if this future was cancelled
2187     * @throws ExecutionException if this future completed exceptionally
2188 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2189     * while waiting
2190     */
2191     public T get() throws InterruptedException, ExecutionException {
2192     Object r; Throwable ex, cause;
2193     if ((r = result) == null && (r = waitingGet(true)) == null)
2194     throw new InterruptedException();
2195 jsr166 1.45 if (!(r instanceof AltResult)) {
2196     @SuppressWarnings("unchecked") T tr = (T) r;
2197     return tr;
2198     }
2199     if ((ex = ((AltResult)r).ex) == null)
2200 dl 1.28 return null;
2201 jsr166 1.45 if (ex instanceof CancellationException)
2202     throw (CancellationException)ex;
2203     if ((ex instanceof CompletionException) &&
2204     (cause = ex.getCause()) != null)
2205     ex = cause;
2206     throw new ExecutionException(ex);
2207 dl 1.28 }
2208    
2209     /**
2210 dl 1.49 * Waits if necessary for at most the given time for this future
2211     * to complete, and then returns its result, if available.
2212 dl 1.28 *
2213     * @param timeout the maximum time to wait
2214     * @param unit the time unit of the timeout argument
2215 dl 1.48 * @return the result value
2216     * @throws CancellationException if this future was cancelled
2217     * @throws ExecutionException if this future completed exceptionally
2218 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2219     * while waiting
2220     * @throws TimeoutException if the wait timed out
2221     */
2222     public T get(long timeout, TimeUnit unit)
2223     throws InterruptedException, ExecutionException, TimeoutException {
2224     Object r; Throwable ex, cause;
2225     long nanos = unit.toNanos(timeout);
2226     if (Thread.interrupted())
2227     throw new InterruptedException();
2228     if ((r = result) == null)
2229     r = timedAwaitDone(nanos);
2230 jsr166 1.45 if (!(r instanceof AltResult)) {
2231     @SuppressWarnings("unchecked") T tr = (T) r;
2232     return tr;
2233     }
2234     if ((ex = ((AltResult)r).ex) == null)
2235 dl 1.28 return null;
2236 jsr166 1.45 if (ex instanceof CancellationException)
2237     throw (CancellationException)ex;
2238     if ((ex instanceof CompletionException) &&
2239     (cause = ex.getCause()) != null)
2240     ex = cause;
2241     throw new ExecutionException(ex);
2242 dl 1.28 }
2243    
2244     /**
2245     * Returns the result value when complete, or throws an
2246     * (unchecked) exception if completed exceptionally. To better
2247     * conform with the use of common functional forms, if a
2248     * computation involved in the completion of this
2249     * CompletableFuture threw an exception, this method throws an
2250     * (unchecked) {@link CompletionException} with the underlying
2251     * exception as its cause.
2252     *
2253     * @return the result value
2254     * @throws CancellationException if the computation was cancelled
2255 jsr166 1.55 * @throws CompletionException if this future completed
2256     * exceptionally or a completion computation threw an exception
2257 dl 1.28 */
2258     public T join() {
2259     Object r; Throwable ex;
2260     if ((r = result) == null)
2261     r = waitingGet(false);
2262 jsr166 1.45 if (!(r instanceof AltResult)) {
2263     @SuppressWarnings("unchecked") T tr = (T) r;
2264     return tr;
2265     }
2266     if ((ex = ((AltResult)r).ex) == null)
2267 dl 1.28 return null;
2268 jsr166 1.45 if (ex instanceof CancellationException)
2269     throw (CancellationException)ex;
2270     if (ex instanceof CompletionException)
2271     throw (CompletionException)ex;
2272     throw new CompletionException(ex);
2273 dl 1.28 }
2274    
2275     /**
2276     * Returns the result value (or throws any encountered exception)
2277     * if completed, else returns the given valueIfAbsent.
2278     *
2279     * @param valueIfAbsent the value to return if not completed
2280     * @return the result value, if completed, else the given valueIfAbsent
2281     * @throws CancellationException if the computation was cancelled
2282 jsr166 1.55 * @throws CompletionException if this future completed
2283     * exceptionally or a completion computation threw an exception
2284 dl 1.28 */
2285     public T getNow(T valueIfAbsent) {
2286     Object r; Throwable ex;
2287     if ((r = result) == null)
2288     return valueIfAbsent;
2289 jsr166 1.45 if (!(r instanceof AltResult)) {
2290     @SuppressWarnings("unchecked") T tr = (T) r;
2291     return tr;
2292     }
2293     if ((ex = ((AltResult)r).ex) == null)
2294 dl 1.28 return null;
2295 jsr166 1.45 if (ex instanceof CancellationException)
2296     throw (CancellationException)ex;
2297     if (ex instanceof CompletionException)
2298     throw (CompletionException)ex;
2299     throw new CompletionException(ex);
2300 dl 1.28 }
2301    
2302     /**
2303     * If not already completed, sets the value returned by {@link
2304     * #get()} and related methods to the given value.
2305     *
2306     * @param value the result value
2307     * @return {@code true} if this invocation caused this CompletableFuture
2308     * to transition to a completed state, else {@code false}
2309     */
2310     public boolean complete(T value) {
2311     boolean triggered = result == null &&
2312     UNSAFE.compareAndSwapObject(this, RESULT, null,
2313     value == null ? NIL : value);
2314     postComplete();
2315     return triggered;
2316     }
2317    
2318     /**
2319     * If not already completed, causes invocations of {@link #get()}
2320     * and related methods to throw the given exception.
2321     *
2322     * @param ex the exception
2323     * @return {@code true} if this invocation caused this CompletableFuture
2324     * to transition to a completed state, else {@code false}
2325     */
2326     public boolean completeExceptionally(Throwable ex) {
2327     if (ex == null) throw new NullPointerException();
2328     boolean triggered = result == null &&
2329     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2330     postComplete();
2331     return triggered;
2332     }
2333    
2334 dl 1.88 // CompletionStage methods
2335    
2336     public <U> CompletableFuture<U> thenApply
2337     (Function<? super T,? extends U> fn) {
2338 dl 1.28 return doThenApply(fn, null);
2339     }
2340    
2341 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2342     (Function<? super T,? extends U> fn) {
2343 dl 1.28 return doThenApply(fn, ForkJoinPool.commonPool());
2344 dl 1.17 }
2345    
2346 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2347     (Function<? super T,? extends U> fn,
2348     Executor executor) {
2349 dl 1.28 if (executor == null) throw new NullPointerException();
2350     return doThenApply(fn, executor);
2351     }
2352 dl 1.1
2353 dl 1.88 public CompletableFuture<Void> thenAccept
2354     (Consumer<? super T> action) {
2355     return doThenAccept(action, null);
2356 dl 1.28 }
2357    
2358 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2359     (Consumer<? super T> action) {
2360     return doThenAccept(action, ForkJoinPool.commonPool());
2361 dl 1.28 }
2362    
2363 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2364     (Consumer<? super T> action,
2365     Executor executor) {
2366 dl 1.28 if (executor == null) throw new NullPointerException();
2367 dl 1.88 return doThenAccept(action, executor);
2368 dl 1.7 }
2369    
2370 dl 1.88 public CompletableFuture<Void> thenRun
2371     (Runnable action) {
2372 dl 1.28 return doThenRun(action, null);
2373     }
2374    
2375 dl 1.88 public CompletableFuture<Void> thenRunAsync
2376     (Runnable action) {
2377 dl 1.28 return doThenRun(action, ForkJoinPool.commonPool());
2378     }
2379    
2380 dl 1.88 public CompletableFuture<Void> thenRunAsync
2381     (Runnable action,
2382     Executor executor) {
2383 dl 1.28 if (executor == null) throw new NullPointerException();
2384     return doThenRun(action, executor);
2385     }
2386    
2387 dl 1.48 public <U,V> CompletableFuture<V> thenCombine
2388 dl 1.88 (CompletionStage<? extends U> other,
2389 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2390 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, null);
2391 dl 1.28 }
2392    
2393 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2394 dl 1.88 (CompletionStage<? extends U> other,
2395 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2396 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn,
2397     ForkJoinPool.commonPool());
2398 dl 1.28 }
2399    
2400 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2401 dl 1.88 (CompletionStage<? extends U> other,
2402 dl 1.48 BiFunction<? super T,? super U,? extends V> fn,
2403     Executor executor) {
2404 dl 1.28 if (executor == null) throw new NullPointerException();
2405 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, executor);
2406 dl 1.1 }
2407    
2408 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBoth
2409 dl 1.88 (CompletionStage<? extends U> other,
2410     BiConsumer<? super T, ? super U> action) {
2411     return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2412 dl 1.28 }
2413    
2414 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2415 dl 1.88 (CompletionStage<? extends U> other,
2416     BiConsumer<? super T, ? super U> action) {
2417     return doThenAcceptBoth(other.toCompletableFuture(), action,
2418     ForkJoinPool.commonPool());
2419 dl 1.28 }
2420    
2421 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2422 dl 1.88 (CompletionStage<? extends U> other,
2423     BiConsumer<? super T, ? super U> action,
2424 dl 1.48 Executor executor) {
2425 dl 1.28 if (executor == null) throw new NullPointerException();
2426 dl 1.88 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2427 dl 1.28 }
2428    
2429 dl 1.88 public CompletableFuture<Void> runAfterBoth
2430     (CompletionStage<?> other,
2431     Runnable action) {
2432     return doRunAfterBoth(other.toCompletableFuture(), action, null);
2433 dl 1.7 }
2434    
2435 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2436     (CompletionStage<?> other,
2437     Runnable action) {
2438     return doRunAfterBoth(other.toCompletableFuture(), action,
2439     ForkJoinPool.commonPool());
2440 dl 1.28 }
2441    
2442 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2443     (CompletionStage<?> other,
2444     Runnable action,
2445     Executor executor) {
2446 dl 1.28 if (executor == null) throw new NullPointerException();
2447 dl 1.88 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2448 dl 1.28 }
2449    
2450 dl 1.1
2451 dl 1.48 public <U> CompletableFuture<U> applyToEither
2452 dl 1.88 (CompletionStage<? extends T> other,
2453 dl 1.48 Function<? super T, U> fn) {
2454 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, null);
2455 dl 1.28 }
2456    
2457 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2458 dl 1.88 (CompletionStage<? extends T> other,
2459 dl 1.48 Function<? super T, U> fn) {
2460 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn,
2461     ForkJoinPool.commonPool());
2462 dl 1.28 }
2463    
2464 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2465 dl 1.88 (CompletionStage<? extends T> other,
2466 dl 1.48 Function<? super T, U> fn,
2467     Executor executor) {
2468 dl 1.28 if (executor == null) throw new NullPointerException();
2469 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2470 dl 1.1 }
2471    
2472 dl 1.48 public CompletableFuture<Void> acceptEither
2473 dl 1.88 (CompletionStage<? extends T> other,
2474     Consumer<? super T> action) {
2475     return doAcceptEither(other.toCompletableFuture(), action, null);
2476 dl 1.28 }
2477    
2478 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2479 dl 1.88 (CompletionStage<? extends T> other,
2480     Consumer<? super T> action) {
2481     return doAcceptEither(other.toCompletableFuture(), action,
2482     ForkJoinPool.commonPool());
2483 dl 1.28 }
2484    
2485 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2486 dl 1.88 (CompletionStage<? extends T> other,
2487     Consumer<? super T> action,
2488 dl 1.48 Executor executor) {
2489 dl 1.28 if (executor == null) throw new NullPointerException();
2490 dl 1.88 return doAcceptEither(other.toCompletableFuture(), action, executor);
2491 dl 1.7 }
2492    
2493 dl 1.88 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2494 dl 1.28 Runnable action) {
2495 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, null);
2496 dl 1.28 }
2497    
2498 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2499 dl 1.88 (CompletionStage<?> other,
2500 dl 1.48 Runnable action) {
2501 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action,
2502     ForkJoinPool.commonPool());
2503 dl 1.28 }
2504    
2505 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2506 dl 1.88 (CompletionStage<?> other,
2507 dl 1.48 Runnable action,
2508     Executor executor) {
2509 dl 1.28 if (executor == null) throw new NullPointerException();
2510 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2511 dl 1.1 }
2512    
2513 dl 1.48 public <U> CompletableFuture<U> thenCompose
2514 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2515 jsr166 1.81 return doThenCompose(fn, null);
2516 dl 1.37 }
2517    
2518 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2519 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2520 jsr166 1.81 return doThenCompose(fn, ForkJoinPool.commonPool());
2521 dl 1.37 }
2522    
2523 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2524 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn,
2525 dl 1.48 Executor executor) {
2526 dl 1.37 if (executor == null) throw new NullPointerException();
2527 jsr166 1.81 return doThenCompose(fn, executor);
2528 dl 1.37 }
2529    
2530 dl 1.88 public CompletableFuture<T> whenComplete
2531     (BiConsumer<? super T, ? super Throwable> action) {
2532     return doWhenComplete(action, null);
2533     }
2534    
2535     public CompletableFuture<T> whenCompleteAsync
2536     (BiConsumer<? super T, ? super Throwable> action) {
2537     return doWhenComplete(action, ForkJoinPool.commonPool());
2538     }
2539    
2540     public CompletableFuture<T> whenCompleteAsync
2541     (BiConsumer<? super T, ? super Throwable> action,
2542     Executor executor) {
2543     if (executor == null) throw new NullPointerException();
2544     return doWhenComplete(action, executor);
2545     }
2546    
2547     public <U> CompletableFuture<U> handle
2548     (BiFunction<? super T, Throwable, ? extends U> fn) {
2549     return doHandle(fn, null);
2550     }
2551    
2552     public <U> CompletableFuture<U> handleAsync
2553     (BiFunction<? super T, Throwable, ? extends U> fn) {
2554     return doHandle(fn, ForkJoinPool.commonPool());
2555     }
2556    
2557     public <U> CompletableFuture<U> handleAsync
2558     (BiFunction<? super T, Throwable, ? extends U> fn,
2559     Executor executor) {
2560     if (executor == null) throw new NullPointerException();
2561     return doHandle(fn, executor);
2562     }
2563    
2564     /**
2565     * Returns this CompletableFuture
2566     *
2567     * @return this CompletableFuture
2568     */
2569     public CompletableFuture<T> toCompletableFuture() {
2570     return this;
2571 dl 1.28 }
2572    
2573 dl 1.88 // not in interface CompletionStage
2574    
2575 dl 1.28 /**
2576 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
2577     * CompletableFuture completes, with the result of the given
2578     * function of the exception triggering this CompletableFuture's
2579     * completion when it completes exceptionally; otherwise, if this
2580     * CompletableFuture completes normally, then the returned
2581     * CompletableFuture also completes normally with the same value.
2582 dl 1.88 * Note: More flexible versions of this functionality are
2583     * available using methods {@code whenComplete} and {@code handle}.
2584 dl 1.28 *
2585     * @param fn the function to use to compute the value of the
2586     * returned CompletableFuture if this CompletableFuture completed
2587     * exceptionally
2588     * @return the new CompletableFuture
2589     */
2590 dl 1.48 public CompletableFuture<T> exceptionally
2591     (Function<Throwable, ? extends T> fn) {
2592 dl 1.28 if (fn == null) throw new NullPointerException();
2593     CompletableFuture<T> dst = new CompletableFuture<T>();
2594     ExceptionCompletion<T> d = null;
2595     Object r;
2596     if ((r = result) == null) {
2597     CompletionNode p =
2598 dl 1.88 new CompletionNode(d = new ExceptionCompletion<T>
2599     (this, fn, dst));
2600 dl 1.28 while ((r = result) == null) {
2601     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2602     p.next = completions, p))
2603     break;
2604     }
2605     }
2606     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2607     T t = null; Throwable ex, dx = null;
2608     if (r instanceof AltResult) {
2609 jsr166 1.87 if ((ex = ((AltResult)r).ex) != null) {
2610 dl 1.28 try {
2611     t = fn.apply(ex);
2612     } catch (Throwable rex) {
2613     dx = rex;
2614     }
2615     }
2616     }
2617     else {
2618     @SuppressWarnings("unchecked") T tr = (T) r;
2619     t = tr;
2620     }
2621     dst.internalComplete(t, dx);
2622     }
2623     helpPostComplete();
2624     return dst;
2625     }
2626    
2627 dl 1.35 /* ------------- Arbitrary-arity constructions -------------- */
2628    
2629     /*
2630     * The basic plan of attack is to recursively form binary
2631     * completion trees of elements. This can be overkill for small
2632     * sets, but scales nicely. The And/All vs Or/Any forms use the
2633     * same idea, but details differ.
2634     */
2635    
2636     /**
2637     * Returns a new CompletableFuture that is completed when all of
2638 jsr166 1.66 * the given CompletableFutures complete. If any of the given
2639 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
2640     * CompletableFuture also does so, with a CompletionException
2641     * holding this exception as its cause. Otherwise, the results,
2642     * if any, of the given CompletableFutures are not reflected in
2643     * the returned CompletableFuture, but may be obtained by
2644     * inspecting them individually. If no CompletableFutures are
2645     * provided, returns a CompletableFuture completed with the value
2646     * {@code null}.
2647 dl 1.35 *
2648     * <p>Among the applications of this method is to await completion
2649     * of a set of independent CompletableFutures before continuing a
2650     * program, as in: {@code CompletableFuture.allOf(c1, c2,
2651     * c3).join();}.
2652     *
2653     * @param cfs the CompletableFutures
2654 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
2655 dl 1.35 * given CompletableFutures complete
2656     * @throws NullPointerException if the array or any of its elements are
2657     * {@code null}
2658     */
2659     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2660     int len = cfs.length; // Directly handle empty and singleton cases
2661     if (len > 1)
2662     return allTree(cfs, 0, len - 1);
2663     else {
2664     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2665     CompletableFuture<?> f;
2666     if (len == 0)
2667     dst.result = NIL;
2668     else if ((f = cfs[0]) == null)
2669     throw new NullPointerException();
2670     else {
2671 dl 1.75 ThenPropagate d = null;
2672 dl 1.35 CompletionNode p = null;
2673     Object r;
2674     while ((r = f.result) == null) {
2675     if (d == null)
2676 dl 1.75 d = new ThenPropagate(f, dst);
2677 dl 1.35 else if (p == null)
2678     p = new CompletionNode(d);
2679     else if (UNSAFE.compareAndSwapObject
2680     (f, COMPLETIONS, p.next = f.completions, p))
2681     break;
2682     }
2683     if (r != null && (d == null || d.compareAndSet(0, 1)))
2684     dst.internalComplete(null, (r instanceof AltResult) ?
2685     ((AltResult)r).ex : null);
2686     f.helpPostComplete();
2687     }
2688     return dst;
2689     }
2690     }
2691    
2692     /**
2693     * Recursively constructs an And'ed tree of CompletableFutures.
2694     * Called only when array known to have at least two elements.
2695     */
2696     private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2697     int lo, int hi) {
2698     CompletableFuture<?> fst, snd;
2699     int mid = (lo + hi) >>> 1;
2700     if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2701     (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2702     throw new NullPointerException();
2703     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2704     AndCompletion d = null;
2705     CompletionNode p = null, q = null;
2706     Object r = null, s = null;
2707     while ((r = fst.result) == null || (s = snd.result) == null) {
2708     if (d == null)
2709     d = new AndCompletion(fst, snd, dst);
2710     else if (p == null)
2711     p = new CompletionNode(d);
2712     else if (q == null) {
2713     if (UNSAFE.compareAndSwapObject
2714     (fst, COMPLETIONS, p.next = fst.completions, p))
2715     q = new CompletionNode(d);
2716     }
2717     else if (UNSAFE.compareAndSwapObject
2718     (snd, COMPLETIONS, q.next = snd.completions, q))
2719     break;
2720     }
2721     if ((r != null || (r = fst.result) != null) &&
2722     (s != null || (s = snd.result) != null) &&
2723     (d == null || d.compareAndSet(0, 1))) {
2724     Throwable ex;
2725     if (r instanceof AltResult)
2726     ex = ((AltResult)r).ex;
2727     else
2728     ex = null;
2729     if (ex == null && (s instanceof AltResult))
2730     ex = ((AltResult)s).ex;
2731     dst.internalComplete(null, ex);
2732     }
2733     fst.helpPostComplete();
2734     snd.helpPostComplete();
2735     return dst;
2736     }
2737    
2738     /**
2739 dl 1.76 * Returns a new CompletableFuture that is completed when any of
2740 jsr166 1.79 * the given CompletableFutures complete, with the same result.
2741     * Otherwise, if it completed exceptionally, the returned
2742 dl 1.77 * CompletableFuture also does so, with a CompletionException
2743     * holding this exception as its cause. If no CompletableFutures
2744     * are provided, returns an incomplete CompletableFuture.
2745 dl 1.35 *
2746     * @param cfs the CompletableFutures
2747 dl 1.77 * @return a new CompletableFuture that is completed with the
2748     * result or exception of any of the given CompletableFutures when
2749     * one completes
2750 dl 1.35 * @throws NullPointerException if the array or any of its elements are
2751     * {@code null}
2752     */
2753 dl 1.77 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2754 dl 1.35 int len = cfs.length; // Same idea as allOf
2755     if (len > 1)
2756     return anyTree(cfs, 0, len - 1);
2757     else {
2758 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2759 dl 1.35 CompletableFuture<?> f;
2760     if (len == 0)
2761 dl 1.48 ; // skip
2762 dl 1.35 else if ((f = cfs[0]) == null)
2763     throw new NullPointerException();
2764     else {
2765 dl 1.77 ThenCopy<Object> d = null;
2766 dl 1.35 CompletionNode p = null;
2767     Object r;
2768     while ((r = f.result) == null) {
2769     if (d == null)
2770 dl 1.77 d = new ThenCopy<Object>(f, dst);
2771 dl 1.35 else if (p == null)
2772     p = new CompletionNode(d);
2773     else if (UNSAFE.compareAndSwapObject
2774     (f, COMPLETIONS, p.next = f.completions, p))
2775     break;
2776     }
2777     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2778     Throwable ex; Object t;
2779 dl 1.77 if (r instanceof AltResult) {
2780 dl 1.35 ex = ((AltResult)r).ex;
2781 dl 1.77 t = null;
2782     }
2783     else {
2784     ex = null;
2785     t = r;
2786     }
2787     dst.internalComplete(t, ex);
2788 dl 1.35 }
2789     f.helpPostComplete();
2790     }
2791     return dst;
2792     }
2793     }
2794    
2795     /**
2796 jsr166 1.44 * Recursively constructs an Or'ed tree of CompletableFutures.
2797 dl 1.35 */
2798 dl 1.77 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2799 jsr166 1.79 int lo, int hi) {
2800 dl 1.35 CompletableFuture<?> fst, snd;
2801     int mid = (lo + hi) >>> 1;
2802     if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2803     (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2804     throw new NullPointerException();
2805 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2806 dl 1.35 OrCompletion d = null;
2807     CompletionNode p = null, q = null;
2808     Object r;
2809     while ((r = fst.result) == null && (r = snd.result) == null) {
2810     if (d == null)
2811     d = new OrCompletion(fst, snd, dst);
2812     else if (p == null)
2813     p = new CompletionNode(d);
2814     else if (q == null) {
2815     if (UNSAFE.compareAndSwapObject
2816     (fst, COMPLETIONS, p.next = fst.completions, p))
2817     q = new CompletionNode(d);
2818     }
2819     else if (UNSAFE.compareAndSwapObject
2820     (snd, COMPLETIONS, q.next = snd.completions, q))
2821     break;
2822     }
2823     if ((r != null || (r = fst.result) != null ||
2824     (r = snd.result) != null) &&
2825     (d == null || d.compareAndSet(0, 1))) {
2826 dl 1.77 Throwable ex; Object t;
2827 dl 1.35 if (r instanceof AltResult) {
2828     ex = ((AltResult)r).ex;
2829 dl 1.77 t = null;
2830 dl 1.35 }
2831 dl 1.77 else {
2832 dl 1.35 ex = null;
2833 dl 1.77 t = r;
2834     }
2835     dst.internalComplete(t, ex);
2836 dl 1.35 }
2837     fst.helpPostComplete();
2838     snd.helpPostComplete();
2839     return dst;
2840     }
2841    
2842     /* ------------- Control and status methods -------------- */
2843    
2844 dl 1.28 /**
2845 dl 1.37 * If not already completed, completes this CompletableFuture with
2846     * a {@link CancellationException}. Dependent CompletableFutures
2847     * that have not already completed will also complete
2848     * exceptionally, with a {@link CompletionException} caused by
2849     * this {@code CancellationException}.
2850 dl 1.28 *
2851     * @param mayInterruptIfRunning this value has no effect in this
2852     * implementation because interrupts are not used to control
2853     * processing.
2854     *
2855     * @return {@code true} if this task is now cancelled
2856     */
2857     public boolean cancel(boolean mayInterruptIfRunning) {
2858 dl 1.46 boolean cancelled = (result == null) &&
2859     UNSAFE.compareAndSwapObject
2860     (this, RESULT, null, new AltResult(new CancellationException()));
2861     postComplete();
2862 dl 1.48 return cancelled || isCancelled();
2863 dl 1.28 }
2864    
2865     /**
2866     * Returns {@code true} if this CompletableFuture was cancelled
2867     * before it completed normally.
2868     *
2869     * @return {@code true} if this CompletableFuture was cancelled
2870     * before it completed normally
2871     */
2872     public boolean isCancelled() {
2873     Object r;
2874 jsr166 1.43 return ((r = result) instanceof AltResult) &&
2875     (((AltResult)r).ex instanceof CancellationException);
2876 dl 1.28 }
2877    
2878     /**
2879 dl 1.88 * Returns {@code true} if this CompletableFuture completed
2880 dl 1.91 * exceptionally, in any way. Possible causes include
2881     * cancellation, explicit invocation of {@code
2882     * completeExceptionally}, and abrupt termination of a
2883     * CompletionStage action.
2884 dl 1.88 *
2885     * @return {@code true} if this CompletableFuture completed
2886     * exceptionally
2887     */
2888     public boolean isCompletedExceptionally() {
2889 dl 1.91 Object r;
2890     return ((r = result) instanceof AltResult) && r != NIL;
2891 dl 1.88 }
2892    
2893     /**
2894 dl 1.28 * Forcibly sets or resets the value subsequently returned by
2895 jsr166 1.42 * method {@link #get()} and related methods, whether or not
2896     * already completed. This method is designed for use only in
2897     * error recovery actions, and even in such situations may result
2898     * in ongoing dependent completions using established versus
2899 dl 1.30 * overwritten outcomes.
2900 dl 1.28 *
2901     * @param value the completion value
2902     */
2903     public void obtrudeValue(T value) {
2904     result = (value == null) ? NIL : value;
2905     postComplete();
2906     }
2907    
2908 dl 1.30 /**
2909 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
2910     * and related methods to throw the given exception, whether or
2911     * not already completed. This method is designed for use only in
2912 dl 1.30 * recovery actions, and even in such situations may result in
2913     * ongoing dependent completions using established versus
2914     * overwritten outcomes.
2915     *
2916     * @param ex the exception
2917     */
2918     public void obtrudeException(Throwable ex) {
2919     if (ex == null) throw new NullPointerException();
2920     result = new AltResult(ex);
2921     postComplete();
2922     }
2923    
2924 dl 1.35 /**
2925     * Returns the estimated number of CompletableFutures whose
2926     * completions are awaiting completion of this CompletableFuture.
2927     * This method is designed for use in monitoring system state, not
2928     * for synchronization control.
2929     *
2930     * @return the number of dependent CompletableFutures
2931     */
2932     public int getNumberOfDependents() {
2933     int count = 0;
2934     for (CompletionNode p = completions; p != null; p = p.next)
2935     ++count;
2936     return count;
2937     }
2938    
2939     /**
2940     * Returns a string identifying this CompletableFuture, as well as
2941 jsr166 1.40 * its completion state. The state, in brackets, contains the
2942 dl 1.35 * String {@code "Completed Normally"} or the String {@code
2943     * "Completed Exceptionally"}, or the String {@code "Not
2944     * completed"} followed by the number of CompletableFutures
2945     * dependent upon its completion, if any.
2946     *
2947     * @return a string identifying this CompletableFuture, as well as its state
2948     */
2949     public String toString() {
2950     Object r = result;
2951 jsr166 1.40 int count;
2952     return super.toString() +
2953     ((r == null) ?
2954     (((count = getNumberOfDependents()) == 0) ?
2955     "[Not completed]" :
2956     "[Not completed, " + count + " dependents]") :
2957     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2958     "[Completed exceptionally]" :
2959     "[Completed normally]"));
2960 dl 1.35 }
2961    
2962 dl 1.1 // Unsafe mechanics
2963     private static final sun.misc.Unsafe UNSAFE;
2964     private static final long RESULT;
2965     private static final long WAITERS;
2966     private static final long COMPLETIONS;
2967     static {
2968     try {
2969     UNSAFE = sun.misc.Unsafe.getUnsafe();
2970     Class<?> k = CompletableFuture.class;
2971     RESULT = UNSAFE.objectFieldOffset
2972     (k.getDeclaredField("result"));
2973     WAITERS = UNSAFE.objectFieldOffset
2974     (k.getDeclaredField("waiters"));
2975     COMPLETIONS = UNSAFE.objectFieldOffset
2976     (k.getDeclaredField("completions"));
2977     } catch (Exception e) {
2978     throw new Error(e);
2979     }
2980     }
2981     }