ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CompletableFuture.java
Revision: 1.6
Committed: Wed Jan 9 02:51:36 2013 UTC (11 years, 4 months ago) by jsr166
Branch: MAIN
Changes since 1.5: +17 -16 lines
Log Message:
more portable getUnsafe()

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package jsr166e;
8     import java.util.concurrent.Future;
9     import java.util.concurrent.TimeUnit;
10     import java.util.concurrent.Executor;
11     import java.util.concurrent.ThreadLocalRandom;
12     import java.util.concurrent.ExecutionException;
13     import java.util.concurrent.TimeoutException;
14     import java.util.concurrent.CancellationException;
15     import java.util.concurrent.atomic.AtomicInteger;
16     import java.util.concurrent.locks.LockSupport;
17    
18     /**
19     * A {@link Future} that may be explicitly completed (setting its
20     * value and status), and may include dependent functions and actions
21     * that trigger upon its completion. Methods are available for adding
22     * those based on Functions, Blocks, and Runnables, depending on
23     * whether they require arguments and/or produce results, as well as
24     * those triggered after either or both the current and another
25     * CompletableFuture complete. Functions and actions supplied for
26     * dependent completions (mainly using methods with prefix {@code
27     * then}) may be performed by the thread that completes the current
28     * CompletableFuture, or by any other caller of these methods. There
29     * are no guarantees about the order of processing completions unless
30     * constrained by these methods.
31     *
32     * <p>When two or more threads attempt to {@link #complete} or {@link
33     * #completeExceptionally} a CompletableFuture, only one of them
34     * succeeds.
35     *
36     * <p>Upon exceptional completion, or when a completion entails
37     * computation of a function or action, and it terminates abruptly
38     * with an (unchecked) exception or error, then further completions
39     * act as {@code completeExceptionally} with a {@link
40     * CompletionException} holding that exception as its cause. If a
41     * CompletableFuture completes exceptionally, and is not followed by a
42     * {@link #exceptionally} or {@link #handle} completion, then all of
43     * its dependents (and their dependents) also complete exceptionally
44     * with CompletionExceptions holding the ultimate cause. In case of a
45     * CompletionException, methods {@link #get()} and {@link #get(long,
46     * TimeUnit)} throw an {@link ExecutionException} with the same cause
47     * as would be held in the corresponding CompletionException. However,
48     * in these cases, methods {@link #join()} and {@link #getNow} throw
49     * the CompletionException, which simplifies usage especially within
50     * other completion functions.
51     *
52     * <p>CompletableFutures themselves do not execute asynchronously.
53     * However, the {@code async} methods provide commonly useful ways to
54     * commence asynchronous processing, using either a given {@link
55     * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
56     * function or action that will result in the completion of a new
57     * CompletableFuture. To simplify monitoring, debugging, and tracking,
58     * all generated asynchronous tasks are instances of the tagging
59     * interface {@link AsynchronousCompletionTask}.
60     *
61     * <p><em>jsr166e note: During transition, this class
62     * uses nested functional interfaces with different names but the
63     * same forms as those expected for JDK8.</em>
64     *
65     * @author Doug Lea
66     * @since 1.8
67     */
68     public class CompletableFuture<T> implements Future<T> {
69     // jsr166e nested interfaces
70    
71     /** Interface describing a void action of one argument */
72     public interface Action<A> { void accept(A a); }
73     /** Interface describing a void action of two arguments */
74     public interface BiAction<A,B> { void accept(A a, B b); }
75     /** Interface describing a function of one argument */
76     public interface Fun<A,T> { T apply(A a); }
77     /** Interface describing a function of two arguments */
78     public interface BiFun<A,B,T> { T apply(A a, B b); }
79     /** Interface describing a function of no arguments */
80     public interface Generator<T> { T get(); }
81    
82    
83     /*
84     * Overview:
85     *
86 jsr166 1.5 * 1. Non-nullness of field result (set via CAS) indicates done.
87     * An AltResult is used to box null as a result, as well as to
88     * hold exceptions. Using a single field makes completion fast
89 dl 1.1 * and simple to detect and trigger, at the expense of a lot of
90     * encoding and decoding that infiltrates many methods. One minor
91     * simplification relies on the (static) NIL (to box null results)
92     * being the only AltResult with a null exception field, so we
93     * don't usually need explicit comparisons with NIL. The CF
94     * exception propagation mechanics surrounding decoding rely on
95     * unchecked casts of decoded results really being unchecked,
96     * where user type errors are caught at point of use, as is
97     * currently the case in Java. These are highlighted by using
98     * SuppressWarnings-annotated temporaries.
99     *
100     * 2. Waiters are held in a Treiber stack similar to the one used
101     * in FutureTask, Phaser, and SynchronousQueue. See their
102     * internal documentation for algorithmic details.
103     *
104     * 3. Completions are also kept in a list/stack, and pulled off
105     * and run when completion is triggered. (We could even use the
106     * same stack as for waiters, but would give up the potential
107     * parallelism obtained because woken waiters help release/run
108     * others -- see method postComplete). Because post-processing
109     * may race with direct calls, class Completion opportunistically
110     * extends AtomicInteger so callers can claim the action via
111     * compareAndSet(0, 1). The Completion.run methods are all
112     * written a boringly similar uniform way (that sometimes includes
113     * unnecessary-looking checks, kept to maintain uniformity). There
114     * are enough dimensions upon which they differ that factoring to
115     * use common code isn't worthwhile.
116     *
117     * 4. The exported then/and/or methods do support a bit of
118     * factoring (see doThenApply etc). They must cope with the
119     * intrinsic races surrounding addition of a dependent action
120     * versus performing the action directly because the task is
121     * already complete. For example, a CF may not be complete upon
122     * entry, so a dependent completion is added, but by the time it
123     * is added, the target CF is complete, so must be directly
124     * executed. This is all done while avoiding unnecessary object
125     * construction in safe-bypass cases.
126     */
127    
128     // preliminaries
129    
130     static final class AltResult {
131     final Throwable ex; // null only for NIL
132     AltResult(Throwable ex) { this.ex = ex; }
133     }
134    
135     static final AltResult NIL = new AltResult(null);
136    
137     // Fields
138    
139     volatile Object result; // Either the result or boxed AltResult
140     volatile WaitNode waiters; // Treiber stack of threads blocked on get()
141     volatile CompletionNode completions; // list (Treiber stack) of completions
142    
143     // Basic utilities for triggering and processing completions
144    
145     /**
146     * Removes and signals all waiting threads and runs all completions.
147     */
148     final void postComplete() {
149     WaitNode q; Thread t;
150     while ((q = waiters) != null) {
151     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
152     (t = q.thread) != null) {
153     q.thread = null;
154     LockSupport.unpark(t);
155     }
156     }
157    
158     CompletionNode h; Completion c;
159     while ((h = completions) != null) {
160     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
161     (c = h.completion) != null)
162     c.run();
163     }
164     }
165    
166     /**
167     * Triggers completion with the encoding of the given arguments:
168     * if the exception is non-null, encodes it as a wrapped
169     * CompletionException unless it is one already. Otherwise uses
170     * the given result, boxed as NIL if null.
171     */
172     final void internalComplete(Object v, Throwable ex) {
173     if (result == null)
174     UNSAFE.compareAndSwapObject
175     (this, RESULT, null,
176     (ex == null) ? (v == null) ? NIL : v :
177     new AltResult((ex instanceof CompletionException) ? ex :
178     new CompletionException(ex)));
179     postComplete(); // help out even if not triggered
180     }
181    
182     /**
183     * If triggered, helps release and/or process completions.
184     */
185     final void helpPostComplete() {
186     if (result != null)
187     postComplete();
188     }
189    
190     /* ------------- waiting for completions -------------- */
191    
192     /**
193     * Heuristic spin value for waitingGet() before blocking on
194     * multiprocessors
195     */
196     static final int WAITING_GET_SPINS = 256;
197    
198     /**
199     * Linked nodes to record waiting threads in a Treiber stack. See
200     * other classes such as Phaser and SynchronousQueue for more
201     * detailed explanation. This class implements ManagedBlocker to
202     * avoid starvation when blocking actions pile up in
203     * ForkJoinPools.
204     */
205     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
206     long nanos; // wait time if timed
207     final long deadline; // non-zero if timed
208     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
209     volatile Thread thread;
210     volatile WaitNode next;
211     WaitNode(boolean interruptible, long nanos, long deadline) {
212     this.thread = Thread.currentThread();
213     this.interruptControl = interruptible ? 1 : 0;
214     this.nanos = nanos;
215     this.deadline = deadline;
216     }
217     public boolean isReleasable() {
218     if (thread == null)
219     return true;
220     if (Thread.interrupted()) {
221     int i = interruptControl;
222     interruptControl = -1;
223     if (i > 0)
224     return true;
225     }
226     if (deadline != 0L &&
227     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
228     thread = null;
229     return true;
230     }
231     return false;
232     }
233     public boolean block() {
234     if (isReleasable())
235     return true;
236     else if (deadline == 0L)
237     LockSupport.park(this);
238     else if (nanos > 0L)
239     LockSupport.parkNanos(this, nanos);
240     return isReleasable();
241     }
242     }
243    
244     /**
245     * Returns raw result after waiting, or null if interruptible and
246     * interrupted.
247     */
248     private Object waitingGet(boolean interruptible) {
249     WaitNode q = null;
250     boolean queued = false;
251     int h = 0, spins = 0;
252     for (Object r;;) {
253     if ((r = result) != null) {
254     if (q != null) { // suppress unpark
255     q.thread = null;
256     if (q.interruptControl < 0) {
257     if (interruptible) {
258     removeWaiter(q);
259     return null;
260     }
261     Thread.currentThread().interrupt();
262     }
263     }
264     postComplete(); // help release others
265     return r;
266     }
267     else if (h == 0) {
268     h = ThreadLocalRandom.current().nextInt();
269     if (Runtime.getRuntime().availableProcessors() > 1)
270     spins = WAITING_GET_SPINS;
271     }
272     else if (spins > 0) {
273     h ^= h << 1; // xorshift
274     h ^= h >>> 3;
275     if ((h ^= h << 10) >= 0)
276     --spins;
277     }
278     else if (q == null)
279     q = new WaitNode(interruptible, 0L, 0L);
280     else if (!queued)
281     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
282     q.next = waiters, q);
283     else if (interruptible && q.interruptControl < 0) {
284     removeWaiter(q);
285     return null;
286     }
287     else if (q.thread != null && result == null) {
288     try {
289     ForkJoinPool.managedBlock(q);
290 jsr166 1.4 } catch (InterruptedException ex) {
291 dl 1.1 q.interruptControl = -1;
292     }
293     }
294     }
295     }
296    
297     /**
298     * Awaits completion or aborts on interrupt or timeout.
299     *
300     * @param nanos time to wait
301     * @return raw result
302     */
303     private Object timedAwaitDone(long nanos)
304     throws InterruptedException, TimeoutException {
305     WaitNode q = null;
306     boolean queued = false;
307     for (Object r;;) {
308     if ((r = result) != null) {
309     if (q != null) {
310     q.thread = null;
311     if (q.interruptControl < 0) {
312     removeWaiter(q);
313     throw new InterruptedException();
314     }
315     }
316     postComplete();
317     return r;
318     }
319     else if (q == null) {
320     if (nanos <= 0L)
321     throw new TimeoutException();
322     long d = System.nanoTime() + nanos;
323 jsr166 1.4 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
324 dl 1.1 }
325     else if (!queued)
326     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
327     q.next = waiters, q);
328     else if (q.interruptControl < 0) {
329     removeWaiter(q);
330     throw new InterruptedException();
331     }
332     else if (q.nanos <= 0L) {
333     if (result == null) {
334     removeWaiter(q);
335     throw new TimeoutException();
336     }
337     }
338     else if (q.thread != null && result == null) {
339     try {
340     ForkJoinPool.managedBlock(q);
341 jsr166 1.4 } catch (InterruptedException ex) {
342 dl 1.1 q.interruptControl = -1;
343     }
344     }
345     }
346     }
347    
348     /**
349     * Tries to unlink a timed-out or interrupted wait node to avoid
350     * accumulating garbage. Internal nodes are simply unspliced
351     * without CAS since it is harmless if they are traversed anyway
352     * by releasers. To avoid effects of unsplicing from already
353     * removed nodes, the list is retraversed in case of an apparent
354     * race. This is slow when there are a lot of nodes, but we don't
355     * expect lists to be long enough to outweigh higher-overhead
356     * schemes.
357     */
358     private void removeWaiter(WaitNode node) {
359     if (node != null) {
360     node.thread = null;
361     retry:
362     for (;;) { // restart on removeWaiter race
363     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
364     s = q.next;
365     if (q.thread != null)
366     pred = q;
367     else if (pred != null) {
368     pred.next = s;
369     if (pred.thread == null) // check for race
370     continue retry;
371     }
372     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
373     continue retry;
374     }
375     break;
376     }
377     }
378     }
379    
380     /* ------------- Async tasks -------------- */
381    
382     /**
383     * A tagging interface identifying asynchronous tasks produced by
384     * {@code async} methods. This may be useful for monitoring,
385     * debugging, and tracking asynchronous activities.
386     */
387     public static interface AsynchronousCompletionTask {
388     }
389    
390     /** Base class can act as either FJ or plain Runnable */
391     static abstract class Async extends ForkJoinTask<Void>
392     implements Runnable, AsynchronousCompletionTask {
393     public final Void getRawResult() { return null; }
394     public final void setRawResult(Void v) { }
395     public final void run() { exec(); }
396     }
397    
398     static final class AsyncRun extends Async {
399     final Runnable fn;
400     final CompletableFuture<Void> dst;
401     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
402     this.fn = fn; this.dst = dst;
403     }
404     public final boolean exec() {
405     CompletableFuture<Void> d; Throwable ex;
406 dl 1.2 if ((d = this.dst) != null && d.result == null) {
407 dl 1.1 try {
408     fn.run();
409     ex = null;
410     } catch (Throwable rex) {
411     ex = rex;
412     }
413     d.internalComplete(null, ex);
414     }
415     return true;
416     }
417     private static final long serialVersionUID = 5232453952276885070L;
418     }
419    
420     static final class AsyncSupply<U> extends Async {
421     final Generator<U> fn;
422     final CompletableFuture<U> dst;
423     AsyncSupply(Generator<U> fn, CompletableFuture<U> dst) {
424     this.fn = fn; this.dst = dst;
425     }
426     public final boolean exec() {
427     CompletableFuture<U> d; U u; Throwable ex;
428 dl 1.2 if ((d = this.dst) != null && d.result == null) {
429 dl 1.1 try {
430     u = fn.get();
431     ex = null;
432     } catch (Throwable rex) {
433     ex = rex;
434     u = null;
435     }
436     d.internalComplete(u, ex);
437     }
438     return true;
439     }
440     private static final long serialVersionUID = 5232453952276885070L;
441     }
442    
443     static final class AsyncApply<T,U> extends Async {
444     final Fun<? super T,? extends U> fn;
445     final T arg;
446     final CompletableFuture<U> dst;
447     AsyncApply(T arg, Fun<? super T,? extends U> fn,
448     CompletableFuture<U> dst) {
449     this.arg = arg; this.fn = fn; this.dst = dst;
450     }
451     public final boolean exec() {
452     CompletableFuture<U> d; U u; Throwable ex;
453 dl 1.2 if ((d = this.dst) != null && d.result == null) {
454 dl 1.1 try {
455     u = fn.apply(arg);
456     ex = null;
457     } catch (Throwable rex) {
458     ex = rex;
459     u = null;
460     }
461     d.internalComplete(u, ex);
462     }
463     return true;
464     }
465     private static final long serialVersionUID = 5232453952276885070L;
466     }
467    
468     static final class AsyncBiApply<T,U,V> extends Async {
469     final BiFun<? super T,? super U,? extends V> fn;
470     final T arg1;
471     final U arg2;
472     final CompletableFuture<V> dst;
473     AsyncBiApply(T arg1, U arg2,
474     BiFun<? super T,? super U,? extends V> fn,
475     CompletableFuture<V> dst) {
476     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
477     }
478     public final boolean exec() {
479     CompletableFuture<V> d; V v; Throwable ex;
480 dl 1.2 if ((d = this.dst) != null && d.result == null) {
481 dl 1.1 try {
482     v = fn.apply(arg1, arg2);
483     ex = null;
484     } catch (Throwable rex) {
485     ex = rex;
486     v = null;
487     }
488     d.internalComplete(v, ex);
489     }
490     return true;
491     }
492     private static final long serialVersionUID = 5232453952276885070L;
493     }
494    
495     static final class AsyncAccept<T> extends Async {
496     final Action<? super T> fn;
497     final T arg;
498     final CompletableFuture<Void> dst;
499     AsyncAccept(T arg, Action<? super T> fn,
500     CompletableFuture<Void> dst) {
501     this.arg = arg; this.fn = fn; this.dst = dst;
502     }
503     public final boolean exec() {
504     CompletableFuture<Void> d; Throwable ex;
505 dl 1.2 if ((d = this.dst) != null && d.result == null) {
506 dl 1.1 try {
507     fn.accept(arg);
508     ex = null;
509     } catch (Throwable rex) {
510     ex = rex;
511     }
512     d.internalComplete(null, ex);
513     }
514     return true;
515     }
516     private static final long serialVersionUID = 5232453952276885070L;
517     }
518    
519     static final class AsyncBiAccept<T,U> extends Async {
520     final BiAction<? super T,? super U> fn;
521     final T arg1;
522     final U arg2;
523     final CompletableFuture<Void> dst;
524     AsyncBiAccept(T arg1, U arg2,
525     BiAction<? super T,? super U> fn,
526     CompletableFuture<Void> dst) {
527     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
528     }
529     public final boolean exec() {
530     CompletableFuture<Void> d; Throwable ex;
531 dl 1.2 if ((d = this.dst) != null && d.result == null) {
532 dl 1.1 try {
533     fn.accept(arg1, arg2);
534     ex = null;
535     } catch (Throwable rex) {
536     ex = rex;
537     }
538     d.internalComplete(null, ex);
539     }
540     return true;
541     }
542     private static final long serialVersionUID = 5232453952276885070L;
543     }
544    
545     /* ------------- Completions -------------- */
546    
547     /**
548     * Simple linked list nodes to record completions, used in
549     * basically the same way as WaitNodes. (We separate nodes from
550     * the Completions themselves mainly because for the And and Or
551     * methods, the same Completion object resides in two lists.)
552     */
553     static final class CompletionNode {
554     final Completion completion;
555     volatile CompletionNode next;
556     CompletionNode(Completion completion) { this.completion = completion; }
557     }
558    
559     // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
560     static abstract class Completion extends AtomicInteger implements Runnable {
561     }
562    
563     static final class ApplyCompletion<T,U> extends Completion {
564     final CompletableFuture<? extends T> src;
565     final Fun<? super T,? extends U> fn;
566     final CompletableFuture<U> dst;
567     final Executor executor;
568     ApplyCompletion(CompletableFuture<? extends T> src,
569     Fun<? super T,? extends U> fn,
570     CompletableFuture<U> dst, Executor executor) {
571     this.src = src; this.fn = fn; this.dst = dst;
572     this.executor = executor;
573     }
574     public final void run() {
575     final CompletableFuture<? extends T> a;
576     final Fun<? super T,? extends U> fn;
577     final CompletableFuture<U> dst;
578     Object r; T t; Throwable ex;
579     if ((dst = this.dst) != null &&
580     (fn = this.fn) != null &&
581     (a = this.src) != null &&
582     (r = a.result) != null &&
583     compareAndSet(0, 1)) {
584     if (r instanceof AltResult) {
585     ex = ((AltResult)r).ex;
586     t = null;
587     }
588     else {
589     ex = null;
590     @SuppressWarnings("unchecked") T tr = (T) r;
591     t = tr;
592     }
593     Executor e = executor;
594     U u = null;
595     if (ex == null) {
596     try {
597     if (e != null)
598     e.execute(new AsyncApply<T,U>(t, fn, dst));
599     else
600     u = fn.apply(t);
601     } catch (Throwable rex) {
602     ex = rex;
603     }
604     }
605     if (e == null || ex != null)
606     dst.internalComplete(u, ex);
607     }
608     }
609     private static final long serialVersionUID = 5232453952276885070L;
610     }
611    
612     static final class AcceptCompletion<T> extends Completion {
613     final CompletableFuture<? extends T> src;
614     final Action<? super T> fn;
615     final CompletableFuture<Void> dst;
616     final Executor executor;
617     AcceptCompletion(CompletableFuture<? extends T> src,
618     Action<? super T> fn,
619     CompletableFuture<Void> dst, Executor executor) {
620     this.src = src; this.fn = fn; this.dst = dst;
621     this.executor = executor;
622     }
623     public final void run() {
624     final CompletableFuture<? extends T> a;
625     final Action<? super T> fn;
626     final CompletableFuture<Void> dst;
627     Object r; T t; Throwable ex;
628     if ((dst = this.dst) != null &&
629     (fn = this.fn) != null &&
630     (a = this.src) != null &&
631     (r = a.result) != null &&
632     compareAndSet(0, 1)) {
633     if (r instanceof AltResult) {
634     ex = ((AltResult)r).ex;
635     t = null;
636     }
637     else {
638     ex = null;
639     @SuppressWarnings("unchecked") T tr = (T) r;
640     t = tr;
641     }
642     Executor e = executor;
643     if (ex == null) {
644     try {
645     if (e != null)
646     e.execute(new AsyncAccept<T>(t, fn, dst));
647     else
648     fn.accept(t);
649     } catch (Throwable rex) {
650     ex = rex;
651     }
652     }
653     if (e == null || ex != null)
654     dst.internalComplete(null, ex);
655     }
656     }
657     private static final long serialVersionUID = 5232453952276885070L;
658     }
659    
660     static final class RunCompletion<T> extends Completion {
661     final CompletableFuture<? extends T> src;
662     final Runnable fn;
663     final CompletableFuture<Void> dst;
664     final Executor executor;
665     RunCompletion(CompletableFuture<? extends T> src,
666     Runnable fn,
667     CompletableFuture<Void> dst,
668     Executor executor) {
669     this.src = src; this.fn = fn; this.dst = dst;
670     this.executor = executor;
671     }
672     public final void run() {
673     final CompletableFuture<? extends T> a;
674     final Runnable fn;
675     final CompletableFuture<Void> dst;
676     Object r; Throwable ex;
677     if ((dst = this.dst) != null &&
678     (fn = this.fn) != null &&
679     (a = this.src) != null &&
680     (r = a.result) != null &&
681     compareAndSet(0, 1)) {
682     if (r instanceof AltResult)
683     ex = ((AltResult)r).ex;
684     else
685     ex = null;
686     Executor e = executor;
687     if (ex == null) {
688     try {
689     if (e != null)
690     e.execute(new AsyncRun(fn, dst));
691     else
692     fn.run();
693     } catch (Throwable rex) {
694     ex = rex;
695     }
696     }
697     if (e == null || ex != null)
698     dst.internalComplete(null, ex);
699     }
700     }
701     private static final long serialVersionUID = 5232453952276885070L;
702     }
703    
704     static final class BiApplyCompletion<T,U,V> extends Completion {
705     final CompletableFuture<? extends T> src;
706     final CompletableFuture<? extends U> snd;
707     final BiFun<? super T,? super U,? extends V> fn;
708     final CompletableFuture<V> dst;
709     final Executor executor;
710     BiApplyCompletion(CompletableFuture<? extends T> src,
711     CompletableFuture<? extends U> snd,
712     BiFun<? super T,? super U,? extends V> fn,
713     CompletableFuture<V> dst, Executor executor) {
714     this.src = src; this.snd = snd;
715     this.fn = fn; this.dst = dst;
716     this.executor = executor;
717     }
718     public final void run() {
719     final CompletableFuture<? extends T> a;
720     final CompletableFuture<? extends U> b;
721     final BiFun<? super T,? super U,? extends V> fn;
722     final CompletableFuture<V> dst;
723     Object r, s; T t; U u; Throwable ex;
724     if ((dst = this.dst) != null &&
725     (fn = this.fn) != null &&
726     (a = this.src) != null &&
727     (r = a.result) != null &&
728     (b = this.snd) != null &&
729     (s = b.result) != null &&
730     compareAndSet(0, 1)) {
731     if (r instanceof AltResult) {
732     ex = ((AltResult)r).ex;
733     t = null;
734     }
735     else {
736     ex = null;
737     @SuppressWarnings("unchecked") T tr = (T) r;
738     t = tr;
739     }
740     if (ex != null)
741     u = null;
742     else if (s instanceof AltResult) {
743     ex = ((AltResult)s).ex;
744     u = null;
745     }
746     else {
747     @SuppressWarnings("unchecked") U us = (U) s;
748     u = us;
749     }
750     Executor e = executor;
751     V v = null;
752     if (ex == null) {
753     try {
754     if (e != null)
755     e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
756     else
757     v = fn.apply(t, u);
758     } catch (Throwable rex) {
759     ex = rex;
760     }
761     }
762     if (e == null || ex != null)
763     dst.internalComplete(v, ex);
764     }
765     }
766     private static final long serialVersionUID = 5232453952276885070L;
767     }
768    
769     static final class BiAcceptCompletion<T,U> extends Completion {
770     final CompletableFuture<? extends T> src;
771     final CompletableFuture<? extends U> snd;
772     final BiAction<? super T,? super U> fn;
773     final CompletableFuture<Void> dst;
774     final Executor executor;
775     BiAcceptCompletion(CompletableFuture<? extends T> src,
776     CompletableFuture<? extends U> snd,
777     BiAction<? super T,? super U> fn,
778     CompletableFuture<Void> dst, Executor executor) {
779     this.src = src; this.snd = snd;
780     this.fn = fn; this.dst = dst;
781     this.executor = executor;
782     }
783     public final void run() {
784     final CompletableFuture<? extends T> a;
785     final CompletableFuture<? extends U> b;
786     final BiAction<? super T,? super U> fn;
787     final CompletableFuture<Void> dst;
788     Object r, s; T t; U u; Throwable ex;
789     if ((dst = this.dst) != null &&
790     (fn = this.fn) != null &&
791     (a = this.src) != null &&
792     (r = a.result) != null &&
793     (b = this.snd) != null &&
794     (s = b.result) != null &&
795     compareAndSet(0, 1)) {
796     if (r instanceof AltResult) {
797     ex = ((AltResult)r).ex;
798     t = null;
799     }
800     else {
801     ex = null;
802     @SuppressWarnings("unchecked") T tr = (T) r;
803     t = tr;
804     }
805     if (ex != null)
806     u = null;
807     else if (s instanceof AltResult) {
808     ex = ((AltResult)s).ex;
809     u = null;
810     }
811     else {
812     @SuppressWarnings("unchecked") U us = (U) s;
813     u = us;
814     }
815     Executor e = executor;
816     if (ex == null) {
817     try {
818     if (e != null)
819     e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
820     else
821     fn.accept(t, u);
822     } catch (Throwable rex) {
823     ex = rex;
824     }
825     }
826     if (e == null || ex != null)
827     dst.internalComplete(null, ex);
828     }
829     }
830     private static final long serialVersionUID = 5232453952276885070L;
831     }
832    
833     static final class BiRunCompletion<T> extends Completion {
834     final CompletableFuture<? extends T> src;
835     final CompletableFuture<?> snd;
836     final Runnable fn;
837     final CompletableFuture<Void> dst;
838     final Executor executor;
839     BiRunCompletion(CompletableFuture<? extends T> src,
840     CompletableFuture<?> snd,
841     Runnable fn,
842     CompletableFuture<Void> dst, Executor executor) {
843     this.src = src; this.snd = snd;
844     this.fn = fn; this.dst = dst;
845     this.executor = executor;
846     }
847     public final void run() {
848     final CompletableFuture<? extends T> a;
849     final CompletableFuture<?> b;
850     final Runnable fn;
851     final CompletableFuture<Void> dst;
852     Object r, s; Throwable ex;
853     if ((dst = this.dst) != null &&
854     (fn = this.fn) != null &&
855     (a = this.src) != null &&
856     (r = a.result) != null &&
857     (b = this.snd) != null &&
858     (s = b.result) != null &&
859     compareAndSet(0, 1)) {
860     if (r instanceof AltResult)
861     ex = ((AltResult)r).ex;
862     else
863     ex = null;
864     if (ex == null && (s instanceof AltResult))
865     ex = ((AltResult)s).ex;
866     Executor e = executor;
867     if (ex == null) {
868     try {
869     if (e != null)
870     e.execute(new AsyncRun(fn, dst));
871     else
872     fn.run();
873     } catch (Throwable rex) {
874     ex = rex;
875     }
876     }
877     if (e == null || ex != null)
878     dst.internalComplete(null, ex);
879     }
880     }
881     private static final long serialVersionUID = 5232453952276885070L;
882     }
883    
884     static final class OrApplyCompletion<T,U> extends Completion {
885     final CompletableFuture<? extends T> src;
886     final CompletableFuture<? extends T> snd;
887     final Fun<? super T,? extends U> fn;
888     final CompletableFuture<U> dst;
889     final Executor executor;
890     OrApplyCompletion(CompletableFuture<? extends T> src,
891     CompletableFuture<? extends T> snd,
892     Fun<? super T,? extends U> fn,
893     CompletableFuture<U> dst, Executor executor) {
894     this.src = src; this.snd = snd;
895     this.fn = fn; this.dst = dst;
896     this.executor = executor;
897     }
898     public final void run() {
899     final CompletableFuture<? extends T> a;
900     final CompletableFuture<? extends T> b;
901     final Fun<? super T,? extends U> fn;
902     final CompletableFuture<U> dst;
903     Object r; T t; Throwable ex;
904     if ((dst = this.dst) != null &&
905     (fn = this.fn) != null &&
906     (((a = this.src) != null && (r = a.result) != null) ||
907     ((b = this.snd) != null && (r = b.result) != null)) &&
908     compareAndSet(0, 1)) {
909     if (r instanceof AltResult) {
910     ex = ((AltResult)r).ex;
911     t = null;
912     }
913     else {
914     ex = null;
915     @SuppressWarnings("unchecked") T tr = (T) r;
916     t = tr;
917     }
918     Executor e = executor;
919     U u = null;
920     if (ex == null) {
921     try {
922     if (e != null)
923     e.execute(new AsyncApply<T,U>(t, fn, dst));
924     else
925     u = fn.apply(t);
926     } catch (Throwable rex) {
927     ex = rex;
928     }
929     }
930     if (e == null || ex != null)
931     dst.internalComplete(u, ex);
932     }
933     }
934     private static final long serialVersionUID = 5232453952276885070L;
935     }
936    
937     static final class OrAcceptCompletion<T> extends Completion {
938     final CompletableFuture<? extends T> src;
939     final CompletableFuture<? extends T> snd;
940     final Action<? super T> fn;
941     final CompletableFuture<Void> dst;
942     final Executor executor;
943     OrAcceptCompletion(CompletableFuture<? extends T> src,
944     CompletableFuture<? extends T> snd,
945     Action<? super T> fn,
946     CompletableFuture<Void> dst, Executor executor) {
947     this.src = src; this.snd = snd;
948     this.fn = fn; this.dst = dst;
949     this.executor = executor;
950     }
951     public final void run() {
952     final CompletableFuture<? extends T> a;
953     final CompletableFuture<? extends T> b;
954     final Action<? super T> fn;
955     final CompletableFuture<Void> dst;
956     Object r; T t; Throwable ex;
957     if ((dst = this.dst) != null &&
958     (fn = this.fn) != null &&
959     (((a = this.src) != null && (r = a.result) != null) ||
960     ((b = this.snd) != null && (r = b.result) != null)) &&
961     compareAndSet(0, 1)) {
962     if (r instanceof AltResult) {
963     ex = ((AltResult)r).ex;
964     t = null;
965     }
966     else {
967     ex = null;
968     @SuppressWarnings("unchecked") T tr = (T) r;
969     t = tr;
970     }
971     Executor e = executor;
972     if (ex == null) {
973     try {
974     if (e != null)
975     e.execute(new AsyncAccept<T>(t, fn, dst));
976     else
977     fn.accept(t);
978     } catch (Throwable rex) {
979     ex = rex;
980     }
981     }
982     if (e == null || ex != null)
983     dst.internalComplete(null, ex);
984     }
985     }
986     private static final long serialVersionUID = 5232453952276885070L;
987     }
988    
989     static final class OrRunCompletion<T> extends Completion {
990     final CompletableFuture<? extends T> src;
991     final CompletableFuture<?> snd;
992     final Runnable fn;
993     final CompletableFuture<Void> dst;
994     final Executor executor;
995     OrRunCompletion(CompletableFuture<? extends T> src,
996     CompletableFuture<?> snd,
997     Runnable fn,
998     CompletableFuture<Void> dst, Executor executor) {
999     this.src = src; this.snd = snd;
1000     this.fn = fn; this.dst = dst;
1001     this.executor = executor;
1002     }
1003     public final void run() {
1004     final CompletableFuture<? extends T> a;
1005     final CompletableFuture<?> b;
1006     final Runnable fn;
1007     final CompletableFuture<Void> dst;
1008     Object r; Throwable ex;
1009     if ((dst = this.dst) != null &&
1010     (fn = this.fn) != null &&
1011     (((a = this.src) != null && (r = a.result) != null) ||
1012     ((b = this.snd) != null && (r = b.result) != null)) &&
1013     compareAndSet(0, 1)) {
1014     if (r instanceof AltResult)
1015     ex = ((AltResult)r).ex;
1016     else
1017     ex = null;
1018     Executor e = executor;
1019     if (ex == null) {
1020     try {
1021     if (e != null)
1022     e.execute(new AsyncRun(fn, dst));
1023     else
1024     fn.run();
1025     } catch (Throwable rex) {
1026     ex = rex;
1027     }
1028     }
1029     if (e == null || ex != null)
1030     dst.internalComplete(null, ex);
1031     }
1032     }
1033     private static final long serialVersionUID = 5232453952276885070L;
1034     }
1035    
1036     static final class ExceptionCompletion<T> extends Completion {
1037     final CompletableFuture<? extends T> src;
1038     final Fun<? super Throwable, ? extends T> fn;
1039     final CompletableFuture<T> dst;
1040     ExceptionCompletion(CompletableFuture<? extends T> src,
1041     Fun<? super Throwable, ? extends T> fn,
1042     CompletableFuture<T> dst) {
1043     this.src = src; this.fn = fn; this.dst = dst;
1044     }
1045     public final void run() {
1046     final CompletableFuture<? extends T> a;
1047     final Fun<? super Throwable, ? extends T> fn;
1048     final CompletableFuture<T> dst;
1049     Object r; T t = null; Throwable ex, dx = null;
1050     if ((dst = this.dst) != null &&
1051     (fn = this.fn) != null &&
1052     (a = this.src) != null &&
1053     (r = a.result) != null &&
1054     compareAndSet(0, 1)) {
1055     if ((r instanceof AltResult) &&
1056     (ex = ((AltResult)r).ex) != null) {
1057     try {
1058     t = fn.apply(ex);
1059     } catch (Throwable rex) {
1060     dx = rex;
1061     }
1062     }
1063     else {
1064     @SuppressWarnings("unchecked") T tr = (T) r;
1065     t = tr;
1066     }
1067     dst.internalComplete(t, dx);
1068     }
1069     }
1070     private static final long serialVersionUID = 5232453952276885070L;
1071     }
1072    
1073     static final class ThenCopy<T> extends Completion {
1074     final CompletableFuture<? extends T> src;
1075     final CompletableFuture<T> dst;
1076     ThenCopy(CompletableFuture<? extends T> src,
1077     CompletableFuture<T> dst) {
1078     this.src = src; this.dst = dst;
1079     }
1080     public final void run() {
1081     final CompletableFuture<? extends T> a;
1082     final CompletableFuture<T> dst;
1083     Object r; Object t; Throwable ex;
1084     if ((dst = this.dst) != null &&
1085     (a = this.src) != null &&
1086     (r = a.result) != null &&
1087     compareAndSet(0, 1)) {
1088     if (r instanceof AltResult) {
1089     ex = ((AltResult)r).ex;
1090     t = null;
1091     }
1092     else {
1093     ex = null;
1094     t = r;
1095     }
1096     dst.internalComplete(t, ex);
1097     }
1098     }
1099     private static final long serialVersionUID = 5232453952276885070L;
1100     }
1101    
1102     static final class HandleCompletion<T,U> extends Completion {
1103     final CompletableFuture<? extends T> src;
1104     final BiFun<? super T, Throwable, ? extends U> fn;
1105     final CompletableFuture<U> dst;
1106     HandleCompletion(CompletableFuture<? extends T> src,
1107     BiFun<? super T, Throwable, ? extends U> fn,
1108     final CompletableFuture<U> dst) {
1109     this.src = src; this.fn = fn; this.dst = dst;
1110     }
1111     public final void run() {
1112     final CompletableFuture<? extends T> a;
1113     final BiFun<? super T, Throwable, ? extends U> fn;
1114     final CompletableFuture<U> dst;
1115     Object r; T t; Throwable ex;
1116     if ((dst = this.dst) != null &&
1117     (fn = this.fn) != null &&
1118     (a = this.src) != null &&
1119     (r = a.result) != null &&
1120     compareAndSet(0, 1)) {
1121     if (r instanceof AltResult) {
1122     ex = ((AltResult)r).ex;
1123     t = null;
1124     }
1125     else {
1126     ex = null;
1127     @SuppressWarnings("unchecked") T tr = (T) r;
1128     t = tr;
1129     }
1130     U u = null; Throwable dx = null;
1131     try {
1132     u = fn.apply(t, ex);
1133     } catch (Throwable rex) {
1134     dx = rex;
1135     }
1136     dst.internalComplete(u, dx);
1137     }
1138     }
1139     private static final long serialVersionUID = 5232453952276885070L;
1140     }
1141    
1142     static final class ComposeCompletion<T,U> extends Completion {
1143     final CompletableFuture<? extends T> src;
1144     final Fun<? super T, CompletableFuture<U>> fn;
1145     final CompletableFuture<U> dst;
1146     ComposeCompletion(CompletableFuture<? extends T> src,
1147     Fun<? super T, CompletableFuture<U>> fn,
1148     final CompletableFuture<U> dst) {
1149     this.src = src; this.fn = fn; this.dst = dst;
1150     }
1151     public final void run() {
1152     final CompletableFuture<? extends T> a;
1153     final Fun<? super T, CompletableFuture<U>> fn;
1154     final CompletableFuture<U> dst;
1155     Object r; T t; Throwable ex;
1156     if ((dst = this.dst) != null &&
1157     (fn = this.fn) != null &&
1158     (a = this.src) != null &&
1159     (r = a.result) != null &&
1160     compareAndSet(0, 1)) {
1161     if (r instanceof AltResult) {
1162     ex = ((AltResult)r).ex;
1163     t = null;
1164     }
1165     else {
1166     ex = null;
1167     @SuppressWarnings("unchecked") T tr = (T) r;
1168     t = tr;
1169     }
1170     CompletableFuture<U> c = null;
1171     U u = null;
1172     boolean complete = false;
1173     if (ex == null) {
1174     try {
1175     c = fn.apply(t);
1176     } catch (Throwable rex) {
1177     ex = rex;
1178     }
1179     }
1180     if (ex != null || c == null) {
1181     if (ex == null)
1182     ex = new NullPointerException();
1183     }
1184     else {
1185     ThenCopy<U> d = null;
1186     Object s;
1187     if ((s = c.result) == null) {
1188     CompletionNode p = new CompletionNode
1189     (d = new ThenCopy<U>(c, dst));
1190     while ((s = c.result) == null) {
1191     if (UNSAFE.compareAndSwapObject
1192     (c, COMPLETIONS, p.next = c.completions, p))
1193     break;
1194     }
1195     }
1196     if (s != null && (d == null || d.compareAndSet(0, 1))) {
1197     complete = true;
1198     if (s instanceof AltResult) {
1199     ex = ((AltResult)s).ex; // no rewrap
1200     u = null;
1201     }
1202     else {
1203     @SuppressWarnings("unchecked") U us = (U) s;
1204     u = us;
1205     }
1206     }
1207     }
1208     if (complete || ex != null)
1209     dst.internalComplete(u, ex);
1210     if (c != null)
1211     c.helpPostComplete();
1212     }
1213     }
1214     private static final long serialVersionUID = 5232453952276885070L;
1215     }
1216    
1217     // public methods
1218    
1219     /**
1220     * Creates a new incomplete CompletableFuture.
1221     */
1222     public CompletableFuture() {
1223     }
1224    
1225     /**
1226     * Asynchronously executes in the {@link
1227     * ForkJoinPool#commonPool()}, a task that completes the returned
1228     * CompletableFuture with the result of the given Supplier.
1229     *
1230     * @param supplier a function returning the value to be used
1231     * to complete the returned CompletableFuture
1232     * @return the CompletableFuture
1233     */
1234     public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier) {
1235     if (supplier == null) throw new NullPointerException();
1236     CompletableFuture<U> f = new CompletableFuture<U>();
1237     ForkJoinPool.commonPool().
1238     execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1239     return f;
1240     }
1241    
1242     /**
1243     * Asynchronously executes using the given executor, a task that
1244     * completes the returned CompletableFuture with the result of the
1245     * given Supplier.
1246     *
1247     * @param supplier a function returning the value to be used
1248     * to complete the returned CompletableFuture
1249     * @param executor the executor to use for asynchronous execution
1250     * @return the CompletableFuture
1251     */
1252     public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier,
1253     Executor executor) {
1254     if (executor == null || supplier == null)
1255     throw new NullPointerException();
1256     CompletableFuture<U> f = new CompletableFuture<U>();
1257     executor.execute(new AsyncSupply<U>(supplier, f));
1258     return f;
1259     }
1260    
1261     /**
1262     * Asynchronously executes in the {@link
1263     * ForkJoinPool#commonPool()} a task that runs the given action,
1264     * and then completes the returned CompletableFuture.
1265     *
1266     * @param runnable the action to run before completing the
1267     * returned CompletableFuture
1268     * @return the CompletableFuture
1269     */
1270     public static CompletableFuture<Void> runAsync(Runnable runnable) {
1271     if (runnable == null) throw new NullPointerException();
1272     CompletableFuture<Void> f = new CompletableFuture<Void>();
1273     ForkJoinPool.commonPool().
1274     execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1275     return f;
1276     }
1277    
1278     /**
1279     * Asynchronously executes using the given executor, a task that
1280     * runs the given action, and then completes the returned
1281     * CompletableFuture.
1282     *
1283     * @param runnable the action to run before completing the
1284     * returned CompletableFuture
1285     * @param executor the executor to use for asynchronous execution
1286     * @return the CompletableFuture
1287     */
1288     public static CompletableFuture<Void> runAsync(Runnable runnable,
1289     Executor executor) {
1290     if (executor == null || runnable == null)
1291     throw new NullPointerException();
1292     CompletableFuture<Void> f = new CompletableFuture<Void>();
1293     executor.execute(new AsyncRun(runnable, f));
1294     return f;
1295     }
1296    
1297     /**
1298     * Returns {@code true} if completed in any fashion: normally,
1299     * exceptionally, or via cancellation.
1300     *
1301     * @return {@code true} if completed
1302     */
1303     public boolean isDone() {
1304     return result != null;
1305     }
1306    
1307     /**
1308     * Waits if necessary for the computation to complete, and then
1309     * retrieves its result.
1310     *
1311     * @return the computed result
1312     * @throws CancellationException if the computation was cancelled
1313     * @throws ExecutionException if the computation threw an
1314     * exception
1315     * @throws InterruptedException if the current thread was interrupted
1316     * while waiting
1317     */
1318     public T get() throws InterruptedException, ExecutionException {
1319     Object r; Throwable ex, cause;
1320     if ((r = result) == null && (r = waitingGet(true)) == null)
1321     throw new InterruptedException();
1322     if (r instanceof AltResult) {
1323     if ((ex = ((AltResult)r).ex) != null) {
1324     if (ex instanceof CancellationException)
1325     throw (CancellationException)ex;
1326     if ((ex instanceof CompletionException) &&
1327     (cause = ex.getCause()) != null)
1328     ex = cause;
1329     throw new ExecutionException(ex);
1330     }
1331     return null;
1332     }
1333     @SuppressWarnings("unchecked") T tr = (T) r;
1334     return tr;
1335     }
1336    
1337     /**
1338     * Waits if necessary for at most the given time for completion,
1339     * and then retrieves its result, if available.
1340     *
1341     * @param timeout the maximum time to wait
1342     * @param unit the time unit of the timeout argument
1343     * @return the computed result
1344     * @throws CancellationException if the computation was cancelled
1345     * @throws ExecutionException if the computation threw an
1346     * exception
1347     * @throws InterruptedException if the current thread was interrupted
1348     * while waiting
1349     * @throws TimeoutException if the wait timed out
1350     */
1351     public T get(long timeout, TimeUnit unit)
1352     throws InterruptedException, ExecutionException, TimeoutException {
1353     Object r; Throwable ex, cause;
1354     long nanos = unit.toNanos(timeout);
1355     if (Thread.interrupted())
1356     throw new InterruptedException();
1357     if ((r = result) == null)
1358     r = timedAwaitDone(nanos);
1359     if (r instanceof AltResult) {
1360     if ((ex = ((AltResult)r).ex) != null) {
1361     if (ex instanceof CancellationException)
1362     throw (CancellationException)ex;
1363     if ((ex instanceof CompletionException) &&
1364     (cause = ex.getCause()) != null)
1365     ex = cause;
1366     throw new ExecutionException(ex);
1367     }
1368     return null;
1369     }
1370     @SuppressWarnings("unchecked") T tr = (T) r;
1371     return tr;
1372     }
1373    
1374     /**
1375     * Returns the result value when complete, or throws an
1376     * (unchecked) exception if completed exceptionally. To better
1377     * conform with the use of common functional forms, if a
1378     * computation involved in the completion of this
1379     * CompletableFuture threw an exception, this method throws an
1380     * (unchecked) {@link CompletionException} with the underlying
1381     * exception as its cause.
1382     *
1383     * @return the result value
1384     * @throws CancellationException if the computation was cancelled
1385     * @throws CompletionException if a completion computation threw
1386     * an exception
1387     */
1388     public T join() {
1389     Object r; Throwable ex;
1390     if ((r = result) == null)
1391     r = waitingGet(false);
1392     if (r instanceof AltResult) {
1393     if ((ex = ((AltResult)r).ex) != null) {
1394     if (ex instanceof CancellationException)
1395     throw (CancellationException)ex;
1396     if (ex instanceof CompletionException)
1397     throw (CompletionException)ex;
1398     throw new CompletionException(ex);
1399     }
1400     return null;
1401     }
1402     @SuppressWarnings("unchecked") T tr = (T) r;
1403     return tr;
1404     }
1405    
1406     /**
1407     * Returns the result value (or throws any encountered exception)
1408     * if completed, else returns the given valueIfAbsent.
1409     *
1410     * @param valueIfAbsent the value to return if not completed
1411     * @return the result value, if completed, else the given valueIfAbsent
1412     * @throws CancellationException if the computation was cancelled
1413     * @throws CompletionException if a completion computation threw
1414     * an exception
1415     */
1416     public T getNow(T valueIfAbsent) {
1417     Object r; Throwable ex;
1418     if ((r = result) == null)
1419     return valueIfAbsent;
1420     if (r instanceof AltResult) {
1421     if ((ex = ((AltResult)r).ex) != null) {
1422     if (ex instanceof CancellationException)
1423     throw (CancellationException)ex;
1424     if (ex instanceof CompletionException)
1425     throw (CompletionException)ex;
1426     throw new CompletionException(ex);
1427     }
1428     return null;
1429     }
1430     @SuppressWarnings("unchecked") T tr = (T) r;
1431     return tr;
1432     }
1433    
1434     /**
1435     * If not already completed, sets the value returned by {@link
1436     * #get()} and related methods to the given value.
1437     *
1438     * @param value the result value
1439     * @return {@code true} if this invocation caused this CompletableFuture
1440     * to transition to a completed state, else {@code false}
1441     */
1442     public boolean complete(T value) {
1443     boolean triggered = result == null &&
1444     UNSAFE.compareAndSwapObject(this, RESULT, null,
1445     value == null ? NIL : value);
1446     postComplete();
1447     return triggered;
1448     }
1449    
1450     /**
1451     * If not already completed, causes invocations of {@link #get()}
1452     * and related methods to throw the given exception.
1453     *
1454     * @param ex the exception
1455     * @return {@code true} if this invocation caused this CompletableFuture
1456     * to transition to a completed state, else {@code false}
1457     */
1458     public boolean completeExceptionally(Throwable ex) {
1459     if (ex == null) throw new NullPointerException();
1460     boolean triggered = result == null &&
1461     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1462     postComplete();
1463     return triggered;
1464     }
1465    
1466     /**
1467     * Creates and returns a CompletableFuture that is completed with
1468     * the result of the given function of this CompletableFuture.
1469     * If this CompletableFuture completes exceptionally,
1470     * then the returned CompletableFuture also does so,
1471     * with a CompletionException holding this exception as
1472     * its cause.
1473     *
1474     * @param fn the function to use to compute the value of
1475     * the returned CompletableFuture
1476     * @return the new CompletableFuture
1477     */
1478     public <U> CompletableFuture<U> thenApply(Fun<? super T,? extends U> fn) {
1479     return doThenApply(fn, null);
1480     }
1481    
1482     /**
1483     * Creates and returns a CompletableFuture that is asynchronously
1484     * completed using the {@link ForkJoinPool#commonPool()} with the
1485     * result of the given function of this CompletableFuture. If
1486     * this CompletableFuture completes exceptionally, then the
1487     * returned CompletableFuture also does so, with a
1488     * CompletionException holding this exception as its cause.
1489     *
1490     * @param fn the function to use to compute the value of
1491     * the returned CompletableFuture
1492     * @return the new CompletableFuture
1493     */
1494     public <U> CompletableFuture<U> thenApplyAsync(Fun<? super T,? extends U> fn) {
1495     return doThenApply(fn, ForkJoinPool.commonPool());
1496     }
1497    
1498     /**
1499     * Creates and returns a CompletableFuture that is asynchronously
1500     * completed using the given executor with the result of the given
1501     * function of this CompletableFuture. If this CompletableFuture
1502     * completes exceptionally, then the returned CompletableFuture
1503     * also does so, with a CompletionException holding this exception as
1504     * its cause.
1505     *
1506     * @param fn the function to use to compute the value of
1507     * the returned CompletableFuture
1508     * @param executor the executor to use for asynchronous execution
1509     * @return the new CompletableFuture
1510     */
1511     public <U> CompletableFuture<U> thenApplyAsync(Fun<? super T,? extends U> fn,
1512     Executor executor) {
1513     if (executor == null) throw new NullPointerException();
1514     return doThenApply(fn, executor);
1515     }
1516    
1517     private <U> CompletableFuture<U> doThenApply(Fun<? super T,? extends U> fn,
1518     Executor e) {
1519     if (fn == null) throw new NullPointerException();
1520     CompletableFuture<U> dst = new CompletableFuture<U>();
1521     ApplyCompletion<T,U> d = null;
1522     Object r;
1523     if ((r = result) == null) {
1524     CompletionNode p = new CompletionNode
1525     (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1526     while ((r = result) == null) {
1527     if (UNSAFE.compareAndSwapObject
1528     (this, COMPLETIONS, p.next = completions, p))
1529     break;
1530     }
1531     }
1532     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1533     T t; Throwable ex;
1534     if (r instanceof AltResult) {
1535     ex = ((AltResult)r).ex;
1536     t = null;
1537     }
1538     else {
1539     ex = null;
1540     @SuppressWarnings("unchecked") T tr = (T) r;
1541     t = tr;
1542     }
1543     U u = null;
1544     if (ex == null) {
1545     try {
1546     if (e != null)
1547     e.execute(new AsyncApply<T,U>(t, fn, dst));
1548     else
1549     u = fn.apply(t);
1550     } catch (Throwable rex) {
1551     ex = rex;
1552     }
1553     }
1554     if (e == null || ex != null)
1555     dst.internalComplete(u, ex);
1556     }
1557     helpPostComplete();
1558     return dst;
1559     }
1560    
1561     /**
1562     * Creates and returns a CompletableFuture that is completed after
1563     * performing the given action with this CompletableFuture's
1564     * result when it completes. If this CompletableFuture
1565     * completes exceptionally, then the returned CompletableFuture
1566     * also does so, with a CompletionException holding this exception as
1567     * its cause.
1568     *
1569     * @param block the action to perform before completing the
1570     * returned CompletableFuture
1571     * @return the new CompletableFuture
1572     */
1573     public CompletableFuture<Void> thenAccept(Action<? super T> block) {
1574     return doThenAccept(block, null);
1575     }
1576    
1577     /**
1578     * Creates and returns a CompletableFuture that is asynchronously
1579     * completed using the {@link ForkJoinPool#commonPool()} with this
1580     * CompletableFuture's result when it completes. If this
1581     * CompletableFuture completes exceptionally, then the returned
1582     * CompletableFuture also does so, with a CompletionException holding
1583     * this exception as its cause.
1584     *
1585     * @param block the action to perform before completing the
1586     * returned CompletableFuture
1587     * @return the new CompletableFuture
1588     */
1589     public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block) {
1590     return doThenAccept(block, ForkJoinPool.commonPool());
1591     }
1592    
1593     /**
1594     * Creates and returns a CompletableFuture that is asynchronously
1595     * completed using the given executor with this
1596     * CompletableFuture's result when it completes. If this
1597     * CompletableFuture completes exceptionally, then the returned
1598     * CompletableFuture also does so, with a CompletionException holding
1599     * this exception as its cause.
1600     *
1601     * @param block the action to perform before completing the
1602     * returned CompletableFuture
1603     * @param executor the executor to use for asynchronous execution
1604     * @return the new CompletableFuture
1605     */
1606     public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block,
1607     Executor executor) {
1608     if (executor == null) throw new NullPointerException();
1609     return doThenAccept(block, executor);
1610     }
1611    
1612     private CompletableFuture<Void> doThenAccept(Action<? super T> fn,
1613     Executor e) {
1614     if (fn == null) throw new NullPointerException();
1615     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1616     AcceptCompletion<T> d = null;
1617     Object r;
1618     if ((r = result) == null) {
1619     CompletionNode p = new CompletionNode
1620     (d = new AcceptCompletion<T>(this, fn, dst, e));
1621     while ((r = result) == null) {
1622     if (UNSAFE.compareAndSwapObject
1623     (this, COMPLETIONS, p.next = completions, p))
1624     break;
1625     }
1626     }
1627     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1628     T t; Throwable ex;
1629     if (r instanceof AltResult) {
1630     ex = ((AltResult)r).ex;
1631     t = null;
1632     }
1633     else {
1634     ex = null;
1635     @SuppressWarnings("unchecked") T tr = (T) r;
1636     t = tr;
1637     }
1638     if (ex == null) {
1639     try {
1640     if (e != null)
1641     e.execute(new AsyncAccept<T>(t, fn, dst));
1642     else
1643     fn.accept(t);
1644     } catch (Throwable rex) {
1645     ex = rex;
1646     }
1647     }
1648     if (e == null || ex != null)
1649     dst.internalComplete(null, ex);
1650     }
1651     helpPostComplete();
1652     return dst;
1653     }
1654    
1655     /**
1656     * Creates and returns a CompletableFuture that is completed after
1657     * performing the given action when this CompletableFuture
1658     * completes. If this CompletableFuture completes exceptionally,
1659     * then the returned CompletableFuture also does so, with a
1660     * CompletionException holding this exception as its cause.
1661     *
1662     * @param action the action to perform before completing the
1663     * returned CompletableFuture
1664     * @return the new CompletableFuture
1665     */
1666     public CompletableFuture<Void> thenRun(Runnable action) {
1667     return doThenRun(action, null);
1668     }
1669    
1670     /**
1671     * Creates and returns a CompletableFuture that is asynchronously
1672     * completed using the {@link ForkJoinPool#commonPool()} after
1673     * performing the given action when this CompletableFuture
1674     * completes. If this CompletableFuture completes exceptionally,
1675     * then the returned CompletableFuture also does so, with a
1676     * CompletionException holding this exception as its cause.
1677     *
1678     * @param action the action to perform before completing the
1679     * returned CompletableFuture
1680     * @return the new CompletableFuture
1681     */
1682     public CompletableFuture<Void> thenRunAsync(Runnable action) {
1683     return doThenRun(action, ForkJoinPool.commonPool());
1684     }
1685    
1686     /**
1687     * Creates and returns a CompletableFuture that is asynchronously
1688     * completed using the given executor after performing the given
1689     * action when this CompletableFuture completes. If this
1690     * CompletableFuture completes exceptionally, then the returned
1691     * CompletableFuture also does so, with a CompletionException holding
1692     * this exception as its cause.
1693     *
1694     * @param action the action to perform before completing the
1695     * returned CompletableFuture
1696     * @param executor the executor to use for asynchronous execution
1697     * @return the new CompletableFuture
1698     */
1699     public CompletableFuture<Void> thenRunAsync(Runnable action,
1700     Executor executor) {
1701     if (executor == null) throw new NullPointerException();
1702     return doThenRun(action, executor);
1703     }
1704    
1705     private CompletableFuture<Void> doThenRun(Runnable action,
1706     Executor e) {
1707     if (action == null) throw new NullPointerException();
1708     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1709     RunCompletion<T> d = null;
1710     Object r;
1711     if ((r = result) == null) {
1712     CompletionNode p = new CompletionNode
1713     (d = new RunCompletion<T>(this, action, dst, e));
1714     while ((r = result) == null) {
1715     if (UNSAFE.compareAndSwapObject
1716     (this, COMPLETIONS, p.next = completions, p))
1717     break;
1718     }
1719     }
1720     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1721     Throwable ex;
1722     if (r instanceof AltResult)
1723     ex = ((AltResult)r).ex;
1724     else
1725     ex = null;
1726     if (ex == null) {
1727     try {
1728     if (e != null)
1729     e.execute(new AsyncRun(action, dst));
1730     else
1731     action.run();
1732     } catch (Throwable rex) {
1733     ex = rex;
1734     }
1735     }
1736     if (e == null || ex != null)
1737     dst.internalComplete(null, ex);
1738     }
1739     helpPostComplete();
1740     return dst;
1741     }
1742    
1743     /**
1744     * Creates and returns a CompletableFuture that is completed with
1745     * the result of the given function of this and the other given
1746     * CompletableFuture's results when both complete. If this or
1747     * the other CompletableFuture complete exceptionally, then the
1748     * returned CompletableFuture also does so, with a
1749     * CompletionException holding the exception as its cause.
1750     *
1751     * @param other the other CompletableFuture
1752     * @param fn the function to use to compute the value of
1753     * the returned CompletableFuture
1754     * @return the new CompletableFuture
1755     */
1756     public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1757     BiFun<? super T,? super U,? extends V> fn) {
1758     return doThenBiApply(other, fn, null);
1759     }
1760    
1761     /**
1762     * Creates and returns a CompletableFuture that is asynchronously
1763     * completed using the {@link ForkJoinPool#commonPool()} with
1764     * the result of the given function of this and the other given
1765     * CompletableFuture's results when both complete. If this or
1766     * the other CompletableFuture complete exceptionally, then the
1767     * returned CompletableFuture also does so, with a
1768     * CompletionException holding the exception as its cause.
1769     *
1770     * @param other the other CompletableFuture
1771     * @param fn the function to use to compute the value of
1772     * the returned CompletableFuture
1773     * @return the new CompletableFuture
1774     */
1775     public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1776     BiFun<? super T,? super U,? extends V> fn) {
1777     return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1778     }
1779    
1780     /**
1781     * Creates and returns a CompletableFuture that is
1782     * asynchronously completed using the given executor with the
1783     * result of the given function of this and the other given
1784     * CompletableFuture's results when both complete. If this or
1785     * the other CompletableFuture complete exceptionally, then the
1786     * returned CompletableFuture also does so, with a
1787     * CompletionException holding the exception as its cause.
1788     *
1789     * @param other the other CompletableFuture
1790     * @param fn the function to use to compute the value of
1791     * the returned CompletableFuture
1792     * @param executor the executor to use for asynchronous execution
1793     * @return the new CompletableFuture
1794     */
1795    
1796     public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1797     BiFun<? super T,? super U,? extends V> fn,
1798     Executor executor) {
1799     if (executor == null) throw new NullPointerException();
1800     return doThenBiApply(other, fn, executor);
1801     }
1802    
1803     private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1804     BiFun<? super T,? super U,? extends V> fn,
1805     Executor e) {
1806     if (other == null || fn == null) throw new NullPointerException();
1807     CompletableFuture<V> dst = new CompletableFuture<V>();
1808     BiApplyCompletion<T,U,V> d = null;
1809     Object r, s = null;
1810     if ((r = result) == null || (s = other.result) == null) {
1811     d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1812     CompletionNode q = null, p = new CompletionNode(d);
1813     while ((r == null && (r = result) == null) ||
1814     (s == null && (s = other.result) == null)) {
1815     if (q != null) {
1816     if (s != null ||
1817     UNSAFE.compareAndSwapObject
1818     (other, COMPLETIONS, q.next = other.completions, q))
1819     break;
1820     }
1821     else if (r != null ||
1822     UNSAFE.compareAndSwapObject
1823     (this, COMPLETIONS, p.next = completions, p)) {
1824     if (s != null)
1825     break;
1826     q = new CompletionNode(d);
1827     }
1828     }
1829     }
1830     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1831     T t; U u; Throwable ex;
1832     if (r instanceof AltResult) {
1833     ex = ((AltResult)r).ex;
1834     t = null;
1835     }
1836     else {
1837     ex = null;
1838     @SuppressWarnings("unchecked") T tr = (T) r;
1839     t = tr;
1840     }
1841     if (ex != null)
1842     u = null;
1843     else if (s instanceof AltResult) {
1844     ex = ((AltResult)s).ex;
1845     u = null;
1846     }
1847     else {
1848     @SuppressWarnings("unchecked") U us = (U) s;
1849     u = us;
1850     }
1851     V v = null;
1852     if (ex == null) {
1853     try {
1854     if (e != null)
1855     e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1856     else
1857     v = fn.apply(t, u);
1858     } catch (Throwable rex) {
1859     ex = rex;
1860     }
1861     }
1862     if (e == null || ex != null)
1863     dst.internalComplete(v, ex);
1864     }
1865     helpPostComplete();
1866     other.helpPostComplete();
1867     return dst;
1868     }
1869    
1870     /**
1871     * Creates and returns a CompletableFuture that is completed with
1872     * the results of this and the other given CompletableFuture if
1873     * both complete. If this and/or the other CompletableFuture
1874     * complete exceptionally, then the returned CompletableFuture
1875     * also does so, with a CompletionException holding one of these
1876     * exceptions as its cause.
1877     *
1878     * @param other the other CompletableFuture
1879     * @param block the action to perform before completing the
1880     * returned CompletableFuture
1881     * @return the new CompletableFuture
1882     */
1883     public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1884     BiAction<? super T, ? super U> block) {
1885     return doThenBiAccept(other, block, null);
1886     }
1887    
1888     /**
1889     * Creates and returns a CompletableFuture that is completed
1890     * asynchronously using the {@link ForkJoinPool#commonPool()} with
1891     * the results of this and the other given CompletableFuture when
1892     * both complete. If this and/or the other CompletableFuture
1893     * complete exceptionally, then the returned CompletableFuture
1894     * also does so, with a CompletionException holding one of these
1895     * exceptions as its cause.
1896     *
1897     * @param other the other CompletableFuture
1898     * @param block the action to perform before completing the
1899     * returned CompletableFuture
1900     * @return the new CompletableFuture
1901     */
1902     public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1903     BiAction<? super T, ? super U> block) {
1904     return doThenBiAccept(other, block, ForkJoinPool.commonPool());
1905     }
1906    
1907     /**
1908     * Creates and returns a CompletableFuture that is completed
1909     * asynchronously using the given executor with the results of
1910     * this and the other given CompletableFuture when both complete.
1911     * If this and/or the other CompletableFuture complete
1912     * exceptionally, then the returned CompletableFuture also does
1913     * so, with a CompletionException holding one of these exceptions as
1914     * its cause.
1915     *
1916     * @param other the other CompletableFuture
1917     * @param block the action to perform before completing the
1918     * returned CompletableFuture
1919     * @param executor the executor to use for asynchronous execution
1920     * @return the new CompletableFuture
1921     */
1922     public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1923     BiAction<? super T, ? super U> block,
1924     Executor executor) {
1925     if (executor == null) throw new NullPointerException();
1926     return doThenBiAccept(other, block, executor);
1927     }
1928    
1929     private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
1930     BiAction<? super T,? super U> fn,
1931     Executor e) {
1932     if (other == null || fn == null) throw new NullPointerException();
1933     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1934     BiAcceptCompletion<T,U> d = null;
1935     Object r, s = null;
1936     if ((r = result) == null || (s = other.result) == null) {
1937     d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
1938     CompletionNode q = null, p = new CompletionNode(d);
1939     while ((r == null && (r = result) == null) ||
1940     (s == null && (s = other.result) == null)) {
1941     if (q != null) {
1942     if (s != null ||
1943     UNSAFE.compareAndSwapObject
1944     (other, COMPLETIONS, q.next = other.completions, q))
1945     break;
1946     }
1947     else if (r != null ||
1948     UNSAFE.compareAndSwapObject
1949     (this, COMPLETIONS, p.next = completions, p)) {
1950     if (s != null)
1951     break;
1952     q = new CompletionNode(d);
1953     }
1954     }
1955     }
1956     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1957     T t; U u; Throwable ex;
1958     if (r instanceof AltResult) {
1959     ex = ((AltResult)r).ex;
1960     t = null;
1961     }
1962     else {
1963     ex = null;
1964     @SuppressWarnings("unchecked") T tr = (T) r;
1965     t = tr;
1966     }
1967     if (ex != null)
1968     u = null;
1969     else if (s instanceof AltResult) {
1970     ex = ((AltResult)s).ex;
1971     u = null;
1972     }
1973     else {
1974     @SuppressWarnings("unchecked") U us = (U) s;
1975     u = us;
1976     }
1977     if (ex == null) {
1978     try {
1979     if (e != null)
1980     e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
1981     else
1982     fn.accept(t, u);
1983     } catch (Throwable rex) {
1984     ex = rex;
1985     }
1986     }
1987     if (e == null || ex != null)
1988     dst.internalComplete(null, ex);
1989     }
1990     helpPostComplete();
1991     other.helpPostComplete();
1992     return dst;
1993     }
1994    
1995     /**
1996     * Creates and returns a CompletableFuture that is completed
1997     * when this and the other given CompletableFuture both
1998     * complete. If this and/or the other CompletableFuture complete
1999     * exceptionally, then the returned CompletableFuture also does
2000     * so, with a CompletionException holding one of these exceptions as
2001     * its cause.
2002     *
2003     * @param other the other CompletableFuture
2004     * @param action the action to perform before completing the
2005     * returned CompletableFuture
2006     * @return the new CompletableFuture
2007     */
2008     public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2009     Runnable action) {
2010     return doThenBiRun(other, action, null);
2011     }
2012    
2013     /**
2014     * Creates and returns a CompletableFuture that is completed
2015     * asynchronously using the {@link ForkJoinPool#commonPool()}
2016     * when this and the other given CompletableFuture both
2017     * complete. If this and/or the other CompletableFuture complete
2018     * exceptionally, then the returned CompletableFuture also does
2019     * so, with a CompletionException holding one of these exceptions as
2020     * its cause.
2021     *
2022     * @param other the other CompletableFuture
2023     * @param action the action to perform before completing the
2024     * returned CompletableFuture
2025     * @return the new CompletableFuture
2026     */
2027     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2028     Runnable action) {
2029     return doThenBiRun(other, action, ForkJoinPool.commonPool());
2030     }
2031    
2032     /**
2033     * Creates and returns a CompletableFuture that is completed
2034     * asynchronously using the given executor
2035     * when this and the other given CompletableFuture both
2036     * complete. If this and/or the other CompletableFuture complete
2037     * exceptionally, then the returned CompletableFuture also does
2038     * so, with a CompletionException holding one of these exceptions as
2039     * its cause.
2040     *
2041     * @param other the other CompletableFuture
2042     * @param action the action to perform before completing the
2043     * returned CompletableFuture
2044     * @param executor the executor to use for asynchronous execution
2045     * @return the new CompletableFuture
2046     */
2047     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2048     Runnable action,
2049     Executor executor) {
2050     if (executor == null) throw new NullPointerException();
2051     return doThenBiRun(other, action, executor);
2052     }
2053    
2054     private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2055     Runnable action,
2056     Executor e) {
2057     if (other == null || action == null) throw new NullPointerException();
2058     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2059     BiRunCompletion<T> d = null;
2060     Object r, s = null;
2061     if ((r = result) == null || (s = other.result) == null) {
2062     d = new BiRunCompletion<T>(this, other, action, dst, e);
2063     CompletionNode q = null, p = new CompletionNode(d);
2064     while ((r == null && (r = result) == null) ||
2065     (s == null && (s = other.result) == null)) {
2066     if (q != null) {
2067     if (s != null ||
2068     UNSAFE.compareAndSwapObject
2069     (other, COMPLETIONS, q.next = other.completions, q))
2070     break;
2071     }
2072     else if (r != null ||
2073     UNSAFE.compareAndSwapObject
2074     (this, COMPLETIONS, p.next = completions, p)) {
2075     if (s != null)
2076     break;
2077     q = new CompletionNode(d);
2078     }
2079     }
2080     }
2081     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2082     Throwable ex;
2083     if (r instanceof AltResult)
2084     ex = ((AltResult)r).ex;
2085     else
2086     ex = null;
2087     if (ex == null && (s instanceof AltResult))
2088     ex = ((AltResult)s).ex;
2089     if (ex == null) {
2090     try {
2091     if (e != null)
2092     e.execute(new AsyncRun(action, dst));
2093     else
2094     action.run();
2095     } catch (Throwable rex) {
2096     ex = rex;
2097     }
2098     }
2099     if (e == null || ex != null)
2100     dst.internalComplete(null, ex);
2101     }
2102     helpPostComplete();
2103     other.helpPostComplete();
2104     return dst;
2105     }
2106    
2107     /**
2108     * Creates and returns a CompletableFuture that is completed with
2109     * the result of the given function of either this or the other
2110     * given CompletableFuture's results when either complete. If
2111     * this and/or the other CompletableFuture complete exceptionally,
2112     * then the returned CompletableFuture may also do so, with a
2113     * CompletionException holding one of these exceptions as its cause.
2114     * No guarantees are made about which result or exception is used
2115     * in the returned CompletableFuture.
2116     *
2117     * @param other the other CompletableFuture
2118     * @param fn the function to use to compute the value of
2119     * the returned CompletableFuture
2120     * @return the new CompletableFuture
2121     */
2122     public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2123     Fun<? super T, U> fn) {
2124     return doOrApply(other, fn, null);
2125     }
2126    
2127     /**
2128     * Creates and returns a CompletableFuture that is completed
2129     * asynchronously using the {@link ForkJoinPool#commonPool()} with
2130     * the result of the given function of either this or the other
2131     * given CompletableFuture's results when either complete. If
2132     * this and/or the other CompletableFuture complete exceptionally,
2133     * then the returned CompletableFuture may also do so, with a
2134     * CompletionException holding one of these exceptions as its cause.
2135     * No guarantees are made about which result or exception is used
2136     * in the returned CompletableFuture.
2137     *
2138     * @param other the other CompletableFuture
2139     * @param fn the function to use to compute the value of
2140     * the returned CompletableFuture
2141     * @return the new CompletableFuture
2142     */
2143     public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2144     Fun<? super T, U> fn) {
2145     return doOrApply(other, fn, ForkJoinPool.commonPool());
2146     }
2147    
2148     /**
2149     * Creates and returns a CompletableFuture that is completed
2150     * asynchronously using the given executor with the result of the
2151     * given function of either this or the other given
2152     * CompletableFuture's results when either complete. If this
2153     * and/or the other CompletableFuture complete exceptionally, then
2154     * the returned CompletableFuture may also do so, with a
2155     * CompletionException holding one of these exceptions as its cause.
2156     * No guarantees are made about which result or exception is used
2157     * in the returned CompletableFuture.
2158     *
2159     * @param other the other CompletableFuture
2160     * @param fn the function to use to compute the value of
2161     * the returned CompletableFuture
2162     * @param executor the executor to use for asynchronous execution
2163     * @return the new CompletableFuture
2164     */
2165     public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2166     Fun<? super T, U> fn,
2167     Executor executor) {
2168     if (executor == null) throw new NullPointerException();
2169     return doOrApply(other, fn, executor);
2170     }
2171    
2172     private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2173     Fun<? super T, U> fn,
2174     Executor e) {
2175     if (other == null || fn == null) throw new NullPointerException();
2176     CompletableFuture<U> dst = new CompletableFuture<U>();
2177     OrApplyCompletion<T,U> d = null;
2178     Object r;
2179     if ((r = result) == null && (r = other.result) == null) {
2180     d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2181     CompletionNode q = null, p = new CompletionNode(d);
2182     while ((r = result) == null && (r = other.result) == null) {
2183     if (q != null) {
2184     if (UNSAFE.compareAndSwapObject
2185     (other, COMPLETIONS, q.next = other.completions, q))
2186     break;
2187     }
2188     else if (UNSAFE.compareAndSwapObject
2189     (this, COMPLETIONS, p.next = completions, p))
2190     q = new CompletionNode(d);
2191     }
2192     }
2193     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2194     T t; Throwable ex;
2195     if (r instanceof AltResult) {
2196     ex = ((AltResult)r).ex;
2197     t = null;
2198     }
2199     else {
2200     ex = null;
2201     @SuppressWarnings("unchecked") T tr = (T) r;
2202     t = tr;
2203     }
2204     U u = null;
2205     if (ex == null) {
2206     try {
2207     if (e != null)
2208     e.execute(new AsyncApply<T,U>(t, fn, dst));
2209     else
2210     u = fn.apply(t);
2211     } catch (Throwable rex) {
2212     ex = rex;
2213     }
2214     }
2215     if (e == null || ex != null)
2216     dst.internalComplete(u, ex);
2217     }
2218     helpPostComplete();
2219     other.helpPostComplete();
2220     return dst;
2221     }
2222    
2223     /**
2224     * Creates and returns a CompletableFuture that is completed after
2225     * performing the given action with the result of either this or the
2226     * other given CompletableFuture's result, when either complete.
2227     * If this and/or the other CompletableFuture complete
2228     * exceptionally, then the returned CompletableFuture may also do
2229     * so, with a CompletionException holding one of these exceptions as
2230     * its cause. No guarantees are made about which exception is
2231     * used in the returned CompletableFuture.
2232     *
2233     * @param other the other CompletableFuture
2234     * @param block the action to perform before completing the
2235     * returned CompletableFuture
2236     * @return the new CompletableFuture
2237     */
2238     public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2239     Action<? super T> block) {
2240     return doOrAccept(other, block, null);
2241     }
2242    
2243     /**
2244     * Creates and returns a CompletableFuture that is completed
2245     * asynchronously using the {@link ForkJoinPool#commonPool()},
2246     * performing the given action with the result of either this or
2247     * the other given CompletableFuture's result, when either
2248     * complete. If this and/or the other CompletableFuture complete
2249     * exceptionally, then the returned CompletableFuture may also do
2250     * so, with a CompletionException holding one of these exceptions as
2251     * its cause. No guarantees are made about which exception is
2252     * used in the returned CompletableFuture.
2253     *
2254     * @param other the other CompletableFuture
2255     * @param block the action to perform before completing the
2256     * returned CompletableFuture
2257     * @return the new CompletableFuture
2258     */
2259     public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2260     Action<? super T> block) {
2261     return doOrAccept(other, block, ForkJoinPool.commonPool());
2262     }
2263    
2264     /**
2265     * Creates and returns a CompletableFuture that is completed
2266     * asynchronously using the given executor,
2267     * performing the given action with the result of either this or
2268     * the other given CompletableFuture's result, when either
2269     * complete. If this and/or the other CompletableFuture complete
2270     * exceptionally, then the returned CompletableFuture may also do
2271     * so, with a CompletionException holding one of these exceptions as
2272     * its cause. No guarantees are made about which exception is
2273     * used in the returned CompletableFuture.
2274     *
2275     * @param other the other CompletableFuture
2276     * @param block the action to perform before completing the
2277     * returned CompletableFuture
2278     * @param executor the executor to use for asynchronous execution
2279     * @return the new CompletableFuture
2280     */
2281     public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2282     Action<? super T> block,
2283     Executor executor) {
2284     if (executor == null) throw new NullPointerException();
2285     return doOrAccept(other, block, executor);
2286     }
2287    
2288     private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2289     Action<? super T> fn,
2290     Executor e) {
2291     if (other == null || fn == null) throw new NullPointerException();
2292     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2293     OrAcceptCompletion<T> d = null;
2294     Object r;
2295     if ((r = result) == null && (r = other.result) == null) {
2296     d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2297     CompletionNode q = null, p = new CompletionNode(d);
2298     while ((r = result) == null && (r = other.result) == null) {
2299     if (q != null) {
2300     if (UNSAFE.compareAndSwapObject
2301     (other, COMPLETIONS, q.next = other.completions, q))
2302     break;
2303     }
2304     else if (UNSAFE.compareAndSwapObject
2305     (this, COMPLETIONS, p.next = completions, p))
2306     q = new CompletionNode(d);
2307     }
2308     }
2309     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2310     T t; Throwable ex;
2311     if (r instanceof AltResult) {
2312     ex = ((AltResult)r).ex;
2313     t = null;
2314     }
2315     else {
2316     ex = null;
2317     @SuppressWarnings("unchecked") T tr = (T) r;
2318     t = tr;
2319     }
2320     if (ex == null) {
2321     try {
2322     if (e != null)
2323     e.execute(new AsyncAccept<T>(t, fn, dst));
2324     else
2325     fn.accept(t);
2326     } catch (Throwable rex) {
2327     ex = rex;
2328     }
2329     }
2330     if (e == null || ex != null)
2331     dst.internalComplete(null, ex);
2332     }
2333     helpPostComplete();
2334     other.helpPostComplete();
2335     return dst;
2336     }
2337    
2338     /**
2339     * Creates and returns a CompletableFuture that is completed
2340     * after this or the other given CompletableFuture complete. If
2341     * this and/or the other CompletableFuture complete exceptionally,
2342     * then the returned CompletableFuture may also do so, with a
2343     * CompletionException holding one of these exceptions as its cause.
2344     * No guarantees are made about which exception is used in the
2345     * returned CompletableFuture.
2346     *
2347     * @param other the other CompletableFuture
2348     * @param action the action to perform before completing the
2349     * returned CompletableFuture
2350     * @return the new CompletableFuture
2351     */
2352     public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2353     Runnable action) {
2354     return doOrRun(other, action, null);
2355     }
2356    
2357     /**
2358     * Creates and returns a CompletableFuture that is completed
2359     * asynchronously using the {@link ForkJoinPool#commonPool()}
2360     * after this or the other given CompletableFuture complete. If
2361     * this and/or the other CompletableFuture complete exceptionally,
2362     * then the returned CompletableFuture may also do so, with a
2363     * CompletionException holding one of these exceptions as its cause.
2364     * No guarantees are made about which exception is used in the
2365     * returned CompletableFuture.
2366     *
2367     * @param other the other CompletableFuture
2368     * @param action the action to perform before completing the
2369     * returned CompletableFuture
2370     * @return the new CompletableFuture
2371     */
2372     public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2373     Runnable action) {
2374     return doOrRun(other, action, ForkJoinPool.commonPool());
2375     }
2376    
2377     /**
2378     * Creates and returns a CompletableFuture that is completed
2379     * asynchronously using the given executor after this or the other
2380     * given CompletableFuture complete. If this and/or the other
2381     * CompletableFuture complete exceptionally, then the returned
2382     * CompletableFuture may also do so, with a CompletionException
2383     * holding one of these exceptions as its cause. No guarantees are
2384     * made about which exception is used in the returned
2385     * CompletableFuture.
2386     *
2387     * @param other the other CompletableFuture
2388     * @param action the action to perform before completing the
2389     * returned CompletableFuture
2390     * @param executor the executor to use for asynchronous execution
2391     * @return the new CompletableFuture
2392     */
2393     public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2394     Runnable action,
2395     Executor executor) {
2396     if (executor == null) throw new NullPointerException();
2397     return doOrRun(other, action, executor);
2398     }
2399    
2400     private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2401     Runnable action,
2402     Executor e) {
2403     if (other == null || action == null) throw new NullPointerException();
2404     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2405     OrRunCompletion<T> d = null;
2406     Object r;
2407     if ((r = result) == null && (r = other.result) == null) {
2408     d = new OrRunCompletion<T>(this, other, action, dst, e);
2409     CompletionNode q = null, p = new CompletionNode(d);
2410     while ((r = result) == null && (r = other.result) == null) {
2411     if (q != null) {
2412     if (UNSAFE.compareAndSwapObject
2413     (other, COMPLETIONS, q.next = other.completions, q))
2414     break;
2415     }
2416     else if (UNSAFE.compareAndSwapObject
2417     (this, COMPLETIONS, p.next = completions, p))
2418     q = new CompletionNode(d);
2419     }
2420     }
2421     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2422     Throwable ex;
2423     if (r instanceof AltResult)
2424     ex = ((AltResult)r).ex;
2425     else
2426     ex = null;
2427     if (ex == null) {
2428     try {
2429     if (e != null)
2430     e.execute(new AsyncRun(action, dst));
2431     else
2432     action.run();
2433     } catch (Throwable rex) {
2434     ex = rex;
2435     }
2436     }
2437     if (e == null || ex != null)
2438     dst.internalComplete(null, ex);
2439     }
2440     helpPostComplete();
2441     other.helpPostComplete();
2442     return dst;
2443     }
2444    
2445     /**
2446     * Returns a CompletableFuture (or an equivalent one) produced by
2447     * the given function of the result of this CompletableFuture when
2448     * completed. If this CompletableFuture completes exceptionally,
2449     * then the returned CompletableFuture also does so, with a
2450     * CompletionException holding this exception as its cause.
2451     *
2452     * @param fn the function returning a new CompletableFuture.
2453     * @return the CompletableFuture, that {@code isDone()} upon
2454     * return if completed by the given function, or an exception
2455     * occurs.
2456     */
2457     public <U> CompletableFuture<U> thenCompose(Fun<? super T,
2458     CompletableFuture<U>> fn) {
2459     if (fn == null) throw new NullPointerException();
2460     CompletableFuture<U> dst = null;
2461     ComposeCompletion<T,U> d = null;
2462     Object r;
2463     if ((r = result) == null) {
2464     dst = new CompletableFuture<U>();
2465     CompletionNode p = new CompletionNode
2466     (d = new ComposeCompletion<T,U>(this, fn, dst));
2467     while ((r = result) == null) {
2468     if (UNSAFE.compareAndSwapObject
2469     (this, COMPLETIONS, p.next = completions, p))
2470     break;
2471     }
2472     }
2473     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2474     T t; Throwable ex;
2475     if (r instanceof AltResult) {
2476     ex = ((AltResult)r).ex;
2477     t = null;
2478     }
2479     else {
2480     ex = null;
2481     @SuppressWarnings("unchecked") T tr = (T) r;
2482     t = tr;
2483     }
2484     if (ex == null) {
2485     try {
2486     dst = fn.apply(t);
2487     } catch (Throwable rex) {
2488     ex = rex;
2489     }
2490     }
2491     if (dst == null) {
2492     dst = new CompletableFuture<U>();
2493     if (ex == null)
2494     ex = new NullPointerException();
2495     }
2496     if (ex != null)
2497     dst.internalComplete(null, ex);
2498     }
2499     helpPostComplete();
2500     dst.helpPostComplete();
2501     return dst;
2502     }
2503    
2504     /**
2505     * Creates and returns a CompletableFuture that is completed with
2506     * the result of the given function of the exception triggering
2507     * this CompletableFuture's completion when it completes
2508     * exceptionally; Otherwise, if this CompletableFuture completes
2509     * normally, then the returned CompletableFuture also completes
2510     * normally with the same value.
2511     *
2512     * @param fn the function to use to compute the value of the
2513     * returned CompletableFuture if this CompletableFuture completed
2514     * exceptionally
2515     * @return the new CompletableFuture
2516     */
2517     public CompletableFuture<T> exceptionally(Fun<Throwable, ? extends T> fn) {
2518     if (fn == null) throw new NullPointerException();
2519     CompletableFuture<T> dst = new CompletableFuture<T>();
2520     ExceptionCompletion<T> d = null;
2521     Object r;
2522     if ((r = result) == null) {
2523     CompletionNode p =
2524     new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2525     while ((r = result) == null) {
2526     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2527     p.next = completions, p))
2528     break;
2529     }
2530     }
2531     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2532     T t = null; Throwable ex, dx = null;
2533     if (r instanceof AltResult) {
2534     if ((ex = ((AltResult)r).ex) != null) {
2535     try {
2536     t = fn.apply(ex);
2537     } catch (Throwable rex) {
2538     dx = rex;
2539     }
2540     }
2541     }
2542     else {
2543     @SuppressWarnings("unchecked") T tr = (T) r;
2544     t = tr;
2545     }
2546     dst.internalComplete(t, dx);
2547     }
2548     helpPostComplete();
2549     return dst;
2550     }
2551    
2552     /**
2553     * Creates and returns a CompletableFuture that is completed with
2554     * the result of the given function of the result and exception of
2555     * this CompletableFuture's completion when it completes. The
2556     * given function is invoked with the result (or {@code null} if
2557     * none) and the exception (or {@code null} if none) of this
2558     * CompletableFuture when complete.
2559     *
2560     * @param fn the function to use to compute the value of the
2561     * returned CompletableFuture
2562    
2563     * @return the new CompletableFuture
2564     */
2565     public <U> CompletableFuture<U> handle(BiFun<? super T, Throwable, ? extends U> fn) {
2566     if (fn == null) throw new NullPointerException();
2567     CompletableFuture<U> dst = new CompletableFuture<U>();
2568     HandleCompletion<T,U> d = null;
2569     Object r;
2570     if ((r = result) == null) {
2571     CompletionNode p =
2572     new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2573     while ((r = result) == null) {
2574     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2575     p.next = completions, p))
2576     break;
2577     }
2578     }
2579     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2580     T t; Throwable ex;
2581     if (r instanceof AltResult) {
2582     ex = ((AltResult)r).ex;
2583     t = null;
2584     }
2585     else {
2586     ex = null;
2587     @SuppressWarnings("unchecked") T tr = (T) r;
2588     t = tr;
2589     }
2590     U u; Throwable dx;
2591     try {
2592     u = fn.apply(t, ex);
2593     dx = null;
2594     } catch (Throwable rex) {
2595     dx = rex;
2596     u = null;
2597     }
2598     dst.internalComplete(u, dx);
2599     }
2600     helpPostComplete();
2601     return dst;
2602     }
2603    
2604     /**
2605     * Attempts to complete this CompletableFuture with
2606     * a {@link CancellationException}.
2607     *
2608     * @param mayInterruptIfRunning this value has no effect in this
2609     * implementation because interrupts are not used to control
2610     * processing.
2611     *
2612     * @return {@code true} if this task is now cancelled
2613     */
2614     public boolean cancel(boolean mayInterruptIfRunning) {
2615     Object r;
2616     while ((r = result) == null) {
2617     r = new AltResult(new CancellationException());
2618     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
2619     postComplete();
2620     return true;
2621     }
2622     }
2623     return ((r instanceof AltResult) &&
2624     (((AltResult)r).ex instanceof CancellationException));
2625     }
2626    
2627     /**
2628     * Returns {@code true} if this CompletableFuture was cancelled
2629     * before it completed normally.
2630     *
2631     * @return {@code true} if this CompletableFuture was cancelled
2632     * before it completed normally
2633     */
2634     public boolean isCancelled() {
2635     Object r;
2636     return ((r = result) != null &&
2637     (r instanceof AltResult) &&
2638     (((AltResult)r).ex instanceof CancellationException));
2639     }
2640    
2641     /**
2642     * Forcibly sets or resets the value subsequently returned by
2643     * method get() and related methods, whether or not already
2644     * completed. This method is designed for use only in error
2645     * recovery actions, and even in such situations may result in
2646     * ongoing dependent completions using established versus
2647 dl 1.3 * overwritten outcomes.
2648 dl 1.1 *
2649     * @param value the completion value
2650     */
2651     public void obtrudeValue(T value) {
2652     result = (value == null) ? NIL : value;
2653     postComplete();
2654     }
2655    
2656 dl 1.3 /**
2657     * Forcibly causes subsequent invocations of method get() and
2658     * related methods to throw the given exception, whether or not
2659     * already completed. This method is designed for use only in
2660     * recovery actions, and even in such situations may result in
2661     * ongoing dependent completions using established versus
2662     * overwritten outcomes.
2663     *
2664     * @param ex the exception
2665     */
2666     public void obtrudeException(Throwable ex) {
2667     if (ex == null) throw new NullPointerException();
2668     result = new AltResult(ex);
2669     postComplete();
2670     }
2671    
2672 dl 1.1 // Unsafe mechanics
2673     private static final sun.misc.Unsafe UNSAFE;
2674     private static final long RESULT;
2675     private static final long WAITERS;
2676     private static final long COMPLETIONS;
2677     static {
2678     try {
2679     UNSAFE = getUnsafe();
2680     Class<?> k = CompletableFuture.class;
2681     RESULT = UNSAFE.objectFieldOffset
2682     (k.getDeclaredField("result"));
2683     WAITERS = UNSAFE.objectFieldOffset
2684     (k.getDeclaredField("waiters"));
2685     COMPLETIONS = UNSAFE.objectFieldOffset
2686     (k.getDeclaredField("completions"));
2687     } catch (Exception e) {
2688     throw new Error(e);
2689     }
2690     }
2691    
2692    
2693     /**
2694     * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
2695     * Replace with a simple call to Unsafe.getUnsafe when integrating
2696     * into a jdk.
2697     *
2698     * @return a sun.misc.Unsafe
2699     */
2700     private static sun.misc.Unsafe getUnsafe() {
2701     try {
2702     return sun.misc.Unsafe.getUnsafe();
2703 jsr166 1.6 } catch (SecurityException tryReflectionInstead) {}
2704     try {
2705     return java.security.AccessController.doPrivileged
2706     (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
2707     public sun.misc.Unsafe run() throws Exception {
2708     Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
2709     for (java.lang.reflect.Field f : k.getDeclaredFields()) {
2710     f.setAccessible(true);
2711     Object x = f.get(null);
2712     if (k.isInstance(x))
2713     return k.cast(x);
2714     }
2715     throw new NoSuchFieldError("the Unsafe");
2716     }});
2717     } catch (java.security.PrivilegedActionException e) {
2718     throw new RuntimeException("Could not initialize intrinsics",
2719     e.getCause());
2720 dl 1.1 }
2721     }
2722     }