ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.29
Committed: Wed Jan 2 17:37:18 2013 UTC (11 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.28: +6 -6 lines
Log Message:
Consistent rechecks

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.7 import java.util.function.Block;
10     import java.util.function.BiBlock;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22     import java.util.concurrent.atomic.AtomicInteger;
23     import java.util.concurrent.locks.LockSupport;
24    
25     /**
26     * A {@link Future} that may be explicitly completed (setting its
27     * value and status), and may include dependent functions and actions
28 dl 1.28 * that trigger upon its completion. Methods are available for adding
29     * those based on Functions, Blocks, and Runnables, depending on
30     * whether they require arguments and/or produce results, as well as
31     * those triggered after either or both the current and another
32     * CompletableFuture complete. Functions and actions supplied for
33     * dependent completions (mainly using methods with prefix {@code
34     * then}) may be performed by the thread that completes the current
35 dl 1.1 * CompletableFuture, or by any other caller of these methods. There
36     * are no guarantees about the order of processing completions unless
37 dl 1.17 * constrained by these methods.
38 dl 1.1 *
39 jsr166 1.4 * <p>When two or more threads attempt to {@link #complete} or {@link
40 dl 1.17 * #completeExceptionally} a CompletableFuture, only one of them
41 dl 1.19 * succeeds.
42     *
43 jsr166 1.23 * <p>Upon exceptional completion, or when a completion entails
44 dl 1.5 * computation of a function or action, and it terminates abruptly
45 dl 1.19 * with an (unchecked) exception or error, then further completions
46     * act as {@code completeExceptionally} with a {@link
47     * CompletionException} holding that exception as its cause. If a
48     * CompletableFuture completes exceptionally, and is not followed by a
49     * {@link #exceptionally} or {@link #handle} completion, then all of
50     * its dependents (and their dependents) also complete exceptionally
51 dl 1.20 * with CompletionExceptions holding the ultimate cause. In case of a
52     * CompletionException, methods {@link #get()} and {@link #get(long,
53 dl 1.28 * TimeUnit)} throw an {@link ExecutionException} with the same cause
54 dl 1.20 * as would be held in the corresponding CompletionException. However,
55     * in these cases, methods {@link #join()} and {@link #getNow} throw
56     * the CompletionException, which simplifies usage especially within
57     * other completion functions.
58 dl 1.1 *
59     * <p>CompletableFutures themselves do not execute asynchronously.
60     * However, the {@code async} methods provide commonly useful ways to
61 jsr166 1.8 * commence asynchronous processing, using either a given {@link
62 dl 1.1 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
63     * function or action that will result in the completion of a new
64 dl 1.28 * CompletableFuture. To simplify monitoring, debugging, and tracking,
65     * all generated asynchronous tasks are instances of the tagging
66     * interface {@link AsynchronousCompletionTask}.
67 dl 1.1 *
68     * @author Doug Lea
69     * @since 1.8
70     */
71     public class CompletableFuture<T> implements Future<T> {
72 dl 1.28
73 dl 1.1 /*
74 dl 1.20 * Overview:
75 dl 1.1 *
76 dl 1.20 * 1. Non-nullness of field result (set via CAS) indicates
77     * done. An AltResult is used to box null as a result, as well as
78     * to hold exceptions. Using a single field makes completion fast
79     * and simple to detect and trigger, at the expense of a lot of
80     * encoding and decoding that infiltrates many methods. One minor
81     * simplification relies on the (static) NIL (to box null results)
82     * being the only AltResult with a null exception field, so we
83 dl 1.28 * don't usually need explicit comparisons with NIL. The CF
84     * exception propagation mechanics surrounding decoding rely on
85     * unchecked casts of decoded results really being unchecked,
86     * where user type errors are caught at point of use, as is
87     * currently the case in Java. These are highlighted by using
88     * SuppressWarnings-annotated temporaries.
89 dl 1.1 *
90     * 2. Waiters are held in a Treiber stack similar to the one used
91 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
92 dl 1.28 * internal documentation for algorithmic details.
93 dl 1.1 *
94     * 3. Completions are also kept in a list/stack, and pulled off
95 dl 1.28 * and run when completion is triggered. (We could even use the
96 jsr166 1.24 * same stack as for waiters, but would give up the potential
97 dl 1.20 * parallelism obtained because woken waiters help release/run
98 dl 1.28 * others -- see method postComplete). Because post-processing
99     * may race with direct calls, class Completion opportunistically
100     * extends AtomicInteger so callers can claim the action via
101     * compareAndSet(0, 1). The Completion.run methods are all
102     * written a boringly similar uniform way (that sometimes includes
103     * unnecessary-looking checks, kept to maintain uniformity). There
104     * are enough dimensions upon which they differ that factoring to
105     * use common code isn't worthwhile.
106 dl 1.20 *
107     * 4. The exported then/and/or methods do support a bit of
108 dl 1.28 * factoring (see doThenApply etc). They must cope with the
109     * intrinsic races surrounding addition of a dependent action
110     * versus performing the action directly because the task is
111     * already complete. For example, a CF may not be complete upon
112     * entry, so a dependent completion is added, but by the time it
113     * is added, the target CF is complete, so must be directly
114 dl 1.20 * executed. This is all done while avoiding unnecessary object
115     * construction in safe-bypass cases.
116 dl 1.1 */
117    
118 dl 1.28 // preliminaries
119 dl 1.20
120 dl 1.1 static final class AltResult {
121     final Throwable ex; // null only for NIL
122 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
123 dl 1.1 }
124    
125     static final AltResult NIL = new AltResult(null);
126    
127 dl 1.20 // Fields
128    
129     volatile Object result; // Either the result or boxed AltResult
130 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
131     volatile CompletionNode completions; // list (Treiber stack) of completions
132    
133 dl 1.20 // Basic utilities for triggering and processing completions
134    
135     /**
136     * Removes and signals all waiting threads and runs all completions.
137     */
138     final void postComplete() {
139     WaitNode q; Thread t;
140     while ((q = waiters) != null) {
141     if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
142     (t = q.thread) != null) {
143     q.thread = null;
144     LockSupport.unpark(t);
145     }
146     }
147    
148     CompletionNode h; Completion c;
149     while ((h = completions) != null) {
150     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
151     (c = h.completion) != null)
152     c.run();
153     }
154     }
155    
156     /**
157     * Triggers completion with the encoding of the given arguments:
158     * if the exception is non-null, encodes it as a wrapped
159     * CompletionException unless it is one already. Otherwise uses
160     * the given result, boxed as NIL if null.
161     */
162     final void internalComplete(Object v, Throwable ex) {
163     if (result == null)
164     UNSAFE.compareAndSwapObject
165     (this, RESULT, null,
166     (ex == null) ? (v == null) ? NIL : v :
167     new AltResult((ex instanceof CompletionException) ? ex :
168     new CompletionException(ex)));
169     postComplete(); // help out even if not triggered
170     }
171    
172     /**
173 jsr166 1.25 * If triggered, helps release and/or process completions.
174 dl 1.20 */
175     final void helpPostComplete() {
176     if (result != null)
177     postComplete();
178     }
179    
180 dl 1.28 /* ------------- waiting for completions -------------- */
181 dl 1.20
182 dl 1.1 /**
183 dl 1.28 * Heuristic spin value for waitingGet() before blocking on
184     * multiprocessors
185 dl 1.1 */
186 dl 1.28 static final int WAITING_GET_SPINS = 256;
187 dl 1.1
188     /**
189 dl 1.28 * Linked nodes to record waiting threads in a Treiber stack. See
190     * other classes such as Phaser and SynchronousQueue for more
191     * detailed explanation. This class implements ManagedBlocker to
192     * avoid starvation when blocking actions pile up in
193     * ForkJoinPools.
194     */
195     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
196     long nanos; // wait time if timed
197     final long deadline; // non-zero if timed
198     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
199     volatile Thread thread;
200     volatile WaitNode next;
201     WaitNode(boolean interruptible, long nanos, long deadline) {
202     this.thread = Thread.currentThread();
203     this.interruptControl = interruptible ? 1 : 0;
204     this.nanos = nanos;
205     this.deadline = deadline;
206     }
207     public boolean isReleasable() {
208     if (thread == null)
209     return true;
210     if (Thread.interrupted()) {
211     int i = interruptControl;
212     interruptControl = -1;
213     if (i > 0)
214     return true;
215     }
216     if (deadline != 0L &&
217     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
218     thread = null;
219     return true;
220     }
221     return false;
222     }
223     public boolean block() {
224     if (isReleasable())
225     return true;
226     else if (deadline == 0L)
227     LockSupport.park(this);
228     else if (nanos > 0L)
229     LockSupport.parkNanos(this, nanos);
230     return isReleasable();
231     }
232 dl 1.1 }
233    
234     /**
235 dl 1.28 * Returns raw result after waiting, or null if interruptible and
236     * interrupted.
237 dl 1.1 */
238 dl 1.28 private Object waitingGet(boolean interruptible) {
239     WaitNode q = null;
240     boolean queued = false;
241     int h = 0, spins = 0;
242     for (Object r;;) {
243     if ((r = result) != null) {
244     if (q != null) { // suppress unpark
245     q.thread = null;
246     if (q.interruptControl < 0) {
247     if (interruptible) {
248     removeWaiter(q);
249     return null;
250     }
251     Thread.currentThread().interrupt();
252     }
253     }
254     postComplete(); // help release others
255     return r;
256     }
257     else if (h == 0) {
258     h = ThreadLocalRandom.current().nextInt();
259     if (Runtime.getRuntime().availableProcessors() > 1)
260     spins = WAITING_GET_SPINS;
261     }
262     else if (spins > 0) {
263     h ^= h << 1; // xorshift
264     h ^= h >>> 3;
265     if ((h ^= h << 10) >= 0)
266     --spins;
267     }
268     else if (q == null)
269     q = new WaitNode(interruptible, 0L, 0L);
270     else if (!queued)
271     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
272     q.next = waiters, q);
273     else if (interruptible && q.interruptControl < 0) {
274     removeWaiter(q);
275     return null;
276     }
277     else if (q.thread != null && result == null) {
278     try {
279     ForkJoinPool.managedBlock(q);
280     } catch(InterruptedException ex){
281     q.interruptControl = -1;
282     }
283     }
284     }
285 dl 1.1 }
286    
287     /**
288 dl 1.28 * Awaits completion or aborts on interrupt or timeout.
289 dl 1.1 *
290 dl 1.28 * @param nanos time to wait
291     * @return raw result
292 dl 1.1 */
293 dl 1.28 private Object timedAwaitDone(long nanos)
294     throws InterruptedException, TimeoutException {
295     WaitNode q = null;
296     boolean queued = false;
297     for (Object r;;) {
298     if ((r = result) != null) {
299     if (q != null) {
300     q.thread = null;
301     if (q.interruptControl < 0) {
302     removeWaiter(q);
303     throw new InterruptedException();
304     }
305     }
306     postComplete();
307     return r;
308     }
309     else if (q == null) {
310     if (nanos <= 0L)
311     throw new TimeoutException();
312     long d = System.nanoTime() + nanos;
313     q = new WaitNode(true, nanos, d == 0L? 1L : d); // avoid 0
314     }
315     else if (!queued)
316     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
317     q.next = waiters, q);
318     else if (q.interruptControl < 0) {
319     removeWaiter(q);
320     throw new InterruptedException();
321     }
322     else if (q.nanos <= 0L) {
323     if (result == null) {
324     removeWaiter(q);
325     throw new TimeoutException();
326     }
327     }
328     else if (q.thread != null && result == null) {
329     try {
330     ForkJoinPool.managedBlock(q);
331     } catch(InterruptedException ex){
332     q.interruptControl = -1;
333     }
334     }
335     }
336 dl 1.1 }
337    
338     /**
339 dl 1.28 * Tries to unlink a timed-out or interrupted wait node to avoid
340     * accumulating garbage. Internal nodes are simply unspliced
341     * without CAS since it is harmless if they are traversed anyway
342     * by releasers. To avoid effects of unsplicing from already
343     * removed nodes, the list is retraversed in case of an apparent
344     * race. This is slow when there are a lot of nodes, but we don't
345     * expect lists to be long enough to outweigh higher-overhead
346     * schemes.
347 dl 1.1 */
348 dl 1.28 private void removeWaiter(WaitNode node) {
349     if (node != null) {
350     node.thread = null;
351     retry:
352     for (;;) { // restart on removeWaiter race
353     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
354     s = q.next;
355     if (q.thread != null)
356     pred = q;
357     else if (pred != null) {
358     pred.next = s;
359     if (pred.thread == null) // check for race
360     continue retry;
361     }
362     else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
363     continue retry;
364     }
365     break;
366     }
367     }
368 dl 1.1 }
369    
370 dl 1.28 /* ------------- Async tasks -------------- */
371    
372 dl 1.1 /**
373 dl 1.28 * A tagging interface identifying asynchronous tasks produced by
374     * {@code async} methods. This may be useful for monitoring,
375     * debugging, and tracking asynchronous activities.
376 dl 1.1 */
377 dl 1.28 public static interface AsynchronousCompletionTask {
378 dl 1.1 }
379    
380 dl 1.28 /** Base class can act as either FJ or plain Runnable */
381     static abstract class Async extends ForkJoinTask<Void>
382     implements Runnable, AsynchronousCompletionTask {
383     public final Void getRawResult() { return null; }
384     public final void setRawResult(Void v) { }
385     public final void run() { exec(); }
386 jsr166 1.26 }
387    
388 dl 1.28 static final class AsyncRun extends Async {
389     final Runnable fn;
390     final CompletableFuture<Void> dst;
391     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
392     this.fn = fn; this.dst = dst;
393     }
394     public final boolean exec() {
395     CompletableFuture<Void> d; Throwable ex;
396 dl 1.29 if ((d = this.dst) != null && d.result == null) {
397 dl 1.28 try {
398     fn.run();
399     ex = null;
400     } catch (Throwable rex) {
401     ex = rex;
402     }
403     d.internalComplete(null, ex);
404 dl 1.19 }
405 dl 1.28 return true;
406 dl 1.19 }
407 dl 1.28 private static final long serialVersionUID = 5232453952276885070L;
408 dl 1.19 }
409    
410 dl 1.28 static final class AsyncSupply<U> extends Async {
411     final Supplier<U> fn;
412     final CompletableFuture<U> dst;
413     AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
414     this.fn = fn; this.dst = dst;
415     }
416     public final boolean exec() {
417     CompletableFuture<U> d; U u; Throwable ex;
418 dl 1.29 if ((d = this.dst) != null && d.result == null) {
419 dl 1.28 try {
420     u = fn.get();
421     ex = null;
422     } catch (Throwable rex) {
423     ex = rex;
424     u = null;
425     }
426     d.internalComplete(u, ex);
427 dl 1.1 }
428     return true;
429     }
430     private static final long serialVersionUID = 5232453952276885070L;
431     }
432    
433 dl 1.28 static final class AsyncApply<T,U> extends Async {
434     final Function<? super T,? extends U> fn;
435     final T arg;
436 dl 1.1 final CompletableFuture<U> dst;
437 dl 1.28 AsyncApply(T arg, Function<? super T,? extends U> fn,
438 dl 1.1 CompletableFuture<U> dst) {
439     this.arg = arg; this.fn = fn; this.dst = dst;
440     }
441     public final boolean exec() {
442 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
443 dl 1.29 if ((d = this.dst) != null && d.result == null) {
444 dl 1.21 try {
445     u = fn.apply(arg);
446     ex = null;
447     } catch (Throwable rex) {
448     ex = rex;
449     u = null;
450     }
451     d.internalComplete(u, ex);
452 dl 1.1 }
453     return true;
454     }
455     private static final long serialVersionUID = 5232453952276885070L;
456     }
457    
458 dl 1.28 static final class AsyncBiApply<T,U,V> extends Async {
459 dl 1.1 final BiFunction<? super T,? super U,? extends V> fn;
460     final T arg1;
461     final U arg2;
462     final CompletableFuture<V> dst;
463 dl 1.28 AsyncBiApply(T arg1, U arg2,
464 dl 1.1 BiFunction<? super T,? super U,? extends V> fn,
465     CompletableFuture<V> dst) {
466     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
467     }
468     public final boolean exec() {
469 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
470 dl 1.29 if ((d = this.dst) != null && d.result == null) {
471 dl 1.21 try {
472     v = fn.apply(arg1, arg2);
473     ex = null;
474     } catch (Throwable rex) {
475     ex = rex;
476     v = null;
477     }
478     d.internalComplete(v, ex);
479 dl 1.1 }
480     return true;
481     }
482     private static final long serialVersionUID = 5232453952276885070L;
483     }
484    
485 dl 1.28 static final class AsyncAccept<T> extends Async {
486     final Block<? super T> fn;
487     final T arg;
488 dl 1.7 final CompletableFuture<Void> dst;
489 dl 1.28 AsyncAccept(T arg, Block<? super T> fn,
490 dl 1.7 CompletableFuture<Void> dst) {
491     this.arg = arg; this.fn = fn; this.dst = dst;
492     }
493     public final boolean exec() {
494 dl 1.21 CompletableFuture<Void> d; Throwable ex;
495 dl 1.29 if ((d = this.dst) != null && d.result == null) {
496 dl 1.21 try {
497     fn.accept(arg);
498     ex = null;
499     } catch (Throwable rex) {
500     ex = rex;
501     }
502     d.internalComplete(null, ex);
503 dl 1.7 }
504     return true;
505     }
506     private static final long serialVersionUID = 5232453952276885070L;
507     }
508    
509 dl 1.28 static final class AsyncBiAccept<T,U> extends Async {
510 dl 1.7 final BiBlock<? super T,? super U> fn;
511     final T arg1;
512     final U arg2;
513     final CompletableFuture<Void> dst;
514 dl 1.28 AsyncBiAccept(T arg1, U arg2,
515 dl 1.7 BiBlock<? super T,? super U> fn,
516     CompletableFuture<Void> dst) {
517     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
518     }
519     public final boolean exec() {
520 dl 1.21 CompletableFuture<Void> d; Throwable ex;
521 dl 1.29 if ((d = this.dst) != null && d.result == null) {
522 dl 1.21 try {
523     fn.accept(arg1, arg2);
524     ex = null;
525     } catch (Throwable rex) {
526     ex = rex;
527     }
528     d.internalComplete(null, ex);
529 dl 1.7 }
530     return true;
531     }
532     private static final long serialVersionUID = 5232453952276885070L;
533     }
534    
535 dl 1.1 /* ------------- Completions -------------- */
536    
537 dl 1.28 /**
538     * Simple linked list nodes to record completions, used in
539     * basically the same way as WaitNodes. (We separate nodes from
540     * the Completions themselves mainly because for the And and Or
541     * methods, the same Completion object resides in two lists.)
542     */
543     static final class CompletionNode {
544     final Completion completion;
545     volatile CompletionNode next;
546     CompletionNode(Completion completion) { this.completion = completion; }
547     }
548    
549 dl 1.1 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
550     static abstract class Completion extends AtomicInteger implements Runnable {
551     }
552    
553 dl 1.28 static final class ApplyCompletion<T,U> extends Completion {
554 jsr166 1.2 final CompletableFuture<? extends T> src;
555     final Function<? super T,? extends U> fn;
556     final CompletableFuture<U> dst;
557 dl 1.1 final Executor executor;
558 dl 1.28 ApplyCompletion(CompletableFuture<? extends T> src,
559     Function<? super T,? extends U> fn,
560     CompletableFuture<U> dst, Executor executor) {
561 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
562     this.executor = executor;
563     }
564 jsr166 1.26 public final void run() {
565 dl 1.28 final CompletableFuture<? extends T> a;
566     final Function<? super T,? extends U> fn;
567     final CompletableFuture<U> dst;
568 jsr166 1.2 Object r; T t; Throwable ex;
569     if ((dst = this.dst) != null &&
570 dl 1.1 (fn = this.fn) != null &&
571     (a = this.src) != null &&
572     (r = a.result) != null &&
573     compareAndSet(0, 1)) {
574 jsr166 1.2 if (r instanceof AltResult) {
575 dl 1.19 ex = ((AltResult)r).ex;
576 dl 1.1 t = null;
577     }
578 dl 1.17 else {
579     ex = null;
580 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
581     t = tr;
582 dl 1.1 }
583 dl 1.20 Executor e = executor;
584     U u = null;
585 dl 1.17 if (ex == null) {
586     try {
587 dl 1.20 if (e != null)
588 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
589 dl 1.17 else
590 dl 1.20 u = fn.apply(t);
591 dl 1.17 } catch (Throwable rex) {
592     ex = rex;
593     }
594     }
595 dl 1.20 if (e == null || ex != null)
596     dst.internalComplete(u, ex);
597 dl 1.1 }
598     }
599 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
600 dl 1.1 }
601    
602 dl 1.28 static final class AcceptCompletion<T> extends Completion {
603 dl 1.7 final CompletableFuture<? extends T> src;
604     final Block<? super T> fn;
605     final CompletableFuture<Void> dst;
606     final Executor executor;
607 dl 1.28 AcceptCompletion(CompletableFuture<? extends T> src,
608     Block<? super T> fn,
609     CompletableFuture<Void> dst, Executor executor) {
610 dl 1.7 this.src = src; this.fn = fn; this.dst = dst;
611     this.executor = executor;
612     }
613 jsr166 1.26 public final void run() {
614 dl 1.28 final CompletableFuture<? extends T> a;
615     final Block<? super T> fn;
616     final CompletableFuture<Void> dst;
617 dl 1.7 Object r; T t; Throwable ex;
618     if ((dst = this.dst) != null &&
619     (fn = this.fn) != null &&
620     (a = this.src) != null &&
621     (r = a.result) != null &&
622     compareAndSet(0, 1)) {
623     if (r instanceof AltResult) {
624 dl 1.19 ex = ((AltResult)r).ex;
625 dl 1.7 t = null;
626     }
627 dl 1.17 else {
628     ex = null;
629 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
630     t = tr;
631 dl 1.17 }
632 dl 1.20 Executor e = executor;
633 dl 1.17 if (ex == null) {
634     try {
635 dl 1.20 if (e != null)
636 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
637 dl 1.20 else
638 dl 1.17 fn.accept(t);
639     } catch (Throwable rex) {
640     ex = rex;
641 dl 1.7 }
642     }
643 dl 1.20 if (e == null || ex != null)
644     dst.internalComplete(null, ex);
645 dl 1.7 }
646     }
647 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
648 dl 1.7 }
649    
650 dl 1.28 static final class RunCompletion<T> extends Completion {
651 jsr166 1.2 final CompletableFuture<? extends T> src;
652     final Runnable fn;
653     final CompletableFuture<Void> dst;
654 dl 1.1 final Executor executor;
655 dl 1.28 RunCompletion(CompletableFuture<? extends T> src,
656     Runnable fn,
657     CompletableFuture<Void> dst,
658     Executor executor) {
659 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
660     this.executor = executor;
661     }
662 jsr166 1.26 public final void run() {
663 dl 1.28 final CompletableFuture<? extends T> a;
664     final Runnable fn;
665     final CompletableFuture<Void> dst;
666 jsr166 1.2 Object r; Throwable ex;
667     if ((dst = this.dst) != null &&
668 dl 1.1 (fn = this.fn) != null &&
669     (a = this.src) != null &&
670     (r = a.result) != null &&
671     compareAndSet(0, 1)) {
672 dl 1.19 if (r instanceof AltResult)
673     ex = ((AltResult)r).ex;
674 dl 1.17 else
675     ex = null;
676 dl 1.20 Executor e = executor;
677 dl 1.17 if (ex == null) {
678     try {
679 dl 1.20 if (e != null)
680 dl 1.28 e.execute(new AsyncRun(fn, dst));
681 dl 1.20 else
682 dl 1.17 fn.run();
683     } catch (Throwable rex) {
684     ex = rex;
685 dl 1.1 }
686     }
687 dl 1.20 if (e == null || ex != null)
688     dst.internalComplete(null, ex);
689 dl 1.1 }
690     }
691 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
692 dl 1.1 }
693    
694 dl 1.28 static final class BiApplyCompletion<T,U,V> extends Completion {
695 jsr166 1.2 final CompletableFuture<? extends T> src;
696     final CompletableFuture<? extends U> snd;
697     final BiFunction<? super T,? super U,? extends V> fn;
698     final CompletableFuture<V> dst;
699 dl 1.1 final Executor executor;
700 dl 1.28 BiApplyCompletion(CompletableFuture<? extends T> src,
701     CompletableFuture<? extends U> snd,
702     BiFunction<? super T,? super U,? extends V> fn,
703     CompletableFuture<V> dst, Executor executor) {
704 dl 1.1 this.src = src; this.snd = snd;
705     this.fn = fn; this.dst = dst;
706     this.executor = executor;
707     }
708 jsr166 1.26 public final void run() {
709 dl 1.28 final CompletableFuture<? extends T> a;
710     final CompletableFuture<? extends U> b;
711     final BiFunction<? super T,? super U,? extends V> fn;
712     final CompletableFuture<V> dst;
713 jsr166 1.2 Object r, s; T t; U u; Throwable ex;
714     if ((dst = this.dst) != null &&
715 dl 1.1 (fn = this.fn) != null &&
716     (a = this.src) != null &&
717     (r = a.result) != null &&
718     (b = this.snd) != null &&
719     (s = b.result) != null &&
720     compareAndSet(0, 1)) {
721 jsr166 1.2 if (r instanceof AltResult) {
722 dl 1.19 ex = ((AltResult)r).ex;
723 jsr166 1.2 t = null;
724     }
725 dl 1.19 else {
726     ex = null;
727 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
728     t = tr;
729 dl 1.19 }
730     if (ex != null)
731     u = null;
732     else if (s instanceof AltResult) {
733     ex = ((AltResult)s).ex;
734 jsr166 1.2 u = null;
735     }
736 dl 1.28 else {
737     @SuppressWarnings("unchecked") U us = (U) s;
738     u = us;
739     }
740 dl 1.20 Executor e = executor;
741     V v = null;
742 dl 1.19 if (ex == null) {
743     try {
744 dl 1.20 if (e != null)
745 dl 1.28 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
746 dl 1.19 else
747 dl 1.20 v = fn.apply(t, u);
748 dl 1.19 } catch (Throwable rex) {
749     ex = rex;
750     }
751 dl 1.1 }
752 dl 1.20 if (e == null || ex != null)
753     dst.internalComplete(v, ex);
754 jsr166 1.2 }
755     }
756 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
757 dl 1.1 }
758    
759 dl 1.28 static final class BiAcceptCompletion<T,U> extends Completion {
760 dl 1.7 final CompletableFuture<? extends T> src;
761     final CompletableFuture<? extends U> snd;
762     final BiBlock<? super T,? super U> fn;
763     final CompletableFuture<Void> dst;
764     final Executor executor;
765 dl 1.28 BiAcceptCompletion(CompletableFuture<? extends T> src,
766     CompletableFuture<? extends U> snd,
767     BiBlock<? super T,? super U> fn,
768     CompletableFuture<Void> dst, Executor executor) {
769 dl 1.7 this.src = src; this.snd = snd;
770     this.fn = fn; this.dst = dst;
771     this.executor = executor;
772     }
773 jsr166 1.26 public final void run() {
774 dl 1.28 final CompletableFuture<? extends T> a;
775     final CompletableFuture<? extends U> b;
776     final BiBlock<? super T,? super U> fn;
777     final CompletableFuture<Void> dst;
778 dl 1.7 Object r, s; T t; U u; Throwable ex;
779     if ((dst = this.dst) != null &&
780     (fn = this.fn) != null &&
781     (a = this.src) != null &&
782     (r = a.result) != null &&
783     (b = this.snd) != null &&
784     (s = b.result) != null &&
785     compareAndSet(0, 1)) {
786     if (r instanceof AltResult) {
787 dl 1.19 ex = ((AltResult)r).ex;
788 dl 1.7 t = null;
789     }
790 dl 1.19 else {
791     ex = null;
792 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
793     t = tr;
794 dl 1.19 }
795     if (ex != null)
796     u = null;
797     else if (s instanceof AltResult) {
798     ex = ((AltResult)s).ex;
799 dl 1.7 u = null;
800     }
801 dl 1.28 else {
802     @SuppressWarnings("unchecked") U us = (U) s;
803     u = us;
804     }
805 dl 1.20 Executor e = executor;
806 dl 1.19 if (ex == null) {
807     try {
808 dl 1.20 if (e != null)
809 dl 1.28 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
810 dl 1.20 else
811 dl 1.19 fn.accept(t, u);
812     } catch (Throwable rex) {
813     ex = rex;
814 dl 1.7 }
815     }
816 dl 1.20 if (e == null || ex != null)
817     dst.internalComplete(null, ex);
818 dl 1.7 }
819     }
820 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
821 dl 1.7 }
822    
823 dl 1.28 static final class BiRunCompletion<T> extends Completion {
824 jsr166 1.2 final CompletableFuture<? extends T> src;
825     final CompletableFuture<?> snd;
826     final Runnable fn;
827     final CompletableFuture<Void> dst;
828 dl 1.1 final Executor executor;
829 dl 1.28 BiRunCompletion(CompletableFuture<? extends T> src,
830     CompletableFuture<?> snd,
831     Runnable fn,
832     CompletableFuture<Void> dst, Executor executor) {
833 dl 1.1 this.src = src; this.snd = snd;
834     this.fn = fn; this.dst = dst;
835     this.executor = executor;
836     }
837 jsr166 1.26 public final void run() {
838 dl 1.1 final CompletableFuture<? extends T> a;
839     final CompletableFuture<?> b;
840     final Runnable fn;
841     final CompletableFuture<Void> dst;
842 dl 1.28 Object r, s; Throwable ex;
843 jsr166 1.2 if ((dst = this.dst) != null &&
844 dl 1.1 (fn = this.fn) != null &&
845     (a = this.src) != null &&
846     (r = a.result) != null &&
847     (b = this.snd) != null &&
848     (s = b.result) != null &&
849     compareAndSet(0, 1)) {
850 dl 1.19 if (r instanceof AltResult)
851     ex = ((AltResult)r).ex;
852     else
853     ex = null;
854     if (ex == null && (s instanceof AltResult))
855     ex = ((AltResult)s).ex;
856 dl 1.20 Executor e = executor;
857 dl 1.19 if (ex == null) {
858     try {
859 dl 1.20 if (e != null)
860 dl 1.28 e.execute(new AsyncRun(fn, dst));
861 dl 1.20 else
862 dl 1.19 fn.run();
863     } catch (Throwable rex) {
864     ex = rex;
865 dl 1.1 }
866 jsr166 1.2 }
867 dl 1.20 if (e == null || ex != null)
868     dst.internalComplete(null, ex);
869 jsr166 1.2 }
870     }
871 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
872 dl 1.1 }
873    
874 dl 1.28 static final class OrApplyCompletion<T,U> extends Completion {
875 jsr166 1.2 final CompletableFuture<? extends T> src;
876     final CompletableFuture<? extends T> snd;
877     final Function<? super T,? extends U> fn;
878     final CompletableFuture<U> dst;
879 dl 1.1 final Executor executor;
880 dl 1.28 OrApplyCompletion(CompletableFuture<? extends T> src,
881     CompletableFuture<? extends T> snd,
882     Function<? super T,? extends U> fn,
883     CompletableFuture<U> dst, Executor executor) {
884 dl 1.1 this.src = src; this.snd = snd;
885     this.fn = fn; this.dst = dst;
886     this.executor = executor;
887     }
888 jsr166 1.26 public final void run() {
889 dl 1.28 final CompletableFuture<? extends T> a;
890     final CompletableFuture<? extends T> b;
891     final Function<? super T,? extends U> fn;
892     final CompletableFuture<U> dst;
893 jsr166 1.2 Object r; T t; Throwable ex;
894     if ((dst = this.dst) != null &&
895 dl 1.1 (fn = this.fn) != null &&
896     (((a = this.src) != null && (r = a.result) != null) ||
897     ((b = this.snd) != null && (r = b.result) != null)) &&
898     compareAndSet(0, 1)) {
899 jsr166 1.2 if (r instanceof AltResult) {
900 dl 1.19 ex = ((AltResult)r).ex;
901 jsr166 1.2 t = null;
902     }
903 dl 1.19 else {
904     ex = null;
905 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
906     t = tr;
907 dl 1.1 }
908 dl 1.20 Executor e = executor;
909     U u = null;
910 dl 1.19 if (ex == null) {
911     try {
912 dl 1.20 if (e != null)
913 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
914 dl 1.19 else
915 dl 1.20 u = fn.apply(t);
916 dl 1.19 } catch (Throwable rex) {
917     ex = rex;
918     }
919     }
920 dl 1.20 if (e == null || ex != null)
921     dst.internalComplete(u, ex);
922 jsr166 1.2 }
923     }
924 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
925 dl 1.1 }
926    
927 dl 1.28 static final class OrAcceptCompletion<T> extends Completion {
928 dl 1.7 final CompletableFuture<? extends T> src;
929     final CompletableFuture<? extends T> snd;
930     final Block<? super T> fn;
931     final CompletableFuture<Void> dst;
932     final Executor executor;
933 dl 1.28 OrAcceptCompletion(CompletableFuture<? extends T> src,
934     CompletableFuture<? extends T> snd,
935     Block<? super T> fn,
936     CompletableFuture<Void> dst, Executor executor) {
937 dl 1.7 this.src = src; this.snd = snd;
938     this.fn = fn; this.dst = dst;
939     this.executor = executor;
940     }
941 jsr166 1.26 public final void run() {
942 dl 1.28 final CompletableFuture<? extends T> a;
943     final CompletableFuture<? extends T> b;
944     final Block<? super T> fn;
945     final CompletableFuture<Void> dst;
946 dl 1.7 Object r; T t; Throwable ex;
947     if ((dst = this.dst) != null &&
948     (fn = this.fn) != null &&
949     (((a = this.src) != null && (r = a.result) != null) ||
950     ((b = this.snd) != null && (r = b.result) != null)) &&
951     compareAndSet(0, 1)) {
952     if (r instanceof AltResult) {
953 dl 1.19 ex = ((AltResult)r).ex;
954 dl 1.7 t = null;
955     }
956 dl 1.19 else {
957     ex = null;
958 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
959     t = tr;
960 dl 1.19 }
961 dl 1.20 Executor e = executor;
962 dl 1.19 if (ex == null) {
963     try {
964 dl 1.20 if (e != null)
965 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
966 dl 1.20 else
967 dl 1.19 fn.accept(t);
968     } catch (Throwable rex) {
969     ex = rex;
970 dl 1.7 }
971     }
972 dl 1.20 if (e == null || ex != null)
973     dst.internalComplete(null, ex);
974 dl 1.7 }
975     }
976 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
977 dl 1.7 }
978    
979 dl 1.28 static final class OrRunCompletion<T> extends Completion {
980 jsr166 1.2 final CompletableFuture<? extends T> src;
981     final CompletableFuture<?> snd;
982     final Runnable fn;
983     final CompletableFuture<Void> dst;
984 dl 1.1 final Executor executor;
985 dl 1.28 OrRunCompletion(CompletableFuture<? extends T> src,
986     CompletableFuture<?> snd,
987     Runnable fn,
988     CompletableFuture<Void> dst, Executor executor) {
989 dl 1.1 this.src = src; this.snd = snd;
990     this.fn = fn; this.dst = dst;
991     this.executor = executor;
992     }
993 jsr166 1.26 public final void run() {
994 dl 1.28 final CompletableFuture<? extends T> a;
995 dl 1.1 final CompletableFuture<?> b;
996     final Runnable fn;
997     final CompletableFuture<Void> dst;
998 dl 1.28 Object r; Throwable ex;
999 jsr166 1.2 if ((dst = this.dst) != null &&
1000 dl 1.1 (fn = this.fn) != null &&
1001     (((a = this.src) != null && (r = a.result) != null) ||
1002     ((b = this.snd) != null && (r = b.result) != null)) &&
1003     compareAndSet(0, 1)) {
1004 dl 1.19 if (r instanceof AltResult)
1005     ex = ((AltResult)r).ex;
1006     else
1007     ex = null;
1008 dl 1.20 Executor e = executor;
1009 dl 1.19 if (ex == null) {
1010 dl 1.1 try {
1011 dl 1.20 if (e != null)
1012 dl 1.28 e.execute(new AsyncRun(fn, dst));
1013 dl 1.20 else
1014 dl 1.1 fn.run();
1015     } catch (Throwable rex) {
1016 dl 1.19 ex = rex;
1017 dl 1.1 }
1018     }
1019 dl 1.20 if (e == null || ex != null)
1020     dst.internalComplete(null, ex);
1021 jsr166 1.2 }
1022     }
1023 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1024 dl 1.1 }
1025    
1026 dl 1.28 static final class ExceptionCompletion<T> extends Completion {
1027 jsr166 1.2 final CompletableFuture<? extends T> src;
1028 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1029     final CompletableFuture<T> dst;
1030 dl 1.28 ExceptionCompletion(CompletableFuture<? extends T> src,
1031     Function<? super Throwable, ? extends T> fn,
1032     CompletableFuture<T> dst) {
1033 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1034     }
1035 jsr166 1.26 public final void run() {
1036 dl 1.28 final CompletableFuture<? extends T> a;
1037     final Function<? super Throwable, ? extends T> fn;
1038     final CompletableFuture<T> dst;
1039 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1040 jsr166 1.2 if ((dst = this.dst) != null &&
1041 dl 1.1 (fn = this.fn) != null &&
1042     (a = this.src) != null &&
1043     (r = a.result) != null &&
1044     compareAndSet(0, 1)) {
1045 dl 1.20 if ((r instanceof AltResult) &&
1046     (ex = ((AltResult)r).ex) != null) {
1047     try {
1048     t = fn.apply(ex);
1049     } catch (Throwable rex) {
1050     dx = rex;
1051 dl 1.1 }
1052     }
1053 dl 1.28 else {
1054     @SuppressWarnings("unchecked") T tr = (T) r;
1055     t = tr;
1056     }
1057 dl 1.20 dst.internalComplete(t, dx);
1058 dl 1.1 }
1059     }
1060 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1061 dl 1.1 }
1062    
1063 dl 1.17 static final class ThenCopy<T> extends Completion {
1064     final CompletableFuture<? extends T> src;
1065     final CompletableFuture<T> dst;
1066     ThenCopy(CompletableFuture<? extends T> src,
1067     CompletableFuture<T> dst) {
1068     this.src = src; this.dst = dst;
1069     }
1070 jsr166 1.26 public final void run() {
1071 dl 1.28 final CompletableFuture<? extends T> a;
1072     final CompletableFuture<T> dst;
1073 dl 1.20 Object r; Object t; Throwable ex;
1074 dl 1.17 if ((dst = this.dst) != null &&
1075     (a = this.src) != null &&
1076     (r = a.result) != null &&
1077     compareAndSet(0, 1)) {
1078 dl 1.20 if (r instanceof AltResult) {
1079     ex = ((AltResult)r).ex;
1080     t = null;
1081     }
1082     else {
1083     ex = null;
1084     t = r;
1085     }
1086     dst.internalComplete(t, ex);
1087 dl 1.17 }
1088     }
1089 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1090 dl 1.17 }
1091    
1092 dl 1.28 static final class HandleCompletion<T,U> extends Completion {
1093 dl 1.17 final CompletableFuture<? extends T> src;
1094     final BiFunction<? super T, Throwable, ? extends U> fn;
1095     final CompletableFuture<U> dst;
1096 dl 1.28 HandleCompletion(CompletableFuture<? extends T> src,
1097     BiFunction<? super T, Throwable, ? extends U> fn,
1098     final CompletableFuture<U> dst) {
1099 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1100     }
1101 jsr166 1.26 public final void run() {
1102 dl 1.28 final CompletableFuture<? extends T> a;
1103     final BiFunction<? super T, Throwable, ? extends U> fn;
1104     final CompletableFuture<U> dst;
1105 dl 1.17 Object r; T t; Throwable ex;
1106     if ((dst = this.dst) != null &&
1107     (fn = this.fn) != null &&
1108     (a = this.src) != null &&
1109     (r = a.result) != null &&
1110     compareAndSet(0, 1)) {
1111     if (r instanceof AltResult) {
1112     ex = ((AltResult)r).ex;
1113     t = null;
1114     }
1115     else {
1116     ex = null;
1117 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1118     t = tr;
1119 dl 1.17 }
1120 dl 1.20 U u = null; Throwable dx = null;
1121 dl 1.17 try {
1122 dl 1.20 u = fn.apply(t, ex);
1123 dl 1.17 } catch (Throwable rex) {
1124 dl 1.20 dx = rex;
1125 dl 1.17 }
1126 dl 1.20 dst.internalComplete(u, dx);
1127 dl 1.17 }
1128     }
1129 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1130 dl 1.17 }
1131    
1132 dl 1.28 static final class ComposeCompletion<T,U> extends Completion {
1133 dl 1.17 final CompletableFuture<? extends T> src;
1134     final Function<? super T, CompletableFuture<U>> fn;
1135     final CompletableFuture<U> dst;
1136 dl 1.28 ComposeCompletion(CompletableFuture<? extends T> src,
1137     Function<? super T, CompletableFuture<U>> fn,
1138     final CompletableFuture<U> dst) {
1139 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1140     }
1141 jsr166 1.26 public final void run() {
1142 dl 1.28 final CompletableFuture<? extends T> a;
1143     final Function<? super T, CompletableFuture<U>> fn;
1144     final CompletableFuture<U> dst;
1145 dl 1.17 Object r; T t; Throwable ex;
1146     if ((dst = this.dst) != null &&
1147     (fn = this.fn) != null &&
1148     (a = this.src) != null &&
1149     (r = a.result) != null &&
1150     compareAndSet(0, 1)) {
1151     if (r instanceof AltResult) {
1152     ex = ((AltResult)r).ex;
1153     t = null;
1154     }
1155     else {
1156     ex = null;
1157 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1158     t = tr;
1159 dl 1.17 }
1160     CompletableFuture<U> c = null;
1161 dl 1.20 U u = null;
1162     boolean complete = false;
1163 dl 1.17 if (ex == null) {
1164     try {
1165     c = fn.apply(t);
1166     } catch (Throwable rex) {
1167     ex = rex;
1168     }
1169     }
1170     if (ex != null || c == null) {
1171     if (ex == null)
1172     ex = new NullPointerException();
1173     }
1174 dl 1.18 else {
1175     ThenCopy<U> d = null;
1176     Object s;
1177     if ((s = c.result) == null) {
1178     CompletionNode p = new CompletionNode
1179     (d = new ThenCopy<U>(c, dst));
1180     while ((s = c.result) == null) {
1181     if (UNSAFE.compareAndSwapObject
1182     (c, COMPLETIONS, p.next = c.completions, p))
1183     break;
1184     }
1185 dl 1.17 }
1186 dl 1.18 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1187 dl 1.20 complete = true;
1188 dl 1.18 if (s instanceof AltResult) {
1189     ex = ((AltResult)s).ex; // no rewrap
1190     u = null;
1191 dl 1.28 }
1192     else {
1193     @SuppressWarnings("unchecked") U us = (U) s;
1194     u = us;
1195     }
1196     }
1197     }
1198     if (complete || ex != null)
1199     dst.internalComplete(u, ex);
1200     if (c != null)
1201     c.helpPostComplete();
1202     }
1203     }
1204     private static final long serialVersionUID = 5232453952276885070L;
1205     }
1206    
1207     // public methods
1208    
1209     /**
1210     * Creates a new incomplete CompletableFuture.
1211     */
1212     public CompletableFuture() {
1213     }
1214    
1215     /**
1216     * Asynchronously executes in the {@link
1217     * ForkJoinPool#commonPool()}, a task that completes the returned
1218     * CompletableFuture with the result of the given Supplier.
1219     *
1220     * @param supplier a function returning the value to be used
1221     * to complete the returned CompletableFuture
1222     * @return the CompletableFuture
1223     */
1224     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1225     if (supplier == null) throw new NullPointerException();
1226     CompletableFuture<U> f = new CompletableFuture<U>();
1227     ForkJoinPool.commonPool().
1228     execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1229     return f;
1230     }
1231    
1232     /**
1233     * Asynchronously executes using the given executor, a task that
1234     * completes the returned CompletableFuture with the result of the
1235     * given Supplier.
1236     *
1237     * @param supplier a function returning the value to be used
1238     * to complete the returned CompletableFuture
1239     * @param executor the executor to use for asynchronous execution
1240     * @return the CompletableFuture
1241     */
1242     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1243     Executor executor) {
1244     if (executor == null || supplier == null)
1245     throw new NullPointerException();
1246     CompletableFuture<U> f = new CompletableFuture<U>();
1247     executor.execute(new AsyncSupply<U>(supplier, f));
1248     return f;
1249     }
1250    
1251     /**
1252     * Asynchronously executes in the {@link
1253     * ForkJoinPool#commonPool()} a task that runs the given action,
1254     * and then completes the returned CompletableFuture.
1255     *
1256     * @param runnable the action to run before completing the
1257     * returned CompletableFuture
1258     * @return the CompletableFuture
1259     */
1260     public static CompletableFuture<Void> runAsync(Runnable runnable) {
1261     if (runnable == null) throw new NullPointerException();
1262     CompletableFuture<Void> f = new CompletableFuture<Void>();
1263     ForkJoinPool.commonPool().
1264     execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1265     return f;
1266     }
1267    
1268     /**
1269     * Asynchronously executes using the given executor, a task that
1270     * runs the given action, and then completes the returned
1271     * CompletableFuture.
1272     *
1273     * @param runnable the action to run before completing the
1274     * returned CompletableFuture
1275     * @param executor the executor to use for asynchronous execution
1276     * @return the CompletableFuture
1277     */
1278     public static CompletableFuture<Void> runAsync(Runnable runnable,
1279     Executor executor) {
1280     if (executor == null || runnable == null)
1281     throw new NullPointerException();
1282     CompletableFuture<Void> f = new CompletableFuture<Void>();
1283     executor.execute(new AsyncRun(runnable, f));
1284     return f;
1285     }
1286    
1287     /**
1288     * Returns {@code true} if completed in any fashion: normally,
1289     * exceptionally, or via cancellation.
1290     *
1291     * @return {@code true} if completed
1292     */
1293     public boolean isDone() {
1294     return result != null;
1295     }
1296    
1297     /**
1298     * Waits if necessary for the computation to complete, and then
1299     * retrieves its result.
1300     *
1301     * @return the computed result
1302     * @throws CancellationException if the computation was cancelled
1303     * @throws ExecutionException if the computation threw an
1304     * exception
1305     * @throws InterruptedException if the current thread was interrupted
1306     * while waiting
1307     */
1308     public T get() throws InterruptedException, ExecutionException {
1309     Object r; Throwable ex, cause;
1310     if ((r = result) == null && (r = waitingGet(true)) == null)
1311     throw new InterruptedException();
1312     if (r instanceof AltResult) {
1313     if ((ex = ((AltResult)r).ex) != null) {
1314     if (ex instanceof CancellationException)
1315     throw (CancellationException)ex;
1316     if ((ex instanceof CompletionException) &&
1317     (cause = ex.getCause()) != null)
1318     ex = cause;
1319     throw new ExecutionException(ex);
1320     }
1321     return null;
1322     }
1323     @SuppressWarnings("unchecked") T tr = (T) r;
1324     return tr;
1325     }
1326    
1327     /**
1328     * Waits if necessary for at most the given time for completion,
1329     * and then retrieves its result, if available.
1330     *
1331     * @param timeout the maximum time to wait
1332     * @param unit the time unit of the timeout argument
1333     * @return the computed result
1334     * @throws CancellationException if the computation was cancelled
1335     * @throws ExecutionException if the computation threw an
1336     * exception
1337     * @throws InterruptedException if the current thread was interrupted
1338     * while waiting
1339     * @throws TimeoutException if the wait timed out
1340     */
1341     public T get(long timeout, TimeUnit unit)
1342     throws InterruptedException, ExecutionException, TimeoutException {
1343     Object r; Throwable ex, cause;
1344     long nanos = unit.toNanos(timeout);
1345     if (Thread.interrupted())
1346     throw new InterruptedException();
1347     if ((r = result) == null)
1348     r = timedAwaitDone(nanos);
1349     if (r instanceof AltResult) {
1350     if ((ex = ((AltResult)r).ex) != null) {
1351     if (ex instanceof CancellationException)
1352     throw (CancellationException)ex;
1353     if ((ex instanceof CompletionException) &&
1354     (cause = ex.getCause()) != null)
1355     ex = cause;
1356     throw new ExecutionException(ex);
1357     }
1358     return null;
1359     }
1360     @SuppressWarnings("unchecked") T tr = (T) r;
1361     return tr;
1362     }
1363    
1364     /**
1365     * Returns the result value when complete, or throws an
1366     * (unchecked) exception if completed exceptionally. To better
1367     * conform with the use of common functional forms, if a
1368     * computation involved in the completion of this
1369     * CompletableFuture threw an exception, this method throws an
1370     * (unchecked) {@link CompletionException} with the underlying
1371     * exception as its cause.
1372     *
1373     * @return the result value
1374     * @throws CancellationException if the computation was cancelled
1375     * @throws CompletionException if a completion computation threw
1376     * an exception
1377     */
1378     public T join() {
1379     Object r; Throwable ex;
1380     if ((r = result) == null)
1381     r = waitingGet(false);
1382     if (r instanceof AltResult) {
1383     if ((ex = ((AltResult)r).ex) != null) {
1384     if (ex instanceof CancellationException)
1385     throw (CancellationException)ex;
1386     if (ex instanceof CompletionException)
1387     throw (CompletionException)ex;
1388     throw new CompletionException(ex);
1389     }
1390     return null;
1391     }
1392     @SuppressWarnings("unchecked") T tr = (T) r;
1393     return tr;
1394     }
1395    
1396     /**
1397     * Returns the result value (or throws any encountered exception)
1398     * if completed, else returns the given valueIfAbsent.
1399     *
1400     * @param valueIfAbsent the value to return if not completed
1401     * @return the result value, if completed, else the given valueIfAbsent
1402     * @throws CancellationException if the computation was cancelled
1403     * @throws CompletionException if a completion computation threw
1404     * an exception
1405     */
1406     public T getNow(T valueIfAbsent) {
1407     Object r; Throwable ex;
1408     if ((r = result) == null)
1409     return valueIfAbsent;
1410     if (r instanceof AltResult) {
1411     if ((ex = ((AltResult)r).ex) != null) {
1412     if (ex instanceof CancellationException)
1413     throw (CancellationException)ex;
1414     if (ex instanceof CompletionException)
1415     throw (CompletionException)ex;
1416     throw new CompletionException(ex);
1417 dl 1.17 }
1418 dl 1.28 return null;
1419 dl 1.17 }
1420 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1421     return tr;
1422     }
1423    
1424     /**
1425     * If not already completed, sets the value returned by {@link
1426     * #get()} and related methods to the given value.
1427     *
1428     * @param value the result value
1429     * @return {@code true} if this invocation caused this CompletableFuture
1430     * to transition to a completed state, else {@code false}
1431     */
1432     public boolean complete(T value) {
1433     boolean triggered = result == null &&
1434     UNSAFE.compareAndSwapObject(this, RESULT, null,
1435     value == null ? NIL : value);
1436     postComplete();
1437     return triggered;
1438     }
1439    
1440     /**
1441     * If not already completed, causes invocations of {@link #get()}
1442     * and related methods to throw the given exception.
1443     *
1444     * @param ex the exception
1445     * @return {@code true} if this invocation caused this CompletableFuture
1446     * to transition to a completed state, else {@code false}
1447     */
1448     public boolean completeExceptionally(Throwable ex) {
1449     if (ex == null) throw new NullPointerException();
1450     boolean triggered = result == null &&
1451     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1452     postComplete();
1453     return triggered;
1454     }
1455    
1456     /**
1457     * Creates and returns a CompletableFuture that is completed with
1458     * the result of the given function of this CompletableFuture.
1459     * If this CompletableFuture completes exceptionally,
1460     * then the returned CompletableFuture also does so,
1461     * with a CompletionException holding this exception as
1462     * its cause.
1463     *
1464     * @param fn the function to use to compute the value of
1465     * the returned CompletableFuture
1466     * @return the new CompletableFuture
1467     */
1468     public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1469     return doThenApply(fn, null);
1470     }
1471    
1472     /**
1473     * Creates and returns a CompletableFuture that is asynchronously
1474     * completed using the {@link ForkJoinPool#commonPool()} with the
1475     * result of the given function of this CompletableFuture. If
1476     * this CompletableFuture completes exceptionally, then the
1477     * returned CompletableFuture also does so, with a
1478     * CompletionException holding this exception as its cause.
1479     *
1480     * @param fn the function to use to compute the value of
1481     * the returned CompletableFuture
1482     * @return the new CompletableFuture
1483     */
1484     public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {
1485     return doThenApply(fn, ForkJoinPool.commonPool());
1486 dl 1.17 }
1487    
1488 dl 1.28 /**
1489     * Creates and returns a CompletableFuture that is asynchronously
1490     * completed using the given executor with the result of the given
1491     * function of this CompletableFuture. If this CompletableFuture
1492     * completes exceptionally, then the returned CompletableFuture
1493     * also does so, with a CompletionException holding this exception as
1494     * its cause.
1495     *
1496     * @param fn the function to use to compute the value of
1497     * the returned CompletableFuture
1498     * @param executor the executor to use for asynchronous execution
1499     * @return the new CompletableFuture
1500     */
1501     public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,
1502     Executor executor) {
1503     if (executor == null) throw new NullPointerException();
1504     return doThenApply(fn, executor);
1505     }
1506 dl 1.1
1507 dl 1.28 private <U> CompletableFuture<U> doThenApply(Function<? super T,? extends U> fn,
1508     Executor e) {
1509 dl 1.1 if (fn == null) throw new NullPointerException();
1510     CompletableFuture<U> dst = new CompletableFuture<U>();
1511 dl 1.28 ApplyCompletion<T,U> d = null;
1512 jsr166 1.2 Object r;
1513     if ((r = result) == null) {
1514 dl 1.1 CompletionNode p = new CompletionNode
1515 dl 1.28 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1516 dl 1.1 while ((r = result) == null) {
1517     if (UNSAFE.compareAndSwapObject
1518     (this, COMPLETIONS, p.next = completions, p))
1519     break;
1520     }
1521     }
1522 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1523 dl 1.19 T t; Throwable ex;
1524 jsr166 1.2 if (r instanceof AltResult) {
1525 dl 1.19 ex = ((AltResult)r).ex;
1526 jsr166 1.2 t = null;
1527 dl 1.1 }
1528 dl 1.19 else {
1529     ex = null;
1530 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1531     t = tr;
1532 dl 1.19 }
1533 dl 1.20 U u = null;
1534 jsr166 1.2 if (ex == null) {
1535 dl 1.1 try {
1536 dl 1.20 if (e != null)
1537 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
1538 dl 1.1 else
1539 dl 1.20 u = fn.apply(t);
1540 dl 1.1 } catch (Throwable rex) {
1541 dl 1.19 ex = rex;
1542 dl 1.1 }
1543     }
1544 dl 1.20 if (e == null || ex != null)
1545     dst.internalComplete(u, ex);
1546 dl 1.17 }
1547 dl 1.20 helpPostComplete();
1548 dl 1.1 return dst;
1549     }
1550    
1551 dl 1.28 /**
1552     * Creates and returns a CompletableFuture that is completed after
1553     * performing the given action with this CompletableFuture's
1554     * result when it completes. If this CompletableFuture
1555     * completes exceptionally, then the returned CompletableFuture
1556     * also does so, with a CompletionException holding this exception as
1557     * its cause.
1558     *
1559     * @param block the action to perform before completing the
1560     * returned CompletableFuture
1561     * @return the new CompletableFuture
1562     */
1563     public CompletableFuture<Void> thenAccept(Block<? super T> block) {
1564     return doThenAccept(block, null);
1565     }
1566    
1567     /**
1568     * Creates and returns a CompletableFuture that is asynchronously
1569     * completed using the {@link ForkJoinPool#commonPool()} with this
1570     * CompletableFuture's result when it completes. If this
1571     * CompletableFuture completes exceptionally, then the returned
1572     * CompletableFuture also does so, with a CompletionException holding
1573     * this exception as its cause.
1574     *
1575     * @param block the action to perform before completing the
1576     * returned CompletableFuture
1577     * @return the new CompletableFuture
1578     */
1579     public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block) {
1580     return doThenAccept(block, ForkJoinPool.commonPool());
1581     }
1582    
1583     /**
1584     * Creates and returns a CompletableFuture that is asynchronously
1585     * completed using the given executor with this
1586     * CompletableFuture's result when it completes. If this
1587     * CompletableFuture completes exceptionally, then the returned
1588     * CompletableFuture also does so, with a CompletionException holding
1589     * this exception as its cause.
1590     *
1591     * @param block the action to perform before completing the
1592     * returned CompletableFuture
1593     * @param executor the executor to use for asynchronous execution
1594     * @return the new CompletableFuture
1595     */
1596     public CompletableFuture<Void> thenAcceptAsync(Block<? super T> block,
1597     Executor executor) {
1598     if (executor == null) throw new NullPointerException();
1599     return doThenAccept(block, executor);
1600     }
1601    
1602     private CompletableFuture<Void> doThenAccept(Block<? super T> fn,
1603     Executor e) {
1604 dl 1.7 if (fn == null) throw new NullPointerException();
1605     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1606 dl 1.28 AcceptCompletion<T> d = null;
1607 dl 1.7 Object r;
1608     if ((r = result) == null) {
1609     CompletionNode p = new CompletionNode
1610 dl 1.28 (d = new AcceptCompletion<T>(this, fn, dst, e));
1611 dl 1.7 while ((r = result) == null) {
1612     if (UNSAFE.compareAndSwapObject
1613     (this, COMPLETIONS, p.next = completions, p))
1614     break;
1615     }
1616     }
1617     if (r != null && (d == null || d.compareAndSet(0, 1))) {
1618 dl 1.19 T t; Throwable ex;
1619 dl 1.7 if (r instanceof AltResult) {
1620 dl 1.19 ex = ((AltResult)r).ex;
1621 dl 1.7 t = null;
1622     }
1623 dl 1.19 else {
1624     ex = null;
1625 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1626     t = tr;
1627 dl 1.19 }
1628 dl 1.7 if (ex == null) {
1629     try {
1630 dl 1.20 if (e != null)
1631 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
1632 dl 1.20 else
1633 dl 1.7 fn.accept(t);
1634     } catch (Throwable rex) {
1635 dl 1.19 ex = rex;
1636 dl 1.7 }
1637     }
1638 dl 1.20 if (e == null || ex != null)
1639     dst.internalComplete(null, ex);
1640 dl 1.17 }
1641 dl 1.20 helpPostComplete();
1642 dl 1.7 return dst;
1643     }
1644    
1645 dl 1.28 /**
1646     * Creates and returns a CompletableFuture that is completed after
1647     * performing the given action when this CompletableFuture
1648     * completes. If this CompletableFuture completes exceptionally,
1649     * then the returned CompletableFuture also does so, with a
1650     * CompletionException holding this exception as its cause.
1651     *
1652     * @param action the action to perform before completing the
1653     * returned CompletableFuture
1654     * @return the new CompletableFuture
1655     */
1656     public CompletableFuture<Void> thenRun(Runnable action) {
1657     return doThenRun(action, null);
1658     }
1659    
1660     /**
1661     * Creates and returns a CompletableFuture that is asynchronously
1662     * completed using the {@link ForkJoinPool#commonPool()} after
1663     * performing the given action when this CompletableFuture
1664     * completes. If this CompletableFuture completes exceptionally,
1665     * then the returned CompletableFuture also does so, with a
1666     * CompletionException holding this exception as its cause.
1667     *
1668     * @param action the action to perform before completing the
1669     * returned CompletableFuture
1670     * @return the new CompletableFuture
1671     */
1672     public CompletableFuture<Void> thenRunAsync(Runnable action) {
1673     return doThenRun(action, ForkJoinPool.commonPool());
1674     }
1675    
1676     /**
1677     * Creates and returns a CompletableFuture that is asynchronously
1678     * completed using the given executor after performing the given
1679     * action when this CompletableFuture completes. If this
1680     * CompletableFuture completes exceptionally, then the returned
1681     * CompletableFuture also does so, with a CompletionException holding
1682     * this exception as its cause.
1683     *
1684     * @param action the action to perform before completing the
1685     * returned CompletableFuture
1686     * @param executor the executor to use for asynchronous execution
1687     * @return the new CompletableFuture
1688     */
1689     public CompletableFuture<Void> thenRunAsync(Runnable action,
1690     Executor executor) {
1691     if (executor == null) throw new NullPointerException();
1692     return doThenRun(action, executor);
1693     }
1694    
1695     private CompletableFuture<Void> doThenRun(Runnable action,
1696     Executor e) {
1697 dl 1.1 if (action == null) throw new NullPointerException();
1698     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1699 dl 1.28 RunCompletion<T> d = null;
1700 jsr166 1.2 Object r;
1701     if ((r = result) == null) {
1702 dl 1.1 CompletionNode p = new CompletionNode
1703 dl 1.28 (d = new RunCompletion<T>(this, action, dst, e));
1704 dl 1.1 while ((r = result) == null) {
1705     if (UNSAFE.compareAndSwapObject
1706     (this, COMPLETIONS, p.next = completions, p))
1707     break;
1708     }
1709     }
1710 jsr166 1.2 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1711 dl 1.19 Throwable ex;
1712     if (r instanceof AltResult)
1713 dl 1.28 ex = ((AltResult)r).ex;
1714     else
1715     ex = null;
1716     if (ex == null) {
1717     try {
1718     if (e != null)
1719     e.execute(new AsyncRun(action, dst));
1720     else
1721     action.run();
1722     } catch (Throwable rex) {
1723     ex = rex;
1724     }
1725     }
1726     if (e == null || ex != null)
1727     dst.internalComplete(null, ex);
1728     }
1729     helpPostComplete();
1730     return dst;
1731     }
1732    
1733     /**
1734     * Creates and returns a CompletableFuture that is completed with
1735     * the result of the given function of this and the other given
1736     * CompletableFuture's results when both complete. If this or
1737     * the other CompletableFuture complete exceptionally, then the
1738     * returned CompletableFuture also does so, with a
1739     * CompletionException holding the exception as its cause.
1740     *
1741     * @param other the other CompletableFuture
1742     * @param fn the function to use to compute the value of
1743     * the returned CompletableFuture
1744     * @return the new CompletableFuture
1745     */
1746     public <U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other,
1747     BiFunction<? super T,? super U,? extends V> fn) {
1748     return doThenBiApply(other, fn, null);
1749     }
1750    
1751     /**
1752     * Creates and returns a CompletableFuture that is asynchronously
1753     * completed using the {@link ForkJoinPool#commonPool()} with
1754     * the result of the given function of this and the other given
1755     * CompletableFuture's results when both complete. If this or
1756     * the other CompletableFuture complete exceptionally, then the
1757     * returned CompletableFuture also does so, with a
1758     * CompletionException holding the exception as its cause.
1759     *
1760     * @param other the other CompletableFuture
1761     * @param fn the function to use to compute the value of
1762     * the returned CompletableFuture
1763     * @return the new CompletableFuture
1764     */
1765     public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1766     BiFunction<? super T,? super U,? extends V> fn) {
1767     return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1768     }
1769    
1770     /**
1771     * Creates and returns a CompletableFuture that is
1772     * asynchronously completed using the given executor with the
1773     * result of the given function of this and the other given
1774     * CompletableFuture's results when both complete. If this or
1775     * the other CompletableFuture complete exceptionally, then the
1776     * returned CompletableFuture also does so, with a
1777     * CompletionException holding the exception as its cause.
1778     *
1779     * @param other the other CompletableFuture
1780     * @param fn the function to use to compute the value of
1781     * the returned CompletableFuture
1782     * @param executor the executor to use for asynchronous execution
1783     * @return the new CompletableFuture
1784     */
1785    
1786     public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,
1787     BiFunction<? super T,? super U,? extends V> fn,
1788     Executor executor) {
1789     if (executor == null) throw new NullPointerException();
1790     return doThenBiApply(other, fn, executor);
1791 dl 1.1 }
1792    
1793 dl 1.28 private <U,V> CompletableFuture<V> doThenBiApply(CompletableFuture<? extends U> other,
1794     BiFunction<? super T,? super U,? extends V> fn,
1795     Executor e) {
1796 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
1797 jsr166 1.2 CompletableFuture<V> dst = new CompletableFuture<V>();
1798 dl 1.28 BiApplyCompletion<T,U,V> d = null;
1799 jsr166 1.2 Object r, s = null;
1800     if ((r = result) == null || (s = other.result) == null) {
1801 dl 1.28 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1802 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
1803     while ((r == null && (r = result) == null) ||
1804     (s == null && (s = other.result) == null)) {
1805     if (q != null) {
1806     if (s != null ||
1807     UNSAFE.compareAndSwapObject
1808     (other, COMPLETIONS, q.next = other.completions, q))
1809     break;
1810     }
1811     else if (r != null ||
1812     UNSAFE.compareAndSwapObject
1813     (this, COMPLETIONS, p.next = completions, p)) {
1814     if (s != null)
1815     break;
1816     q = new CompletionNode(d);
1817     }
1818     }
1819     }
1820 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1821 dl 1.19 T t; U u; Throwable ex;
1822 jsr166 1.2 if (r instanceof AltResult) {
1823 dl 1.19 ex = ((AltResult)r).ex;
1824 jsr166 1.2 t = null;
1825 dl 1.1 }
1826 dl 1.19 else {
1827     ex = null;
1828 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1829     t = tr;
1830 dl 1.19 }
1831 dl 1.1 if (ex != null)
1832     u = null;
1833     else if (s instanceof AltResult) {
1834 dl 1.19 ex = ((AltResult)s).ex;
1835 dl 1.1 u = null;
1836     }
1837 dl 1.28 else {
1838     @SuppressWarnings("unchecked") U us = (U) s;
1839     u = us;
1840     }
1841 dl 1.20 V v = null;
1842 jsr166 1.2 if (ex == null) {
1843 dl 1.1 try {
1844 dl 1.20 if (e != null)
1845 dl 1.28 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1846 dl 1.1 else
1847 dl 1.20 v = fn.apply(t, u);
1848 dl 1.1 } catch (Throwable rex) {
1849 dl 1.19 ex = rex;
1850 dl 1.1 }
1851     }
1852 dl 1.20 if (e == null || ex != null)
1853     dst.internalComplete(v, ex);
1854 jsr166 1.2 }
1855 dl 1.20 helpPostComplete();
1856     other.helpPostComplete();
1857 jsr166 1.2 return dst;
1858 dl 1.1 }
1859    
1860 dl 1.28 /**
1861     * Creates and returns a CompletableFuture that is completed with
1862     * the results of this and the other given CompletableFuture if
1863     * both complete. If this and/or the other CompletableFuture
1864     * complete exceptionally, then the returned CompletableFuture
1865     * also does so, with a CompletionException holding one of these
1866     * exceptions as its cause.
1867     *
1868     * @param other the other CompletableFuture
1869     * @param block the action to perform before completing the
1870     * returned CompletableFuture
1871     * @return the new CompletableFuture
1872     */
1873     public <U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other,
1874     BiBlock<? super T, ? super U> block) {
1875     return doThenBiAccept(other, block, null);
1876     }
1877    
1878     /**
1879     * Creates and returns a CompletableFuture that is completed
1880     * asynchronously using the {@link ForkJoinPool#commonPool()} with
1881     * the results of this and the other given CompletableFuture when
1882     * both complete. If this and/or the other CompletableFuture
1883     * complete exceptionally, then the returned CompletableFuture
1884     * also does so, with a CompletionException holding one of these
1885     * exceptions as its cause.
1886     *
1887     * @param other the other CompletableFuture
1888     * @param block the action to perform before completing the
1889     * returned CompletableFuture
1890     * @return the new CompletableFuture
1891     */
1892     public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1893     BiBlock<? super T, ? super U> block) {
1894     return doThenBiAccept(other, block, ForkJoinPool.commonPool());
1895     }
1896    
1897     /**
1898     * Creates and returns a CompletableFuture that is completed
1899     * asynchronously using the given executor with the results of
1900     * this and the other given CompletableFuture when both complete.
1901     * If this and/or the other CompletableFuture complete
1902     * exceptionally, then the returned CompletableFuture also does
1903     * so, with a CompletionException holding one of these exceptions as
1904     * its cause.
1905     *
1906     * @param other the other CompletableFuture
1907     * @param block the action to perform before completing the
1908     * returned CompletableFuture
1909     * @param executor the executor to use for asynchronous execution
1910     * @return the new CompletableFuture
1911     */
1912     public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletableFuture<? extends U> other,
1913     BiBlock<? super T, ? super U> block,
1914     Executor executor) {
1915     if (executor == null) throw new NullPointerException();
1916     return doThenBiAccept(other, block, executor);
1917     }
1918    
1919     private <U> CompletableFuture<Void> doThenBiAccept(CompletableFuture<? extends U> other,
1920     BiBlock<? super T,? super U> fn,
1921     Executor e) {
1922 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
1923     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1924 dl 1.28 BiAcceptCompletion<T,U> d = null;
1925 dl 1.7 Object r, s = null;
1926     if ((r = result) == null || (s = other.result) == null) {
1927 dl 1.28 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
1928 dl 1.7 CompletionNode q = null, p = new CompletionNode(d);
1929     while ((r == null && (r = result) == null) ||
1930     (s == null && (s = other.result) == null)) {
1931     if (q != null) {
1932     if (s != null ||
1933     UNSAFE.compareAndSwapObject
1934     (other, COMPLETIONS, q.next = other.completions, q))
1935     break;
1936     }
1937     else if (r != null ||
1938     UNSAFE.compareAndSwapObject
1939     (this, COMPLETIONS, p.next = completions, p)) {
1940     if (s != null)
1941     break;
1942     q = new CompletionNode(d);
1943     }
1944     }
1945     }
1946     if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1947 dl 1.19 T t; U u; Throwable ex;
1948 dl 1.7 if (r instanceof AltResult) {
1949 dl 1.19 ex = ((AltResult)r).ex;
1950 dl 1.7 t = null;
1951     }
1952 dl 1.19 else {
1953     ex = null;
1954 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1955     t = tr;
1956 dl 1.19 }
1957 dl 1.7 if (ex != null)
1958     u = null;
1959     else if (s instanceof AltResult) {
1960 dl 1.19 ex = ((AltResult)s).ex;
1961 dl 1.7 u = null;
1962     }
1963 dl 1.28 else {
1964     @SuppressWarnings("unchecked") U us = (U) s;
1965     u = us;
1966     }
1967 dl 1.7 if (ex == null) {
1968     try {
1969 dl 1.20 if (e != null)
1970 dl 1.28 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
1971 dl 1.20 else
1972 dl 1.7 fn.accept(t, u);
1973     } catch (Throwable rex) {
1974 dl 1.19 ex = rex;
1975 dl 1.7 }
1976     }
1977 dl 1.20 if (e == null || ex != null)
1978     dst.internalComplete(null, ex);
1979 dl 1.7 }
1980 dl 1.20 helpPostComplete();
1981     other.helpPostComplete();
1982 dl 1.7 return dst;
1983     }
1984    
1985 dl 1.28 /**
1986     * Creates and returns a CompletableFuture that is completed
1987     * when this and the other given CompletableFuture both
1988     * complete. If this and/or the other CompletableFuture complete
1989     * exceptionally, then the returned CompletableFuture also does
1990     * so, with a CompletionException holding one of these exceptions as
1991     * its cause.
1992     *
1993     * @param other the other CompletableFuture
1994     * @param action the action to perform before completing the
1995     * returned CompletableFuture
1996     * @return the new CompletableFuture
1997     */
1998     public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
1999     Runnable action) {
2000     return doThenBiRun(other, action, null);
2001     }
2002    
2003     /**
2004     * Creates and returns a CompletableFuture that is completed
2005     * asynchronously using the {@link ForkJoinPool#commonPool()}
2006     * when this and the other given CompletableFuture both
2007     * complete. If this and/or the other CompletableFuture complete
2008     * exceptionally, then the returned CompletableFuture also does
2009     * so, with a CompletionException holding one of these exceptions as
2010     * its cause.
2011     *
2012     * @param other the other CompletableFuture
2013     * @param action the action to perform before completing the
2014     * returned CompletableFuture
2015     * @return the new CompletableFuture
2016     */
2017     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2018     Runnable action) {
2019     return doThenBiRun(other, action, ForkJoinPool.commonPool());
2020     }
2021    
2022     /**
2023     * Creates and returns a CompletableFuture that is completed
2024     * asynchronously using the given executor
2025     * when this and the other given CompletableFuture both
2026     * complete. If this and/or the other CompletableFuture complete
2027     * exceptionally, then the returned CompletableFuture also does
2028     * so, with a CompletionException holding one of these exceptions as
2029     * its cause.
2030     *
2031     * @param other the other CompletableFuture
2032     * @param action the action to perform before completing the
2033     * returned CompletableFuture
2034     * @param executor the executor to use for asynchronous execution
2035     * @return the new CompletableFuture
2036     */
2037     public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2038     Runnable action,
2039     Executor executor) {
2040     if (executor == null) throw new NullPointerException();
2041     return doThenBiRun(other, action, executor);
2042     }
2043    
2044     private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2045     Runnable action,
2046     Executor e) {
2047 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2048 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2049 dl 1.28 BiRunCompletion<T> d = null;
2050 jsr166 1.2 Object r, s = null;
2051     if ((r = result) == null || (s = other.result) == null) {
2052 dl 1.28 d = new BiRunCompletion<T>(this, other, action, dst, e);
2053 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2054     while ((r == null && (r = result) == null) ||
2055     (s == null && (s = other.result) == null)) {
2056     if (q != null) {
2057     if (s != null ||
2058     UNSAFE.compareAndSwapObject
2059     (other, COMPLETIONS, q.next = other.completions, q))
2060     break;
2061     }
2062     else if (r != null ||
2063     UNSAFE.compareAndSwapObject
2064     (this, COMPLETIONS, p.next = completions, p)) {
2065     if (s != null)
2066     break;
2067     q = new CompletionNode(d);
2068     }
2069     }
2070     }
2071 jsr166 1.2 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2072 dl 1.19 Throwable ex;
2073     if (r instanceof AltResult)
2074     ex = ((AltResult)r).ex;
2075     else
2076     ex = null;
2077     if (ex == null && (s instanceof AltResult))
2078     ex = ((AltResult)s).ex;
2079     if (ex == null) {
2080 dl 1.1 try {
2081 dl 1.20 if (e != null)
2082 dl 1.28 e.execute(new AsyncRun(action, dst));
2083 dl 1.20 else
2084 dl 1.1 action.run();
2085     } catch (Throwable rex) {
2086 dl 1.19 ex = rex;
2087 dl 1.1 }
2088     }
2089 dl 1.20 if (e == null || ex != null)
2090     dst.internalComplete(null, ex);
2091 jsr166 1.2 }
2092 dl 1.20 helpPostComplete();
2093     other.helpPostComplete();
2094 jsr166 1.2 return dst;
2095 dl 1.1 }
2096    
2097 dl 1.28 /**
2098     * Creates and returns a CompletableFuture that is completed with
2099     * the result of the given function of either this or the other
2100     * given CompletableFuture's results when either complete. If
2101     * this and/or the other CompletableFuture complete exceptionally,
2102     * then the returned CompletableFuture may also do so, with a
2103     * CompletionException holding one of these exceptions as its cause.
2104     * No guarantees are made about which result or exception is used
2105     * in the returned CompletableFuture.
2106     *
2107     * @param other the other CompletableFuture
2108     * @param fn the function to use to compute the value of
2109     * the returned CompletableFuture
2110     * @return the new CompletableFuture
2111     */
2112     public <U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other,
2113     Function<? super T, U> fn) {
2114     return doOrApply(other, fn, null);
2115     }
2116    
2117     /**
2118     * Creates and returns a CompletableFuture that is completed
2119     * asynchronously using the {@link ForkJoinPool#commonPool()} with
2120     * the result of the given function of either this or the other
2121     * given CompletableFuture's results when either complete. If
2122     * this and/or the other CompletableFuture complete exceptionally,
2123     * then the returned CompletableFuture may also do so, with a
2124     * CompletionException holding one of these exceptions as its cause.
2125     * No guarantees are made about which result or exception is used
2126     * in the returned CompletableFuture.
2127     *
2128     * @param other the other CompletableFuture
2129     * @param fn the function to use to compute the value of
2130     * the returned CompletableFuture
2131     * @return the new CompletableFuture
2132     */
2133     public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2134     Function<? super T, U> fn) {
2135     return doOrApply(other, fn, ForkJoinPool.commonPool());
2136     }
2137    
2138     /**
2139     * Creates and returns a CompletableFuture that is completed
2140     * asynchronously using the given executor with the result of the
2141     * given function of either this or the other given
2142     * CompletableFuture's results when either complete. If this
2143     * and/or the other CompletableFuture complete exceptionally, then
2144     * the returned CompletableFuture may also do so, with a
2145     * CompletionException holding one of these exceptions as its cause.
2146     * No guarantees are made about which result or exception is used
2147     * in the returned CompletableFuture.
2148     *
2149     * @param other the other CompletableFuture
2150     * @param fn the function to use to compute the value of
2151     * the returned CompletableFuture
2152     * @param executor the executor to use for asynchronous execution
2153     * @return the new CompletableFuture
2154     */
2155     public <U> CompletableFuture<U> applyToEitherAsync(CompletableFuture<? extends T> other,
2156     Function<? super T, U> fn,
2157     Executor executor) {
2158     if (executor == null) throw new NullPointerException();
2159     return doOrApply(other, fn, executor);
2160     }
2161    
2162     private <U> CompletableFuture<U> doOrApply(CompletableFuture<? extends T> other,
2163     Function<? super T, U> fn,
2164     Executor e) {
2165 dl 1.1 if (other == null || fn == null) throw new NullPointerException();
2166 jsr166 1.2 CompletableFuture<U> dst = new CompletableFuture<U>();
2167 dl 1.28 OrApplyCompletion<T,U> d = null;
2168 jsr166 1.2 Object r;
2169     if ((r = result) == null && (r = other.result) == null) {
2170 dl 1.28 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2171 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2172     while ((r = result) == null && (r = other.result) == null) {
2173     if (q != null) {
2174     if (UNSAFE.compareAndSwapObject
2175     (other, COMPLETIONS, q.next = other.completions, q))
2176     break;
2177     }
2178     else if (UNSAFE.compareAndSwapObject
2179     (this, COMPLETIONS, p.next = completions, p))
2180     q = new CompletionNode(d);
2181     }
2182 jsr166 1.2 }
2183     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2184 dl 1.19 T t; Throwable ex;
2185 jsr166 1.2 if (r instanceof AltResult) {
2186 dl 1.19 ex = ((AltResult)r).ex;
2187 jsr166 1.2 t = null;
2188 dl 1.1 }
2189 dl 1.19 else {
2190     ex = null;
2191 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2192     t = tr;
2193 dl 1.19 }
2194 dl 1.20 U u = null;
2195 jsr166 1.2 if (ex == null) {
2196 dl 1.1 try {
2197 dl 1.20 if (e != null)
2198 dl 1.28 e.execute(new AsyncApply<T,U>(t, fn, dst));
2199 dl 1.1 else
2200 dl 1.20 u = fn.apply(t);
2201 dl 1.1 } catch (Throwable rex) {
2202 dl 1.19 ex = rex;
2203 dl 1.1 }
2204     }
2205 dl 1.20 if (e == null || ex != null)
2206     dst.internalComplete(u, ex);
2207 jsr166 1.2 }
2208 dl 1.20 helpPostComplete();
2209     other.helpPostComplete();
2210 jsr166 1.2 return dst;
2211 dl 1.1 }
2212    
2213 dl 1.28 /**
2214     * Creates and returns a CompletableFuture that is completed after
2215     * performing the given action with the result of either this or the
2216     * other given CompletableFuture's result, when either complete.
2217     * If this and/or the other CompletableFuture complete
2218     * exceptionally, then the returned CompletableFuture may also do
2219     * so, with a CompletionException holding one of these exceptions as
2220     * its cause. No guarantees are made about which exception is
2221     * used in the returned CompletableFuture.
2222     *
2223     * @param other the other CompletableFuture
2224     * @param block the action to perform before completing the
2225     * returned CompletableFuture
2226     * @return the new CompletableFuture
2227     */
2228     public CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other,
2229     Block<? super T> block) {
2230     return doOrAccept(other, block, null);
2231     }
2232    
2233     /**
2234     * Creates and returns a CompletableFuture that is completed
2235     * asynchronously using the {@link ForkJoinPool#commonPool()},
2236     * performing the given action with the result of either this or
2237     * the other given CompletableFuture's result, when either
2238     * complete. If this and/or the other CompletableFuture complete
2239     * exceptionally, then the returned CompletableFuture may also do
2240     * so, with a CompletionException holding one of these exceptions as
2241     * its cause. No guarantees are made about which exception is
2242     * used in the returned CompletableFuture.
2243     *
2244     * @param other the other CompletableFuture
2245     * @param block the action to perform before completing the
2246     * returned CompletableFuture
2247     * @return the new CompletableFuture
2248     */
2249     public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2250     Block<? super T> block) {
2251     return doOrAccept(other, block, ForkJoinPool.commonPool());
2252     }
2253    
2254     /**
2255     * Creates and returns a CompletableFuture that is completed
2256     * asynchronously using the given executor,
2257     * performing the given action with the result of either this or
2258     * the other given CompletableFuture's result, when either
2259     * complete. If this and/or the other CompletableFuture complete
2260     * exceptionally, then the returned CompletableFuture may also do
2261     * so, with a CompletionException holding one of these exceptions as
2262     * its cause. No guarantees are made about which exception is
2263     * used in the returned CompletableFuture.
2264     *
2265     * @param other the other CompletableFuture
2266     * @param block the action to perform before completing the
2267     * returned CompletableFuture
2268     * @param executor the executor to use for asynchronous execution
2269     * @return the new CompletableFuture
2270     */
2271     public CompletableFuture<Void> acceptEitherAsync(CompletableFuture<? extends T> other,
2272     Block<? super T> block,
2273     Executor executor) {
2274     if (executor == null) throw new NullPointerException();
2275     return doOrAccept(other, block, executor);
2276     }
2277    
2278     private CompletableFuture<Void> doOrAccept(CompletableFuture<? extends T> other,
2279     Block<? super T> fn,
2280     Executor e) {
2281 dl 1.7 if (other == null || fn == null) throw new NullPointerException();
2282     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2283 dl 1.28 OrAcceptCompletion<T> d = null;
2284 dl 1.7 Object r;
2285     if ((r = result) == null && (r = other.result) == null) {
2286 dl 1.28 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2287 dl 1.7 CompletionNode q = null, p = new CompletionNode(d);
2288     while ((r = result) == null && (r = other.result) == null) {
2289     if (q != null) {
2290     if (UNSAFE.compareAndSwapObject
2291     (other, COMPLETIONS, q.next = other.completions, q))
2292     break;
2293     }
2294     else if (UNSAFE.compareAndSwapObject
2295     (this, COMPLETIONS, p.next = completions, p))
2296     q = new CompletionNode(d);
2297     }
2298     }
2299     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2300 dl 1.19 T t; Throwable ex;
2301 dl 1.7 if (r instanceof AltResult) {
2302 dl 1.19 ex = ((AltResult)r).ex;
2303 dl 1.7 t = null;
2304     }
2305 dl 1.19 else {
2306     ex = null;
2307 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
2308     t = tr;
2309 dl 1.19 }
2310 dl 1.7 if (ex == null) {
2311     try {
2312 dl 1.20 if (e != null)
2313 dl 1.28 e.execute(new AsyncAccept<T>(t, fn, dst));
2314 dl 1.20 else
2315 dl 1.7 fn.accept(t);
2316     } catch (Throwable rex) {
2317 dl 1.19 ex = rex;
2318 dl 1.7 }
2319     }
2320 dl 1.20 if (e == null || ex != null)
2321     dst.internalComplete(null, ex);
2322 dl 1.7 }
2323 dl 1.20 helpPostComplete();
2324     other.helpPostComplete();
2325 dl 1.7 return dst;
2326     }
2327    
2328 dl 1.28 /**
2329     * Creates and returns a CompletableFuture that is completed
2330     * after this or the other given CompletableFuture complete. If
2331     * this and/or the other CompletableFuture complete exceptionally,
2332     * then the returned CompletableFuture may also do so, with a
2333     * CompletionException holding one of these exceptions as its cause.
2334     * No guarantees are made about which exception is used in the
2335     * returned CompletableFuture.
2336     *
2337     * @param other the other CompletableFuture
2338     * @param action the action to perform before completing the
2339     * returned CompletableFuture
2340     * @return the new CompletableFuture
2341     */
2342     public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2343     Runnable action) {
2344     return doOrRun(other, action, null);
2345     }
2346    
2347     /**
2348     * Creates and returns a CompletableFuture that is completed
2349     * asynchronously using the {@link ForkJoinPool#commonPool()}
2350     * after this or the other given CompletableFuture complete. If
2351     * this and/or the other CompletableFuture complete exceptionally,
2352     * then the returned CompletableFuture may also do so, with a
2353     * CompletionException holding one of these exceptions as its cause.
2354     * No guarantees are made about which exception is used in the
2355     * returned CompletableFuture.
2356     *
2357     * @param other the other CompletableFuture
2358     * @param action the action to perform before completing the
2359     * returned CompletableFuture
2360     * @return the new CompletableFuture
2361     */
2362     public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2363     Runnable action) {
2364     return doOrRun(other, action, ForkJoinPool.commonPool());
2365     }
2366    
2367     /**
2368     * Creates and returns a CompletableFuture that is completed
2369     * asynchronously using the given executor after this or the other
2370     * given CompletableFuture complete. If this and/or the other
2371     * CompletableFuture complete exceptionally, then the returned
2372     * CompletableFuture may also do so, with a CompletionException
2373     * holding one of these exceptions as its cause. No guarantees are
2374     * made about which exception is used in the returned
2375     * CompletableFuture.
2376     *
2377     * @param other the other CompletableFuture
2378     * @param action the action to perform before completing the
2379     * returned CompletableFuture
2380     * @param executor the executor to use for asynchronous execution
2381     * @return the new CompletableFuture
2382     */
2383     public CompletableFuture<Void> runAfterEitherAsync(CompletableFuture<?> other,
2384     Runnable action,
2385     Executor executor) {
2386     if (executor == null) throw new NullPointerException();
2387     return doOrRun(other, action, executor);
2388     }
2389    
2390     private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2391     Runnable action,
2392     Executor e) {
2393 dl 1.1 if (other == null || action == null) throw new NullPointerException();
2394 jsr166 1.2 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2395 dl 1.28 OrRunCompletion<T> d = null;
2396 jsr166 1.2 Object r;
2397     if ((r = result) == null && (r = other.result) == null) {
2398 dl 1.28 d = new OrRunCompletion<T>(this, other, action, dst, e);
2399 dl 1.1 CompletionNode q = null, p = new CompletionNode(d);
2400     while ((r = result) == null && (r = other.result) == null) {
2401     if (q != null) {
2402     if (UNSAFE.compareAndSwapObject
2403     (other, COMPLETIONS, q.next = other.completions, q))
2404     break;
2405     }
2406     else if (UNSAFE.compareAndSwapObject
2407     (this, COMPLETIONS, p.next = completions, p))
2408     q = new CompletionNode(d);
2409     }
2410 jsr166 1.2 }
2411     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2412 dl 1.19 Throwable ex;
2413     if (r instanceof AltResult)
2414     ex = ((AltResult)r).ex;
2415     else
2416     ex = null;
2417     if (ex == null) {
2418 dl 1.1 try {
2419 dl 1.20 if (e != null)
2420 dl 1.28 e.execute(new AsyncRun(action, dst));
2421 dl 1.20 else
2422 dl 1.1 action.run();
2423     } catch (Throwable rex) {
2424 dl 1.19 ex = rex;
2425 dl 1.1 }
2426     }
2427 dl 1.20 if (e == null || ex != null)
2428     dst.internalComplete(null, ex);
2429 jsr166 1.2 }
2430 dl 1.20 helpPostComplete();
2431     other.helpPostComplete();
2432 jsr166 1.2 return dst;
2433 dl 1.1 }
2434    
2435 dl 1.28 /**
2436     * Returns a CompletableFuture (or an equivalent one) produced by
2437     * the given function of the result of this CompletableFuture when
2438     * completed. If this CompletableFuture completes exceptionally,
2439     * then the returned CompletableFuture also does so, with a
2440     * CompletionException holding this exception as its cause.
2441     *
2442     * @param fn the function returning a new CompletableFuture.
2443     * @return the CompletableFuture, that {@code isDone()} upon
2444     * return if completed by the given function, or an exception
2445     * occurs.
2446     */
2447     public <U> CompletableFuture<U> thenCompose(Function<? super T,
2448     CompletableFuture<U>> fn) {
2449     if (fn == null) throw new NullPointerException();
2450     CompletableFuture<U> dst = null;
2451     ComposeCompletion<T,U> d = null;
2452     Object r;
2453     if ((r = result) == null) {
2454     dst = new CompletableFuture<U>();
2455     CompletionNode p = new CompletionNode
2456     (d = new ComposeCompletion<T,U>(this, fn, dst));
2457     while ((r = result) == null) {
2458     if (UNSAFE.compareAndSwapObject
2459     (this, COMPLETIONS, p.next = completions, p))
2460     break;
2461     }
2462     }
2463     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2464     T t; Throwable ex;
2465     if (r instanceof AltResult) {
2466     ex = ((AltResult)r).ex;
2467     t = null;
2468     }
2469     else {
2470     ex = null;
2471     @SuppressWarnings("unchecked") T tr = (T) r;
2472     t = tr;
2473     }
2474     if (ex == null) {
2475     try {
2476     dst = fn.apply(t);
2477     } catch (Throwable rex) {
2478     ex = rex;
2479     }
2480     }
2481     if (dst == null) {
2482     dst = new CompletableFuture<U>();
2483     if (ex == null)
2484     ex = new NullPointerException();
2485     }
2486     if (ex != null)
2487     dst.internalComplete(null, ex);
2488     }
2489     helpPostComplete();
2490     dst.helpPostComplete();
2491     return dst;
2492     }
2493    
2494     /**
2495     * Creates and returns a CompletableFuture that is completed with
2496     * the result of the given function of the exception triggering
2497     * this CompletableFuture's completion when it completes
2498     * exceptionally; Otherwise, if this CompletableFuture completes
2499     * normally, then the returned CompletableFuture also completes
2500     * normally with the same value.
2501     *
2502     * @param fn the function to use to compute the value of the
2503     * returned CompletableFuture if this CompletableFuture completed
2504     * exceptionally
2505     * @return the new CompletableFuture
2506     */
2507     public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) {
2508     if (fn == null) throw new NullPointerException();
2509     CompletableFuture<T> dst = new CompletableFuture<T>();
2510     ExceptionCompletion<T> d = null;
2511     Object r;
2512     if ((r = result) == null) {
2513     CompletionNode p =
2514     new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2515     while ((r = result) == null) {
2516     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2517     p.next = completions, p))
2518     break;
2519     }
2520     }
2521     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2522     T t = null; Throwable ex, dx = null;
2523     if (r instanceof AltResult) {
2524     if ((ex = ((AltResult)r).ex) != null) {
2525     try {
2526     t = fn.apply(ex);
2527     } catch (Throwable rex) {
2528     dx = rex;
2529     }
2530     }
2531     }
2532     else {
2533     @SuppressWarnings("unchecked") T tr = (T) r;
2534     t = tr;
2535     }
2536     dst.internalComplete(t, dx);
2537     }
2538     helpPostComplete();
2539     return dst;
2540     }
2541    
2542     /**
2543     * Creates and returns a CompletableFuture that is completed with
2544     * the result of the given function of the result and exception of
2545     * this CompletableFuture's completion when it completes. The
2546     * given function is invoked with the result (or {@code null} if
2547     * none) and the exception (or {@code null} if none) of this
2548     * CompletableFuture when complete.
2549     *
2550     * @param fn the function to use to compute the value of the
2551     * returned CompletableFuture
2552    
2553     * @return the new CompletableFuture
2554     */
2555     public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) {
2556     if (fn == null) throw new NullPointerException();
2557     CompletableFuture<U> dst = new CompletableFuture<U>();
2558     HandleCompletion<T,U> d = null;
2559     Object r;
2560     if ((r = result) == null) {
2561     CompletionNode p =
2562     new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2563     while ((r = result) == null) {
2564     if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2565     p.next = completions, p))
2566     break;
2567     }
2568     }
2569     if (r != null && (d == null || d.compareAndSet(0, 1))) {
2570     T t; Throwable ex;
2571     if (r instanceof AltResult) {
2572     ex = ((AltResult)r).ex;
2573     t = null;
2574     }
2575     else {
2576     ex = null;
2577     @SuppressWarnings("unchecked") T tr = (T) r;
2578     t = tr;
2579     }
2580     U u; Throwable dx;
2581     try {
2582     u = fn.apply(t, ex);
2583     dx = null;
2584     } catch (Throwable rex) {
2585     dx = rex;
2586     u = null;
2587     }
2588     dst.internalComplete(u, dx);
2589     }
2590     helpPostComplete();
2591     return dst;
2592     }
2593    
2594     /**
2595     * Attempts to complete this CompletableFuture with
2596     * a {@link CancellationException}.
2597     *
2598     * @param mayInterruptIfRunning this value has no effect in this
2599     * implementation because interrupts are not used to control
2600     * processing.
2601     *
2602     * @return {@code true} if this task is now cancelled
2603     */
2604     public boolean cancel(boolean mayInterruptIfRunning) {
2605     Object r;
2606     while ((r = result) == null) {
2607     r = new AltResult(new CancellationException());
2608     if (UNSAFE.compareAndSwapObject(this, RESULT, null, r)) {
2609     postComplete();
2610     return true;
2611     }
2612     }
2613     return ((r instanceof AltResult) &&
2614     (((AltResult)r).ex instanceof CancellationException));
2615     }
2616    
2617     /**
2618     * Returns {@code true} if this CompletableFuture was cancelled
2619     * before it completed normally.
2620     *
2621     * @return {@code true} if this CompletableFuture was cancelled
2622     * before it completed normally
2623     */
2624     public boolean isCancelled() {
2625     Object r;
2626     return ((r = result) != null &&
2627     (r instanceof AltResult) &&
2628     (((AltResult)r).ex instanceof CancellationException));
2629     }
2630    
2631     /**
2632     * Forcibly sets or resets the value subsequently returned by
2633     * method get() and related methods, whether or not already
2634     * completed. This method is designed for use only in error
2635     * recovery actions, and even in such situations may result in
2636     * ongoing dependent completions using established versus
2637     * overwritten values.
2638     *
2639     * @param value the completion value
2640     */
2641     public void obtrudeValue(T value) {
2642     result = (value == null) ? NIL : value;
2643     postComplete();
2644     }
2645    
2646 dl 1.1 // Unsafe mechanics
2647     private static final sun.misc.Unsafe UNSAFE;
2648     private static final long RESULT;
2649     private static final long WAITERS;
2650     private static final long COMPLETIONS;
2651     static {
2652     try {
2653     UNSAFE = sun.misc.Unsafe.getUnsafe();
2654     Class<?> k = CompletableFuture.class;
2655     RESULT = UNSAFE.objectFieldOffset
2656     (k.getDeclaredField("result"));
2657     WAITERS = UNSAFE.objectFieldOffset
2658     (k.getDeclaredField("waiters"));
2659     COMPLETIONS = UNSAFE.objectFieldOffset
2660     (k.getDeclaredField("completions"));
2661     } catch (Exception e) {
2662     throw new Error(e);
2663     }
2664     }
2665     }