ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CompletableFuture.java
Revision: 1.14
Committed: Sat Feb 16 20:50:29 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.13: +0 -1 lines
Log Message:
javadoc comment correctness

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.concurrent.Future;
9     import java.util.concurrent.TimeUnit;
10     import java.util.concurrent.Executor;
11     import java.util.concurrent.ThreadLocalRandom;
12     import java.util.concurrent.ExecutionException;
13     import java.util.concurrent.TimeoutException;
14     import java.util.concurrent.CancellationException;
15     import java.util.concurrent.atomic.AtomicInteger;
16     import java.util.concurrent.locks.LockSupport;
17    
18     /**
19     * A {@link Future} that may be explicitly completed (setting its
20     * value and status), and may include dependent functions and actions
21     * that trigger upon its completion. Methods are available for adding
22     * those based on Functions, Blocks, and Runnables, depending on
23     * whether they require arguments and/or produce results, as well as
24     * those triggered after either or both the current and another
25     * CompletableFuture complete. Functions and actions supplied for
26     * dependent completions (mainly using methods with prefix {@code
27     * then}) may be performed by the thread that completes the current
28     * CompletableFuture, or by any other caller of these methods. There
29     * are no guarantees about the order of processing completions unless
30     * constrained by these methods.
31     *
32     * <p>When two or more threads attempt to {@link #complete} or {@link
33     * #completeExceptionally} a CompletableFuture, only one of them
34     * succeeds.
35     *
36     * <p>Upon exceptional completion, or when a completion entails
37     * computation of a function or action, and it terminates abruptly
38     * with an (unchecked) exception or error, then further completions
39     * act as {@code completeExceptionally} with a {@link
40     * CompletionException} holding that exception as its cause. If a
41     * CompletableFuture completes exceptionally, and is not followed by a
42     * {@link #exceptionally} or {@link #handle} completion, then all of
43     * its dependents (and their dependents) also complete exceptionally
44     * with CompletionExceptions holding the ultimate cause. In case of a
45     * CompletionException, methods {@link #get()} and {@link #get(long,
46     * TimeUnit)} throw an {@link ExecutionException} with the same cause
47     * as would be held in the corresponding CompletionException. However,
48     * in these cases, methods {@link #join()} and {@link #getNow} throw
49     * the CompletionException, which simplifies usage especially within
50     * other completion functions.
51     *
52     * <p>CompletableFutures themselves do not execute asynchronously.
53     * However, the {@code async} methods provide commonly useful ways to
54     * commence asynchronous processing, using either a given {@link
55     * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
56     * function or action that will result in the completion of a new
57     * CompletableFuture. To simplify monitoring, debugging, and tracking,
58     * all generated asynchronous tasks are instances of the tagging
59     * interface {@link AsynchronousCompletionTask}.
60     *
61     * <p><em>jsr166e note: During transition, this class
62     * uses nested functional interfaces with different names but the
63     * same forms as those expected for JDK8.</em>
64     *
65     * @author Doug Lea
66     * @since 1.8
67     */
68     public class CompletableFuture<T> implements Future<T> {
69     // jsr166e nested interfaces
70    
71     /** Interface describing a void action of one argument */
72     public interface Action<A> { void accept(A a); }
73     /** Interface describing a void action of two arguments */
74     public interface BiAction<A,B> { void accept(A a, B b); }
75     /** Interface describing a function of one argument */
76     public interface Fun<A,T> { T apply(A a); }
77     /** Interface describing a function of two arguments */
78     public interface BiFun<A,B,T> { T apply(A a, B b); }
79     /** Interface describing a function of no arguments */
80     public interface Generator<T> { T get(); }
81    
82    
83     /*
84     * Overview:
85     *
86 jsr166 1.5 * 1. Non-nullness of field result (set via CAS) indicates done.
87     * An AltResult is used to box null as a result, as well as to
88     * hold exceptions. Using a single field makes completion fast
89 dl 1.1 * and simple to detect and trigger, at the expense of a lot of
90     * encoding and decoding that infiltrates many methods. One minor
91     * simplification relies on the (static) NIL (to box null results)
92     * being the only AltResult with a null exception field, so we
93     * don't usually need explicit comparisons with NIL. The CF
94     * exception propagation mechanics surrounding decoding rely on
95     * unchecked casts of decoded results really being unchecked,
96     * where user type errors are caught at point of use, as is
97     * currently the case in Java. These are highlighted by using
98     * SuppressWarnings-annotated temporaries.
99     *
100     * 2. Waiters are held in a Treiber stack similar to the one used
101     * in FutureTask, Phaser, and SynchronousQueue. See their
102     * internal documentation for algorithmic details.
103     *
104     * 3. Completions are also kept in a list/stack, and pulled off
105     * and run when completion is triggered. (We could even use the
106     * same stack as for waiters, but would give up the potential
107     * parallelism obtained because woken waiters help release/run
108     * others -- see method postComplete). Because post-processing
109     * may race with direct calls, class Completion opportunistically
110     * extends AtomicInteger so callers can claim the action via
111     * compareAndSet(0, 1). The Completion.run methods are all
112     * written a boringly similar uniform way (that sometimes includes
113     * unnecessary-looking checks, kept to maintain uniformity). There
114     * are enough dimensions upon which they differ that factoring to
115     * use common code isn't worthwhile.
116     *
117     * 4. The exported then/and/or methods do support a bit of
118     * factoring (see doThenApply etc). They must cope with the
119     * intrinsic races surrounding addition of a dependent action
120     * versus performing the action directly because the task is
121     * already complete. For example, a CF may not be complete upon
122     * entry, so a dependent completion is added, but by the time it
123     * is added, the target CF is complete, so must be directly
124     * executed. This is all done while avoiding unnecessary object
125     * construction in safe-bypass cases.
126     */
127    
128     // preliminaries
129    
130     static final class AltResult {
131     final Throwable ex; // null only for NIL
132     AltResult(Throwable ex) { this.ex = ex; }
133     }
134    
135     static final AltResult NIL = new AltResult(null);
136    
137     // Fields
138    
139     volatile Object result; // Either the result or boxed AltResult
140     volatile WaitNode waiters; // Treiber stack of threads blocked on get()
141     volatile CompletionNode completions; // list (Treiber stack) of completions
142    
143     // Basic utilities for triggering and processing completions
144    
145     /**
146     * Removes and signals all waiting threads and runs all completions.
147     */
148     final void postComplete() {
149     WaitNode q; Thread t;
150     while ((q = waiters) != null) {
151     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
152     (t = q.thread) != null) {
153     q.thread = null;
154     LockSupport.unpark(t);
155     }
156     }
157    
158     CompletionNode h; Completion c;
159     while ((h = completions) != null) {
160     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
161     (c = h.completion) != null)
162     c.run();
163     }
164     }
165    
166     /**
167     * Triggers completion with the encoding of the given arguments:
168     * if the exception is non-null, encodes it as a wrapped
169     * CompletionException unless it is one already. Otherwise uses
170     * the given result, boxed as NIL if null.
171     */
172     final void internalComplete(Object v, Throwable ex) {
173     if (result == null)
174     UNSAFE.compareAndSwapObject
175     (this, RESULT, null,
176     (ex == null) ? (v == null) ? NIL : v :
177     new AltResult((ex instanceof CompletionException) ? ex :
178     new CompletionException(ex)));
179     postComplete(); // help out even if not triggered
180     }
181    
182     /**
183     * If triggered, helps release and/or process completions.
184     */
185     final void helpPostComplete() {
186     if (result != null)
187     postComplete();
188     }
189    
190     /* ------------- waiting for completions -------------- */
191    
192     /**
193     * Heuristic spin value for waitingGet() before blocking on
194     * multiprocessors
195     */
196     static final int WAITING_GET_SPINS = 256;
197    
198     /**
199     * Linked nodes to record waiting threads in a Treiber stack. See
200     * other classes such as Phaser and SynchronousQueue for more
201     * detailed explanation. This class implements ManagedBlocker to
202     * avoid starvation when blocking actions pile up in
203     * ForkJoinPools.
204     */
205     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
206     long nanos; // wait time if timed
207     final long deadline; // non-zero if timed
208     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
209     volatile Thread thread;
210     volatile WaitNode next;
211     WaitNode(boolean interruptible, long nanos, long deadline) {
212     this.thread = Thread.currentThread();
213     this.interruptControl = interruptible ? 1 : 0;
214     this.nanos = nanos;
215     this.deadline = deadline;
216     }
217     public boolean isReleasable() {
218     if (thread == null)
219     return true;
220     if (Thread.interrupted()) {
221     int i = interruptControl;
222     interruptControl = -1;
223     if (i > 0)
224     return true;
225     }
226     if (deadline != 0L &&
227     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
228     thread = null;
229     return true;
230     }
231     return false;
232     }
233     public boolean block() {
234     if (isReleasable())
235     return true;
236     else if (deadline == 0L)
237     LockSupport.park(this);
238     else if (nanos > 0L)
239     LockSupport.parkNanos(this, nanos);
240     return isReleasable();
241     }
242     }
243    
244     /**
245     * Returns raw result after waiting, or null if interruptible and
246     * interrupted.
247     */
248     private Object waitingGet(boolean interruptible) {
249     WaitNode q = null;
250     boolean queued = false;
251     int h = 0, spins = 0;
252     for (Object r;;) {
253     if ((r = result) != null) {
254     if (q != null) { // suppress unpark
255     q.thread = null;
256     if (q.interruptControl < 0) {
257     if (interruptible) {
258     removeWaiter(q);
259     return null;
260     }
261     Thread.currentThread().interrupt();
262     }
263     }
264     postComplete(); // help release others
265     return r;
266     }
267     else if (h == 0) {
268     h = ThreadLocalRandom.current().nextInt();
269     if (Runtime.getRuntime().availableProcessors() > 1)
270     spins = WAITING_GET_SPINS;
271     }
272     else if (spins > 0) {
273     h ^= h << 1; // xorshift
274     h ^= h >>> 3;
275     if ((h ^= h << 10) >= 0)
276     --spins;
277     }
278     else if (q == null)
279     q = new WaitNode(interruptible, 0L, 0L);
280     else if (!queued)
281     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
282     q.next = waiters, q);
283     else if (interruptible && q.interruptControl < 0) {
284     removeWaiter(q);
285     return null;
286     }
287     else if (q.thread != null && result == null) {
288     try {
289     ForkJoinPool.managedBlock(q);
290 jsr166 1.4 } catch (InterruptedException ex) {
291 dl 1.1 q.interruptControl = -1;
292     }
293     }
294     }
295     }
296    
297     /**
298     * Awaits completion or aborts on interrupt or timeout.
299     *
300     * @param nanos time to wait
301     * @return raw result
302     */
303     private Object timedAwaitDone(long nanos)
304     throws InterruptedException, TimeoutException {
305     WaitNode q = null;
306     boolean queued = false;
307     for (Object r;;) {
308     if ((r = result) != null) {
309     if (q != null) {
310     q.thread = null;
311     if (q.interruptControl < 0) {
312     removeWaiter(q);
313     throw new InterruptedException();
314     }
315     }
316     postComplete();
317     return r;
318     }
319     else if (q == null) {
320     if (nanos <= 0L)
321     throw new TimeoutException();
322     long d = System.nanoTime() + nanos;
323 jsr166 1.4 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
324 dl 1.1 }
325     else if (!queued)
326     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
327     q.next = waiters, q);
328     else if (q.interruptControl < 0) {
329     removeWaiter(q);
330     throw new InterruptedException();
331     }
332     else if (q.nanos <= 0L) {
333     if (result == null) {
334     removeWaiter(q);
335     throw new TimeoutException();
336     }
337     }
338     else if (q.thread != null && result == null) {
339     try {
340     ForkJoinPool.managedBlock(q);
341 jsr166 1.4 } catch (InterruptedException ex) {
342 dl 1.1 q.interruptControl = -1;
343     }
344     }
345     }
346     }
347    
348     /**
349     * Tries to unlink a timed-out or interrupted wait node to avoid
350     * accumulating garbage. Internal nodes are simply unspliced
351     * without CAS since it is harmless if they are traversed anyway
352     * by releasers. To avoid effects of unsplicing from already
353     * removed nodes, the list is retraversed in case of an apparent
354     * race. This is slow when there are a lot of nodes, but we don't
355     * expect lists to be long enough to outweigh higher-overhead
356     * schemes.
357     */
358     private void removeWaiter(WaitNode node) {
359     if (node != null) {
360     node.thread = null;
361     retry:
362     for (;;) { // restart on removeWaiter race
363     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
364     s = q.next;
365     if (q.thread != null)
366     pred = q;
367     else if (pred != null) {
368     pred.next = s;
369     if (pred.thread == null) // check for race
370     continue retry;
371     }
372     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
373     continue retry;
374     }
375     break;
376     }
377     }
378     }
379    
380     /* ------------- Async tasks -------------- */
381    
382     /**
383     * A tagging interface identifying asynchronous tasks produced by
384     * {@code async} methods. This may be useful for monitoring,
385     * debugging, and tracking asynchronous activities.
386     */
387     public static interface AsynchronousCompletionTask {
388     }
389    
390     /** Base class can act as either FJ or plain Runnable */
391 jsr166 1.7 abstract static class Async extends ForkJoinTask<Void>
392 dl 1.1 implements Runnable, AsynchronousCompletionTask {
393     public final Void getRawResult() { return null; }
394     public final void setRawResult(Void v) { }
395     public final void run() { exec(); }
396     }
397    
398     static final class AsyncRun extends Async {
399     final Runnable fn;
400     final CompletableFuture<Void> dst;
401     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
402     this.fn = fn; this.dst = dst;
403     }
404     public final boolean exec() {
405     CompletableFuture<Void> d; Throwable ex;
406 dl 1.2 if ((d = this.dst) != null && d.result == null) {
407 dl 1.1 try {
408     fn.run();
409     ex = null;
410     } catch (Throwable rex) {
411     ex = rex;
412     }
413     d.internalComplete(null, ex);
414     }
415     return true;
416     }
417     private static final long serialVersionUID = 5232453952276885070L;
418     }
419    
420     static final class AsyncSupply<U> extends Async {
421     final Generator<U> fn;
422     final CompletableFuture<U> dst;
423     AsyncSupply(Generator<U> fn, CompletableFuture<U> dst) {
424     this.fn = fn; this.dst = dst;
425     }
426     public final boolean exec() {
427     CompletableFuture<U> d; U u; Throwable ex;
428 dl 1.2 if ((d = this.dst) != null && d.result == null) {
429 dl 1.1 try {
430     u = fn.get();
431     ex = null;
432     } catch (Throwable rex) {
433     ex = rex;
434     u = null;
435     }
436     d.internalComplete(u, ex);
437     }
438     return true;
439     }
440     private static final long serialVersionUID = 5232453952276885070L;
441     }
442    
443     static final class AsyncApply<T,U> extends Async {
444     final Fun<? super T,? extends U> fn;
445     final T arg;
446     final CompletableFuture<U> dst;
447     AsyncApply(T arg, Fun<? super T,? extends U> fn,
448     CompletableFuture<U> dst) {
449     this.arg = arg; this.fn = fn; this.dst = dst;
450     }
451     public final boolean exec() {
452     CompletableFuture<U> d; U u; Throwable ex;
453 dl 1.2 if ((d = this.dst) != null && d.result == null) {
454 dl 1.1 try {
455     u = fn.apply(arg);
456     ex = null;
457     } catch (Throwable rex) {
458     ex = rex;
459     u = null;
460     }
461     d.internalComplete(u, ex);
462     }
463     return true;
464     }
465     private static final long serialVersionUID = 5232453952276885070L;
466     }
467    
468     static final class AsyncBiApply<T,U,V> extends Async {
469     final BiFun<? super T,? super U,? extends V> fn;
470     final T arg1;
471     final U arg2;
472     final CompletableFuture<V> dst;
473     AsyncBiApply(T arg1, U arg2,
474     BiFun<? super T,? super U,? extends V> fn,
475     CompletableFuture<V> dst) {
476     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
477     }
478     public final boolean exec() {
479     CompletableFuture<V> d; V v; Throwable ex;
480 dl 1.2 if ((d = this.dst) != null && d.result == null) {
481 dl 1.1 try {
482     v = fn.apply(arg1, arg2);
483     ex = null;
484     } catch (Throwable rex) {
485     ex = rex;
486     v = null;
487     }
488     d.internalComplete(v, ex);
489     }
490     return true;
491     }
492     private static final long serialVersionUID = 5232453952276885070L;
493     }
494    
495     static final class AsyncAccept<T> extends Async {
496     final Action<? super T> fn;
497     final T arg;
498     final CompletableFuture<Void> dst;
499     AsyncAccept(T arg, Action<? super T> fn,
500     CompletableFuture<Void> dst) {
501     this.arg = arg; this.fn = fn; this.dst = dst;
502     }
503     public final boolean exec() {
504     CompletableFuture<Void> d; Throwable ex;
505 dl 1.2 if ((d = this.dst) != null && d.result == null) {
506 dl 1.1 try {
507     fn.accept(arg);
508     ex = null;
509     } catch (Throwable rex) {
510     ex = rex;
511     }
512     d.internalComplete(null, ex);
513     }
514     return true;
515     }
516     private static final long serialVersionUID = 5232453952276885070L;
517     }
518    
519     static final class AsyncBiAccept<T,U> extends Async {
520     final BiAction<? super T,? super U> fn;
521     final T arg1;
522     final U arg2;
523     final CompletableFuture<Void> dst;
524     AsyncBiAccept(T arg1, U arg2,
525     BiAction<? super T,? super U> fn,
526     CompletableFuture<Void> dst) {
527     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
528     }
529     public final boolean exec() {
530     CompletableFuture<Void> d; Throwable ex;
531 dl 1.2 if ((d = this.dst) != null && d.result == null) {
532 dl 1.1 try {
533     fn.accept(arg1, arg2);
534     ex = null;
535     } catch (Throwable rex) {
536     ex = rex;
537     }
538     d.internalComplete(null, ex);
539     }
540     return true;
541     }
542     private static final long serialVersionUID = 5232453952276885070L;
543     }
544    
545     /* ------------- Completions -------------- */
546    
547     /**
548     * Simple linked list nodes to record completions, used in
549     * basically the same way as WaitNodes. (We separate nodes from
550     * the Completions themselves mainly because for the And and Or
551     * methods, the same Completion object resides in two lists.)
552     */
553     static final class CompletionNode {
554     final Completion completion;
555     volatile CompletionNode next;
556     CompletionNode(Completion completion) { this.completion = completion; }
557     }
558    
559     // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
560 jsr166 1.7 abstract static class Completion extends AtomicInteger implements Runnable {
561 dl 1.1 }
562    
563     static final class ApplyCompletion<T,U> extends Completion {
564     final CompletableFuture<? extends T> src;
565     final Fun<? super T,? extends U> fn;
566     final CompletableFuture<U> dst;
567     final Executor executor;
568     ApplyCompletion(CompletableFuture<? extends T> src,
569     Fun<? super T,? extends U> fn,
570     CompletableFuture<U> dst, Executor executor) {
571     this.src = src; this.fn = fn; this.dst = dst;
572     this.executor = executor;
573     }
574     public final void run() {
575     final CompletableFuture<? extends T> a;
576     final Fun<? super T,? extends U> fn;
577     final CompletableFuture<U> dst;
578     Object r; T t; Throwable ex;
579     if ((dst = this.dst) != null &&
580     (fn = this.fn) != null &&
581     (a = this.src) != null &&
582     (r = a.result) != null &&
583     compareAndSet(0, 1)) {
584     if (r instanceof AltResult) {
585     ex = ((AltResult)r).ex;
586     t = null;
587     }
588     else {
589     ex = null;
590     @SuppressWarnings("unchecked") T tr = (T) r;
591     t = tr;
592     }
593     Executor e = executor;
594     U u = null;
595     if (ex == null) {
596     try {
597     if (e != null)
598     e.execute(new AsyncApply<T,U>(t, fn, dst));
599     else
600     u = fn.apply(t);
601     } catch (Throwable rex) {
602     ex = rex;
603     }
604     }
605     if (e == null || ex != null)
606     dst.internalComplete(u, ex);
607     }
608     }
609     private static final long serialVersionUID = 5232453952276885070L;
610     }
611    
612     static final class AcceptCompletion<T> extends Completion {
613     final CompletableFuture<? extends T> src;
614     final Action<? super T> fn;
615     final CompletableFuture<Void> dst;
616     final Executor executor;
617     AcceptCompletion(CompletableFuture<? extends T> src,
618     Action<? super T> fn,
619     CompletableFuture<Void> dst, Executor executor) {
620     this.src = src; this.fn = fn; this.dst = dst;
621     this.executor = executor;
622     }
623     public final void run() {
624     final CompletableFuture<? extends T> a;
625     final Action<? super T> fn;
626     final CompletableFuture<Void> dst;
627     Object r; T t; Throwable ex;
628     if ((dst = this.dst) != null &&
629     (fn = this.fn) != null &&
630     (a = this.src) != null &&
631     (r = a.result) != null &&
632     compareAndSet(0, 1)) {
633     if (r instanceof AltResult) {
634     ex = ((AltResult)r).ex;
635     t = null;
636     }
637     else {
638     ex = null;
639     @SuppressWarnings("unchecked") T tr = (T) r;
640     t = tr;
641     }
642     Executor e = executor;
643     if (ex == null) {
644     try {
645     if (e != null)
646     e.execute(new AsyncAccept<T>(t, fn, dst));
647     else
648     fn.accept(t);
649     } catch (Throwable rex) {
650     ex = rex;
651     }
652     }
653     if (e == null || ex != null)
654     dst.internalComplete(null, ex);
655     }
656     }
657     private static final long serialVersionUID = 5232453952276885070L;
658     }
659    
660     static final class RunCompletion<T> extends Completion {
661     final CompletableFuture<? extends T> src;
662     final Runnable fn;
663     final CompletableFuture<Void> dst;
664     final Executor executor;
665     RunCompletion(CompletableFuture<? extends T> src,
666     Runnable fn,
667     CompletableFuture<Void> dst,
668     Executor executor) {
669     this.src = src; this.fn = fn; this.dst = dst;
670     this.executor = executor;
671     }
672     public final void run() {
673     final CompletableFuture<? extends T> a;
674     final Runnable fn;
675     final CompletableFuture<Void> dst;
676     Object r; Throwable ex;
677     if ((dst = this.dst) != null &&
678     (fn = this.fn) != null &&
679     (a = this.src) != null &&
680     (r = a.result) != null &&
681     compareAndSet(0, 1)) {
682     if (r instanceof AltResult)
683     ex = ((AltResult)r).ex;
684     else
685     ex = null;
686     Executor e = executor;
687     if (ex == null) {
688     try {
689     if (e != null)
690     e.execute(new AsyncRun(fn, dst));
691     else
692     fn.run();
693     } catch (Throwable rex) {
694     ex = rex;
695     }
696     }
697     if (e == null || ex != null)
698     dst.internalComplete(null, ex);
699     }
700     }
701     private static final long serialVersionUID = 5232453952276885070L;
702     }
703    
704     static final class BiApplyCompletion<T,U,V> extends Completion {
705     final CompletableFuture<? extends T> src;
706     final CompletableFuture<? extends U> snd;
707     final BiFun<? super T,? super U,? extends V> fn;
708     final CompletableFuture<V> dst;
709     final Executor executor;
710     BiApplyCompletion(CompletableFuture<? extends T> src,
711     CompletableFuture<? extends U> snd,
712     BiFun<? super T,? super U,? extends V> fn,
713     CompletableFuture<V> dst, Executor executor) {
714     this.src = src; this.snd = snd;
715     this.fn = fn; this.dst = dst;
716     this.executor = executor;
717     }
718     public final void run() {
719     final CompletableFuture<? extends T> a;
720     final CompletableFuture<? extends U> b;
721     final BiFun<? super T,? super U,? extends V> fn;
722     final CompletableFuture<V> dst;
723     Object r, s; T t; U u; Throwable ex;
724     if ((dst = this.dst) != null &&
725     (fn = this.fn) != null &&
726     (a = this.src) != null &&
727     (r = a.result) != null &&
728     (b = this.snd) != null &&
729     (s = b.result) != null &&
730     compareAndSet(0, 1)) {
731     if (r instanceof AltResult) {
732     ex = ((AltResult)r).ex;
733     t = null;
734     }
735     else {
736     ex = null;
737     @SuppressWarnings("unchecked") T tr = (T) r;
738     t = tr;
739     }
740     if (ex != null)
741     u = null;
742     else if (s instanceof AltResult) {
743     ex = ((AltResult)s).ex;
744     u = null;
745     }
746     else {
747     @SuppressWarnings("unchecked") U us = (U) s;
748     u = us;
749     }
750     Executor e = executor;
751     V v = null;
752     if (ex == null) {
753     try {
754     if (e != null)
755     e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
756     else
757     v = fn.apply(t, u);
758     } catch (Throwable rex) {
759     ex = rex;
760     }
761     }
762     if (e == null || ex != null)
763     dst.internalComplete(v, ex);
764     }
765     }
766     private static final long serialVersionUID = 5232453952276885070L;
767     }
768    
769     static final class BiAcceptCompletion<T,U> extends Completion {
770     final CompletableFuture<? extends T> src;
771     final CompletableFuture<? extends U> snd;
772     final BiAction<? super T,? super U> fn;
773     final CompletableFuture<Void> dst;
774     final Executor executor;
775     BiAcceptCompletion(CompletableFuture<? extends T> src,
776     CompletableFuture<? extends U> snd,
777     BiAction<? super T,? super U> fn,
778     CompletableFuture<Void> dst, Executor executor) {
779     this.src = src; this.snd = snd;
780     this.fn = fn; this.dst = dst;
781     this.executor = executor;
782     }
783     public final void run() {
784     final CompletableFuture<? extends T> a;
785     final CompletableFuture<? extends U> b;
786     final BiAction<? super T,? super U> fn;
787     final CompletableFuture<Void> dst;
788     Object r, s; T t; U u; Throwable ex;
789     if ((dst = this.dst) != null &&
790     (fn = this.fn) != null &&
791     (a = this.src) != null &&
792     (r = a.result) != null &&
793     (b = this.snd) != null &&
794     (s = b.result) != null &&
795     compareAndSet(0, 1)) {
796     if (r instanceof AltResult) {
797     ex = ((AltResult)r).ex;
798     t = null;
799     }
800     else {
801     ex = null;
802     @SuppressWarnings("unchecked") T tr = (T) r;
803     t = tr;
804     }
805     if (ex != null)
806     u = null;
807     else if (s instanceof AltResult) {
808     ex = ((AltResult)s).ex;
809     u = null;
810     }
811     else {
812     @SuppressWarnings("unchecked") U us = (U) s;
813     u = us;
814     }
815     Executor e = executor;
816     if (ex == null) {
817     try {
818     if (e != null)
819     e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
820     else
821     fn.accept(t, u);
822     } catch (Throwable rex) {
823     ex = rex;
824     }
825     }
826     if (e == null || ex != null)
827     dst.internalComplete(null, ex);
828     }
829     }
830     private static final long serialVersionUID = 5232453952276885070L;
831     }
832    
833     static final class BiRunCompletion<T> extends Completion {
834     final CompletableFuture<? extends T> src;
835     final CompletableFuture<?> snd;
836     final Runnable fn;
837     final CompletableFuture<Void> dst;
838     final Executor executor;
839     BiRunCompletion(CompletableFuture<? extends T> src,
840     CompletableFuture<?> snd,
841     Runnable fn,
842     CompletableFuture<Void> dst, Executor executor) {
843     this.src = src; this.snd = snd;
844     this.fn = fn; this.dst = dst;
845     this.executor = executor;
846     }
847     public final void run() {
848     final CompletableFuture<? extends T> a;
849     final CompletableFuture<?> b;
850     final Runnable fn;
851     final CompletableFuture<Void> dst;
852     Object r, s; Throwable ex;
853     if ((dst = this.dst) != null &&
854     (fn = this.fn) != null &&
855     (a = this.src) != null &&
856     (r = a.result) != null &&
857     (b = this.snd) != null &&
858     (s = b.result) != null &&
859     compareAndSet(0, 1)) {
860     if (r instanceof AltResult)
861     ex = ((AltResult)r).ex;
862     else
863     ex = null;
864     if (ex == null && (s instanceof AltResult))
865     ex = ((AltResult)s).ex;
866     Executor e = executor;
867     if (ex == null) {
868     try {
869     if (e != null)
870     e.execute(new AsyncRun(fn, dst));
871     else
872     fn.run();
873     } catch (Throwable rex) {
874     ex = rex;
875     }
876     }
877     if (e == null || ex != null)
878     dst.internalComplete(null, ex);
879     }
880     }
881     private static final long serialVersionUID = 5232453952276885070L;
882     }
883    
884     static final class OrApplyCompletion<T,U> extends Completion {
885     final CompletableFuture<? extends T> src;
886     final CompletableFuture<? extends T> snd;
887     final Fun<? super T,? extends U> fn;
888     final CompletableFuture<U> dst;
889     final Executor executor;
890     OrApplyCompletion(CompletableFuture<? extends T> src,
891     CompletableFuture<? extends T> snd,
892     Fun<? super T,? extends U> fn,
893     CompletableFuture<U> dst, Executor executor) {
894     this.src = src; this.snd = snd;
895     this.fn = fn; this.dst = dst;
896     this.executor = executor;
897     }
898     public final void run() {
899     final CompletableFuture<? extends T> a;
900     final CompletableFuture<? extends T> b;
901     final Fun<? super T,? extends U> fn;
902     final CompletableFuture<U> dst;
903     Object r; T t; Throwable ex;
904     if ((dst = this.dst) != null &&
905     (fn = this.fn) != null &&
906     (((a = this.src) != null && (r = a.result) != null) ||
907     ((b = this.snd) != null && (r = b.result) != null)) &&
908     compareAndSet(0, 1)) {
909     if (r instanceof AltResult) {
910     ex = ((AltResult)r).ex;
911     t = null;
912     }
913     else {
914     ex = null;
915     @SuppressWarnings("unchecked") T tr = (T) r;
916     t = tr;
917     }
918     Executor e = executor;
919     U u = null;
920     if (ex == null) {
921     try {
922     if (e != null)
923     e.execute(new AsyncApply<T,U>(t, fn, dst));
924     else
925     u = fn.apply(t);
926     } catch (Throwable rex) {
927     ex = rex;
928     }
929     }
930     if (e == null || ex != null)
931     dst.internalComplete(u, ex);
932     }
933     }
934     private static final long serialVersionUID = 5232453952276885070L;
935     }
936    
937     static final class OrAcceptCompletion<T> extends Completion {
938     final CompletableFuture<? extends T> src;
939     final CompletableFuture<? extends T> snd;
940     final Action<? super T> fn;
941     final CompletableFuture<Void> dst;
942     final Executor executor;
943     OrAcceptCompletion(CompletableFuture<? extends T> src,
944     CompletableFuture<? extends T> snd,
945     Action<? super T> fn,
946     CompletableFuture<Void> dst, Executor executor) {
947     this.src = src; this.snd = snd;
948     this.fn = fn; this.dst = dst;
949     this.executor = executor;
950     }
951     public final void run() {
952     final CompletableFuture<? extends T> a;
953     final CompletableFuture<? extends T> b;
954     final Action<? super T> fn;
955     final CompletableFuture<Void> dst;
956     Object r; T t; Throwable ex;
957     if ((dst = this.dst) != null &&
958     (fn = this.fn) != null &&
959     (((a = this.src) != null && (r = a.result) != null) ||
960     ((b = this.snd) != null && (r = b.result) != null)) &&
961     compareAndSet(0, 1)) {
962     if (r instanceof AltResult) {
963     ex = ((AltResult)r).ex;
964     t = null;
965     }
966     else {
967     ex = null;
968     @SuppressWarnings("unchecked") T tr = (T) r;
969     t = tr;
970     }
971     Executor e = executor;
972     if (ex == null) {
973     try {
974     if (e != null)
975     e.execute(new AsyncAccept<T>(t, fn, dst));
976     else
977     fn.accept(t);
978     } catch (Throwable rex) {
979     ex = rex;
980     }
981     }
982     if (e == null || ex != null)
983     dst.internalComplete(null, ex);
984     }
985     }
986     private static final long serialVersionUID = 5232453952276885070L;
987     }
988    
989     static final class OrRunCompletion<T> extends Completion {
990     final CompletableFuture<? extends T> src;
991     final CompletableFuture<?> snd;
992     final Runnable fn;
993     final CompletableFuture<Void> dst;
994     final Executor executor;
995     OrRunCompletion(CompletableFuture<? extends T> src,
996     CompletableFuture<?> snd,
997     Runnable fn,
998     CompletableFuture<Void> dst, Executor executor) {
999     this.src = src; this.snd = snd;
1000     this.fn = fn; this.dst = dst;
1001     this.executor = executor;
1002     }
1003     public final void run() {
1004     final CompletableFuture<? extends T> a;
1005     final CompletableFuture<?> b;
1006     final Runnable fn;
1007     final CompletableFuture<Void> dst;
1008     Object r; Throwable ex;
1009     if ((dst = this.dst) != null &&
1010     (fn = this.fn) != null &&
1011     (((a = this.src) != null && (r = a.result) != null) ||
1012     ((b = this.snd) != null && (r = b.result) != null)) &&
1013     compareAndSet(0, 1)) {
1014     if (r instanceof AltResult)
1015     ex = ((AltResult)r).ex;
1016     else
1017     ex = null;
1018     Executor e = executor;
1019     if (ex == null) {
1020     try {
1021     if (e != null)
1022     e.execute(new AsyncRun(fn, dst));
1023     else
1024     fn.run();
1025     } catch (Throwable rex) {
1026     ex = rex;
1027     }
1028     }
1029     if (e == null || ex != null)
1030     dst.internalComplete(null, ex);
1031     }
1032     }
1033     private static final long serialVersionUID = 5232453952276885070L;
1034     }
1035    
1036     static final class ExceptionCompletion<T> extends Completion {
1037     final CompletableFuture<? extends T> src;
1038     final Fun<? super Throwable, ? extends T> fn;
1039     final CompletableFuture<T> dst;
1040     ExceptionCompletion(CompletableFuture<? extends T> src,
1041     Fun<? super Throwable, ? extends T> fn,
1042     CompletableFuture<T> dst) {
1043     this.src = src; this.fn = fn; this.dst = dst;
1044     }
1045     public final void run() {
1046     final CompletableFuture<? extends T> a;
1047     final Fun<? super Throwable, ? extends T> fn;
1048     final CompletableFuture<T> dst;
1049     Object r; T t = null; Throwable ex, dx = null;
1050     if ((dst = this.dst) != null &&
1051     (fn = this.fn) != null &&
1052     (a = this.src) != null &&
1053     (r = a.result) != null &&
1054     compareAndSet(0, 1)) {
1055     if ((r instanceof AltResult) &&
1056     (ex = ((AltResult)r).ex) != null) {
1057     try {
1058     t = fn.apply(ex);
1059     } catch (Throwable rex) {
1060     dx = rex;
1061     }
1062     }
1063     else {
1064     @SuppressWarnings("unchecked") T tr = (T) r;
1065     t = tr;
1066     }
1067     dst.internalComplete(t, dx);
1068     }
1069     }
1070     private static final long serialVersionUID = 5232453952276885070L;
1071     }
1072    
1073     static final class ThenCopy<T> extends Completion {
1074     final CompletableFuture<? extends T> src;
1075     final CompletableFuture<T> dst;
1076     ThenCopy(CompletableFuture<? extends T> src,
1077     CompletableFuture<T> dst) {
1078     this.src = src; this.dst = dst;
1079     }
1080     public final void run() {
1081     final CompletableFuture<? extends T> a;
1082     final CompletableFuture<T> dst;
1083     Object r; Object t; Throwable ex;
1084     if ((dst = this.dst) != null &&
1085     (a = this.src) != null &&
1086     (r = a.result) != null &&
1087     compareAndSet(0, 1)) {
1088     if (r instanceof AltResult) {
1089     ex = ((AltResult)r).ex;
1090     t = null;
1091     }
1092     else {
1093     ex = null;
1094     t = r;
1095     }
1096     dst.internalComplete(t, ex);
1097     }
1098     }
1099     private static final long serialVersionUID = 5232453952276885070L;
1100     }
1101    
1102     static final class HandleCompletion<T,U> extends Completion {
1103     final CompletableFuture<? extends T> src;
1104     final BiFun<? super T, Throwable, ? extends U> fn;
1105     final CompletableFuture<U> dst;
1106     HandleCompletion(CompletableFuture<? extends T> src,
1107     BiFun<? super T, Throwable, ? extends U> fn,
1108     final CompletableFuture<U> dst) {
1109     this.src = src; this.fn = fn; this.dst = dst;
1110     }
1111     public final void run() {
1112     final CompletableFuture<? extends T> a;
1113     final BiFun<? super T, Throwable, ? extends U> fn;
1114     final CompletableFuture<U> dst;
1115     Object r; T t; Throwable ex;
1116     if ((dst = this.dst) != null &&
1117     (fn = this.fn) != null &&
1118     (a = this.src) != null &&
1119     (r = a.result) != null &&
1120     compareAndSet(0, 1)) {
1121     if (r instanceof AltResult) {
1122     ex = ((AltResult)r).ex;
1123     t = null;
1124     }
1125     else {
1126     ex = null;
1127     @SuppressWarnings("unchecked") T tr = (T) r;
1128     t = tr;
1129     }
1130     U u = null; Throwable dx = null;
1131     try {
1132     u = fn.apply(t, ex);
1133     } catch (Throwable rex) {
1134     dx = rex;
1135     }
1136     dst.internalComplete(u, dx);
1137     }
1138     }
1139     private static final long serialVersionUID = 5232453952276885070L;
1140     }
1141    
1142     static final class ComposeCompletion<T,U> extends Completion {
1143     final CompletableFuture<? extends T> src;
1144     final Fun<? super T, CompletableFuture<U>> fn;
1145     final CompletableFuture<U> dst;
1146     ComposeCompletion(CompletableFuture<? extends T> src,
1147     Fun<? super T, CompletableFuture<U>> fn,
1148     final CompletableFuture<U> dst) {
1149     this.src = src; this.fn = fn; this.dst = dst;
1150     }
1151     public final void run() {
1152     final CompletableFuture<? extends T> a;
1153     final Fun<? super T, CompletableFuture<U>> fn;
1154     final CompletableFuture<U> dst;
1155     Object r; T t; Throwable ex;
1156     if ((dst = this.dst) != null &&
1157     (fn = this.fn) != null &&
1158     (a = this.src) != null &&
1159     (r = a.result) != null &&
1160     compareAndSet(0, 1)) {
1161     if (r instanceof AltResult) {
1162     ex = ((AltResult)r).ex;
1163     t = null;
1164     }
1165     else {
1166     ex = null;
1167     @SuppressWarnings("unchecked") T tr = (T) r;
1168     t = tr;
1169     }
1170     CompletableFuture<U> c = null;
1171     U u = null;
1172     boolean complete = false;
1173     if (ex == null) {
1174     try {
1175     c = fn.apply(t);
1176     } catch (Throwable rex) {
1177     ex = rex;
1178     }
1179     }
1180     if (ex != null || c == null) {
1181     if (ex == null)
1182     ex = new NullPointerException();
1183     }
1184     else {
1185     ThenCopy<U> d = null;
1186     Object s;
1187     if ((s = c.result) == null) {
1188     CompletionNode p = new CompletionNode
1189     (d = new ThenCopy<U>(c, dst));
1190     while ((s = c.result) == null) {
1191     if (UNSAFE.compareAndSwapObject
1192     (c, COMPLETIONS, p.next = c.completions, p))
1193     break;
1194     }
1195     }
1196     if (s != null && (d == null || d.compareAndSet(0, 1))) {
1197     complete = true;
1198     if (s instanceof AltResult) {
1199     ex = ((AltResult)s).ex; // no rewrap
1200     u = null;
1201     }
1202     else {
1203     @SuppressWarnings("unchecked") U us = (U) s;
1204     u = us;
1205     }
1206     }
1207     }
1208     if (complete || ex != null)
1209     dst.internalComplete(u, ex);
1210     if (c != null)
1211     c.helpPostComplete();
1212     }
1213     }
1214     private static final long serialVersionUID = 5232453952276885070L;
1215     }
1216    
1217     // public methods
1218    
1219     /**
1220     * Creates a new incomplete CompletableFuture.
1221     */
1222     public CompletableFuture() {
1223     }
1224    
1225     /**
1226     * Asynchronously executes in the {@link
1227     * ForkJoinPool#commonPool()}, a task that completes the returned
1228     * CompletableFuture with the result of the given Supplier.
1229     *
1230     * @param supplier a function returning the value to be used
1231     * to complete the returned CompletableFuture
1232     * @return the CompletableFuture
1233     */
1234     public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier) {
1235     if (supplier == null) throw new NullPointerException();
1236     CompletableFuture<U> f = new CompletableFuture<U>();
1237     ForkJoinPool.commonPool().
1238     execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1239     return f;
1240     }
1241    
1242     /**
1243     * Asynchronously executes using the given executor, a task that
1244     * completes the returned CompletableFuture with the result of the
1245     * given Supplier.
1246     *
1247     * @param supplier a function returning the value to be used
1248     * to complete the returned CompletableFuture
1249     * @param executor the executor to use for asynchronous execution
1250     * @return the CompletableFuture
1251     */
1252     public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier,
1253     Executor executor) {
1254     if (executor == null || supplier == null)
1255     throw new NullPointerException();
1256     CompletableFuture<U> f = new CompletableFuture<U>();
1257     executor.execute(new AsyncSupply<U>(supplier, f));
1258     return f;
1259     }
1260    
1261     /**
1262     * Asynchronously executes in the {@link
1263     * ForkJoinPool#commonPool()} a task that runs the given action,
1264     * and then completes the returned CompletableFuture.
1265     *
1266     * @param runnable the action to run before completing the
1267     * returned CompletableFuture
1268     * @return the CompletableFuture
1269     */
1270     public static CompletableFuture<Void> runAsync(Runnable runnable) {
1271     if (runnable == null) throw new NullPointerException();
1272     CompletableFuture<Void> f = new CompletableFuture<Void>();
1273     ForkJoinPool.commonPool().
1274     execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1275     return f;
1276     }
1277    
1278     /**
1279     * Asynchronously executes using the given executor, a task that
1280     * runs the given action, and then completes the returned
1281     * CompletableFuture.
1282     *
1283     * @param runnable the action to run before completing the
1284     * returned CompletableFuture
1285     * @param executor the executor to use for asynchronous execution
1286     * @return the CompletableFuture
1287     */
1288     public static CompletableFuture<Void> runAsync(Runnable runnable,
1289     Executor executor) {
1290     if (executor == null || runnable == null)
1291     throw new NullPointerException();
1292     CompletableFuture<Void> f = new CompletableFuture<Void>();
1293     executor.execute(new AsyncRun(runnable, f));
1294     return f;
1295     }
1296    
1297     /**
1298     * Returns {@code true} if completed in any fashion: normally,
1299     * exceptionally, or via cancellation.
1300     *
1301     * @return {@code true} if completed
1302     */
1303     public boolean isDone() {
1304     return result != null;
1305     }
1306    
1307     /**
1308     * Waits if necessary for the computation to complete, and then
1309     * retrieves its result.
1310     *
1311     * @return the computed result
1312     * @throws CancellationException if the computation was cancelled
1313     * @throws ExecutionException if the computation threw an
1314     * exception
1315     * @throws InterruptedException if the current thread was interrupted
1316     * while waiting
1317     */
1318     public T get() throws InterruptedException, ExecutionException {
1319     Object r; Throwable ex, cause;
1320     if ((r = result) == null && (r = waitingGet(true)) == null)
1321     throw new InterruptedException();
1322 jsr166 1.12 if (!(r instanceof AltResult)) {
1323     @SuppressWarnings("unchecked") T tr = (T) r;
1324     return tr;
1325     }
1326     if ((ex = ((AltResult)r).ex) == null)
1327 dl 1.1 return null;
1328 jsr166 1.12 if (ex instanceof CancellationException)
1329     throw (CancellationException)ex;
1330     if ((ex instanceof CompletionException) &&
1331     (cause = ex.getCause()) != null)
1332     ex = cause;
1333     throw new ExecutionException(ex);
1334 dl 1.1 }
1335    
1336     /**
1337     * Waits if necessary for at most the given time for completion,
1338     * and then retrieves its result, if available.
1339     *
1340     * @param timeout the maximum time to wait
1341     * @param unit the time unit of the timeout argument
1342     * @return the computed result
1343     * @throws CancellationException if the computation was cancelled
1344     * @throws ExecutionException if the computation threw an
1345     * exception
1346     * @throws InterruptedException if the current thread was interrupted
1347     * while waiting
1348     * @throws TimeoutException if the wait timed out
1349     */
1350     public T get(long timeout, TimeUnit unit)
1351     throws InterruptedException, ExecutionException, TimeoutException {
1352     Object r; Throwable ex, cause;
1353     long nanos = unit.toNanos(timeout);
1354     if (Thread.interrupted())
1355     throw new InterruptedException();
1356     if ((r = result) == null)
1357     r = timedAwaitDone(nanos);
1358 jsr166 1.12 if (!(r instanceof AltResult)) {
1359     @SuppressWarnings("unchecked") T tr = (T) r;
1360     return tr;
1361     }
1362     if ((ex = ((AltResult)r).ex) == null)
1363 dl 1.1 return null;
1364 jsr166 1.12 if (ex instanceof CancellationException)
1365     throw (CancellationException)ex;
1366     if ((ex instanceof CompletionException) &&
1367     (cause = ex.getCause()) != null)
1368     ex = cause;
1369     throw new ExecutionException(ex);
1370 dl 1.1 }
1371    
1372     /**
1373     * Returns the result value when complete, or throws an
1374     * (unchecked) exception if completed exceptionally. To better
1375     * conform with the use of common functional forms, if a
1376     * computation involved in the completion of this
1377     * CompletableFuture threw an exception, this method throws an
1378     * (unchecked) {@link CompletionException} with the underlying
1379     * exception as its cause.
1380     *
1381     * @return the result value
1382     * @throws CancellationException if the computation was cancelled
1383     * @throws CompletionException if a completion computation threw
1384     * an exception
1385     */
1386     public T join() {
1387     Object r; Throwable ex;
1388     if ((r = result) == null)
1389     r = waitingGet(false);
1390 jsr166 1.12 if (!(r instanceof AltResult)) {
1391     @SuppressWarnings("unchecked") T tr = (T) r;
1392     return tr;
1393     }
1394     if ((ex = ((AltResult)r).ex) == null)
1395 dl 1.1 return null;
1396 jsr166 1.12 if (ex instanceof CancellationException)
1397     throw (CancellationException)ex;
1398     if (ex instanceof CompletionException)
1399     throw (CompletionException)ex;
1400     throw new CompletionException(ex);
1401 dl 1.1 }
1402    
1403     /**
1404     * Returns the result value (or throws any encountered exception)
1405     * if completed, else returns the given valueIfAbsent.
1406     *
1407     * @param valueIfAbsent the value to return if not completed
1408     * @return the result value, if completed, else the given valueIfAbsent
1409     * @throws CancellationException if the computation was cancelled
1410     * @throws CompletionException if a completion computation threw
1411     * an exception
1412     */
1413     public T getNow(T valueIfAbsent) {
1414     Object r; Throwable ex;
1415     if ((r = result) == null)
1416     return valueIfAbsent;
1417 jsr166 1.12 if (!(r instanceof AltResult)) {
1418     @SuppressWarnings("unchecked") T tr = (T) r;
1419     return tr;
1420     }
1421     if ((ex = ((AltResult)r).ex) == null)
1422 dl 1.1 return null;
1423 jsr166 1.12 if (ex instanceof CancellationException)
1424     throw (CancellationException)ex;
1425     if (ex instanceof CompletionException)
1426     throw (CompletionException)ex;
1427     throw new CompletionException(ex);
1428 dl 1.1 }
1429    
1430     /**
1431     * If not already completed, sets the value returned by {@link
1432     * #get()} and related methods to the given value.
1433     *
1434     * @param value the result value
1435     * @return {@code true} if this invocation caused this CompletableFuture
1436     * to transition to a completed state, else {@code false}
1437     */
1438     public boolean complete(T value) {
1439     boolean triggered = result == null &&
1440     UNSAFE.compareAndSwapObject(this, RESULT, null,
1441     value == null ? NIL : value);
1442     postComplete();
1443     return triggered;
1444     }
1445    
1446     /**
1447     * If not already completed, causes invocations of {@link #get()}
1448     * and related methods to throw the given exception.
1449     *
1450     * @param ex the exception
1451     * @return {@code true} if this invocation caused this CompletableFuture
1452     * to transition to a completed state, else {@code false}
1453     */
1454     public boolean completeExceptionally(Throwable ex) {
1455     if (ex == null) throw new NullPointerException();
1456     boolean triggered = result == null &&
1457     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1458     postComplete();
1459     return triggered;
1460     }
1461    
1462     /**
1463     * Creates and returns a CompletableFuture that is completed with
1464     * the result of the given function of this CompletableFuture.
1465     * If this CompletableFuture completes exceptionally,
1466     * then the returned CompletableFuture also does so,
1467     * with a CompletionException holding this exception as
1468     * its cause.
1469     *
1470     * @param fn the function to use to compute the value of
1471     * the returned CompletableFuture
1472     * @return the new CompletableFuture
1473     */
1474     public <U> CompletableFuture<U> thenApply(Fun<? super T,? extends U> fn) {
1475     return doThenApply(fn, null);
1476     }
1477    
1478     /**
1479     * Creates and returns a CompletableFuture that is asynchronously
1480     * completed using the {@link ForkJoinPool#commonPool()} with the
1481     * result of the given function of this CompletableFuture. If
1482     * this CompletableFuture completes exceptionally, then the
1483     * returned CompletableFuture also does so, with a
1484     * CompletionException holding this exception as its cause.
1485     *
1486     * @param fn the function to use to compute the value of
1487     * the returned CompletableFuture
1488     * @return the new CompletableFuture
1489     */
1490     public <U> CompletableFuture<U> thenApplyAsync(Fun<? super T,? extends U> fn) {
1491     return doThenApply(fn, ForkJoinPool.commonPool());
1492     }
1493    
1494     /**
1495     * Creates and returns a CompletableFuture that is asynchronously
1496     * completed using the given executor with the result of the given
1497     * function of this CompletableFuture. If this CompletableFuture
1498     * completes exceptionally, then the returned CompletableFuture
1499     * also does so, with a CompletionException holding this exception as
1500     * its cause.
1501     *
1502     * @param fn the function to use to compute the value of
1503     * the returned CompletableFuture
1504     * @param executor the executor to use for asynchronous execution
1505     * @return the new CompletableFuture
1506     */
1507     public <U> CompletableFuture<U> thenApplyAsync(Fun<? super T,? extends U> fn,
1508     Executor executor) {
1509     if (executor == null) throw new NullPointerException();
1510     return doThenApply(fn, executor);
1511     }
1512    
1513     private <U> CompletableFuture<U> doThenApply(Fun<? super T,? extends U> fn,
1514     Executor e) {
1515     if (fn == null) throw new NullPointerException();
1516     CompletableFuture<U> dst = new CompletableFuture<U>();
1517     ApplyCompletion<T,U> d = null;
1518     Object r;
1519     if ((r = result) == null) {
1520     CompletionNode p = new CompletionNode
1521     (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1522     while ((r = result) == null) {
1523     if (UNSAFE.compareAndSwapObject
1524     (this, COMPLETIONS, p.next = completions, p))
1525     break;
1526     }
1527     }
1528     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1529     T t; Throwable ex;
1530     if (r instanceof AltResult) {
1531     ex = ((AltResult)r).ex;
1532     t = null;
1533     }
1534     else {
1535     ex = null;
1536     @SuppressWarnings("unchecked") T tr = (T) r;
1537     t = tr;
1538     }
1539     U u = null;
1540     if (ex == null) {
1541     try {
1542     if (e != null)
1543     e.execute(new AsyncApply<T,U>(t, fn, dst));
1544     else
1545     u = fn.apply(t);
1546     } catch (Throwable rex) {
1547     ex = rex;
1548     }
1549     }
1550     if (e == null || ex != null)
1551     dst.internalComplete(u, ex);
1552     }
1553     helpPostComplete();
1554     return dst;
1555     }
1556    
1557     /**
1558     * Creates and returns a CompletableFuture that is completed after
1559     * performing the given action with this CompletableFuture's
1560     * result when it completes. If this CompletableFuture
1561     * completes exceptionally, then the returned CompletableFuture
1562     * also does so, with a CompletionException holding this exception as
1563     * its cause.
1564     *
1565     * @param block the action to perform before completing the
1566     * returned CompletableFuture
1567     * @return the new CompletableFuture
1568     */
1569     public CompletableFuture<Void> thenAccept(Action<? super T> block) {
1570     return doThenAccept(block, null);
1571     }
1572    
1573     /**
1574     * Creates and returns a CompletableFuture that is asynchronously
1575     * completed using the {@link ForkJoinPool#commonPool()} with this
1576     * CompletableFuture's result when it completes. If this
1577     * CompletableFuture completes exceptionally, then the returned
1578     * CompletableFuture also does so, with a CompletionException holding
1579     * this exception as its cause.
1580     *
1581     * @param block the action to perform before completing the
1582     * returned CompletableFuture
1583     * @return the new CompletableFuture
1584     */
1585     public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block) {
1586     return doThenAccept(block, ForkJoinPool.commonPool());
1587     }
1588    
1589     /**
1590     * Creates and returns a CompletableFuture that is asynchronously
1591     * completed using the given executor with this
1592     * CompletableFuture's result when it completes. If this
1593     * CompletableFuture completes exceptionally, then the returned
1594     * CompletableFuture also does so, with a CompletionException holding
1595     * this exception as its cause.
1596     *
1597     * @param block the action to perform before completing the
1598     * returned CompletableFuture
1599     * @param executor the executor to use for asynchronous execution
1600     * @return the new CompletableFuture
1601     */
1602     public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block,
1603     Executor executor) {
1604     if (executor == null) throw new NullPointerException();
1605     return doThenAccept(block, executor);
1606     }
1607    
1608     private CompletableFuture<Void> doThenAccept(Action<? super T> fn,
1609     Executor e) {
1610     if (fn == null) throw new NullPointerException();
1611     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1612     AcceptCompletion<T> d = null;
1613     Object r;
1614     if ((r = result) == null) {
1615     CompletionNode p = new CompletionNode
1616     (d = new AcceptCompletion<T>(this, fn, dst, e));
1617     while ((r = result) == null) {
1618     if (UNSAFE.compareAndSwapObject
1619     (this, COMPLETIONS, p.next = completions, p))
1620     break;
1621     }
1622     }
1623     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1624     T t; Throwable ex;
1625     if (r instanceof AltResult) {
1626     ex = ((AltResult)r).ex;
1627     t = null;
1628     }
1629     else {
1630     ex = null;
1631     @SuppressWarnings("unchecked") T tr = (T) r;
1632     t = tr;
1633     }
1634     if (ex == null) {
1635     try {
1636     if (e != null)
1637     e.execute(new AsyncAccept<T>(t, fn, dst));
1638     else
1639     fn.accept(t);
1640     } catch (Throwable rex) {
1641     ex = rex;
1642     }
1643     }
1644     if (e == null || ex != null)
1645     dst.internalComplete(null, ex);
1646     }
1647     helpPostComplete();
1648     return dst;
1649     }
1650    
1651     /**
1652     * Creates and returns a CompletableFuture that is completed after
1653     * performing the given action when this CompletableFuture
1654     * completes. If this CompletableFuture completes exceptionally,
1655     * then the returned CompletableFuture also does so, with a
1656     * CompletionException holding this exception as its cause.
1657     *
1658     * @param action the action to perform before completing the
1659     * returned CompletableFuture
1660     * @return the new CompletableFuture
1661     */
1662     public CompletableFuture<Void> thenRun(Runnable action) {
1663     return doThenRun(action, null);
1664     }
1665    
1666     /**
1667     * Creates and returns a CompletableFuture that is asynchronously
1668     * completed using the {@link ForkJoinPool#commonPool()} after
1669     * performing the given action when this CompletableFuture
1670     * completes. If this CompletableFuture completes exceptionally,
1671     * then the returned CompletableFuture also does so, with a
1672     * CompletionException holding this exception as its cause.
1673     *
1674     * @param action the action to perform before completing the
1675     * returned CompletableFuture
1676     * @return the new CompletableFuture
1677     */
1678     public CompletableFuture<Void> thenRunAsync(Runnable action) {
1679     return doThenRun(action, ForkJoinPool.commonPool());
1680     }
1681    
1682     /**
1683     * Creates and returns a CompletableFuture that is asynchronously
1684     * completed using the given executor after performing the given
1685     * action when this CompletableFuture completes. If this
1686     * CompletableFuture completes exceptionally, then the returned
1687     * CompletableFuture also does so, with a CompletionException holding
1688     * this exception as its cause.
1689     *
1690     * @param action the action to perform before completing the
1691     * returned CompletableFuture
1692     * @param executor the executor to use for asynchronous execution
1693     * @return the new CompletableFuture
1694     */
1695     public CompletableFuture<Void> thenRunAsync(Runnable action,
1696     Executor executor) {
1697     if (executor == null) throw new NullPointerException();
1698     return doThenRun(action, executor);
1699     }
1700    
1701     private CompletableFuture<Void> doThenRun(Runnable action,
1702     Executor e) {
1703     if (action == null) throw new NullPointerException();
1704     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1705     RunCompletion<T> d = null;
1706     Object r;
1707     if ((r = result) == null) {
1708     CompletionNode p = new CompletionNode
1709     (d = new RunCompletion<T>(this, action, dst, e));
1710     while ((r = result) == null) {
1711     if (UNSAFE.compareAndSwapObject
1712     (this, COMPLETIONS, p.next = completions, p))
1713     break;
1714     }
1715     }
1716     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1717     Throwable ex;
1718     if (r instanceof AltResult)
1719     ex = ((AltResult)r).ex;
1720     else
1721     ex = null;
1722     if (ex == null) {
1723     try {
1724     if (e != null)
1725     e.execute(new AsyncRun(action, dst));
1726     else
1727     action.run();
1728     } catch (Throwable rex) {
1729     ex = rex;
1730     }
1731     }
1732     if (e == null || ex != null)
1733     dst.internalComplete(null, ex);
1734     }
1735     helpPostComplete();
1736     return dst;
1737     }
1738    
1739     /**
1740     * Creates and returns a CompletableFuture that is completed with
1741     * the result of the given function of this and the other given
1742     * CompletableFuture's results when both complete. If this or
1743     * the other CompletableFuture complete exceptionally, then the
1744     * returned CompletableFuture also does so, with a
1745     * CompletionException holding the exception as its cause.
1746     *
1747     * @param other the other CompletableFuture
1748     * @param fn the function to use to compute the value of
1749     * the returned CompletableFuture
1750     * @return the new CompletableFuture
1751     */
1752     public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1753     BiFun<? super T,? super U,? extends V> fn) {
1754     return doThenBiApply(other, fn, null);
1755     }
1756    
1757     /**
1758     * Creates and returns a CompletableFuture that is asynchronously
1759     * completed using the {@link ForkJoinPool#commonPool()} with
1760     * the result of the given function of this and the other given
1761     * CompletableFuture's results when both complete. If this or
1762     * the other CompletableFuture complete exceptionally, then the
1763     * returned CompletableFuture also does so, with a
1764     * CompletionException holding the exception as its cause.
1765     *
1766     * @param other the other CompletableFuture
1767     * @param fn the function to use to compute the value of
1768     * the returned CompletableFuture
1769     * @return the new CompletableFuture
1770     */
1771     public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1772     BiFun<? super T,? super U,? extends V> fn) {
1773     return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1774     }
1775    
1776     /**
1777     * Creates and returns a CompletableFuture that is
1778     * asynchronously completed using the given executor with the
1779     * result of the given function of this and the other given
1780     * CompletableFuture's results when both complete. If this or
1781     * the other CompletableFuture complete exceptionally, then the
1782     * returned CompletableFuture also does so, with a
1783     * CompletionException holding the exception as its cause.
1784     *
1785     * @param other the other CompletableFuture
1786     * @param fn the function to use to compute the value of
1787     * the returned CompletableFuture
1788     * @param executor the executor to use for asynchronous execution
1789     * @return the new CompletableFuture
1790     */
1791     public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1792     BiFun<? super T,? super U,? extends V> fn,
1793     Executor executor) {
1794     if (executor == null) throw new NullPointerException();
1795     return doThenBiApply(other, fn, executor);
1796     }
1797    
1798     private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1799     BiFun<? super T,? super U,? extends V> fn,
1800     Executor e) {
1801     if (other == null || fn == null) throw new NullPointerException();
1802     CompletableFuture<V> dst = new CompletableFuture<V>();
1803     BiApplyCompletion<T,U,V> d = null;
1804     Object r, s = null;
1805     if ((r = result) == null || (s = other.result) == null) {
1806     d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1807     CompletionNode q = null, p = new CompletionNode(d);
1808     while ((r == null && (r = result) == null) ||
1809     (s == null && (s = other.result) == null)) {
1810     if (q != null) {
1811     if (s != null ||
1812     UNSAFE.compareAndSwapObject
1813     (other, COMPLETIONS, q.next = other.completions, q))
1814     break;
1815     }
1816     else if (r != null ||
1817     UNSAFE.compareAndSwapObject
1818     (this, COMPLETIONS, p.next = completions, p)) {
1819     if (s != null)
1820     break;
1821     q = new CompletionNode(d);
1822     }
1823     }
1824     }
1825     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1826     T t; U u; Throwable ex;
1827     if (r instanceof AltResult) {
1828     ex = ((AltResult)r).ex;
1829     t = null;
1830     }
1831     else {
1832     ex = null;
1833     @SuppressWarnings("unchecked") T tr = (T) r;
1834     t = tr;
1835     }
1836     if (ex != null)
1837     u = null;
1838     else if (s instanceof AltResult) {
1839     ex = ((AltResult)s).ex;
1840     u = null;
1841     }
1842     else {
1843     @SuppressWarnings("unchecked") U us = (U) s;
1844     u = us;
1845     }
1846     V v = null;
1847     if (ex == null) {
1848     try {
1849     if (e != null)
1850     e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1851     else
1852     v = fn.apply(t, u);
1853     } catch (Throwable rex) {
1854     ex = rex;
1855     }
1856     }
1857     if (e == null || ex != null)
1858     dst.internalComplete(v, ex);
1859     }
1860     helpPostComplete();
1861     other.helpPostComplete();
1862     return dst;
1863     }
1864    
1865     /**
1866     * Creates and returns a CompletableFuture that is completed with
1867     * the results of this and the other given CompletableFuture if
1868     * both complete. If this and/or the other CompletableFuture
1869     * complete exceptionally, then the returned CompletableFuture
1870     * also does so, with a CompletionException holding one of these
1871     * exceptions as its cause.
1872     *
1873     * @param other the other CompletableFuture
1874     * @param block the action to perform before completing the
1875     * returned CompletableFuture
1876     * @return the new CompletableFuture
1877     */
1878     public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1879     BiAction<? super T, ? super U> block) {
1880     return doThenBiAccept(other, block, null);
1881     }
1882    
1883     /**
1884     * Creates and returns a CompletableFuture that is completed
1885     * asynchronously using the {@link ForkJoinPool#commonPool()} with
1886     * the results of this and the other given CompletableFuture when
1887     * both complete. If this and/or the other CompletableFuture
1888     * complete exceptionally, then the returned CompletableFuture
1889     * also does so, with a CompletionException holding one of these
1890     * exceptions as its cause.
1891     *
1892     * @param other the other CompletableFuture
1893     * @param block the action to perform before completing the
1894     * returned CompletableFuture
1895     * @return the new CompletableFuture
1896     */
1897     public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1898     BiAction<? super T, ? super U> block) {
1899     return doThenBiAccept(other, block, ForkJoinPool.commonPool());
1900     }
1901    
1902     /**
1903     * Creates and returns a CompletableFuture that is completed
1904     * asynchronously using the given executor with the results of
1905     * this and the other given CompletableFuture when both complete.
1906     * If this and/or the other CompletableFuture complete
1907     * exceptionally, then the returned CompletableFuture also does
1908     * so, with a CompletionException holding one of these exceptions as
1909     * its cause.
1910     *
1911     * @param other the other CompletableFuture
1912     * @param block the action to perform before completing the
1913     * returned CompletableFuture
1914     * @param executor the executor to use for asynchronous execution
1915     * @return the new CompletableFuture
1916     */
1917     public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1918     BiAction<? super T, ? super U> block,
1919     Executor executor) {
1920     if (executor == null) throw new NullPointerException();
1921     return doThenBiAccept(other, block, executor);
1922     }
1923    
1924     private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
1925     BiAction<? super T,? super U> fn,
1926     Executor e) {
1927     if (other == null || fn == null) throw new NullPointerException();
1928     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1929     BiAcceptCompletion<T,U> d = null;
1930     Object r, s = null;
1931     if ((r = result) == null || (s = other.result) == null) {
1932     d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
1933     CompletionNode q = null, p = new CompletionNode(d);
1934     while ((r == null && (r = result) == null) ||
1935     (s == null && (s = other.result) == null)) {
1936     if (q != null) {
1937     if (s != null ||
1938     UNSAFE.compareAndSwapObject
1939     (other, COMPLETIONS, q.next = other.completions, q))
1940     break;
1941     }
1942     else if (r != null ||
1943     UNSAFE.compareAndSwapObject
1944     (this, COMPLETIONS, p.next = completions, p)) {
1945     if (s != null)
1946     break;
1947     q = new CompletionNode(d);
1948     }
1949     }
1950     }
1951     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1952     T t; U u; Throwable ex;
1953     if (r instanceof AltResult) {
1954     ex = ((AltResult)r).ex;
1955     t = null;
1956     }
1957     else {
1958     ex = null;
1959     @SuppressWarnings("unchecked") T tr = (T) r;
1960     t = tr;
1961     }
1962     if (ex != null)
1963     u = null;
1964     else if (s instanceof AltResult) {
1965     ex = ((AltResult)s).ex;
1966     u = null;
1967     }
1968     else {
1969     @SuppressWarnings("unchecked") U us = (U) s;
1970     u = us;
1971     }
1972     if (ex == null) {
1973     try {
1974     if (e != null)
1975     e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
1976     else
1977     fn.accept(t, u);
1978     } catch (Throwable rex) {
1979     ex = rex;
1980     }
1981     }
1982     if (e == null || ex != null)
1983     dst.internalComplete(null, ex);
1984     }
1985     helpPostComplete();
1986     other.helpPostComplete();
1987     return dst;
1988     }
1989    
1990     /**
1991     * Creates and returns a CompletableFuture that is completed
1992     * when this and the other given CompletableFuture both
1993     * complete. If this and/or the other CompletableFuture complete
1994     * exceptionally, then the returned CompletableFuture also does
1995     * so, with a CompletionException holding one of these exceptions as
1996     * its cause.
1997     *
1998     * @param other the other CompletableFuture
1999     * @param action the action to perform before completing the
2000     * returned CompletableFuture
2001     * @return the new CompletableFuture
2002     */
2003     public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2004     Runnable action) {
2005     return doThenBiRun(other, action, null);
2006     }
2007    
2008     /**
2009     * Creates and returns a CompletableFuture that is completed
2010     * asynchronously using the {@link ForkJoinPool#commonPool()}
2011     * when this and the other given CompletableFuture both
2012     * complete. If this and/or the other CompletableFuture complete
2013     * exceptionally, then the returned CompletableFuture also does
2014     * so, with a CompletionException holding one of these exceptions as
2015     * its cause.
2016     *
2017     * @param other the other CompletableFuture
2018     * @param action the action to perform before completing the
2019     * returned CompletableFuture
2020     * @return the new CompletableFuture
2021     */
2022     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2023     Runnable action) {
2024     return doThenBiRun(other, action, ForkJoinPool.commonPool());
2025     }
2026    
2027     /**
2028     * Creates and returns a CompletableFuture that is completed
2029     * asynchronously using the given executor
2030     * when this and the other given CompletableFuture both
2031     * complete. If this and/or the other CompletableFuture complete
2032     * exceptionally, then the returned CompletableFuture also does
2033     * so, with a CompletionException holding one of these exceptions as
2034     * its cause.
2035     *
2036     * @param other the other CompletableFuture
2037     * @param action the action to perform before completing the
2038     * returned CompletableFuture
2039     * @param executor the executor to use for asynchronous execution
2040     * @return the new CompletableFuture
2041     */
2042     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2043     Runnable action,
2044     Executor executor) {
2045     if (executor == null) throw new NullPointerException();
2046     return doThenBiRun(other, action, executor);
2047     }
2048    
2049     private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2050     Runnable action,
2051     Executor e) {
2052     if (other == null || action == null) throw new NullPointerException();
2053     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2054     BiRunCompletion<T> d = null;
2055     Object r, s = null;
2056     if ((r = result) == null || (s = other.result) == null) {
2057     d = new BiRunCompletion<T>(this, other, action, dst, e);
2058     CompletionNode q = null, p = new CompletionNode(d);
2059     while ((r == null && (r = result) == null) ||
2060     (s == null && (s = other.result) == null)) {
2061     if (q != null) {
2062     if (s != null ||
2063     UNSAFE.compareAndSwapObject
2064     (other, COMPLETIONS, q.next = other.completions, q))
2065     break;
2066     }
2067     else if (r != null ||
2068     UNSAFE.compareAndSwapObject
2069     (this, COMPLETIONS, p.next = completions, p)) {
2070     if (s != null)
2071     break;
2072     q = new CompletionNode(d);
2073     }
2074     }
2075     }
2076     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2077     Throwable ex;
2078     if (r instanceof AltResult)
2079     ex = ((AltResult)r).ex;
2080     else
2081     ex = null;
2082     if (ex == null && (s instanceof AltResult))
2083     ex = ((AltResult)s).ex;
2084     if (ex == null) {
2085     try {
2086     if (e != null)
2087     e.execute(new AsyncRun(action, dst));
2088     else
2089     action.run();
2090     } catch (Throwable rex) {
2091     ex = rex;
2092     }
2093     }
2094     if (e == null || ex != null)
2095     dst.internalComplete(null, ex);
2096     }
2097     helpPostComplete();
2098     other.helpPostComplete();
2099     return dst;
2100     }
2101    
2102     /**
2103     * Creates and returns a CompletableFuture that is completed with
2104     * the result of the given function of either this or the other
2105     * given CompletableFuture's results when either complete. If
2106     * this and/or the other CompletableFuture complete exceptionally,
2107     * then the returned CompletableFuture may also do so, with a
2108     * CompletionException holding one of these exceptions as its cause.
2109     * No guarantees are made about which result or exception is used
2110     * in the returned CompletableFuture.
2111     *
2112     * @param other the other CompletableFuture
2113     * @param fn the function to use to compute the value of
2114     * the returned CompletableFuture
2115     * @return the new CompletableFuture
2116     */
2117     public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2118     Fun<? super T, U> fn) {
2119     return doOrApply(other, fn, null);
2120     }
2121    
2122     /**
2123     * Creates and returns a CompletableFuture that is completed
2124     * asynchronously using the {@link ForkJoinPool#commonPool()} with
2125     * the result of the given function of either this or the other
2126     * given CompletableFuture's results when either complete. If
2127     * this and/or the other CompletableFuture complete exceptionally,
2128     * then the returned CompletableFuture may also do so, with a
2129     * CompletionException holding one of these exceptions as its cause.
2130     * No guarantees are made about which result or exception is used
2131     * in the returned CompletableFuture.
2132     *
2133     * @param other the other CompletableFuture
2134     * @param fn the function to use to compute the value of
2135     * the returned CompletableFuture
2136     * @return the new CompletableFuture
2137     */
2138     public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2139     Fun<? super T, U> fn) {
2140     return doOrApply(other, fn, ForkJoinPool.commonPool());
2141     }
2142    
2143     /**
2144     * Creates and returns a CompletableFuture that is completed
2145     * asynchronously using the given executor with the result of the
2146     * given function of either this or the other given
2147     * CompletableFuture's results when either complete. If this
2148     * and/or the other CompletableFuture complete exceptionally, then
2149     * the returned CompletableFuture may also do so, with a
2150     * CompletionException holding one of these exceptions as its cause.
2151     * No guarantees are made about which result or exception is used
2152     * in the returned CompletableFuture.
2153     *
2154     * @param other the other CompletableFuture
2155     * @param fn the function to use to compute the value of
2156     * the returned CompletableFuture
2157     * @param executor the executor to use for asynchronous execution
2158     * @return the new CompletableFuture
2159     */
2160     public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2161     Fun<? super T, U> fn,
2162     Executor executor) {
2163     if (executor == null) throw new NullPointerException();
2164     return doOrApply(other, fn, executor);
2165     }
2166    
2167     private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2168     Fun<? super T, U> fn,
2169     Executor e) {
2170     if (other == null || fn == null) throw new NullPointerException();
2171     CompletableFuture<U> dst = new CompletableFuture<U>();
2172     OrApplyCompletion<T,U> d = null;
2173     Object r;
2174     if ((r = result) == null && (r = other.result) == null) {
2175     d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2176     CompletionNode q = null, p = new CompletionNode(d);
2177     while ((r = result) == null && (r = other.result) == null) {
2178     if (q != null) {
2179     if (UNSAFE.compareAndSwapObject
2180     (other, COMPLETIONS, q.next = other.completions, q))
2181     break;
2182     }
2183     else if (UNSAFE.compareAndSwapObject
2184     (this, COMPLETIONS, p.next = completions, p))
2185     q = new CompletionNode(d);
2186     }
2187     }
2188     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2189     T t; Throwable ex;
2190     if (r instanceof AltResult) {
2191     ex = ((AltResult)r).ex;
2192     t = null;
2193     }
2194     else {
2195     ex = null;
2196     @SuppressWarnings("unchecked") T tr = (T) r;
2197     t = tr;
2198     }
2199     U u = null;
2200     if (ex == null) {
2201     try {
2202     if (e != null)
2203     e.execute(new AsyncApply<T,U>(t, fn, dst));
2204     else
2205     u = fn.apply(t);
2206     } catch (Throwable rex) {
2207     ex = rex;
2208     }
2209     }
2210     if (e == null || ex != null)
2211     dst.internalComplete(u, ex);
2212     }
2213     helpPostComplete();
2214     other.helpPostComplete();
2215     return dst;
2216     }
2217    
2218     /**
2219     * Creates and returns a CompletableFuture that is completed after
2220     * performing the given action with the result of either this or the
2221     * other given CompletableFuture's result, when either complete.
2222     * If this and/or the other CompletableFuture complete
2223     * exceptionally, then the returned CompletableFuture may also do
2224     * so, with a CompletionException holding one of these exceptions as
2225     * its cause. No guarantees are made about which exception is
2226     * used in the returned CompletableFuture.
2227     *
2228     * @param other the other CompletableFuture
2229     * @param block the action to perform before completing the
2230     * returned CompletableFuture
2231     * @return the new CompletableFuture
2232     */
2233     public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2234     Action<? super T> block) {
2235     return doOrAccept(other, block, null);
2236     }
2237    
2238     /**
2239     * Creates and returns a CompletableFuture that is completed
2240     * asynchronously using the {@link ForkJoinPool#commonPool()},
2241     * performing the given action with the result of either this or
2242     * the other given CompletableFuture's result, when either
2243     * complete. If this and/or the other CompletableFuture complete
2244     * exceptionally, then the returned CompletableFuture may also do
2245     * so, with a CompletionException holding one of these exceptions as
2246     * its cause. No guarantees are made about which exception is
2247     * used in the returned CompletableFuture.
2248     *
2249     * @param other the other CompletableFuture
2250     * @param block the action to perform before completing the
2251     * returned CompletableFuture
2252     * @return the new CompletableFuture
2253     */
2254     public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2255     Action<? super T> block) {
2256     return doOrAccept(other, block, ForkJoinPool.commonPool());
2257     }
2258    
2259     /**
2260     * Creates and returns a CompletableFuture that is completed
2261     * asynchronously using the given executor,
2262     * performing the given action with the result of either this or
2263     * the other given CompletableFuture's result, when either
2264     * complete. If this and/or the other CompletableFuture complete
2265     * exceptionally, then the returned CompletableFuture may also do
2266     * so, with a CompletionException holding one of these exceptions as
2267     * its cause. No guarantees are made about which exception is
2268     * used in the returned CompletableFuture.
2269     *
2270     * @param other the other CompletableFuture
2271     * @param block the action to perform before completing the
2272     * returned CompletableFuture
2273     * @param executor the executor to use for asynchronous execution
2274     * @return the new CompletableFuture
2275     */
2276     public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2277     Action<? super T> block,
2278     Executor executor) {
2279     if (executor == null) throw new NullPointerException();
2280     return doOrAccept(other, block, executor);
2281     }
2282    
2283     private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2284     Action<? super T> fn,
2285     Executor e) {
2286     if (other == null || fn == null) throw new NullPointerException();
2287     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2288     OrAcceptCompletion<T> d = null;
2289     Object r;
2290     if ((r = result) == null && (r = other.result) == null) {
2291     d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2292     CompletionNode q = null, p = new CompletionNode(d);
2293     while ((r = result) == null && (r = other.result) == null) {
2294     if (q != null) {
2295     if (UNSAFE.compareAndSwapObject
2296     (other, COMPLETIONS, q.next = other.completions, q))
2297     break;
2298     }
2299     else if (UNSAFE.compareAndSwapObject
2300     (this, COMPLETIONS, p.next = completions, p))
2301     q = new CompletionNode(d);
2302     }
2303     }
2304     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2305     T t; Throwable ex;
2306     if (r instanceof AltResult) {
2307     ex = ((AltResult)r).ex;
2308     t = null;
2309     }
2310     else {
2311     ex = null;
2312     @SuppressWarnings("unchecked") T tr = (T) r;
2313     t = tr;
2314     }
2315     if (ex == null) {
2316     try {
2317     if (e != null)
2318     e.execute(new AsyncAccept<T>(t, fn, dst));
2319     else
2320     fn.accept(t);
2321     } catch (Throwable rex) {
2322     ex = rex;
2323     }
2324     }
2325     if (e == null || ex != null)
2326     dst.internalComplete(null, ex);
2327     }
2328     helpPostComplete();
2329     other.helpPostComplete();
2330     return dst;
2331     }
2332    
2333     /**
2334     * Creates and returns a CompletableFuture that is completed
2335     * after this or the other given CompletableFuture complete. If
2336     * this and/or the other CompletableFuture complete exceptionally,
2337     * then the returned CompletableFuture may also do so, with a
2338     * CompletionException holding one of these exceptions as its cause.
2339     * No guarantees are made about which exception is used in the
2340     * returned CompletableFuture.
2341     *
2342     * @param other the other CompletableFuture
2343     * @param action the action to perform before completing the
2344     * returned CompletableFuture
2345     * @return the new CompletableFuture
2346     */
2347     public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2348     Runnable action) {
2349     return doOrRun(other, action, null);
2350     }
2351    
2352     /**
2353     * Creates and returns a CompletableFuture that is completed
2354     * asynchronously using the {@link ForkJoinPool#commonPool()}
2355     * after this or the other given CompletableFuture complete. If
2356     * this and/or the other CompletableFuture complete exceptionally,
2357     * then the returned CompletableFuture may also do so, with a
2358     * CompletionException holding one of these exceptions as its cause.
2359     * No guarantees are made about which exception is used in the
2360     * returned CompletableFuture.
2361     *
2362     * @param other the other CompletableFuture
2363     * @param action the action to perform before completing the
2364     * returned CompletableFuture
2365     * @return the new CompletableFuture
2366     */
2367     public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2368     Runnable action) {
2369     return doOrRun(other, action, ForkJoinPool.commonPool());
2370     }
2371    
2372     /**
2373     * Creates and returns a CompletableFuture that is completed
2374     * asynchronously using the given executor after this or the other
2375     * given CompletableFuture complete. If this and/or the other
2376     * CompletableFuture complete exceptionally, then the returned
2377     * CompletableFuture may also do so, with a CompletionException
2378     * holding one of these exceptions as its cause. No guarantees are
2379     * made about which exception is used in the returned
2380     * CompletableFuture.
2381     *
2382     * @param other the other CompletableFuture
2383     * @param action the action to perform before completing the
2384     * returned CompletableFuture
2385     * @param executor the executor to use for asynchronous execution
2386     * @return the new CompletableFuture
2387     */
2388     public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2389     Runnable action,
2390     Executor executor) {
2391     if (executor == null) throw new NullPointerException();
2392     return doOrRun(other, action, executor);
2393     }
2394    
2395     private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2396     Runnable action,
2397     Executor e) {
2398     if (other == null || action == null) throw new NullPointerException();
2399     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2400     OrRunCompletion<T> d = null;
2401     Object r;
2402     if ((r = result) == null && (r = other.result) == null) {
2403     d = new OrRunCompletion<T>(this, other, action, dst, e);
2404     CompletionNode q = null, p = new CompletionNode(d);
2405     while ((r = result) == null && (r = other.result) == null) {
2406     if (q != null) {
2407     if (UNSAFE.compareAndSwapObject
2408     (other, COMPLETIONS, q.next = other.completions, q))
2409     break;
2410     }
2411     else if (UNSAFE.compareAndSwapObject
2412     (this, COMPLETIONS, p.next = completions, p))
2413     q = new CompletionNode(d);
2414     }
2415     }
2416     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2417     Throwable ex;
2418     if (r instanceof AltResult)
2419     ex = ((AltResult)r).ex;
2420     else
2421     ex = null;
2422     if (ex == null) {
2423     try {
2424     if (e != null)
2425     e.execute(new AsyncRun(action, dst));
2426     else
2427     action.run();
2428     } catch (Throwable rex) {
2429     ex = rex;
2430     }
2431     }
2432     if (e == null || ex != null)
2433     dst.internalComplete(null, ex);
2434     }
2435     helpPostComplete();
2436     other.helpPostComplete();
2437     return dst;
2438     }
2439    
2440     /**
2441     * Returns a CompletableFuture (or an equivalent one) produced by
2442     * the given function of the result of this CompletableFuture when
2443     * completed. If this CompletableFuture completes exceptionally,
2444     * then the returned CompletableFuture also does so, with a
2445     * CompletionException holding this exception as its cause.
2446     *
2447 jsr166 1.8 * @param fn the function returning a new CompletableFuture
2448 dl 1.1 * @return the CompletableFuture, that {@code isDone()} upon
2449     * return if completed by the given function, or an exception
2450 jsr166 1.8 * occurs
2451 dl 1.1 */
2452     public <U> CompletableFuture<U> thenCompose(Fun<? super T,
2453     CompletableFuture<U>> fn) {
2454     if (fn == null) throw new NullPointerException();
2455     CompletableFuture<U> dst = null;
2456     ComposeCompletion<T,U> d = null;
2457     Object r;
2458     if ((r = result) == null) {
2459     dst = new CompletableFuture<U>();
2460     CompletionNode p = new CompletionNode
2461     (d = new ComposeCompletion<T,U>(this, fn, dst));
2462     while ((r = result) == null) {
2463     if (UNSAFE.compareAndSwapObject
2464     (this, COMPLETIONS, p.next = completions, p))
2465     break;
2466     }
2467     }
2468     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2469     T t; Throwable ex;
2470     if (r instanceof AltResult) {
2471     ex = ((AltResult)r).ex;
2472     t = null;
2473     }
2474     else {
2475     ex = null;
2476     @SuppressWarnings("unchecked") T tr = (T) r;
2477     t = tr;
2478     }
2479     if (ex == null) {
2480     try {
2481     dst = fn.apply(t);
2482     } catch (Throwable rex) {
2483     ex = rex;
2484     }
2485     }
2486     if (dst == null) {
2487     dst = new CompletableFuture<U>();
2488     if (ex == null)
2489     ex = new NullPointerException();
2490     }
2491     if (ex != null)
2492     dst.internalComplete(null, ex);
2493     }
2494     helpPostComplete();
2495     dst.helpPostComplete();
2496     return dst;
2497     }
2498    
2499     /**
2500     * Creates and returns a CompletableFuture that is completed with
2501     * the result of the given function of the exception triggering
2502     * this CompletableFuture's completion when it completes
2503     * exceptionally; Otherwise, if this CompletableFuture completes
2504     * normally, then the returned CompletableFuture also completes
2505     * normally with the same value.
2506     *
2507     * @param fn the function to use to compute the value of the
2508     * returned CompletableFuture if this CompletableFuture completed
2509     * exceptionally
2510     * @return the new CompletableFuture
2511     */
2512     public CompletableFuture<T> exceptionally(Fun<Throwable, ? extends T> fn) {
2513     if (fn == null) throw new NullPointerException();
2514     CompletableFuture<T> dst = new CompletableFuture<T>();
2515     ExceptionCompletion<T> d = null;
2516     Object r;
2517     if ((r = result) == null) {
2518     CompletionNode p =
2519     new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2520     while ((r = result) == null) {
2521     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2522     p.next = completions, p))
2523     break;
2524     }
2525     }
2526     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2527     T t = null; Throwable ex, dx = null;
2528     if (r instanceof AltResult) {
2529     if ((ex = ((AltResult)r).ex) != null) {
2530     try {
2531     t = fn.apply(ex);
2532     } catch (Throwable rex) {
2533     dx = rex;
2534     }
2535     }
2536     }
2537     else {
2538     @SuppressWarnings("unchecked") T tr = (T) r;
2539     t = tr;
2540     }
2541     dst.internalComplete(t, dx);
2542     }
2543     helpPostComplete();
2544     return dst;
2545     }
2546    
2547     /**
2548     * Creates and returns a CompletableFuture that is completed with
2549     * the result of the given function of the result and exception of
2550     * this CompletableFuture's completion when it completes. The
2551     * given function is invoked with the result (or {@code null} if
2552     * none) and the exception (or {@code null} if none) of this
2553     * CompletableFuture when complete.
2554     *
2555     * @param fn the function to use to compute the value of the
2556     * returned CompletableFuture
2557     * @return the new CompletableFuture
2558     */
2559     public <U> CompletableFuture<U> handle(BiFun<? super T, Throwable, ? extends U> fn) {
2560     if (fn == null) throw new NullPointerException();
2561     CompletableFuture<U> dst = new CompletableFuture<U>();
2562     HandleCompletion<T,U> d = null;
2563     Object r;
2564     if ((r = result) == null) {
2565     CompletionNode p =
2566     new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2567     while ((r = result) == null) {
2568     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2569     p.next = completions, p))
2570     break;
2571     }
2572     }
2573     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2574     T t; Throwable ex;
2575     if (r instanceof AltResult) {
2576     ex = ((AltResult)r).ex;
2577     t = null;
2578     }
2579     else {
2580     ex = null;
2581     @SuppressWarnings("unchecked") T tr = (T) r;
2582     t = tr;
2583     }
2584     U u; Throwable dx;
2585     try {
2586     u = fn.apply(t, ex);
2587     dx = null;
2588     } catch (Throwable rex) {
2589     dx = rex;
2590     u = null;
2591     }
2592     dst.internalComplete(u, dx);
2593     }
2594     helpPostComplete();
2595     return dst;
2596     }
2597    
2598     /**
2599     * Attempts to complete this CompletableFuture with
2600     * a {@link CancellationException}.
2601     *
2602     * @param mayInterruptIfRunning this value has no effect in this
2603     * implementation because interrupts are not used to control
2604     * processing.
2605     *
2606     * @return {@code true} if this task is now cancelled
2607     */
2608     public boolean cancel(boolean mayInterruptIfRunning) {
2609     Object r;
2610     while ((r = result) == null) {
2611     r = new AltResult(new CancellationException());
2612     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
2613     postComplete();
2614     return true;
2615     }
2616     }
2617     return ((r instanceof AltResult) &&
2618     (((AltResult)r).ex instanceof CancellationException));
2619     }
2620    
2621     /**
2622     * Returns {@code true} if this CompletableFuture was cancelled
2623     * before it completed normally.
2624     *
2625     * @return {@code true} if this CompletableFuture was cancelled
2626     * before it completed normally
2627     */
2628     public boolean isCancelled() {
2629     Object r;
2630 jsr166 1.11 return ((r = result) instanceof AltResult) &&
2631     (((AltResult)r).ex instanceof CancellationException);
2632 dl 1.1 }
2633    
2634     /**
2635     * Forcibly sets or resets the value subsequently returned by
2636 jsr166 1.10 * method {@link #get()} and related methods, whether or not
2637     * already completed. This method is designed for use only in
2638     * error recovery actions, and even in such situations may result
2639     * in ongoing dependent completions using established versus
2640 dl 1.3 * overwritten outcomes.
2641 dl 1.1 *
2642     * @param value the completion value
2643     */
2644     public void obtrudeValue(T value) {
2645     result = (value == null) ? NIL : value;
2646     postComplete();
2647     }
2648    
2649 dl 1.3 /**
2650 jsr166 1.9 * Forcibly causes subsequent invocations of method {@link #get()}
2651     * and related methods to throw the given exception, whether or
2652     * not already completed. This method is designed for use only in
2653 dl 1.3 * recovery actions, and even in such situations may result in
2654     * ongoing dependent completions using established versus
2655     * overwritten outcomes.
2656     *
2657     * @param ex the exception
2658     */
2659     public void obtrudeException(Throwable ex) {
2660     if (ex == null) throw new NullPointerException();
2661     result = new AltResult(ex);
2662     postComplete();
2663     }
2664    
2665 dl 1.1 // Unsafe mechanics
2666     private static final sun.misc.Unsafe UNSAFE;
2667     private static final long RESULT;
2668     private static final long WAITERS;
2669     private static final long COMPLETIONS;
2670     static {
2671     try {
2672     UNSAFE = getUnsafe();
2673     Class<?> k = CompletableFuture.class;
2674     RESULT = UNSAFE.objectFieldOffset
2675     (k.getDeclaredField("result"));
2676     WAITERS = UNSAFE.objectFieldOffset
2677     (k.getDeclaredField("waiters"));
2678     COMPLETIONS = UNSAFE.objectFieldOffset
2679     (k.getDeclaredField("completions"));
2680     } catch (Exception e) {
2681     throw new Error(e);
2682     }
2683     }
2684    
2685    
2686     /**
2687     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
2688     * Replace with a simple call to Unsafe.getUnsafe when integrating
2689     * into a jdk.
2690     *
2691     * @return a sun.misc.Unsafe
2692     */
2693     private static sun.misc.Unsafe getUnsafe() {
2694     try {
2695     return sun.misc.Unsafe.getUnsafe();
2696 jsr166 1.6 } catch (SecurityException tryReflectionInstead) {}
2697     try {
2698     return java.security.AccessController.doPrivileged
2699     (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
2700     public sun.misc.Unsafe run() throws Exception {
2701     Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
2702     for (java.lang.reflect.Field f : k.getDeclaredFields()) {
2703     f.setAccessible(true);
2704     Object x = f.get(null);
2705     if (k.isInstance(x))
2706     return k.cast(x);
2707     }
2708     throw new NoSuchFieldError("the Unsafe");
2709     }});
2710     } catch (java.security.PrivilegedActionException e) {
2711     throw new RuntimeException("Could not initialize intrinsics",
2712     e.getCause());
2713 dl 1.1 }
2714     }
2715     }