ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.101
Committed: Sun Apr 13 22:25:37 2014 UTC (10 years, 1 month ago) by dl
Branch: MAIN
Changes since 1.100: +238 -265 lines
Log Message:
Avoid StackOverflowError for branching completions

File Contents

# User Rev Content
1 dl 1.1 /*
2     * Written by Doug Lea with assistance from members of JCP JSR-166
3     * Expert Group and released to the public domain, as explained at
4     * http://creativecommons.org/publicdomain/zero/1.0/
5     */
6    
7     package java.util.concurrent;
8     import java.util.function.Supplier;
9 dl 1.34 import java.util.function.Consumer;
10     import java.util.function.BiConsumer;
11 dl 1.1 import java.util.function.Function;
12     import java.util.function.BiFunction;
13     import java.util.concurrent.Future;
14     import java.util.concurrent.TimeUnit;
15     import java.util.concurrent.ForkJoinPool;
16     import java.util.concurrent.ForkJoinTask;
17     import java.util.concurrent.Executor;
18 dl 1.5 import java.util.concurrent.ThreadLocalRandom;
19 dl 1.1 import java.util.concurrent.ExecutionException;
20     import java.util.concurrent.TimeoutException;
21     import java.util.concurrent.CancellationException;
22 dl 1.88 import java.util.concurrent.CompletionException;
23     import java.util.concurrent.CompletionStage;
24 dl 1.1 import java.util.concurrent.atomic.AtomicInteger;
25     import java.util.concurrent.locks.LockSupport;
26    
27     /**
28     * A {@link Future} that may be explicitly completed (setting its
29 dl 1.88 * value and status), and may be used as a {@link CompletionStage},
30     * supporting dependent functions and actions that trigger upon its
31     * completion.
32 dl 1.1 *
33 jsr166 1.50 * <p>When two or more threads attempt to
34     * {@link #complete complete},
35 jsr166 1.52 * {@link #completeExceptionally completeExceptionally}, or
36 jsr166 1.50 * {@link #cancel cancel}
37     * a CompletableFuture, only one of them succeeds.
38 dl 1.19 *
39 dl 1.91 * <p>In addition to these and related methods for directly
40     * manipulating status and results, CompletableFuture implements
41     * interface {@link CompletionStage} with the following policies: <ul>
42 dl 1.35 *
43 dl 1.88 * <li>Actions supplied for dependent completions of
44     * <em>non-async</em> methods may be performed by the thread that
45     * completes the current CompletableFuture, or by any other caller of
46 dl 1.96 * a completion method.</li>
47 jsr166 1.65 *
48 dl 1.88 * <li>All <em>async</em> methods without an explicit Executor
49 dl 1.96 * argument are performed using the {@link ForkJoinPool#commonPool()}
50     * (unless it does not support a parallelism level of at least two, in
51     * which case, a new Thread is used). To simplify monitoring,
52     * debugging, and tracking, all generated asynchronous tasks are
53     * instances of the marker interface {@link
54     * AsynchronousCompletionTask}. </li>
55 dl 1.35 *
56 dl 1.88 * <li>All CompletionStage methods are implemented independently of
57     * other public methods, so the behavior of one method is not impacted
58     * by overrides of others in subclasses. </li> </ul>
59     *
60 dl 1.91 * <p>CompletableFuture also implements {@link Future} with the following
61 dl 1.88 * policies: <ul>
62     *
63     * <li>Since (unlike {@link FutureTask}) this class has no direct
64 jsr166 1.55 * control over the computation that causes it to be completed,
65 dl 1.88 * cancellation is treated as just another form of exceptional
66     * completion. Method {@link #cancel cancel} has the same effect as
67     * {@code completeExceptionally(new CancellationException())}. Method
68     * {@link #isCompletedExceptionally} can be used to determine if a
69     * CompletableFuture completed in any exceptional fashion.</li>
70 jsr166 1.55 *
71 dl 1.88 * <li>In case of exceptional completion with a CompletionException,
72 jsr166 1.55 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
73     * {@link ExecutionException} with the same cause as held in the
74 dl 1.88 * corresponding CompletionException. To simplify usage in most
75     * contexts, this class also defines methods {@link #join()} and
76     * {@link #getNow} that instead throw the CompletionException directly
77     * in these cases.</li> </ul>
78 jsr166 1.80 *
79 dl 1.1 * @author Doug Lea
80     * @since 1.8
81     */
82 dl 1.88 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
83 dl 1.28
84 dl 1.1 /*
85 dl 1.20 * Overview:
86 dl 1.1 *
87 jsr166 1.32 * 1. Non-nullness of field result (set via CAS) indicates done.
88     * An AltResult is used to box null as a result, as well as to
89     * hold exceptions. Using a single field makes completion fast
90 dl 1.20 * and simple to detect and trigger, at the expense of a lot of
91     * encoding and decoding that infiltrates many methods. One minor
92     * simplification relies on the (static) NIL (to box null results)
93     * being the only AltResult with a null exception field, so we
94 dl 1.28 * don't usually need explicit comparisons with NIL. The CF
95     * exception propagation mechanics surrounding decoding rely on
96     * unchecked casts of decoded results really being unchecked,
97     * where user type errors are caught at point of use, as is
98     * currently the case in Java. These are highlighted by using
99     * SuppressWarnings-annotated temporaries.
100 dl 1.1 *
101     * 2. Waiters are held in a Treiber stack similar to the one used
102 dl 1.20 * in FutureTask, Phaser, and SynchronousQueue. See their
103 dl 1.28 * internal documentation for algorithmic details.
104 dl 1.1 *
105     * 3. Completions are also kept in a list/stack, and pulled off
106 dl 1.101 * and run when completion of an observable CF is triggered. (We
107     * could even use the same stack as for waiters, but would give up
108     * the potential parallelism obtained because woken waiters help
109     * release/run others -- see method postComplete). Because
110     * post-processing may race with direct calls, class Completion
111     * opportunistically extends AtomicInteger so callers can claim
112     * the action via compareAndSet(0, 1). The Completion.tryComplete
113     * methods are all written a boringly similar uniform way (that
114     * sometimes includes unnecessary-looking checks, kept to maintain
115     * uniformity). There are enough dimensions upon which they
116     * differ that attempts to factor commonalities while maintaining
117     * efficiency require more lines of code than they would save.
118 dl 1.20 *
119     * 4. The exported then/and/or methods do support a bit of
120 dl 1.28 * factoring (see doThenApply etc). They must cope with the
121     * intrinsic races surrounding addition of a dependent action
122     * versus performing the action directly because the task is
123     * already complete. For example, a CF may not be complete upon
124     * entry, so a dependent completion is added, but by the time it
125     * is added, the target CF is complete, so must be directly
126 dl 1.20 * executed. This is all done while avoiding unnecessary object
127     * construction in safe-bypass cases.
128 dl 1.1 */
129    
130 dl 1.28 // preliminaries
131 dl 1.20
132 dl 1.1 static final class AltResult {
133     final Throwable ex; // null only for NIL
134 jsr166 1.2 AltResult(Throwable ex) { this.ex = ex; }
135 dl 1.1 }
136    
137     static final AltResult NIL = new AltResult(null);
138    
139 dl 1.20 // Fields
140    
141     volatile Object result; // Either the result or boxed AltResult
142 dl 1.1 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
143     volatile CompletionNode completions; // list (Treiber stack) of completions
144    
145 dl 1.20 // Basic utilities for triggering and processing completions
146    
147     /**
148 dl 1.99 * Triggers completion with the encoding of the given arguments:
149     * if the exception is non-null, encodes it as a wrapped
150     * CompletionException unless it is one already. Otherwise uses
151     * the given result, boxed as NIL if null.
152     */
153 dl 1.101 final void internalComplete(T v, Throwable ex) {
154 dl 1.99 if (result == null)
155     UNSAFE.compareAndSwapObject
156     (this, RESULT, null,
157     (ex == null) ? (v == null) ? NIL : v :
158     new AltResult((ex instanceof CompletionException) ? ex :
159     new CompletionException(ex)));
160     }
161    
162     /**
163 dl 1.101 * Signals waiters and triggers all enabled dependent completions
164     * reachable from src.
165 dl 1.99 *
166 dl 1.101 * @param src if non-null a completed CompletableFuture
167 dl 1.99 */
168 dl 1.101 static final void postComplete(CompletableFuture<?> src) {
169     /*
170     * CF "src" is always the base of a possible chain of
171     * completions that may need further processing. To avoid
172     * potential StackOverflowErrors, we extend along only one
173     * path ("dep") at a time, holding others by pushing them on
174     * src's completion list, which advances tail-recursion style
175     * when possible. On each step, "f" is dep if non-null, else
176     * src.
177     */
178     for (CompletableFuture<?> f = src, dep = null; f != null;) {
179     WaitNode q; CompletionNode h; Thread w;
180     if ((q = f.waiters) != null) {
181     if (UNSAFE.compareAndSwapObject(f, WAITERS, q, q.next) &&
182     (w = q.thread) != null) {
183     q.thread = null;
184     LockSupport.unpark(w);
185 dl 1.99 }
186     }
187 dl 1.101 else if ((h = f.completions) == null) {
188     if (dep == null)
189     break;
190     dep = null;
191     f = src;
192     }
193     else if (UNSAFE.compareAndSwapObject(f, COMPLETIONS, h, h.next)) {
194     Completion c; CompletableFuture<?> d;
195     if (dep != null && dep.completions != null) { // push to src
196     do {} while (!UNSAFE.compareAndSwapObject(
197     src, COMPLETIONS,
198     h.next = src.completions, h));
199     }
200     else if ((c = h.completion) == null ||
201     (d = c.tryComplete()) == null ||
202     d.result == null) {
203     dep = null;
204     f = src;
205     }
206     else if (src.completions == null) {
207     dep = null;
208     f = src = d;
209     }
210     else
211     f = dep = d;
212     }
213 dl 1.99 }
214 dl 1.20 }
215    
216 dl 1.28 /* ------------- waiting for completions -------------- */
217 dl 1.20
218 dl 1.35 /** Number of processors, for spin control */
219     static final int NCPU = Runtime.getRuntime().availableProcessors();
220    
221 dl 1.1 /**
222 dl 1.28 * Heuristic spin value for waitingGet() before blocking on
223     * multiprocessors
224 dl 1.1 */
225 dl 1.35 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
226 dl 1.1
227     /**
228 dl 1.28 * Linked nodes to record waiting threads in a Treiber stack. See
229     * other classes such as Phaser and SynchronousQueue for more
230     * detailed explanation. This class implements ManagedBlocker to
231     * avoid starvation when blocking actions pile up in
232     * ForkJoinPools.
233     */
234     static final class WaitNode implements ForkJoinPool.ManagedBlocker {
235     long nanos; // wait time if timed
236     final long deadline; // non-zero if timed
237     volatile int interruptControl; // > 0: interruptible, < 0: interrupted
238     volatile Thread thread;
239     volatile WaitNode next;
240     WaitNode(boolean interruptible, long nanos, long deadline) {
241     this.thread = Thread.currentThread();
242     this.interruptControl = interruptible ? 1 : 0;
243     this.nanos = nanos;
244     this.deadline = deadline;
245     }
246     public boolean isReleasable() {
247     if (thread == null)
248     return true;
249     if (Thread.interrupted()) {
250     int i = interruptControl;
251     interruptControl = -1;
252     if (i > 0)
253     return true;
254     }
255     if (deadline != 0L &&
256     (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
257     thread = null;
258     return true;
259     }
260     return false;
261     }
262     public boolean block() {
263     if (isReleasable())
264     return true;
265     else if (deadline == 0L)
266     LockSupport.park(this);
267     else if (nanos > 0L)
268     LockSupport.parkNanos(this, nanos);
269     return isReleasable();
270     }
271 dl 1.1 }
272    
273     /**
274 dl 1.28 * Returns raw result after waiting, or null if interruptible and
275     * interrupted.
276 dl 1.1 */
277 dl 1.28 private Object waitingGet(boolean interruptible) {
278     WaitNode q = null;
279     boolean queued = false;
280 dl 1.35 int spins = SPINS;
281 dl 1.28 for (Object r;;) {
282     if ((r = result) != null) {
283     if (q != null) { // suppress unpark
284     q.thread = null;
285     if (q.interruptControl < 0) {
286     if (interruptible) {
287 dl 1.101 removeCancelledWaiters();
288 dl 1.28 return null;
289     }
290     Thread.currentThread().interrupt();
291     }
292     }
293 dl 1.101 postComplete(this); // help release others
294 dl 1.28 return r;
295     }
296     else if (spins > 0) {
297 dl 1.35 int rnd = ThreadLocalRandom.nextSecondarySeed();
298     if (rnd == 0)
299     rnd = ThreadLocalRandom.current().nextInt();
300     if (rnd >= 0)
301 dl 1.28 --spins;
302     }
303     else if (q == null)
304     q = new WaitNode(interruptible, 0L, 0L);
305     else if (!queued)
306     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
307     q.next = waiters, q);
308     else if (interruptible && q.interruptControl < 0) {
309 dl 1.101 q.thread = null;
310     removeCancelledWaiters();
311 dl 1.28 return null;
312     }
313     else if (q.thread != null && result == null) {
314     try {
315     ForkJoinPool.managedBlock(q);
316 jsr166 1.31 } catch (InterruptedException ex) {
317 dl 1.28 q.interruptControl = -1;
318     }
319     }
320     }
321 dl 1.1 }
322    
323     /**
324 dl 1.28 * Awaits completion or aborts on interrupt or timeout.
325 dl 1.1 *
326 dl 1.28 * @param nanos time to wait
327     * @return raw result
328 dl 1.1 */
329 dl 1.28 private Object timedAwaitDone(long nanos)
330     throws InterruptedException, TimeoutException {
331     WaitNode q = null;
332     boolean queued = false;
333     for (Object r;;) {
334     if ((r = result) != null) {
335     if (q != null) {
336     q.thread = null;
337     if (q.interruptControl < 0) {
338 dl 1.101 removeCancelledWaiters();
339 dl 1.28 throw new InterruptedException();
340     }
341     }
342 dl 1.101 postComplete(this);
343 dl 1.28 return r;
344     }
345     else if (q == null) {
346     if (nanos <= 0L)
347     throw new TimeoutException();
348     long d = System.nanoTime() + nanos;
349 jsr166 1.31 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
350 dl 1.28 }
351     else if (!queued)
352     queued = UNSAFE.compareAndSwapObject(this, WAITERS,
353     q.next = waiters, q);
354     else if (q.interruptControl < 0) {
355 dl 1.101 q.thread = null;
356     removeCancelledWaiters();
357 dl 1.28 throw new InterruptedException();
358     }
359     else if (q.nanos <= 0L) {
360     if (result == null) {
361 dl 1.101 q.thread = null;
362     removeCancelledWaiters();
363 dl 1.28 throw new TimeoutException();
364     }
365     }
366     else if (q.thread != null && result == null) {
367     try {
368     ForkJoinPool.managedBlock(q);
369 jsr166 1.31 } catch (InterruptedException ex) {
370 dl 1.28 q.interruptControl = -1;
371     }
372     }
373     }
374 dl 1.1 }
375    
376     /**
377 dl 1.101 * Tries to unlink timed-out or interrupted wait nodes to avoid
378 dl 1.28 * accumulating garbage. Internal nodes are simply unspliced
379     * without CAS since it is harmless if they are traversed anyway
380     * by releasers. To avoid effects of unsplicing from already
381     * removed nodes, the list is retraversed in case of an apparent
382     * race. This is slow when there are a lot of nodes, but we don't
383     * expect lists to be long enough to outweigh higher-overhead
384     * schemes.
385 dl 1.1 */
386 dl 1.101 private void removeCancelledWaiters() {
387     retry: for (;;) { // restart on removeWaiter race
388     for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
389     s = q.next;
390     if (q.thread != null)
391     pred = q;
392     else if (pred != null) {
393     pred.next = s;
394     if (pred.thread == null) // check for race
395 dl 1.28 continue retry;
396     }
397 dl 1.101 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
398     continue retry;
399 dl 1.28 }
400 dl 1.101 break;
401 dl 1.28 }
402 dl 1.1 }
403    
404 dl 1.28 /* ------------- Async tasks -------------- */
405    
406 dl 1.1 /**
407 jsr166 1.56 * A marker interface identifying asynchronous tasks produced by
408 dl 1.28 * {@code async} methods. This may be useful for monitoring,
409     * debugging, and tracking asynchronous activities.
410 jsr166 1.57 *
411     * @since 1.8
412 dl 1.1 */
413 dl 1.28 public static interface AsynchronousCompletionTask {
414 dl 1.1 }
415    
416 dl 1.28 /** Base class can act as either FJ or plain Runnable */
417 dl 1.97 @SuppressWarnings("serial")
418 jsr166 1.33 abstract static class Async extends ForkJoinTask<Void>
419 dl 1.28 implements Runnable, AsynchronousCompletionTask {
420     public final Void getRawResult() { return null; }
421     public final void setRawResult(Void v) { }
422     public final void run() { exec(); }
423 jsr166 1.26 }
424    
425 dl 1.96 /**
426     * Starts the given async task using the given executor, unless
427     * the executor is ForkJoinPool.commonPool and it has been
428     * disabled, in which case starts a new thread.
429     */
430     static void execAsync(Executor e, Async r) {
431     if (e == ForkJoinPool.commonPool() &&
432     ForkJoinPool.getCommonPoolParallelism() <= 1)
433     new Thread(r).start();
434     else
435     e.execute(r);
436     }
437    
438 dl 1.28 static final class AsyncRun extends Async {
439     final Runnable fn;
440     final CompletableFuture<Void> dst;
441     AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
442     this.fn = fn; this.dst = dst;
443     }
444     public final boolean exec() {
445     CompletableFuture<Void> d; Throwable ex;
446 dl 1.29 if ((d = this.dst) != null && d.result == null) {
447 dl 1.28 try {
448     fn.run();
449     ex = null;
450     } catch (Throwable rex) {
451     ex = rex;
452     }
453     d.internalComplete(null, ex);
454 dl 1.19 }
455 dl 1.101 postComplete(d);
456 dl 1.28 return true;
457 dl 1.19 }
458 dl 1.28 private static final long serialVersionUID = 5232453952276885070L;
459 dl 1.19 }
460    
461 dl 1.28 static final class AsyncSupply<U> extends Async {
462     final Supplier<U> fn;
463     final CompletableFuture<U> dst;
464     AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
465     this.fn = fn; this.dst = dst;
466     }
467     public final boolean exec() {
468     CompletableFuture<U> d; U u; Throwable ex;
469 dl 1.29 if ((d = this.dst) != null && d.result == null) {
470 dl 1.28 try {
471     u = fn.get();
472     ex = null;
473     } catch (Throwable rex) {
474     ex = rex;
475     u = null;
476     }
477     d.internalComplete(u, ex);
478 dl 1.1 }
479 dl 1.101 postComplete(d);
480 dl 1.1 return true;
481     }
482     private static final long serialVersionUID = 5232453952276885070L;
483     }
484    
485 dl 1.28 static final class AsyncApply<T,U> extends Async {
486 jsr166 1.63 final T arg;
487 dl 1.28 final Function<? super T,? extends U> fn;
488 dl 1.1 final CompletableFuture<U> dst;
489 dl 1.28 AsyncApply(T arg, Function<? super T,? extends U> fn,
490 dl 1.35 CompletableFuture<U> dst) {
491 dl 1.1 this.arg = arg; this.fn = fn; this.dst = dst;
492     }
493     public final boolean exec() {
494 dl 1.21 CompletableFuture<U> d; U u; Throwable ex;
495 dl 1.29 if ((d = this.dst) != null && d.result == null) {
496 dl 1.21 try {
497     u = fn.apply(arg);
498     ex = null;
499     } catch (Throwable rex) {
500     ex = rex;
501     u = null;
502     }
503     d.internalComplete(u, ex);
504 dl 1.1 }
505 dl 1.101 postComplete(d);
506 dl 1.1 return true;
507     }
508     private static final long serialVersionUID = 5232453952276885070L;
509     }
510    
511 jsr166 1.81 static final class AsyncCombine<T,U,V> extends Async {
512 dl 1.1 final T arg1;
513     final U arg2;
514 jsr166 1.63 final BiFunction<? super T,? super U,? extends V> fn;
515 dl 1.1 final CompletableFuture<V> dst;
516 jsr166 1.81 AsyncCombine(T arg1, U arg2,
517 dl 1.35 BiFunction<? super T,? super U,? extends V> fn,
518     CompletableFuture<V> dst) {
519 dl 1.1 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
520     }
521     public final boolean exec() {
522 dl 1.21 CompletableFuture<V> d; V v; Throwable ex;
523 dl 1.29 if ((d = this.dst) != null && d.result == null) {
524 dl 1.21 try {
525     v = fn.apply(arg1, arg2);
526     ex = null;
527     } catch (Throwable rex) {
528     ex = rex;
529     v = null;
530     }
531     d.internalComplete(v, ex);
532 dl 1.1 }
533 dl 1.101 postComplete(d);
534 dl 1.1 return true;
535     }
536     private static final long serialVersionUID = 5232453952276885070L;
537     }
538    
539 dl 1.28 static final class AsyncAccept<T> extends Async {
540 jsr166 1.63 final T arg;
541 dl 1.34 final Consumer<? super T> fn;
542 dl 1.88 final CompletableFuture<?> dst;
543 dl 1.34 AsyncAccept(T arg, Consumer<? super T> fn,
544 dl 1.88 CompletableFuture<?> dst) {
545 dl 1.7 this.arg = arg; this.fn = fn; this.dst = dst;
546     }
547     public final boolean exec() {
548 dl 1.88 CompletableFuture<?> d; Throwable ex;
549 dl 1.29 if ((d = this.dst) != null && d.result == null) {
550 dl 1.21 try {
551     fn.accept(arg);
552     ex = null;
553     } catch (Throwable rex) {
554     ex = rex;
555     }
556     d.internalComplete(null, ex);
557 dl 1.7 }
558 dl 1.101 postComplete(d);
559 dl 1.7 return true;
560     }
561     private static final long serialVersionUID = 5232453952276885070L;
562     }
563    
564 jsr166 1.81 static final class AsyncAcceptBoth<T,U> extends Async {
565 dl 1.7 final T arg1;
566     final U arg2;
567 jsr166 1.63 final BiConsumer<? super T,? super U> fn;
568 dl 1.88 final CompletableFuture<?> dst;
569 jsr166 1.81 AsyncAcceptBoth(T arg1, U arg2,
570     BiConsumer<? super T,? super U> fn,
571 dl 1.88 CompletableFuture<?> dst) {
572 dl 1.7 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
573     }
574     public final boolean exec() {
575 dl 1.88 CompletableFuture<?> d; Throwable ex;
576 dl 1.29 if ((d = this.dst) != null && d.result == null) {
577 dl 1.21 try {
578     fn.accept(arg1, arg2);
579     ex = null;
580     } catch (Throwable rex) {
581     ex = rex;
582     }
583     d.internalComplete(null, ex);
584 dl 1.7 }
585 dl 1.101 postComplete(d);
586 dl 1.7 return true;
587     }
588     private static final long serialVersionUID = 5232453952276885070L;
589     }
590    
591 dl 1.37 static final class AsyncCompose<T,U> extends Async {
592 jsr166 1.63 final T arg;
593 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
594 dl 1.37 final CompletableFuture<U> dst;
595     AsyncCompose(T arg,
596 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
597 dl 1.37 CompletableFuture<U> dst) {
598     this.arg = arg; this.fn = fn; this.dst = dst;
599     }
600     public final boolean exec() {
601     CompletableFuture<U> d, fr; U u; Throwable ex;
602     if ((d = this.dst) != null && d.result == null) {
603     try {
604 dl 1.88 CompletionStage<U> cs = fn.apply(arg);
605     fr = (cs == null) ? null : cs.toCompletableFuture();
606 jsr166 1.61 ex = (fr == null) ? new NullPointerException() : null;
607 dl 1.37 } catch (Throwable rex) {
608     ex = rex;
609     fr = null;
610     }
611     if (ex != null)
612     u = null;
613     else {
614     Object r = fr.result;
615 dl 1.67 if (r == null)
616     r = fr.waitingGet(false);
617 dl 1.37 if (r instanceof AltResult) {
618     ex = ((AltResult)r).ex;
619     u = null;
620     }
621     else {
622     @SuppressWarnings("unchecked") U ur = (U) r;
623     u = ur;
624     }
625     }
626     d.internalComplete(u, ex);
627     }
628 dl 1.101 postComplete(d);
629 dl 1.37 return true;
630     }
631     private static final long serialVersionUID = 5232453952276885070L;
632     }
633    
634 dl 1.91 static final class AsyncWhenComplete<T> extends Async {
635     final T arg1;
636     final Throwable arg2;
637     final BiConsumer<? super T,? super Throwable> fn;
638     final CompletableFuture<T> dst;
639     AsyncWhenComplete(T arg1, Throwable arg2,
640     BiConsumer<? super T,? super Throwable> fn,
641     CompletableFuture<T> dst) {
642     this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
643     }
644     public final boolean exec() {
645 jsr166 1.92 CompletableFuture<T> d;
646 dl 1.91 if ((d = this.dst) != null && d.result == null) {
647     Throwable ex = arg2;
648     try {
649     fn.accept(arg1, ex);
650     } catch (Throwable rex) {
651     if (ex == null)
652     ex = rex;
653     }
654     d.internalComplete(arg1, ex);
655     }
656 dl 1.101 postComplete(d);
657 dl 1.91 return true;
658     }
659     private static final long serialVersionUID = 5232453952276885070L;
660     }
661    
662 dl 1.1 /* ------------- Completions -------------- */
663    
664 dl 1.28 /**
665     * Simple linked list nodes to record completions, used in
666     * basically the same way as WaitNodes. (We separate nodes from
667     * the Completions themselves mainly because for the And and Or
668     * methods, the same Completion object resides in two lists.)
669     */
670     static final class CompletionNode {
671     final Completion completion;
672     volatile CompletionNode next;
673     CompletionNode(Completion completion) { this.completion = completion; }
674     }
675    
676 dl 1.1 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
677 dl 1.97 @SuppressWarnings("serial")
678 dl 1.99 abstract static class Completion extends AtomicInteger {
679     /**
680 dl 1.101 * Completes a dependent Completablefuture if enabled
681 dl 1.99 * @return the dependent Completablefuture
682     */
683 dl 1.101 public abstract CompletableFuture<?> tryComplete();
684 dl 1.1 }
685    
686 jsr166 1.81 static final class ThenApply<T,U> extends Completion {
687 jsr166 1.2 final CompletableFuture<? extends T> src;
688     final Function<? super T,? extends U> fn;
689     final CompletableFuture<U> dst;
690 dl 1.1 final Executor executor;
691 jsr166 1.81 ThenApply(CompletableFuture<? extends T> src,
692     Function<? super T,? extends U> fn,
693     CompletableFuture<U> dst,
694     Executor executor) {
695 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
696     this.executor = executor;
697     }
698 dl 1.101 public final CompletableFuture<?> tryComplete() {
699 dl 1.28 final CompletableFuture<? extends T> a;
700     final Function<? super T,? extends U> fn;
701     final CompletableFuture<U> dst;
702 jsr166 1.2 Object r; T t; Throwable ex;
703     if ((dst = this.dst) != null &&
704 dl 1.1 (fn = this.fn) != null &&
705     (a = this.src) != null &&
706     (r = a.result) != null &&
707 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
708 jsr166 1.2 if (r instanceof AltResult) {
709 dl 1.19 ex = ((AltResult)r).ex;
710 dl 1.1 t = null;
711     }
712 dl 1.17 else {
713     ex = null;
714 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
715     t = tr;
716 dl 1.1 }
717 dl 1.20 Executor e = executor;
718     U u = null;
719 dl 1.17 if (ex == null) {
720     try {
721 dl 1.20 if (e != null)
722 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
723 dl 1.17 else
724 dl 1.20 u = fn.apply(t);
725 dl 1.17 } catch (Throwable rex) {
726     ex = rex;
727     }
728     }
729 dl 1.20 if (e == null || ex != null)
730 dl 1.101 dst.internalComplete(u, ex);
731 dl 1.1 }
732 dl 1.99 return dst;
733 dl 1.1 }
734 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
735 dl 1.1 }
736    
737 jsr166 1.81 static final class ThenAccept<T> extends Completion {
738 dl 1.7 final CompletableFuture<? extends T> src;
739 dl 1.34 final Consumer<? super T> fn;
740 dl 1.88 final CompletableFuture<?> dst;
741 dl 1.7 final Executor executor;
742 jsr166 1.81 ThenAccept(CompletableFuture<? extends T> src,
743     Consumer<? super T> fn,
744 dl 1.88 CompletableFuture<?> dst,
745 jsr166 1.81 Executor executor) {
746 dl 1.7 this.src = src; this.fn = fn; this.dst = dst;
747     this.executor = executor;
748     }
749 dl 1.101 public final CompletableFuture<?> tryComplete() {
750 dl 1.28 final CompletableFuture<? extends T> a;
751 dl 1.34 final Consumer<? super T> fn;
752 dl 1.88 final CompletableFuture<?> dst;
753 dl 1.7 Object r; T t; Throwable ex;
754     if ((dst = this.dst) != null &&
755     (fn = this.fn) != null &&
756     (a = this.src) != null &&
757     (r = a.result) != null &&
758 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
759 dl 1.7 if (r instanceof AltResult) {
760 dl 1.19 ex = ((AltResult)r).ex;
761 dl 1.7 t = null;
762     }
763 dl 1.17 else {
764     ex = null;
765 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
766     t = tr;
767 dl 1.17 }
768 dl 1.20 Executor e = executor;
769 dl 1.17 if (ex == null) {
770     try {
771 dl 1.20 if (e != null)
772 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
773 dl 1.20 else
774 dl 1.17 fn.accept(t);
775     } catch (Throwable rex) {
776     ex = rex;
777 dl 1.7 }
778     }
779 dl 1.20 if (e == null || ex != null)
780 dl 1.101 dst.internalComplete(null, ex);
781 dl 1.7 }
782 dl 1.99 return dst;
783 dl 1.7 }
784 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
785 dl 1.7 }
786    
787 jsr166 1.82 static final class ThenRun extends Completion {
788     final CompletableFuture<?> src;
789 jsr166 1.2 final Runnable fn;
790     final CompletableFuture<Void> dst;
791 dl 1.1 final Executor executor;
792 jsr166 1.82 ThenRun(CompletableFuture<?> src,
793 jsr166 1.81 Runnable fn,
794     CompletableFuture<Void> dst,
795     Executor executor) {
796 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
797     this.executor = executor;
798     }
799 dl 1.101 public final CompletableFuture<?> tryComplete() {
800 jsr166 1.82 final CompletableFuture<?> a;
801 dl 1.28 final Runnable fn;
802     final CompletableFuture<Void> dst;
803 jsr166 1.2 Object r; Throwable ex;
804     if ((dst = this.dst) != null &&
805 dl 1.1 (fn = this.fn) != null &&
806     (a = this.src) != null &&
807     (r = a.result) != null &&
808 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
809 dl 1.19 if (r instanceof AltResult)
810     ex = ((AltResult)r).ex;
811 dl 1.17 else
812     ex = null;
813 dl 1.20 Executor e = executor;
814 dl 1.17 if (ex == null) {
815     try {
816 dl 1.20 if (e != null)
817 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
818 dl 1.20 else
819 dl 1.17 fn.run();
820     } catch (Throwable rex) {
821     ex = rex;
822 dl 1.1 }
823     }
824 dl 1.20 if (e == null || ex != null)
825 dl 1.101 dst.internalComplete(null, ex);
826 dl 1.1 }
827 dl 1.99 return dst;
828 dl 1.1 }
829 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
830 dl 1.1 }
831    
832 jsr166 1.81 static final class ThenCombine<T,U,V> extends Completion {
833 jsr166 1.2 final CompletableFuture<? extends T> src;
834     final CompletableFuture<? extends U> snd;
835     final BiFunction<? super T,? super U,? extends V> fn;
836     final CompletableFuture<V> dst;
837 dl 1.1 final Executor executor;
838 jsr166 1.81 ThenCombine(CompletableFuture<? extends T> src,
839     CompletableFuture<? extends U> snd,
840     BiFunction<? super T,? super U,? extends V> fn,
841     CompletableFuture<V> dst,
842     Executor executor) {
843 dl 1.1 this.src = src; this.snd = snd;
844     this.fn = fn; this.dst = dst;
845     this.executor = executor;
846     }
847 dl 1.101 public final CompletableFuture<?> tryComplete() {
848 dl 1.28 final CompletableFuture<? extends T> a;
849     final CompletableFuture<? extends U> b;
850     final BiFunction<? super T,? super U,? extends V> fn;
851     final CompletableFuture<V> dst;
852 dl 1.85 Object r, s; T t; U u; Throwable ex;
853     if ((dst = this.dst) != null &&
854     (fn = this.fn) != null &&
855     (a = this.src) != null &&
856     (r = a.result) != null &&
857     (b = this.snd) != null &&
858     (s = b.result) != null &&
859 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
860 dl 1.85 if (r instanceof AltResult) {
861     ex = ((AltResult)r).ex;
862     t = null;
863     }
864     else {
865     ex = null;
866     @SuppressWarnings("unchecked") T tr = (T) r;
867     t = tr;
868     }
869     if (ex != null)
870     u = null;
871     else if (s instanceof AltResult) {
872     ex = ((AltResult)s).ex;
873     u = null;
874     }
875     else {
876     @SuppressWarnings("unchecked") U us = (U) s;
877     u = us;
878     }
879 dl 1.20 Executor e = executor;
880     V v = null;
881 dl 1.19 if (ex == null) {
882     try {
883 dl 1.20 if (e != null)
884 dl 1.96 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
885 dl 1.19 else
886 dl 1.20 v = fn.apply(t, u);
887 dl 1.19 } catch (Throwable rex) {
888     ex = rex;
889     }
890 dl 1.1 }
891 dl 1.20 if (e == null || ex != null)
892 dl 1.101 dst.internalComplete(v, ex);
893 jsr166 1.2 }
894 dl 1.99 return dst;
895 jsr166 1.2 }
896 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
897 dl 1.1 }
898    
899 jsr166 1.81 static final class ThenAcceptBoth<T,U> extends Completion {
900 dl 1.7 final CompletableFuture<? extends T> src;
901     final CompletableFuture<? extends U> snd;
902 dl 1.34 final BiConsumer<? super T,? super U> fn;
903 dl 1.7 final CompletableFuture<Void> dst;
904     final Executor executor;
905 jsr166 1.81 ThenAcceptBoth(CompletableFuture<? extends T> src,
906     CompletableFuture<? extends U> snd,
907     BiConsumer<? super T,? super U> fn,
908     CompletableFuture<Void> dst,
909     Executor executor) {
910 dl 1.7 this.src = src; this.snd = snd;
911     this.fn = fn; this.dst = dst;
912     this.executor = executor;
913     }
914 dl 1.101 public final CompletableFuture<?> tryComplete() {
915 dl 1.28 final CompletableFuture<? extends T> a;
916     final CompletableFuture<? extends U> b;
917 dl 1.34 final BiConsumer<? super T,? super U> fn;
918 dl 1.28 final CompletableFuture<Void> dst;
919 dl 1.85 Object r, s; T t; U u; Throwable ex;
920     if ((dst = this.dst) != null &&
921     (fn = this.fn) != null &&
922     (a = this.src) != null &&
923     (r = a.result) != null &&
924     (b = this.snd) != null &&
925     (s = b.result) != null &&
926 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
927 dl 1.85 if (r instanceof AltResult) {
928     ex = ((AltResult)r).ex;
929     t = null;
930     }
931     else {
932     ex = null;
933     @SuppressWarnings("unchecked") T tr = (T) r;
934     t = tr;
935     }
936     if (ex != null)
937     u = null;
938     else if (s instanceof AltResult) {
939     ex = ((AltResult)s).ex;
940     u = null;
941     }
942     else {
943     @SuppressWarnings("unchecked") U us = (U) s;
944     u = us;
945     }
946 dl 1.20 Executor e = executor;
947 dl 1.19 if (ex == null) {
948     try {
949 dl 1.20 if (e != null)
950 dl 1.96 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
951 dl 1.20 else
952 dl 1.19 fn.accept(t, u);
953     } catch (Throwable rex) {
954     ex = rex;
955 dl 1.7 }
956     }
957 dl 1.20 if (e == null || ex != null)
958 dl 1.101 dst.internalComplete(null, ex);
959 dl 1.7 }
960 dl 1.99 return dst;
961 dl 1.7 }
962 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
963 dl 1.7 }
964    
965 jsr166 1.82 static final class RunAfterBoth extends Completion {
966     final CompletableFuture<?> src;
967 jsr166 1.2 final CompletableFuture<?> snd;
968     final Runnable fn;
969     final CompletableFuture<Void> dst;
970 dl 1.1 final Executor executor;
971 jsr166 1.82 RunAfterBoth(CompletableFuture<?> src,
972 jsr166 1.81 CompletableFuture<?> snd,
973     Runnable fn,
974     CompletableFuture<Void> dst,
975     Executor executor) {
976 dl 1.1 this.src = src; this.snd = snd;
977     this.fn = fn; this.dst = dst;
978     this.executor = executor;
979     }
980 dl 1.101 public final CompletableFuture<?> tryComplete() {
981 jsr166 1.82 final CompletableFuture<?> a;
982 dl 1.1 final CompletableFuture<?> b;
983     final Runnable fn;
984     final CompletableFuture<Void> dst;
985 dl 1.85 Object r, s; Throwable ex;
986     if ((dst = this.dst) != null &&
987     (fn = this.fn) != null &&
988     (a = this.src) != null &&
989     (r = a.result) != null &&
990     (b = this.snd) != null &&
991     (s = b.result) != null &&
992 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
993 dl 1.85 if (r instanceof AltResult)
994     ex = ((AltResult)r).ex;
995     else
996     ex = null;
997     if (ex == null && (s instanceof AltResult))
998     ex = ((AltResult)s).ex;
999 dl 1.20 Executor e = executor;
1000 dl 1.19 if (ex == null) {
1001     try {
1002 dl 1.20 if (e != null)
1003 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
1004 dl 1.20 else
1005 dl 1.19 fn.run();
1006     } catch (Throwable rex) {
1007     ex = rex;
1008 dl 1.1 }
1009 jsr166 1.2 }
1010 dl 1.20 if (e == null || ex != null)
1011 dl 1.101 dst.internalComplete(null, ex);
1012 jsr166 1.2 }
1013 dl 1.99 return dst;
1014 jsr166 1.2 }
1015 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1016 dl 1.1 }
1017    
1018 dl 1.35 static final class AndCompletion extends Completion {
1019     final CompletableFuture<?> src;
1020     final CompletableFuture<?> snd;
1021     final CompletableFuture<Void> dst;
1022     AndCompletion(CompletableFuture<?> src,
1023     CompletableFuture<?> snd,
1024     CompletableFuture<Void> dst) {
1025     this.src = src; this.snd = snd; this.dst = dst;
1026     }
1027 dl 1.101 public final CompletableFuture<?> tryComplete() {
1028 dl 1.35 final CompletableFuture<?> a;
1029     final CompletableFuture<?> b;
1030     final CompletableFuture<Void> dst;
1031     Object r, s; Throwable ex;
1032     if ((dst = this.dst) != null &&
1033     (a = this.src) != null &&
1034     (r = a.result) != null &&
1035     (b = this.snd) != null &&
1036     (s = b.result) != null &&
1037 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1038 dl 1.35 if (r instanceof AltResult)
1039     ex = ((AltResult)r).ex;
1040     else
1041     ex = null;
1042     if (ex == null && (s instanceof AltResult))
1043     ex = ((AltResult)s).ex;
1044 dl 1.101 dst.internalComplete(null, ex);
1045 dl 1.35 }
1046 dl 1.99 return dst;
1047 dl 1.35 }
1048     private static final long serialVersionUID = 5232453952276885070L;
1049     }
1050    
1051 jsr166 1.81 static final class ApplyToEither<T,U> extends Completion {
1052 jsr166 1.2 final CompletableFuture<? extends T> src;
1053     final CompletableFuture<? extends T> snd;
1054     final Function<? super T,? extends U> fn;
1055     final CompletableFuture<U> dst;
1056 dl 1.1 final Executor executor;
1057 jsr166 1.81 ApplyToEither(CompletableFuture<? extends T> src,
1058     CompletableFuture<? extends T> snd,
1059     Function<? super T,? extends U> fn,
1060     CompletableFuture<U> dst,
1061     Executor executor) {
1062 dl 1.1 this.src = src; this.snd = snd;
1063     this.fn = fn; this.dst = dst;
1064     this.executor = executor;
1065     }
1066 dl 1.101 public final CompletableFuture<?> tryComplete() {
1067 dl 1.28 final CompletableFuture<? extends T> a;
1068     final CompletableFuture<? extends T> b;
1069     final Function<? super T,? extends U> fn;
1070     final CompletableFuture<U> dst;
1071 jsr166 1.2 Object r; T t; Throwable ex;
1072     if ((dst = this.dst) != null &&
1073 dl 1.1 (fn = this.fn) != null &&
1074     (((a = this.src) != null && (r = a.result) != null) ||
1075     ((b = this.snd) != null && (r = b.result) != null)) &&
1076 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1077 jsr166 1.2 if (r instanceof AltResult) {
1078 dl 1.19 ex = ((AltResult)r).ex;
1079 jsr166 1.2 t = null;
1080     }
1081 dl 1.19 else {
1082     ex = null;
1083 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1084     t = tr;
1085 dl 1.1 }
1086 dl 1.20 Executor e = executor;
1087     U u = null;
1088 dl 1.19 if (ex == null) {
1089     try {
1090 dl 1.20 if (e != null)
1091 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1092 dl 1.19 else
1093 dl 1.20 u = fn.apply(t);
1094 dl 1.19 } catch (Throwable rex) {
1095     ex = rex;
1096     }
1097     }
1098 dl 1.20 if (e == null || ex != null)
1099 dl 1.101 dst.internalComplete(u, ex);
1100 jsr166 1.2 }
1101 dl 1.99 return dst;
1102 jsr166 1.2 }
1103 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1104 dl 1.1 }
1105    
1106 jsr166 1.81 static final class AcceptEither<T> extends Completion {
1107 dl 1.7 final CompletableFuture<? extends T> src;
1108     final CompletableFuture<? extends T> snd;
1109 dl 1.34 final Consumer<? super T> fn;
1110 dl 1.7 final CompletableFuture<Void> dst;
1111     final Executor executor;
1112 jsr166 1.81 AcceptEither(CompletableFuture<? extends T> src,
1113     CompletableFuture<? extends T> snd,
1114     Consumer<? super T> fn,
1115     CompletableFuture<Void> dst,
1116     Executor executor) {
1117 dl 1.7 this.src = src; this.snd = snd;
1118     this.fn = fn; this.dst = dst;
1119     this.executor = executor;
1120     }
1121 dl 1.101 public final CompletableFuture<?> tryComplete() {
1122 dl 1.28 final CompletableFuture<? extends T> a;
1123     final CompletableFuture<? extends T> b;
1124 dl 1.34 final Consumer<? super T> fn;
1125 dl 1.28 final CompletableFuture<Void> dst;
1126 dl 1.7 Object r; T t; Throwable ex;
1127     if ((dst = this.dst) != null &&
1128     (fn = this.fn) != null &&
1129     (((a = this.src) != null && (r = a.result) != null) ||
1130     ((b = this.snd) != null && (r = b.result) != null)) &&
1131 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1132 dl 1.7 if (r instanceof AltResult) {
1133 dl 1.19 ex = ((AltResult)r).ex;
1134 dl 1.7 t = null;
1135     }
1136 dl 1.19 else {
1137     ex = null;
1138 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1139     t = tr;
1140 dl 1.19 }
1141 dl 1.20 Executor e = executor;
1142 dl 1.19 if (ex == null) {
1143     try {
1144 dl 1.20 if (e != null)
1145 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1146 dl 1.20 else
1147 dl 1.19 fn.accept(t);
1148     } catch (Throwable rex) {
1149     ex = rex;
1150 dl 1.7 }
1151     }
1152 dl 1.20 if (e == null || ex != null)
1153 dl 1.101 dst.internalComplete(null, ex);
1154 dl 1.7 }
1155 dl 1.99 return dst;
1156 dl 1.7 }
1157 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1158 dl 1.7 }
1159    
1160 jsr166 1.82 static final class RunAfterEither extends Completion {
1161     final CompletableFuture<?> src;
1162 jsr166 1.2 final CompletableFuture<?> snd;
1163     final Runnable fn;
1164     final CompletableFuture<Void> dst;
1165 dl 1.1 final Executor executor;
1166 jsr166 1.82 RunAfterEither(CompletableFuture<?> src,
1167 jsr166 1.81 CompletableFuture<?> snd,
1168     Runnable fn,
1169     CompletableFuture<Void> dst,
1170     Executor executor) {
1171 dl 1.1 this.src = src; this.snd = snd;
1172     this.fn = fn; this.dst = dst;
1173     this.executor = executor;
1174     }
1175 dl 1.101 public final CompletableFuture<?> tryComplete() {
1176 jsr166 1.82 final CompletableFuture<?> a;
1177 dl 1.1 final CompletableFuture<?> b;
1178     final Runnable fn;
1179     final CompletableFuture<Void> dst;
1180 dl 1.28 Object r; Throwable ex;
1181 jsr166 1.2 if ((dst = this.dst) != null &&
1182 dl 1.1 (fn = this.fn) != null &&
1183     (((a = this.src) != null && (r = a.result) != null) ||
1184     ((b = this.snd) != null && (r = b.result) != null)) &&
1185 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1186 dl 1.19 if (r instanceof AltResult)
1187     ex = ((AltResult)r).ex;
1188     else
1189     ex = null;
1190 dl 1.20 Executor e = executor;
1191 dl 1.19 if (ex == null) {
1192 dl 1.1 try {
1193 dl 1.20 if (e != null)
1194 dl 1.96 execAsync(e, new AsyncRun(fn, dst));
1195 dl 1.20 else
1196 dl 1.1 fn.run();
1197     } catch (Throwable rex) {
1198 dl 1.19 ex = rex;
1199 dl 1.1 }
1200     }
1201 dl 1.20 if (e == null || ex != null)
1202 dl 1.101 dst.internalComplete(null, ex);
1203 jsr166 1.2 }
1204 dl 1.99 return dst;
1205 jsr166 1.2 }
1206 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1207 dl 1.1 }
1208    
1209 dl 1.35 static final class OrCompletion extends Completion {
1210     final CompletableFuture<?> src;
1211     final CompletableFuture<?> snd;
1212 dl 1.77 final CompletableFuture<Object> dst;
1213 dl 1.35 OrCompletion(CompletableFuture<?> src,
1214     CompletableFuture<?> snd,
1215 dl 1.77 CompletableFuture<Object> dst) {
1216 dl 1.35 this.src = src; this.snd = snd; this.dst = dst;
1217     }
1218 dl 1.101 public final CompletableFuture<?> tryComplete() {
1219 dl 1.35 final CompletableFuture<?> a;
1220     final CompletableFuture<?> b;
1221 dl 1.77 final CompletableFuture<Object> dst;
1222     Object r, t; Throwable ex;
1223 dl 1.35 if ((dst = this.dst) != null &&
1224     (((a = this.src) != null && (r = a.result) != null) ||
1225     ((b = this.snd) != null && (r = b.result) != null)) &&
1226 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1227 dl 1.77 if (r instanceof AltResult) {
1228 dl 1.35 ex = ((AltResult)r).ex;
1229 dl 1.77 t = null;
1230     }
1231     else {
1232 dl 1.35 ex = null;
1233 dl 1.77 t = r;
1234     }
1235 dl 1.101 dst.internalComplete(t, ex);
1236 dl 1.35 }
1237 dl 1.99 return dst;
1238 dl 1.35 }
1239     private static final long serialVersionUID = 5232453952276885070L;
1240     }
1241    
1242 dl 1.28 static final class ExceptionCompletion<T> extends Completion {
1243 jsr166 1.2 final CompletableFuture<? extends T> src;
1244 dl 1.6 final Function<? super Throwable, ? extends T> fn;
1245     final CompletableFuture<T> dst;
1246 dl 1.28 ExceptionCompletion(CompletableFuture<? extends T> src,
1247     Function<? super Throwable, ? extends T> fn,
1248     CompletableFuture<T> dst) {
1249 dl 1.1 this.src = src; this.fn = fn; this.dst = dst;
1250     }
1251 dl 1.101 public final CompletableFuture<?> tryComplete() {
1252 dl 1.28 final CompletableFuture<? extends T> a;
1253     final Function<? super Throwable, ? extends T> fn;
1254     final CompletableFuture<T> dst;
1255 dl 1.20 Object r; T t = null; Throwable ex, dx = null;
1256 jsr166 1.2 if ((dst = this.dst) != null &&
1257 dl 1.1 (fn = this.fn) != null &&
1258     (a = this.src) != null &&
1259     (r = a.result) != null &&
1260 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1261 dl 1.20 if ((r instanceof AltResult) &&
1262 jsr166 1.87 (ex = ((AltResult)r).ex) != null) {
1263 dl 1.20 try {
1264     t = fn.apply(ex);
1265     } catch (Throwable rex) {
1266     dx = rex;
1267 dl 1.1 }
1268     }
1269 dl 1.28 else {
1270     @SuppressWarnings("unchecked") T tr = (T) r;
1271     t = tr;
1272     }
1273 dl 1.101 dst.internalComplete(t, dx);
1274 dl 1.1 }
1275 dl 1.99 return dst;
1276 dl 1.1 }
1277 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1278 dl 1.1 }
1279    
1280 dl 1.88 static final class WhenCompleteCompletion<T> extends Completion {
1281     final CompletableFuture<? extends T> src;
1282     final BiConsumer<? super T, ? super Throwable> fn;
1283     final CompletableFuture<T> dst;
1284     final Executor executor;
1285     WhenCompleteCompletion(CompletableFuture<? extends T> src,
1286 dl 1.101 BiConsumer<? super T, ? super Throwable> fn,
1287     CompletableFuture<T> dst,
1288     Executor executor) {
1289 dl 1.88 this.src = src; this.fn = fn; this.dst = dst;
1290     this.executor = executor;
1291     }
1292 dl 1.101 public final CompletableFuture<?> tryComplete() {
1293 dl 1.88 final CompletableFuture<? extends T> a;
1294     final BiConsumer<? super T, ? super Throwable> fn;
1295     final CompletableFuture<T> dst;
1296     Object r; T t; Throwable ex;
1297     if ((dst = this.dst) != null &&
1298     (fn = this.fn) != null &&
1299     (a = this.src) != null &&
1300     (r = a.result) != null &&
1301 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1302 dl 1.88 if (r instanceof AltResult) {
1303     ex = ((AltResult)r).ex;
1304     t = null;
1305     }
1306     else {
1307     ex = null;
1308     @SuppressWarnings("unchecked") T tr = (T) r;
1309     t = tr;
1310     }
1311     Executor e = executor;
1312     Throwable dx = null;
1313     try {
1314     if (e != null)
1315 dl 1.96 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
1316 dl 1.88 else
1317     fn.accept(t, ex);
1318     } catch (Throwable rex) {
1319     dx = rex;
1320     }
1321     if (e == null || dx != null)
1322 dl 1.101 dst.internalComplete(t, ex != null ? ex : dx);
1323 dl 1.88 }
1324 dl 1.99 return dst;
1325 dl 1.88 }
1326     private static final long serialVersionUID = 5232453952276885070L;
1327     }
1328    
1329 dl 1.75 static final class ThenCopy<T> extends Completion {
1330 dl 1.77 final CompletableFuture<?> src;
1331 dl 1.75 final CompletableFuture<T> dst;
1332 dl 1.77 ThenCopy(CompletableFuture<?> src,
1333 dl 1.75 CompletableFuture<T> dst) {
1334 dl 1.17 this.src = src; this.dst = dst;
1335     }
1336 dl 1.101 public final CompletableFuture<?> tryComplete() {
1337 dl 1.77 final CompletableFuture<?> a;
1338 dl 1.75 final CompletableFuture<T> dst;
1339     Object r; T t; Throwable ex;
1340 dl 1.17 if ((dst = this.dst) != null &&
1341     (a = this.src) != null &&
1342     (r = a.result) != null &&
1343 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1344 dl 1.20 if (r instanceof AltResult) {
1345     ex = ((AltResult)r).ex;
1346     t = null;
1347     }
1348     else {
1349     ex = null;
1350 dl 1.75 @SuppressWarnings("unchecked") T tr = (T) r;
1351     t = tr;
1352 dl 1.20 }
1353 dl 1.101 dst.internalComplete(t, ex);
1354 dl 1.17 }
1355 dl 1.99 return dst;
1356 dl 1.17 }
1357 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1358 dl 1.17 }
1359    
1360 dl 1.75 // version of ThenCopy for CompletableFuture<Void> dst
1361     static final class ThenPropagate extends Completion {
1362     final CompletableFuture<?> src;
1363     final CompletableFuture<Void> dst;
1364     ThenPropagate(CompletableFuture<?> src,
1365     CompletableFuture<Void> dst) {
1366     this.src = src; this.dst = dst;
1367     }
1368 dl 1.101 public final CompletableFuture<?> tryComplete() {
1369 dl 1.75 final CompletableFuture<?> a;
1370     final CompletableFuture<Void> dst;
1371     Object r; Throwable ex;
1372     if ((dst = this.dst) != null &&
1373     (a = this.src) != null &&
1374     (r = a.result) != null &&
1375 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1376 dl 1.75 if (r instanceof AltResult)
1377     ex = ((AltResult)r).ex;
1378     else
1379     ex = null;
1380 dl 1.101 dst.internalComplete(null, ex);
1381 dl 1.75 }
1382 dl 1.99 return dst;
1383 dl 1.75 }
1384     private static final long serialVersionUID = 5232453952276885070L;
1385     }
1386    
1387 dl 1.28 static final class HandleCompletion<T,U> extends Completion {
1388 dl 1.17 final CompletableFuture<? extends T> src;
1389     final BiFunction<? super T, Throwable, ? extends U> fn;
1390     final CompletableFuture<U> dst;
1391 dl 1.88 final Executor executor;
1392 dl 1.28 HandleCompletion(CompletableFuture<? extends T> src,
1393     BiFunction<? super T, Throwable, ? extends U> fn,
1394 dl 1.88 CompletableFuture<U> dst,
1395 dl 1.101 Executor executor) {
1396 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1397 dl 1.88 this.executor = executor;
1398 dl 1.17 }
1399 dl 1.101 public final CompletableFuture<?> tryComplete() {
1400 dl 1.28 final CompletableFuture<? extends T> a;
1401     final BiFunction<? super T, Throwable, ? extends U> fn;
1402     final CompletableFuture<U> dst;
1403 dl 1.17 Object r; T t; Throwable ex;
1404     if ((dst = this.dst) != null &&
1405     (fn = this.fn) != null &&
1406     (a = this.src) != null &&
1407     (r = a.result) != null &&
1408 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1409 dl 1.17 if (r instanceof AltResult) {
1410     ex = ((AltResult)r).ex;
1411     t = null;
1412     }
1413     else {
1414     ex = null;
1415 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1416     t = tr;
1417 dl 1.17 }
1418 dl 1.88 Executor e = executor;
1419     U u = null;
1420     Throwable dx = null;
1421 dl 1.17 try {
1422 dl 1.88 if (e != null)
1423 dl 1.96 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1424 dl 1.88 else
1425     u = fn.apply(t, ex);
1426 dl 1.17 } catch (Throwable rex) {
1427 dl 1.20 dx = rex;
1428 dl 1.17 }
1429 dl 1.88 if (e == null || dx != null)
1430 dl 1.101 dst.internalComplete(u, dx);
1431 dl 1.17 }
1432 dl 1.99 return dst;
1433 dl 1.17 }
1434 dl 1.20 private static final long serialVersionUID = 5232453952276885070L;
1435 dl 1.17 }
1436    
1437 jsr166 1.81 static final class ThenCompose<T,U> extends Completion {
1438 dl 1.17 final CompletableFuture<? extends T> src;
1439 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1440 dl 1.17 final CompletableFuture<U> dst;
1441 dl 1.37 final Executor executor;
1442 jsr166 1.81 ThenCompose(CompletableFuture<? extends T> src,
1443 dl 1.88 Function<? super T, ? extends CompletionStage<U>> fn,
1444 jsr166 1.81 CompletableFuture<U> dst,
1445     Executor executor) {
1446 dl 1.17 this.src = src; this.fn = fn; this.dst = dst;
1447 dl 1.37 this.executor = executor;
1448 dl 1.17 }
1449 dl 1.101 public final CompletableFuture<?> tryComplete() {
1450 dl 1.28 final CompletableFuture<? extends T> a;
1451 dl 1.88 final Function<? super T, ? extends CompletionStage<U>> fn;
1452 dl 1.28 final CompletableFuture<U> dst;
1453 dl 1.37 Object r; T t; Throwable ex; Executor e;
1454 dl 1.17 if ((dst = this.dst) != null &&
1455     (fn = this.fn) != null &&
1456     (a = this.src) != null &&
1457     (r = a.result) != null &&
1458 dl 1.101 get() == 0 && compareAndSet(0, 1)) {
1459 dl 1.17 if (r instanceof AltResult) {
1460     ex = ((AltResult)r).ex;
1461     t = null;
1462     }
1463     else {
1464     ex = null;
1465 dl 1.28 @SuppressWarnings("unchecked") T tr = (T) r;
1466     t = tr;
1467 dl 1.17 }
1468     CompletableFuture<U> c = null;
1469 dl 1.20 U u = null;
1470     boolean complete = false;
1471 dl 1.17 if (ex == null) {
1472 dl 1.37 if ((e = executor) != null)
1473 dl 1.96 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1474 dl 1.37 else {
1475     try {
1476 dl 1.88 CompletionStage<U> cs = fn.apply(t);
1477     c = (cs == null) ? null : cs.toCompletableFuture();
1478     if (c == null)
1479 dl 1.37 ex = new NullPointerException();
1480     } catch (Throwable rex) {
1481     ex = rex;
1482     }
1483 dl 1.17 }
1484     }
1485 dl 1.37 if (c != null) {
1486 dl 1.18 Object s;
1487     if ((s = c.result) == null) {
1488 dl 1.101 ThenCopy<U> d = new ThenCopy<U>(c, dst);
1489     CompletionNode p = new CompletionNode(d);
1490 dl 1.18 while ((s = c.result) == null) {
1491     if (UNSAFE.compareAndSwapObject
1492     (c, COMPLETIONS, p.next = c.completions, p))
1493     break;
1494     }
1495 dl 1.101 d.tryComplete();
1496 dl 1.17 }
1497 dl 1.101 else {
1498 dl 1.20 complete = true;
1499 dl 1.18 if (s instanceof AltResult) {
1500     ex = ((AltResult)s).ex; // no rewrap
1501     u = null;
1502 dl 1.28 }
1503     else {
1504     @SuppressWarnings("unchecked") U us = (U) s;
1505     u = us;
1506     }
1507     }
1508     }
1509     if (complete || ex != null)
1510 dl 1.101 dst.internalComplete(u, ex);
1511 dl 1.28 }
1512 dl 1.99 return dst;
1513 dl 1.28 }
1514     private static final long serialVersionUID = 5232453952276885070L;
1515     }
1516    
1517 dl 1.88 // Implementations of stage methods with (plain, async, Executor) forms
1518 dl 1.28
1519 dl 1.88 private <U> CompletableFuture<U> doThenApply
1520     (Function<? super T,? extends U> fn,
1521     Executor e) {
1522     if (fn == null) throw new NullPointerException();
1523     CompletableFuture<U> dst = new CompletableFuture<U>();
1524     Object r;
1525     if ((r = result) == null) {
1526 dl 1.101 ThenApply<T,U> d = new ThenApply<T,U>(this, fn, dst, e);
1527     CompletionNode p = new CompletionNode(d);
1528     while (result == null) {
1529 dl 1.88 if (UNSAFE.compareAndSwapObject
1530     (this, COMPLETIONS, p.next = completions, p))
1531     break;
1532     }
1533 dl 1.101 d.tryComplete();
1534 dl 1.88 }
1535 dl 1.101 else {
1536 dl 1.88 T t; Throwable ex;
1537     if (r instanceof AltResult) {
1538     ex = ((AltResult)r).ex;
1539     t = null;
1540     }
1541     else {
1542     ex = null;
1543     @SuppressWarnings("unchecked") T tr = (T) r;
1544     t = tr;
1545     }
1546     U u = null;
1547     if (ex == null) {
1548     try {
1549     if (e != null)
1550 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1551 dl 1.88 else
1552     u = fn.apply(t);
1553     } catch (Throwable rex) {
1554     ex = rex;
1555     }
1556     }
1557     if (e == null || ex != null)
1558     dst.internalComplete(u, ex);
1559     }
1560     return dst;
1561 dl 1.28 }
1562    
1563 dl 1.88 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1564     Executor e) {
1565     if (fn == null) throw new NullPointerException();
1566     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1567     Object r;
1568     if ((r = result) == null) {
1569 dl 1.101 ThenAccept<T> d = new ThenAccept<T>(this, fn, dst, e);
1570     CompletionNode p = new CompletionNode(d);
1571     while (result == null) {
1572 dl 1.88 if (UNSAFE.compareAndSwapObject
1573     (this, COMPLETIONS, p.next = completions, p))
1574     break;
1575     }
1576 dl 1.101 d.tryComplete();
1577 dl 1.88 }
1578 dl 1.101 else {
1579 dl 1.88 T t; Throwable ex;
1580     if (r instanceof AltResult) {
1581     ex = ((AltResult)r).ex;
1582     t = null;
1583     }
1584     else {
1585     ex = null;
1586     @SuppressWarnings("unchecked") T tr = (T) r;
1587     t = tr;
1588     }
1589     if (ex == null) {
1590     try {
1591     if (e != null)
1592 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1593 dl 1.88 else
1594     fn.accept(t);
1595     } catch (Throwable rex) {
1596     ex = rex;
1597     }
1598     }
1599     if (e == null || ex != null)
1600     dst.internalComplete(null, ex);
1601     }
1602     return dst;
1603 dl 1.28 }
1604    
1605 dl 1.88 private CompletableFuture<Void> doThenRun(Runnable action,
1606     Executor e) {
1607     if (action == null) throw new NullPointerException();
1608     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1609     Object r;
1610     if ((r = result) == null) {
1611 dl 1.101 ThenRun d = new ThenRun(this, action, dst, e);
1612     CompletionNode p = new CompletionNode(d);
1613     while (result == null) {
1614 dl 1.88 if (UNSAFE.compareAndSwapObject
1615     (this, COMPLETIONS, p.next = completions, p))
1616     break;
1617     }
1618 dl 1.101 d.tryComplete();
1619 dl 1.88 }
1620 dl 1.101 else {
1621 dl 1.88 Throwable ex;
1622     if (r instanceof AltResult)
1623     ex = ((AltResult)r).ex;
1624     else
1625     ex = null;
1626     if (ex == null) {
1627     try {
1628     if (e != null)
1629 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1630 dl 1.88 else
1631     action.run();
1632     } catch (Throwable rex) {
1633     ex = rex;
1634     }
1635     }
1636     if (e == null || ex != null)
1637     dst.internalComplete(null, ex);
1638     }
1639     return dst;
1640     }
1641    
1642     private <U,V> CompletableFuture<V> doThenCombine
1643     (CompletableFuture<? extends U> other,
1644     BiFunction<? super T,? super U,? extends V> fn,
1645     Executor e) {
1646     if (other == null || fn == null) throw new NullPointerException();
1647     CompletableFuture<V> dst = new CompletableFuture<V>();
1648 dl 1.101 Object r = result, s = other.result;
1649     if (r == null || s == null) {
1650     ThenCombine<T,U,V> d =
1651     new ThenCombine<T,U,V>(this, other, fn, dst, e);
1652 dl 1.88 CompletionNode q = null, p = new CompletionNode(d);
1653     while ((r == null && (r = result) == null) ||
1654     (s == null && (s = other.result) == null)) {
1655     if (q != null) {
1656     if (s != null ||
1657     UNSAFE.compareAndSwapObject
1658     (other, COMPLETIONS, q.next = other.completions, q))
1659     break;
1660     }
1661     else if (r != null ||
1662     UNSAFE.compareAndSwapObject
1663     (this, COMPLETIONS, p.next = completions, p)) {
1664     if (s != null)
1665     break;
1666     q = new CompletionNode(d);
1667     }
1668     }
1669 dl 1.101 d.tryComplete();
1670 dl 1.88 }
1671 dl 1.101 else {
1672 dl 1.88 T t; U u; Throwable ex;
1673     if (r instanceof AltResult) {
1674     ex = ((AltResult)r).ex;
1675     t = null;
1676     }
1677     else {
1678     ex = null;
1679     @SuppressWarnings("unchecked") T tr = (T) r;
1680     t = tr;
1681     }
1682     if (ex != null)
1683     u = null;
1684     else if (s instanceof AltResult) {
1685     ex = ((AltResult)s).ex;
1686     u = null;
1687     }
1688     else {
1689     @SuppressWarnings("unchecked") U us = (U) s;
1690     u = us;
1691     }
1692     V v = null;
1693     if (ex == null) {
1694     try {
1695     if (e != null)
1696 dl 1.96 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
1697 dl 1.88 else
1698     v = fn.apply(t, u);
1699     } catch (Throwable rex) {
1700     ex = rex;
1701     }
1702     }
1703     if (e == null || ex != null)
1704     dst.internalComplete(v, ex);
1705     }
1706     return dst;
1707     }
1708    
1709     private <U> CompletableFuture<Void> doThenAcceptBoth
1710     (CompletableFuture<? extends U> other,
1711     BiConsumer<? super T,? super U> fn,
1712     Executor e) {
1713     if (other == null || fn == null) throw new NullPointerException();
1714     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1715 dl 1.101 Object r = result, s = other.result;
1716     if (r == null || s == null) {
1717     ThenAcceptBoth<T,U> d =
1718     new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1719 dl 1.88 CompletionNode q = null, p = new CompletionNode(d);
1720     while ((r == null && (r = result) == null) ||
1721     (s == null && (s = other.result) == null)) {
1722     if (q != null) {
1723     if (s != null ||
1724     UNSAFE.compareAndSwapObject
1725     (other, COMPLETIONS, q.next = other.completions, q))
1726     break;
1727     }
1728     else if (r != null ||
1729     UNSAFE.compareAndSwapObject
1730     (this, COMPLETIONS, p.next = completions, p)) {
1731     if (s != null)
1732     break;
1733     q = new CompletionNode(d);
1734     }
1735     }
1736 dl 1.101 d.tryComplete();
1737 dl 1.88 }
1738 dl 1.101 else {
1739 dl 1.88 T t; U u; Throwable ex;
1740     if (r instanceof AltResult) {
1741     ex = ((AltResult)r).ex;
1742     t = null;
1743     }
1744     else {
1745     ex = null;
1746     @SuppressWarnings("unchecked") T tr = (T) r;
1747     t = tr;
1748     }
1749     if (ex != null)
1750     u = null;
1751     else if (s instanceof AltResult) {
1752     ex = ((AltResult)s).ex;
1753     u = null;
1754     }
1755     else {
1756     @SuppressWarnings("unchecked") U us = (U) s;
1757     u = us;
1758     }
1759     if (ex == null) {
1760     try {
1761     if (e != null)
1762 dl 1.96 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1763 dl 1.88 else
1764     fn.accept(t, u);
1765     } catch (Throwable rex) {
1766     ex = rex;
1767     }
1768     }
1769     if (e == null || ex != null)
1770     dst.internalComplete(null, ex);
1771     }
1772     return dst;
1773     }
1774    
1775     private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1776     Runnable action,
1777     Executor e) {
1778     if (other == null || action == null) throw new NullPointerException();
1779     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1780 dl 1.101 Object r = result, s = other.result;
1781     if (r == null || s == null) {
1782     RunAfterBoth d = new RunAfterBoth(this, other, action, dst, e);
1783 dl 1.88 CompletionNode q = null, p = new CompletionNode(d);
1784     while ((r == null && (r = result) == null) ||
1785     (s == null && (s = other.result) == null)) {
1786     if (q != null) {
1787     if (s != null ||
1788     UNSAFE.compareAndSwapObject
1789     (other, COMPLETIONS, q.next = other.completions, q))
1790     break;
1791     }
1792     else if (r != null ||
1793     UNSAFE.compareAndSwapObject
1794     (this, COMPLETIONS, p.next = completions, p)) {
1795     if (s != null)
1796     break;
1797     q = new CompletionNode(d);
1798     }
1799     }
1800 dl 1.101 d.tryComplete();
1801 dl 1.88 }
1802 dl 1.101 else {
1803 dl 1.88 Throwable ex;
1804     if (r instanceof AltResult)
1805     ex = ((AltResult)r).ex;
1806     else
1807     ex = null;
1808     if (ex == null && (s instanceof AltResult))
1809     ex = ((AltResult)s).ex;
1810     if (ex == null) {
1811     try {
1812     if (e != null)
1813 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1814 dl 1.88 else
1815     action.run();
1816     } catch (Throwable rex) {
1817     ex = rex;
1818     }
1819     }
1820     if (e == null || ex != null)
1821     dst.internalComplete(null, ex);
1822     }
1823     return dst;
1824     }
1825    
1826     private <U> CompletableFuture<U> doApplyToEither
1827     (CompletableFuture<? extends T> other,
1828     Function<? super T, U> fn,
1829     Executor e) {
1830     if (other == null || fn == null) throw new NullPointerException();
1831     CompletableFuture<U> dst = new CompletableFuture<U>();
1832     Object r;
1833     if ((r = result) == null && (r = other.result) == null) {
1834 dl 1.101 ApplyToEither<T,U> d =
1835     new ApplyToEither<T,U>(this, other, fn, dst, e);
1836 dl 1.88 CompletionNode q = null, p = new CompletionNode(d);
1837 dl 1.101 while (result == null && other.result == null) {
1838 dl 1.88 if (q != null) {
1839     if (UNSAFE.compareAndSwapObject
1840     (other, COMPLETIONS, q.next = other.completions, q))
1841     break;
1842     }
1843     else if (UNSAFE.compareAndSwapObject
1844     (this, COMPLETIONS, p.next = completions, p))
1845     q = new CompletionNode(d);
1846     }
1847 dl 1.101 d.tryComplete();
1848 dl 1.88 }
1849 dl 1.101 else {
1850 dl 1.88 T t; Throwable ex;
1851     if (r instanceof AltResult) {
1852     ex = ((AltResult)r).ex;
1853     t = null;
1854     }
1855     else {
1856     ex = null;
1857     @SuppressWarnings("unchecked") T tr = (T) r;
1858     t = tr;
1859     }
1860     U u = null;
1861     if (ex == null) {
1862     try {
1863     if (e != null)
1864 dl 1.96 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1865 dl 1.88 else
1866     u = fn.apply(t);
1867     } catch (Throwable rex) {
1868     ex = rex;
1869     }
1870     }
1871     if (e == null || ex != null)
1872     dst.internalComplete(u, ex);
1873     }
1874     return dst;
1875     }
1876    
1877     private CompletableFuture<Void> doAcceptEither
1878     (CompletableFuture<? extends T> other,
1879     Consumer<? super T> fn,
1880     Executor e) {
1881     if (other == null || fn == null) throw new NullPointerException();
1882     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1883     Object r;
1884     if ((r = result) == null && (r = other.result) == null) {
1885 dl 1.101 AcceptEither<T> d =
1886     new AcceptEither<T>(this, other, fn, dst, e);
1887 dl 1.88 CompletionNode q = null, p = new CompletionNode(d);
1888 dl 1.101 while (result == null && other.result == null) {
1889 dl 1.88 if (q != null) {
1890     if (UNSAFE.compareAndSwapObject
1891     (other, COMPLETIONS, q.next = other.completions, q))
1892     break;
1893     }
1894     else if (UNSAFE.compareAndSwapObject
1895     (this, COMPLETIONS, p.next = completions, p))
1896     q = new CompletionNode(d);
1897     }
1898 dl 1.101 d.tryComplete();
1899 dl 1.88 }
1900 dl 1.101 else {
1901 dl 1.88 T t; Throwable ex;
1902     if (r instanceof AltResult) {
1903     ex = ((AltResult)r).ex;
1904     t = null;
1905     }
1906     else {
1907     ex = null;
1908     @SuppressWarnings("unchecked") T tr = (T) r;
1909     t = tr;
1910     }
1911     if (ex == null) {
1912     try {
1913     if (e != null)
1914 dl 1.96 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1915 dl 1.88 else
1916     fn.accept(t);
1917     } catch (Throwable rex) {
1918     ex = rex;
1919     }
1920     }
1921     if (e == null || ex != null)
1922     dst.internalComplete(null, ex);
1923     }
1924     return dst;
1925     }
1926    
1927     private CompletableFuture<Void> doRunAfterEither
1928     (CompletableFuture<?> other,
1929     Runnable action,
1930     Executor e) {
1931     if (other == null || action == null) throw new NullPointerException();
1932     CompletableFuture<Void> dst = new CompletableFuture<Void>();
1933     Object r;
1934     if ((r = result) == null && (r = other.result) == null) {
1935 dl 1.101 RunAfterEither d =
1936     new RunAfterEither(this, other, action, dst, e);
1937 dl 1.88 CompletionNode q = null, p = new CompletionNode(d);
1938 dl 1.101 while (result == null && other.result == null) {
1939 dl 1.88 if (q != null) {
1940     if (UNSAFE.compareAndSwapObject
1941     (other, COMPLETIONS, q.next = other.completions, q))
1942     break;
1943     }
1944     else if (UNSAFE.compareAndSwapObject
1945     (this, COMPLETIONS, p.next = completions, p))
1946     q = new CompletionNode(d);
1947     }
1948 dl 1.101 d.tryComplete();
1949 dl 1.88 }
1950 dl 1.101 else {
1951 dl 1.88 Throwable ex;
1952     if (r instanceof AltResult)
1953     ex = ((AltResult)r).ex;
1954     else
1955     ex = null;
1956     if (ex == null) {
1957     try {
1958     if (e != null)
1959 dl 1.96 execAsync(e, new AsyncRun(action, dst));
1960 dl 1.88 else
1961     action.run();
1962     } catch (Throwable rex) {
1963     ex = rex;
1964     }
1965     }
1966     if (e == null || ex != null)
1967     dst.internalComplete(null, ex);
1968     }
1969     return dst;
1970     }
1971    
1972     private <U> CompletableFuture<U> doThenCompose
1973     (Function<? super T, ? extends CompletionStage<U>> fn,
1974     Executor e) {
1975     if (fn == null) throw new NullPointerException();
1976     CompletableFuture<U> dst = null;
1977     Object r;
1978     if ((r = result) == null) {
1979     dst = new CompletableFuture<U>();
1980 dl 1.101 ThenCompose<T,U> d = new ThenCompose<T,U>(this, fn, dst, e);
1981     CompletionNode p = new CompletionNode(d);
1982     while (result == null) {
1983 dl 1.88 if (UNSAFE.compareAndSwapObject
1984     (this, COMPLETIONS, p.next = completions, p))
1985     break;
1986     }
1987 dl 1.101 d.tryComplete();
1988 dl 1.88 }
1989 dl 1.101 else {
1990 dl 1.88 T t; Throwable ex;
1991     if (r instanceof AltResult) {
1992     ex = ((AltResult)r).ex;
1993     t = null;
1994     }
1995     else {
1996     ex = null;
1997     @SuppressWarnings("unchecked") T tr = (T) r;
1998     t = tr;
1999     }
2000     if (ex == null) {
2001     if (e != null) {
2002     if (dst == null)
2003     dst = new CompletableFuture<U>();
2004 dl 1.96 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
2005 dl 1.88 }
2006     else {
2007     try {
2008     CompletionStage<U> cs = fn.apply(t);
2009     if (cs == null ||
2010     (dst = cs.toCompletableFuture()) == null)
2011     ex = new NullPointerException();
2012     } catch (Throwable rex) {
2013     ex = rex;
2014     }
2015     }
2016     }
2017     if (dst == null)
2018     dst = new CompletableFuture<U>();
2019 dl 1.98 if (ex != null)
2020 dl 1.88 dst.internalComplete(null, ex);
2021     }
2022     return dst;
2023     }
2024    
2025     private CompletableFuture<T> doWhenComplete
2026     (BiConsumer<? super T, ? super Throwable> fn,
2027     Executor e) {
2028     if (fn == null) throw new NullPointerException();
2029     CompletableFuture<T> dst = new CompletableFuture<T>();
2030     Object r;
2031     if ((r = result) == null) {
2032 dl 1.101 WhenCompleteCompletion<T> d =
2033     new WhenCompleteCompletion<T>(this, fn, dst, e);
2034     CompletionNode p = new CompletionNode(d);
2035     while (result == null) {
2036 dl 1.88 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2037     p.next = completions, p))
2038     break;
2039     }
2040 dl 1.101 d.tryComplete();
2041 dl 1.88 }
2042 dl 1.101 else {
2043 dl 1.88 T t; Throwable ex;
2044     if (r instanceof AltResult) {
2045     ex = ((AltResult)r).ex;
2046     t = null;
2047     }
2048     else {
2049     ex = null;
2050     @SuppressWarnings("unchecked") T tr = (T) r;
2051     t = tr;
2052     }
2053     Throwable dx = null;
2054     try {
2055     if (e != null)
2056 dl 1.96 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
2057 dl 1.88 else
2058     fn.accept(t, ex);
2059     } catch (Throwable rex) {
2060     dx = rex;
2061     }
2062     if (e == null || dx != null)
2063     dst.internalComplete(t, ex != null ? ex : dx);
2064     }
2065     return dst;
2066     }
2067    
2068     private <U> CompletableFuture<U> doHandle
2069     (BiFunction<? super T, Throwable, ? extends U> fn,
2070     Executor e) {
2071     if (fn == null) throw new NullPointerException();
2072     CompletableFuture<U> dst = new CompletableFuture<U>();
2073     Object r;
2074     if ((r = result) == null) {
2075 dl 1.101 HandleCompletion<T,U> d =
2076     new HandleCompletion<T,U>(this, fn, dst, e);
2077     CompletionNode p = new CompletionNode(d);
2078     while (result == null) {
2079 dl 1.88 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2080     p.next = completions, p))
2081     break;
2082     }
2083 dl 1.101 d.tryComplete();
2084 dl 1.88 }
2085 dl 1.101 else {
2086 dl 1.88 T t; Throwable ex;
2087     if (r instanceof AltResult) {
2088     ex = ((AltResult)r).ex;
2089     t = null;
2090     }
2091     else {
2092     ex = null;
2093     @SuppressWarnings("unchecked") T tr = (T) r;
2094     t = tr;
2095     }
2096 jsr166 1.90 U u = null;
2097 dl 1.88 Throwable dx = null;
2098     try {
2099     if (e != null)
2100 dl 1.96 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2101 dl 1.88 else {
2102     u = fn.apply(t, ex);
2103     dx = null;
2104     }
2105     } catch (Throwable rex) {
2106     dx = rex;
2107     u = null;
2108     }
2109     if (e == null || dx != null)
2110     dst.internalComplete(u, dx);
2111     }
2112     return dst;
2113     }
2114    
2115     // public methods
2116    
2117     /**
2118     * Creates a new incomplete CompletableFuture.
2119     */
2120     public CompletableFuture() {
2121     }
2122    
2123     /**
2124     * Returns a new CompletableFuture that is asynchronously completed
2125     * by a task running in the {@link ForkJoinPool#commonPool()} with
2126     * the value obtained by calling the given Supplier.
2127     *
2128     * @param supplier a function returning the value to be used
2129     * to complete the returned CompletableFuture
2130 jsr166 1.95 * @param <U> the function's return type
2131 dl 1.88 * @return the new CompletableFuture
2132     */
2133     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2134     if (supplier == null) throw new NullPointerException();
2135     CompletableFuture<U> f = new CompletableFuture<U>();
2136 dl 1.96 execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
2137 dl 1.88 return f;
2138     }
2139    
2140     /**
2141     * Returns a new CompletableFuture that is asynchronously completed
2142     * by a task running in the given executor with the value obtained
2143     * by calling the given Supplier.
2144     *
2145     * @param supplier a function returning the value to be used
2146     * to complete the returned CompletableFuture
2147     * @param executor the executor to use for asynchronous execution
2148 jsr166 1.95 * @param <U> the function's return type
2149 dl 1.88 * @return the new CompletableFuture
2150     */
2151     public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2152     Executor executor) {
2153     if (executor == null || supplier == null)
2154     throw new NullPointerException();
2155     CompletableFuture<U> f = new CompletableFuture<U>();
2156 dl 1.96 execAsync(executor, new AsyncSupply<U>(supplier, f));
2157 dl 1.88 return f;
2158 dl 1.28 }
2159    
2160     /**
2161 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2162     * by a task running in the {@link ForkJoinPool#commonPool()} after
2163     * it runs the given action.
2164 dl 1.28 *
2165     * @param runnable the action to run before completing the
2166     * returned CompletableFuture
2167 jsr166 1.58 * @return the new CompletableFuture
2168 dl 1.28 */
2169     public static CompletableFuture<Void> runAsync(Runnable runnable) {
2170     if (runnable == null) throw new NullPointerException();
2171     CompletableFuture<Void> f = new CompletableFuture<Void>();
2172 dl 1.96 execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f));
2173 dl 1.28 return f;
2174     }
2175    
2176     /**
2177 jsr166 1.66 * Returns a new CompletableFuture that is asynchronously completed
2178     * by a task running in the given executor after it runs the given
2179     * action.
2180 dl 1.28 *
2181     * @param runnable the action to run before completing the
2182     * returned CompletableFuture
2183     * @param executor the executor to use for asynchronous execution
2184 jsr166 1.58 * @return the new CompletableFuture
2185 dl 1.28 */
2186     public static CompletableFuture<Void> runAsync(Runnable runnable,
2187     Executor executor) {
2188     if (executor == null || runnable == null)
2189     throw new NullPointerException();
2190     CompletableFuture<Void> f = new CompletableFuture<Void>();
2191 dl 1.96 execAsync(executor, new AsyncRun(runnable, f));
2192 dl 1.28 return f;
2193     }
2194    
2195     /**
2196 dl 1.77 * Returns a new CompletableFuture that is already completed with
2197     * the given value.
2198     *
2199     * @param value the value
2200 jsr166 1.95 * @param <U> the type of the value
2201 dl 1.77 * @return the completed CompletableFuture
2202     */
2203     public static <U> CompletableFuture<U> completedFuture(U value) {
2204     CompletableFuture<U> f = new CompletableFuture<U>();
2205 jsr166 1.78 f.result = (value == null) ? NIL : value;
2206 dl 1.77 return f;
2207     }
2208    
2209     /**
2210 dl 1.28 * Returns {@code true} if completed in any fashion: normally,
2211     * exceptionally, or via cancellation.
2212     *
2213     * @return {@code true} if completed
2214     */
2215     public boolean isDone() {
2216     return result != null;
2217     }
2218    
2219     /**
2220 dl 1.49 * Waits if necessary for this future to complete, and then
2221 dl 1.48 * returns its result.
2222 dl 1.28 *
2223 dl 1.48 * @return the result value
2224     * @throws CancellationException if this future was cancelled
2225     * @throws ExecutionException if this future completed exceptionally
2226 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2227     * while waiting
2228     */
2229     public T get() throws InterruptedException, ExecutionException {
2230     Object r; Throwable ex, cause;
2231     if ((r = result) == null && (r = waitingGet(true)) == null)
2232     throw new InterruptedException();
2233 jsr166 1.45 if (!(r instanceof AltResult)) {
2234     @SuppressWarnings("unchecked") T tr = (T) r;
2235     return tr;
2236     }
2237     if ((ex = ((AltResult)r).ex) == null)
2238 dl 1.28 return null;
2239 jsr166 1.45 if (ex instanceof CancellationException)
2240     throw (CancellationException)ex;
2241     if ((ex instanceof CompletionException) &&
2242     (cause = ex.getCause()) != null)
2243     ex = cause;
2244     throw new ExecutionException(ex);
2245 dl 1.28 }
2246    
2247     /**
2248 dl 1.49 * Waits if necessary for at most the given time for this future
2249     * to complete, and then returns its result, if available.
2250 dl 1.28 *
2251     * @param timeout the maximum time to wait
2252     * @param unit the time unit of the timeout argument
2253 dl 1.48 * @return the result value
2254     * @throws CancellationException if this future was cancelled
2255     * @throws ExecutionException if this future completed exceptionally
2256 dl 1.28 * @throws InterruptedException if the current thread was interrupted
2257     * while waiting
2258     * @throws TimeoutException if the wait timed out
2259     */
2260     public T get(long timeout, TimeUnit unit)
2261     throws InterruptedException, ExecutionException, TimeoutException {
2262     Object r; Throwable ex, cause;
2263     long nanos = unit.toNanos(timeout);
2264     if (Thread.interrupted())
2265     throw new InterruptedException();
2266     if ((r = result) == null)
2267     r = timedAwaitDone(nanos);
2268 jsr166 1.45 if (!(r instanceof AltResult)) {
2269     @SuppressWarnings("unchecked") T tr = (T) r;
2270     return tr;
2271     }
2272     if ((ex = ((AltResult)r).ex) == null)
2273 dl 1.28 return null;
2274 jsr166 1.45 if (ex instanceof CancellationException)
2275     throw (CancellationException)ex;
2276     if ((ex instanceof CompletionException) &&
2277     (cause = ex.getCause()) != null)
2278     ex = cause;
2279     throw new ExecutionException(ex);
2280 dl 1.28 }
2281    
2282     /**
2283     * Returns the result value when complete, or throws an
2284     * (unchecked) exception if completed exceptionally. To better
2285     * conform with the use of common functional forms, if a
2286     * computation involved in the completion of this
2287     * CompletableFuture threw an exception, this method throws an
2288     * (unchecked) {@link CompletionException} with the underlying
2289     * exception as its cause.
2290     *
2291     * @return the result value
2292     * @throws CancellationException if the computation was cancelled
2293 jsr166 1.55 * @throws CompletionException if this future completed
2294     * exceptionally or a completion computation threw an exception
2295 dl 1.28 */
2296     public T join() {
2297     Object r; Throwable ex;
2298     if ((r = result) == null)
2299     r = waitingGet(false);
2300 jsr166 1.45 if (!(r instanceof AltResult)) {
2301     @SuppressWarnings("unchecked") T tr = (T) r;
2302     return tr;
2303     }
2304     if ((ex = ((AltResult)r).ex) == null)
2305 dl 1.28 return null;
2306 jsr166 1.45 if (ex instanceof CancellationException)
2307     throw (CancellationException)ex;
2308     if (ex instanceof CompletionException)
2309     throw (CompletionException)ex;
2310     throw new CompletionException(ex);
2311 dl 1.28 }
2312    
2313     /**
2314     * Returns the result value (or throws any encountered exception)
2315     * if completed, else returns the given valueIfAbsent.
2316     *
2317     * @param valueIfAbsent the value to return if not completed
2318     * @return the result value, if completed, else the given valueIfAbsent
2319     * @throws CancellationException if the computation was cancelled
2320 jsr166 1.55 * @throws CompletionException if this future completed
2321     * exceptionally or a completion computation threw an exception
2322 dl 1.28 */
2323     public T getNow(T valueIfAbsent) {
2324     Object r; Throwable ex;
2325     if ((r = result) == null)
2326     return valueIfAbsent;
2327 jsr166 1.45 if (!(r instanceof AltResult)) {
2328     @SuppressWarnings("unchecked") T tr = (T) r;
2329     return tr;
2330     }
2331     if ((ex = ((AltResult)r).ex) == null)
2332 dl 1.28 return null;
2333 jsr166 1.45 if (ex instanceof CancellationException)
2334     throw (CancellationException)ex;
2335     if (ex instanceof CompletionException)
2336     throw (CompletionException)ex;
2337     throw new CompletionException(ex);
2338 dl 1.28 }
2339    
2340     /**
2341     * If not already completed, sets the value returned by {@link
2342     * #get()} and related methods to the given value.
2343     *
2344     * @param value the result value
2345     * @return {@code true} if this invocation caused this CompletableFuture
2346     * to transition to a completed state, else {@code false}
2347     */
2348     public boolean complete(T value) {
2349     boolean triggered = result == null &&
2350     UNSAFE.compareAndSwapObject(this, RESULT, null,
2351     value == null ? NIL : value);
2352 dl 1.101 postComplete(this);
2353 dl 1.28 return triggered;
2354     }
2355    
2356     /**
2357     * If not already completed, causes invocations of {@link #get()}
2358     * and related methods to throw the given exception.
2359     *
2360     * @param ex the exception
2361     * @return {@code true} if this invocation caused this CompletableFuture
2362     * to transition to a completed state, else {@code false}
2363     */
2364     public boolean completeExceptionally(Throwable ex) {
2365     if (ex == null) throw new NullPointerException();
2366     boolean triggered = result == null &&
2367     UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2368 dl 1.101 postComplete(this);
2369 dl 1.28 return triggered;
2370     }
2371    
2372 dl 1.88 // CompletionStage methods
2373    
2374     public <U> CompletableFuture<U> thenApply
2375     (Function<? super T,? extends U> fn) {
2376 dl 1.28 return doThenApply(fn, null);
2377     }
2378    
2379 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2380     (Function<? super T,? extends U> fn) {
2381 dl 1.28 return doThenApply(fn, ForkJoinPool.commonPool());
2382 dl 1.17 }
2383    
2384 dl 1.48 public <U> CompletableFuture<U> thenApplyAsync
2385     (Function<? super T,? extends U> fn,
2386     Executor executor) {
2387 dl 1.28 if (executor == null) throw new NullPointerException();
2388     return doThenApply(fn, executor);
2389     }
2390 dl 1.1
2391 dl 1.88 public CompletableFuture<Void> thenAccept
2392     (Consumer<? super T> action) {
2393     return doThenAccept(action, null);
2394 dl 1.28 }
2395    
2396 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2397     (Consumer<? super T> action) {
2398     return doThenAccept(action, ForkJoinPool.commonPool());
2399 dl 1.28 }
2400    
2401 dl 1.88 public CompletableFuture<Void> thenAcceptAsync
2402     (Consumer<? super T> action,
2403     Executor executor) {
2404 dl 1.28 if (executor == null) throw new NullPointerException();
2405 dl 1.88 return doThenAccept(action, executor);
2406 dl 1.7 }
2407    
2408 dl 1.88 public CompletableFuture<Void> thenRun
2409     (Runnable action) {
2410 dl 1.28 return doThenRun(action, null);
2411     }
2412    
2413 dl 1.88 public CompletableFuture<Void> thenRunAsync
2414     (Runnable action) {
2415 dl 1.28 return doThenRun(action, ForkJoinPool.commonPool());
2416     }
2417    
2418 dl 1.88 public CompletableFuture<Void> thenRunAsync
2419     (Runnable action,
2420     Executor executor) {
2421 dl 1.28 if (executor == null) throw new NullPointerException();
2422     return doThenRun(action, executor);
2423     }
2424    
2425 dl 1.48 public <U,V> CompletableFuture<V> thenCombine
2426 dl 1.88 (CompletionStage<? extends U> other,
2427 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2428 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, null);
2429 dl 1.28 }
2430    
2431 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2432 dl 1.88 (CompletionStage<? extends U> other,
2433 dl 1.48 BiFunction<? super T,? super U,? extends V> fn) {
2434 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn,
2435     ForkJoinPool.commonPool());
2436 dl 1.28 }
2437    
2438 dl 1.48 public <U,V> CompletableFuture<V> thenCombineAsync
2439 dl 1.88 (CompletionStage<? extends U> other,
2440 dl 1.48 BiFunction<? super T,? super U,? extends V> fn,
2441     Executor executor) {
2442 dl 1.28 if (executor == null) throw new NullPointerException();
2443 dl 1.88 return doThenCombine(other.toCompletableFuture(), fn, executor);
2444 dl 1.1 }
2445    
2446 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBoth
2447 dl 1.88 (CompletionStage<? extends U> other,
2448     BiConsumer<? super T, ? super U> action) {
2449     return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2450 dl 1.28 }
2451    
2452 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2453 dl 1.88 (CompletionStage<? extends U> other,
2454     BiConsumer<? super T, ? super U> action) {
2455     return doThenAcceptBoth(other.toCompletableFuture(), action,
2456     ForkJoinPool.commonPool());
2457 dl 1.28 }
2458    
2459 dl 1.48 public <U> CompletableFuture<Void> thenAcceptBothAsync
2460 dl 1.88 (CompletionStage<? extends U> other,
2461     BiConsumer<? super T, ? super U> action,
2462 dl 1.48 Executor executor) {
2463 dl 1.28 if (executor == null) throw new NullPointerException();
2464 dl 1.88 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2465 dl 1.28 }
2466    
2467 dl 1.88 public CompletableFuture<Void> runAfterBoth
2468     (CompletionStage<?> other,
2469     Runnable action) {
2470     return doRunAfterBoth(other.toCompletableFuture(), action, null);
2471 dl 1.7 }
2472    
2473 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2474     (CompletionStage<?> other,
2475     Runnable action) {
2476     return doRunAfterBoth(other.toCompletableFuture(), action,
2477     ForkJoinPool.commonPool());
2478 dl 1.28 }
2479    
2480 dl 1.88 public CompletableFuture<Void> runAfterBothAsync
2481     (CompletionStage<?> other,
2482     Runnable action,
2483     Executor executor) {
2484 dl 1.28 if (executor == null) throw new NullPointerException();
2485 dl 1.88 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2486 dl 1.28 }
2487    
2488 dl 1.1
2489 dl 1.48 public <U> CompletableFuture<U> applyToEither
2490 dl 1.88 (CompletionStage<? extends T> other,
2491 dl 1.48 Function<? super T, U> fn) {
2492 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, null);
2493 dl 1.28 }
2494    
2495 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2496 dl 1.88 (CompletionStage<? extends T> other,
2497 dl 1.48 Function<? super T, U> fn) {
2498 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn,
2499     ForkJoinPool.commonPool());
2500 dl 1.28 }
2501    
2502 dl 1.48 public <U> CompletableFuture<U> applyToEitherAsync
2503 dl 1.88 (CompletionStage<? extends T> other,
2504 dl 1.48 Function<? super T, U> fn,
2505     Executor executor) {
2506 dl 1.28 if (executor == null) throw new NullPointerException();
2507 dl 1.88 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2508 dl 1.1 }
2509    
2510 dl 1.48 public CompletableFuture<Void> acceptEither
2511 dl 1.88 (CompletionStage<? extends T> other,
2512     Consumer<? super T> action) {
2513     return doAcceptEither(other.toCompletableFuture(), action, null);
2514 dl 1.28 }
2515    
2516 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2517 dl 1.88 (CompletionStage<? extends T> other,
2518     Consumer<? super T> action) {
2519     return doAcceptEither(other.toCompletableFuture(), action,
2520     ForkJoinPool.commonPool());
2521 dl 1.28 }
2522    
2523 dl 1.48 public CompletableFuture<Void> acceptEitherAsync
2524 dl 1.88 (CompletionStage<? extends T> other,
2525     Consumer<? super T> action,
2526 dl 1.48 Executor executor) {
2527 dl 1.28 if (executor == null) throw new NullPointerException();
2528 dl 1.88 return doAcceptEither(other.toCompletableFuture(), action, executor);
2529 dl 1.7 }
2530    
2531 dl 1.88 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2532 dl 1.28 Runnable action) {
2533 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, null);
2534 dl 1.28 }
2535    
2536 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2537 dl 1.88 (CompletionStage<?> other,
2538 dl 1.48 Runnable action) {
2539 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action,
2540     ForkJoinPool.commonPool());
2541 dl 1.28 }
2542    
2543 dl 1.48 public CompletableFuture<Void> runAfterEitherAsync
2544 dl 1.88 (CompletionStage<?> other,
2545 dl 1.48 Runnable action,
2546     Executor executor) {
2547 dl 1.28 if (executor == null) throw new NullPointerException();
2548 dl 1.88 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2549 dl 1.1 }
2550    
2551 dl 1.48 public <U> CompletableFuture<U> thenCompose
2552 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2553 jsr166 1.81 return doThenCompose(fn, null);
2554 dl 1.37 }
2555    
2556 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2557 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn) {
2558 jsr166 1.81 return doThenCompose(fn, ForkJoinPool.commonPool());
2559 dl 1.37 }
2560    
2561 dl 1.48 public <U> CompletableFuture<U> thenComposeAsync
2562 dl 1.88 (Function<? super T, ? extends CompletionStage<U>> fn,
2563 dl 1.48 Executor executor) {
2564 dl 1.37 if (executor == null) throw new NullPointerException();
2565 jsr166 1.81 return doThenCompose(fn, executor);
2566 dl 1.37 }
2567    
2568 dl 1.88 public CompletableFuture<T> whenComplete
2569     (BiConsumer<? super T, ? super Throwable> action) {
2570     return doWhenComplete(action, null);
2571     }
2572    
2573     public CompletableFuture<T> whenCompleteAsync
2574     (BiConsumer<? super T, ? super Throwable> action) {
2575     return doWhenComplete(action, ForkJoinPool.commonPool());
2576     }
2577    
2578     public CompletableFuture<T> whenCompleteAsync
2579     (BiConsumer<? super T, ? super Throwable> action,
2580     Executor executor) {
2581     if (executor == null) throw new NullPointerException();
2582     return doWhenComplete(action, executor);
2583     }
2584    
2585     public <U> CompletableFuture<U> handle
2586     (BiFunction<? super T, Throwable, ? extends U> fn) {
2587     return doHandle(fn, null);
2588     }
2589    
2590     public <U> CompletableFuture<U> handleAsync
2591     (BiFunction<? super T, Throwable, ? extends U> fn) {
2592     return doHandle(fn, ForkJoinPool.commonPool());
2593     }
2594    
2595     public <U> CompletableFuture<U> handleAsync
2596     (BiFunction<? super T, Throwable, ? extends U> fn,
2597     Executor executor) {
2598     if (executor == null) throw new NullPointerException();
2599     return doHandle(fn, executor);
2600     }
2601    
2602     /**
2603     * Returns this CompletableFuture
2604     *
2605     * @return this CompletableFuture
2606     */
2607     public CompletableFuture<T> toCompletableFuture() {
2608     return this;
2609 dl 1.28 }
2610    
2611 dl 1.88 // not in interface CompletionStage
2612    
2613 dl 1.28 /**
2614 jsr166 1.66 * Returns a new CompletableFuture that is completed when this
2615     * CompletableFuture completes, with the result of the given
2616     * function of the exception triggering this CompletableFuture's
2617     * completion when it completes exceptionally; otherwise, if this
2618     * CompletableFuture completes normally, then the returned
2619     * CompletableFuture also completes normally with the same value.
2620 dl 1.88 * Note: More flexible versions of this functionality are
2621     * available using methods {@code whenComplete} and {@code handle}.
2622 dl 1.28 *
2623     * @param fn the function to use to compute the value of the
2624     * returned CompletableFuture if this CompletableFuture completed
2625     * exceptionally
2626     * @return the new CompletableFuture
2627     */
2628 dl 1.48 public CompletableFuture<T> exceptionally
2629     (Function<Throwable, ? extends T> fn) {
2630 dl 1.28 if (fn == null) throw new NullPointerException();
2631     CompletableFuture<T> dst = new CompletableFuture<T>();
2632     Object r;
2633     if ((r = result) == null) {
2634 dl 1.101 ExceptionCompletion<T> d =
2635     new ExceptionCompletion<T>(this, fn, dst);
2636     CompletionNode p = new CompletionNode(d);
2637     while (result == null) {
2638 dl 1.28 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2639     p.next = completions, p))
2640     break;
2641     }
2642 dl 1.101 d.tryComplete();
2643 dl 1.28 }
2644 dl 1.101 else {
2645 dl 1.28 T t = null; Throwable ex, dx = null;
2646     if (r instanceof AltResult) {
2647 jsr166 1.87 if ((ex = ((AltResult)r).ex) != null) {
2648 dl 1.28 try {
2649     t = fn.apply(ex);
2650     } catch (Throwable rex) {
2651     dx = rex;
2652     }
2653     }
2654     }
2655     else {
2656     @SuppressWarnings("unchecked") T tr = (T) r;
2657     t = tr;
2658     }
2659     dst.internalComplete(t, dx);
2660     }
2661     return dst;
2662     }
2663    
2664 dl 1.35 /* ------------- Arbitrary-arity constructions -------------- */
2665    
2666     /*
2667     * The basic plan of attack is to recursively form binary
2668     * completion trees of elements. This can be overkill for small
2669     * sets, but scales nicely. The And/All vs Or/Any forms use the
2670     * same idea, but details differ.
2671     */
2672    
2673     /**
2674     * Returns a new CompletableFuture that is completed when all of
2675 jsr166 1.66 * the given CompletableFutures complete. If any of the given
2676 jsr166 1.69 * CompletableFutures complete exceptionally, then the returned
2677     * CompletableFuture also does so, with a CompletionException
2678     * holding this exception as its cause. Otherwise, the results,
2679     * if any, of the given CompletableFutures are not reflected in
2680     * the returned CompletableFuture, but may be obtained by
2681     * inspecting them individually. If no CompletableFutures are
2682     * provided, returns a CompletableFuture completed with the value
2683     * {@code null}.
2684 dl 1.35 *
2685     * <p>Among the applications of this method is to await completion
2686     * of a set of independent CompletableFutures before continuing a
2687     * program, as in: {@code CompletableFuture.allOf(c1, c2,
2688     * c3).join();}.
2689     *
2690     * @param cfs the CompletableFutures
2691 jsr166 1.59 * @return a new CompletableFuture that is completed when all of the
2692 dl 1.35 * given CompletableFutures complete
2693     * @throws NullPointerException if the array or any of its elements are
2694     * {@code null}
2695     */
2696     public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2697     int len = cfs.length; // Directly handle empty and singleton cases
2698     if (len > 1)
2699     return allTree(cfs, 0, len - 1);
2700     else {
2701     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2702     CompletableFuture<?> f;
2703     if (len == 0)
2704     dst.result = NIL;
2705     else if ((f = cfs[0]) == null)
2706     throw new NullPointerException();
2707     else {
2708 dl 1.75 ThenPropagate d = null;
2709 dl 1.35 CompletionNode p = null;
2710     Object r;
2711     while ((r = f.result) == null) {
2712     if (d == null)
2713 dl 1.75 d = new ThenPropagate(f, dst);
2714 dl 1.35 else if (p == null)
2715     p = new CompletionNode(d);
2716     else if (UNSAFE.compareAndSwapObject
2717     (f, COMPLETIONS, p.next = f.completions, p))
2718     break;
2719     }
2720 dl 1.101 if (d != null)
2721     d.tryComplete();
2722     else
2723 dl 1.35 dst.internalComplete(null, (r instanceof AltResult) ?
2724     ((AltResult)r).ex : null);
2725     }
2726     return dst;
2727     }
2728     }
2729    
2730     /**
2731     * Recursively constructs an And'ed tree of CompletableFutures.
2732     * Called only when array known to have at least two elements.
2733     */
2734     private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2735     int lo, int hi) {
2736     CompletableFuture<?> fst, snd;
2737     int mid = (lo + hi) >>> 1;
2738     if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2739     (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2740     throw new NullPointerException();
2741     CompletableFuture<Void> dst = new CompletableFuture<Void>();
2742     AndCompletion d = null;
2743     CompletionNode p = null, q = null;
2744     Object r = null, s = null;
2745     while ((r = fst.result) == null || (s = snd.result) == null) {
2746     if (d == null)
2747     d = new AndCompletion(fst, snd, dst);
2748     else if (p == null)
2749     p = new CompletionNode(d);
2750     else if (q == null) {
2751     if (UNSAFE.compareAndSwapObject
2752     (fst, COMPLETIONS, p.next = fst.completions, p))
2753     q = new CompletionNode(d);
2754     }
2755     else if (UNSAFE.compareAndSwapObject
2756     (snd, COMPLETIONS, q.next = snd.completions, q))
2757     break;
2758     }
2759 dl 1.101 if (d != null)
2760     d.tryComplete();
2761     else {
2762 dl 1.35 Throwable ex;
2763     if (r instanceof AltResult)
2764     ex = ((AltResult)r).ex;
2765     else
2766     ex = null;
2767     if (ex == null && (s instanceof AltResult))
2768     ex = ((AltResult)s).ex;
2769     dst.internalComplete(null, ex);
2770     }
2771     return dst;
2772     }
2773    
2774     /**
2775 dl 1.76 * Returns a new CompletableFuture that is completed when any of
2776 jsr166 1.79 * the given CompletableFutures complete, with the same result.
2777     * Otherwise, if it completed exceptionally, the returned
2778 dl 1.77 * CompletableFuture also does so, with a CompletionException
2779     * holding this exception as its cause. If no CompletableFutures
2780     * are provided, returns an incomplete CompletableFuture.
2781 dl 1.35 *
2782     * @param cfs the CompletableFutures
2783 dl 1.77 * @return a new CompletableFuture that is completed with the
2784     * result or exception of any of the given CompletableFutures when
2785     * one completes
2786 dl 1.35 * @throws NullPointerException if the array or any of its elements are
2787     * {@code null}
2788     */
2789 dl 1.77 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2790 dl 1.35 int len = cfs.length; // Same idea as allOf
2791     if (len > 1)
2792     return anyTree(cfs, 0, len - 1);
2793     else {
2794 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2795 dl 1.35 CompletableFuture<?> f;
2796     if (len == 0)
2797 dl 1.48 ; // skip
2798 dl 1.35 else if ((f = cfs[0]) == null)
2799     throw new NullPointerException();
2800     else {
2801 dl 1.77 ThenCopy<Object> d = null;
2802 dl 1.35 CompletionNode p = null;
2803     Object r;
2804     while ((r = f.result) == null) {
2805     if (d == null)
2806 dl 1.77 d = new ThenCopy<Object>(f, dst);
2807 dl 1.35 else if (p == null)
2808     p = new CompletionNode(d);
2809     else if (UNSAFE.compareAndSwapObject
2810     (f, COMPLETIONS, p.next = f.completions, p))
2811     break;
2812     }
2813 dl 1.101 if (d != null)
2814     d.tryComplete();
2815     else {
2816 dl 1.35 Throwable ex; Object t;
2817 dl 1.77 if (r instanceof AltResult) {
2818 dl 1.35 ex = ((AltResult)r).ex;
2819 dl 1.77 t = null;
2820     }
2821     else {
2822     ex = null;
2823     t = r;
2824     }
2825     dst.internalComplete(t, ex);
2826 dl 1.35 }
2827     }
2828     return dst;
2829     }
2830     }
2831    
2832     /**
2833 jsr166 1.44 * Recursively constructs an Or'ed tree of CompletableFutures.
2834 dl 1.35 */
2835 dl 1.77 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2836 jsr166 1.79 int lo, int hi) {
2837 dl 1.35 CompletableFuture<?> fst, snd;
2838     int mid = (lo + hi) >>> 1;
2839     if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2840     (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2841     throw new NullPointerException();
2842 dl 1.77 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2843 dl 1.35 OrCompletion d = null;
2844     CompletionNode p = null, q = null;
2845     Object r;
2846     while ((r = fst.result) == null && (r = snd.result) == null) {
2847     if (d == null)
2848     d = new OrCompletion(fst, snd, dst);
2849     else if (p == null)
2850     p = new CompletionNode(d);
2851     else if (q == null) {
2852     if (UNSAFE.compareAndSwapObject
2853     (fst, COMPLETIONS, p.next = fst.completions, p))
2854     q = new CompletionNode(d);
2855     }
2856     else if (UNSAFE.compareAndSwapObject
2857     (snd, COMPLETIONS, q.next = snd.completions, q))
2858     break;
2859     }
2860 dl 1.101 if (d != null)
2861     d.tryComplete();
2862     else {
2863 dl 1.77 Throwable ex; Object t;
2864 dl 1.35 if (r instanceof AltResult) {
2865     ex = ((AltResult)r).ex;
2866 dl 1.77 t = null;
2867 dl 1.35 }
2868 dl 1.77 else {
2869 dl 1.35 ex = null;
2870 dl 1.77 t = r;
2871     }
2872     dst.internalComplete(t, ex);
2873 dl 1.35 }
2874     return dst;
2875     }
2876    
2877     /* ------------- Control and status methods -------------- */
2878    
2879 dl 1.28 /**
2880 dl 1.37 * If not already completed, completes this CompletableFuture with
2881     * a {@link CancellationException}. Dependent CompletableFutures
2882     * that have not already completed will also complete
2883     * exceptionally, with a {@link CompletionException} caused by
2884     * this {@code CancellationException}.
2885 dl 1.28 *
2886     * @param mayInterruptIfRunning this value has no effect in this
2887     * implementation because interrupts are not used to control
2888     * processing.
2889     *
2890     * @return {@code true} if this task is now cancelled
2891     */
2892     public boolean cancel(boolean mayInterruptIfRunning) {
2893 dl 1.46 boolean cancelled = (result == null) &&
2894     UNSAFE.compareAndSwapObject
2895     (this, RESULT, null, new AltResult(new CancellationException()));
2896 dl 1.101 postComplete(this);
2897 dl 1.48 return cancelled || isCancelled();
2898 dl 1.28 }
2899    
2900     /**
2901     * Returns {@code true} if this CompletableFuture was cancelled
2902     * before it completed normally.
2903     *
2904     * @return {@code true} if this CompletableFuture was cancelled
2905     * before it completed normally
2906     */
2907     public boolean isCancelled() {
2908     Object r;
2909 jsr166 1.43 return ((r = result) instanceof AltResult) &&
2910     (((AltResult)r).ex instanceof CancellationException);
2911 dl 1.28 }
2912    
2913     /**
2914 dl 1.88 * Returns {@code true} if this CompletableFuture completed
2915 dl 1.91 * exceptionally, in any way. Possible causes include
2916     * cancellation, explicit invocation of {@code
2917     * completeExceptionally}, and abrupt termination of a
2918     * CompletionStage action.
2919 dl 1.88 *
2920     * @return {@code true} if this CompletableFuture completed
2921     * exceptionally
2922     */
2923     public boolean isCompletedExceptionally() {
2924 dl 1.91 Object r;
2925     return ((r = result) instanceof AltResult) && r != NIL;
2926 dl 1.88 }
2927    
2928     /**
2929 dl 1.28 * Forcibly sets or resets the value subsequently returned by
2930 jsr166 1.42 * method {@link #get()} and related methods, whether or not
2931     * already completed. This method is designed for use only in
2932     * error recovery actions, and even in such situations may result
2933     * in ongoing dependent completions using established versus
2934 dl 1.30 * overwritten outcomes.
2935 dl 1.28 *
2936     * @param value the completion value
2937     */
2938     public void obtrudeValue(T value) {
2939     result = (value == null) ? NIL : value;
2940 dl 1.101 postComplete(this);
2941 dl 1.28 }
2942    
2943 dl 1.30 /**
2944 jsr166 1.41 * Forcibly causes subsequent invocations of method {@link #get()}
2945     * and related methods to throw the given exception, whether or
2946     * not already completed. This method is designed for use only in
2947 dl 1.30 * recovery actions, and even in such situations may result in
2948     * ongoing dependent completions using established versus
2949     * overwritten outcomes.
2950     *
2951     * @param ex the exception
2952     */
2953     public void obtrudeException(Throwable ex) {
2954     if (ex == null) throw new NullPointerException();
2955     result = new AltResult(ex);
2956 dl 1.101 postComplete(this);
2957 dl 1.30 }
2958    
2959 dl 1.35 /**
2960     * Returns the estimated number of CompletableFutures whose
2961     * completions are awaiting completion of this CompletableFuture.
2962     * This method is designed for use in monitoring system state, not
2963     * for synchronization control.
2964     *
2965     * @return the number of dependent CompletableFutures
2966     */
2967     public int getNumberOfDependents() {
2968     int count = 0;
2969     for (CompletionNode p = completions; p != null; p = p.next)
2970     ++count;
2971     return count;
2972     }
2973    
2974     /**
2975     * Returns a string identifying this CompletableFuture, as well as
2976 jsr166 1.40 * its completion state. The state, in brackets, contains the
2977 dl 1.35 * String {@code "Completed Normally"} or the String {@code
2978     * "Completed Exceptionally"}, or the String {@code "Not
2979     * completed"} followed by the number of CompletableFutures
2980     * dependent upon its completion, if any.
2981     *
2982     * @return a string identifying this CompletableFuture, as well as its state
2983     */
2984     public String toString() {
2985     Object r = result;
2986 jsr166 1.40 int count;
2987     return super.toString() +
2988     ((r == null) ?
2989     (((count = getNumberOfDependents()) == 0) ?
2990     "[Not completed]" :
2991     "[Not completed, " + count + " dependents]") :
2992     (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2993     "[Completed exceptionally]" :
2994     "[Completed normally]"));
2995 dl 1.35 }
2996    
2997 dl 1.1 // Unsafe mechanics
2998     private static final sun.misc.Unsafe UNSAFE;
2999     private static final long RESULT;
3000     private static final long WAITERS;
3001     private static final long COMPLETIONS;
3002     static {
3003     try {
3004     UNSAFE = sun.misc.Unsafe.getUnsafe();
3005     Class<?> k = CompletableFuture.class;
3006     RESULT = UNSAFE.objectFieldOffset
3007     (k.getDeclaredField("result"));
3008     WAITERS = UNSAFE.objectFieldOffset
3009     (k.getDeclaredField("waiters"));
3010     COMPLETIONS = UNSAFE.objectFieldOffset
3011     (k.getDeclaredField("completions"));
3012     } catch (Exception e) {
3013     throw new Error(e);
3014     }
3015     }
3016     }