ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.100
Committed: Mon Apr 7 15:51:32 2014 UTC (10 years, 1 month ago) by jsr166
Branch: MAIN
Changes since 1.99: +3 -3 lines
Log Message:
punctuation

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.locks.LockSupport;
26
27 /**
28 * A {@link Future} that may be explicitly completed (setting its
29 * value and status), and may be used as a {@link CompletionStage},
30 * supporting dependent functions and actions that trigger upon its
31 * completion.
32 *
33 * <p>When two or more threads attempt to
34 * {@link #complete complete},
35 * {@link #completeExceptionally completeExceptionally}, or
36 * {@link #cancel cancel}
37 * a CompletableFuture, only one of them succeeds.
38 *
39 * <p>In addition to these and related methods for directly
40 * manipulating status and results, CompletableFuture implements
41 * interface {@link CompletionStage} with the following policies: <ul>
42 *
43 * <li>Actions supplied for dependent completions of
44 * <em>non-async</em> methods may be performed by the thread that
45 * completes the current CompletableFuture, or by any other caller of
46 * a completion method.</li>
47 *
48 * <li>All <em>async</em> methods without an explicit Executor
49 * argument are performed using the {@link ForkJoinPool#commonPool()}
50 * (unless it does not support a parallelism level of at least two, in
51 * which case, a new Thread is used). To simplify monitoring,
52 * debugging, and tracking, all generated asynchronous tasks are
53 * instances of the marker interface {@link
54 * AsynchronousCompletionTask}. </li>
55 *
56 * <li>All CompletionStage methods are implemented independently of
57 * other public methods, so the behavior of one method is not impacted
58 * by overrides of others in subclasses. </li> </ul>
59 *
60 * <p>CompletableFuture also implements {@link Future} with the following
61 * policies: <ul>
62 *
63 * <li>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional
66 * completion. Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}. Method
68 * {@link #isCompletedExceptionally} can be used to determine if a
69 * CompletableFuture completed in any exceptional fashion.</li>
70 *
71 * <li>In case of exceptional completion with a CompletionException,
72 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
73 * {@link ExecutionException} with the same cause as held in the
74 * corresponding CompletionException. To simplify usage in most
75 * contexts, this class also defines methods {@link #join()} and
76 * {@link #getNow} that instead throw the CompletionException directly
77 * in these cases.</li> </ul>
78 *
79 * @author Doug Lea
80 * @since 1.8
81 */
82 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
83
84 /*
85 * Overview:
86 *
87 * 1. Non-nullness of field result (set via CAS) indicates done.
88 * An AltResult is used to box null as a result, as well as to
89 * hold exceptions. Using a single field makes completion fast
90 * and simple to detect and trigger, at the expense of a lot of
91 * encoding and decoding that infiltrates many methods. One minor
92 * simplification relies on the (static) NIL (to box null results)
93 * being the only AltResult with a null exception field, so we
94 * don't usually need explicit comparisons with NIL. The CF
95 * exception propagation mechanics surrounding decoding rely on
96 * unchecked casts of decoded results really being unchecked,
97 * where user type errors are caught at point of use, as is
98 * currently the case in Java. These are highlighted by using
99 * SuppressWarnings-annotated temporaries.
100 *
101 * 2. Waiters are held in a Treiber stack similar to the one used
102 * in FutureTask, Phaser, and SynchronousQueue. See their
103 * internal documentation for algorithmic details.
104 *
105 * 3. Completions are also kept in a list/stack, and pulled off
106 * and run when completion is triggered. (We could even use the
107 * same stack as for waiters, but would give up the potential
108 * parallelism obtained because woken waiters help release/run
109 * others -- see method postComplete). Because post-processing
110 * may race with direct calls, class Completion opportunistically
111 * extends AtomicInteger so callers can claim the action via
112 * compareAndSet(0, 1). The Completion.trigger methods are all
113 * written a boringly similar uniform way (that sometimes includes
114 * unnecessary-looking checks, kept to maintain uniformity).
115 * There are enough dimensions upon which they differ that
116 * attempts to factor commonalities while maintaining efficiency
117 * require more lines of code than they would save.
118 *
119 * 4. The exported then/and/or methods do support a bit of
120 * factoring (see doThenApply etc). They must cope with the
121 * intrinsic races surrounding addition of a dependent action
122 * versus performing the action directly because the task is
123 * already complete. For example, a CF may not be complete upon
124 * entry, so a dependent completion is added, but by the time it
125 * is added, the target CF is complete, so must be directly
126 * executed. This is all done while avoiding unnecessary object
127 * construction in safe-bypass cases.
128 */
129
130 // preliminaries
131
132 static final class AltResult {
133 final Throwable ex; // null only for NIL
134 AltResult(Throwable ex) { this.ex = ex; }
135 }
136
137 static final AltResult NIL = new AltResult(null);
138
139 // Fields
140
141 volatile Object result; // Either the result or boxed AltResult
142 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
143 volatile CompletionNode completions; // list (Treiber stack) of completions
144
145 // Basic utilities for triggering and processing completions
146
147 /**
148 * Triggers completion with the encoding of the given arguments:
149 * if the exception is non-null, encodes it as a wrapped
150 * CompletionException unless it is one already. Otherwise uses
151 * the given result, boxed as NIL if null.
152 */
153 final void setInternalResult(T v, Throwable ex) {
154 if (result == null)
155 UNSAFE.compareAndSwapObject
156 (this, RESULT, null,
157 (ex == null) ? (v == null) ? NIL : v :
158 new AltResult((ex instanceof CompletionException) ? ex :
159 new CompletionException(ex)));
160 }
161
162 /**
163 * Removes and signals all waiting threads.
164 */
165 final void removeAndSignalWaiters() {
166 WaitNode q; Thread t;
167 while ((q = waiters) != null) {
168 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
169 (t = q.thread) != null) {
170 q.thread = null;
171 LockSupport.unpark(t);
172 }
173 }
174 }
175
176 /**
177 * Triggers all enabled completions reachable from b. Loopifies
178 * the final recursive call for each stage to avoid potential
179 * StackOverflowErrors in cases of long linear chains.
180 *
181 * @param b if non-null, a completed CompletableFuture
182 */
183 static final void removeAndTriggerCompletions(CompletableFuture<?> b) {
184 CompletionNode h; Completion c; CompletableFuture<?> f;
185 while (b != null && (h = b.completions) != null) {
186 if (UNSAFE.compareAndSwapObject(b, COMPLETIONS, h, h.next) &&
187 (c = h.completion) != null &&
188 (f = c.trigger()) != null &&
189 f.result != null) {
190 f.removeAndSignalWaiters();
191 if (f.completions != null) {
192 if (b.completions == null)
193 b = f; // tail-recurse
194 else
195 removeAndTriggerCompletions(f);
196 }
197 }
198 }
199 }
200
201 /**
202 * Sets result, signals waiters, and triggers dependents.
203 */
204 final void internalComplete(T v, Throwable ex) {
205 setInternalResult(v, ex);
206 removeAndSignalWaiters();
207 removeAndTriggerCompletions(this);
208 }
209
210
211 /**
212 * Signals waiters and triggers dependents. Call only if known to
213 * be completed.
214 */
215 final void postComplete() {
216 removeAndSignalWaiters();
217 removeAndTriggerCompletions(this);
218 }
219
220 /**
221 * If completed, helps signal waiters and trigger dependents.
222 */
223 final void helpPostComplete() {
224 if (result != null) {
225 removeAndSignalWaiters();
226 removeAndTriggerCompletions(this);
227 }
228 }
229
230 /* ------------- waiting for completions -------------- */
231
232 /** Number of processors, for spin control */
233 static final int NCPU = Runtime.getRuntime().availableProcessors();
234
235 /**
236 * Heuristic spin value for waitingGet() before blocking on
237 * multiprocessors
238 */
239 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
240
241 /**
242 * Linked nodes to record waiting threads in a Treiber stack. See
243 * other classes such as Phaser and SynchronousQueue for more
244 * detailed explanation. This class implements ManagedBlocker to
245 * avoid starvation when blocking actions pile up in
246 * ForkJoinPools.
247 */
248 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
249 long nanos; // wait time if timed
250 final long deadline; // non-zero if timed
251 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
252 volatile Thread thread;
253 volatile WaitNode next;
254 WaitNode(boolean interruptible, long nanos, long deadline) {
255 this.thread = Thread.currentThread();
256 this.interruptControl = interruptible ? 1 : 0;
257 this.nanos = nanos;
258 this.deadline = deadline;
259 }
260 public boolean isReleasable() {
261 if (thread == null)
262 return true;
263 if (Thread.interrupted()) {
264 int i = interruptControl;
265 interruptControl = -1;
266 if (i > 0)
267 return true;
268 }
269 if (deadline != 0L &&
270 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
271 thread = null;
272 return true;
273 }
274 return false;
275 }
276 public boolean block() {
277 if (isReleasable())
278 return true;
279 else if (deadline == 0L)
280 LockSupport.park(this);
281 else if (nanos > 0L)
282 LockSupport.parkNanos(this, nanos);
283 return isReleasable();
284 }
285 }
286
287 /**
288 * Returns raw result after waiting, or null if interruptible and
289 * interrupted.
290 */
291 private Object waitingGet(boolean interruptible) {
292 WaitNode q = null;
293 boolean queued = false;
294 int spins = SPINS;
295 for (Object r;;) {
296 if ((r = result) != null) {
297 if (q != null) { // suppress unpark
298 q.thread = null;
299 if (q.interruptControl < 0) {
300 if (interruptible) {
301 removeWaiter(q);
302 return null;
303 }
304 Thread.currentThread().interrupt();
305 }
306 }
307 postComplete(); // help release others
308 return r;
309 }
310 else if (spins > 0) {
311 int rnd = ThreadLocalRandom.nextSecondarySeed();
312 if (rnd == 0)
313 rnd = ThreadLocalRandom.current().nextInt();
314 if (rnd >= 0)
315 --spins;
316 }
317 else if (q == null)
318 q = new WaitNode(interruptible, 0L, 0L);
319 else if (!queued)
320 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
321 q.next = waiters, q);
322 else if (interruptible && q.interruptControl < 0) {
323 removeWaiter(q);
324 return null;
325 }
326 else if (q.thread != null && result == null) {
327 try {
328 ForkJoinPool.managedBlock(q);
329 } catch (InterruptedException ex) {
330 q.interruptControl = -1;
331 }
332 }
333 }
334 }
335
336 /**
337 * Awaits completion or aborts on interrupt or timeout.
338 *
339 * @param nanos time to wait
340 * @return raw result
341 */
342 private Object timedAwaitDone(long nanos)
343 throws InterruptedException, TimeoutException {
344 WaitNode q = null;
345 boolean queued = false;
346 for (Object r;;) {
347 if ((r = result) != null) {
348 if (q != null) {
349 q.thread = null;
350 if (q.interruptControl < 0) {
351 removeWaiter(q);
352 throw new InterruptedException();
353 }
354 }
355 postComplete();
356 return r;
357 }
358 else if (q == null) {
359 if (nanos <= 0L)
360 throw new TimeoutException();
361 long d = System.nanoTime() + nanos;
362 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
363 }
364 else if (!queued)
365 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
366 q.next = waiters, q);
367 else if (q.interruptControl < 0) {
368 removeWaiter(q);
369 throw new InterruptedException();
370 }
371 else if (q.nanos <= 0L) {
372 if (result == null) {
373 removeWaiter(q);
374 throw new TimeoutException();
375 }
376 }
377 else if (q.thread != null && result == null) {
378 try {
379 ForkJoinPool.managedBlock(q);
380 } catch (InterruptedException ex) {
381 q.interruptControl = -1;
382 }
383 }
384 }
385 }
386
387 /**
388 * Tries to unlink a timed-out or interrupted wait node to avoid
389 * accumulating garbage. Internal nodes are simply unspliced
390 * without CAS since it is harmless if they are traversed anyway
391 * by releasers. To avoid effects of unsplicing from already
392 * removed nodes, the list is retraversed in case of an apparent
393 * race. This is slow when there are a lot of nodes, but we don't
394 * expect lists to be long enough to outweigh higher-overhead
395 * schemes.
396 */
397 private void removeWaiter(WaitNode node) {
398 if (node != null) {
399 node.thread = null;
400 retry:
401 for (;;) { // restart on removeWaiter race
402 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
403 s = q.next;
404 if (q.thread != null)
405 pred = q;
406 else if (pred != null) {
407 pred.next = s;
408 if (pred.thread == null) // check for race
409 continue retry;
410 }
411 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
412 continue retry;
413 }
414 break;
415 }
416 }
417 }
418
419 /* ------------- Async tasks -------------- */
420
421 /**
422 * A marker interface identifying asynchronous tasks produced by
423 * {@code async} methods. This may be useful for monitoring,
424 * debugging, and tracking asynchronous activities.
425 *
426 * @since 1.8
427 */
428 public static interface AsynchronousCompletionTask {
429 }
430
431 /** Base class can act as either FJ or plain Runnable */
432 @SuppressWarnings("serial")
433 abstract static class Async extends ForkJoinTask<Void>
434 implements Runnable, AsynchronousCompletionTask {
435 public final Void getRawResult() { return null; }
436 public final void setRawResult(Void v) { }
437 public final void run() { exec(); }
438 }
439
440 /**
441 * Starts the given async task using the given executor, unless
442 * the executor is ForkJoinPool.commonPool and it has been
443 * disabled, in which case starts a new thread.
444 */
445 static void execAsync(Executor e, Async r) {
446 if (e == ForkJoinPool.commonPool() &&
447 ForkJoinPool.getCommonPoolParallelism() <= 1)
448 new Thread(r).start();
449 else
450 e.execute(r);
451 }
452
453 static final class AsyncRun extends Async {
454 final Runnable fn;
455 final CompletableFuture<Void> dst;
456 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
457 this.fn = fn; this.dst = dst;
458 }
459 public final boolean exec() {
460 CompletableFuture<Void> d; Throwable ex;
461 if ((d = this.dst) != null && d.result == null) {
462 try {
463 fn.run();
464 ex = null;
465 } catch (Throwable rex) {
466 ex = rex;
467 }
468 d.internalComplete(null, ex);
469 }
470 return true;
471 }
472 private static final long serialVersionUID = 5232453952276885070L;
473 }
474
475 static final class AsyncSupply<U> extends Async {
476 final Supplier<U> fn;
477 final CompletableFuture<U> dst;
478 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
479 this.fn = fn; this.dst = dst;
480 }
481 public final boolean exec() {
482 CompletableFuture<U> d; U u; Throwable ex;
483 if ((d = this.dst) != null && d.result == null) {
484 try {
485 u = fn.get();
486 ex = null;
487 } catch (Throwable rex) {
488 ex = rex;
489 u = null;
490 }
491 d.internalComplete(u, ex);
492 }
493 return true;
494 }
495 private static final long serialVersionUID = 5232453952276885070L;
496 }
497
498 static final class AsyncApply<T,U> extends Async {
499 final T arg;
500 final Function<? super T,? extends U> fn;
501 final CompletableFuture<U> dst;
502 AsyncApply(T arg, Function<? super T,? extends U> fn,
503 CompletableFuture<U> dst) {
504 this.arg = arg; this.fn = fn; this.dst = dst;
505 }
506 public final boolean exec() {
507 CompletableFuture<U> d; U u; Throwable ex;
508 if ((d = this.dst) != null && d.result == null) {
509 try {
510 u = fn.apply(arg);
511 ex = null;
512 } catch (Throwable rex) {
513 ex = rex;
514 u = null;
515 }
516 d.internalComplete(u, ex);
517 }
518 return true;
519 }
520 private static final long serialVersionUID = 5232453952276885070L;
521 }
522
523 static final class AsyncCombine<T,U,V> extends Async {
524 final T arg1;
525 final U arg2;
526 final BiFunction<? super T,? super U,? extends V> fn;
527 final CompletableFuture<V> dst;
528 AsyncCombine(T arg1, U arg2,
529 BiFunction<? super T,? super U,? extends V> fn,
530 CompletableFuture<V> dst) {
531 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
532 }
533 public final boolean exec() {
534 CompletableFuture<V> d; V v; Throwable ex;
535 if ((d = this.dst) != null && d.result == null) {
536 try {
537 v = fn.apply(arg1, arg2);
538 ex = null;
539 } catch (Throwable rex) {
540 ex = rex;
541 v = null;
542 }
543 d.internalComplete(v, ex);
544 }
545 return true;
546 }
547 private static final long serialVersionUID = 5232453952276885070L;
548 }
549
550 static final class AsyncAccept<T> extends Async {
551 final T arg;
552 final Consumer<? super T> fn;
553 final CompletableFuture<?> dst;
554 AsyncAccept(T arg, Consumer<? super T> fn,
555 CompletableFuture<?> dst) {
556 this.arg = arg; this.fn = fn; this.dst = dst;
557 }
558 public final boolean exec() {
559 CompletableFuture<?> d; Throwable ex;
560 if ((d = this.dst) != null && d.result == null) {
561 try {
562 fn.accept(arg);
563 ex = null;
564 } catch (Throwable rex) {
565 ex = rex;
566 }
567 d.internalComplete(null, ex);
568 }
569 return true;
570 }
571 private static final long serialVersionUID = 5232453952276885070L;
572 }
573
574 static final class AsyncAcceptBoth<T,U> extends Async {
575 final T arg1;
576 final U arg2;
577 final BiConsumer<? super T,? super U> fn;
578 final CompletableFuture<?> dst;
579 AsyncAcceptBoth(T arg1, U arg2,
580 BiConsumer<? super T,? super U> fn,
581 CompletableFuture<?> dst) {
582 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
583 }
584 public final boolean exec() {
585 CompletableFuture<?> d; Throwable ex;
586 if ((d = this.dst) != null && d.result == null) {
587 try {
588 fn.accept(arg1, arg2);
589 ex = null;
590 } catch (Throwable rex) {
591 ex = rex;
592 }
593 d.internalComplete(null, ex);
594 }
595 return true;
596 }
597 private static final long serialVersionUID = 5232453952276885070L;
598 }
599
600 static final class AsyncCompose<T,U> extends Async {
601 final T arg;
602 final Function<? super T, ? extends CompletionStage<U>> fn;
603 final CompletableFuture<U> dst;
604 AsyncCompose(T arg,
605 Function<? super T, ? extends CompletionStage<U>> fn,
606 CompletableFuture<U> dst) {
607 this.arg = arg; this.fn = fn; this.dst = dst;
608 }
609 public final boolean exec() {
610 CompletableFuture<U> d, fr; U u; Throwable ex;
611 if ((d = this.dst) != null && d.result == null) {
612 try {
613 CompletionStage<U> cs = fn.apply(arg);
614 fr = (cs == null) ? null : cs.toCompletableFuture();
615 ex = (fr == null) ? new NullPointerException() : null;
616 } catch (Throwable rex) {
617 ex = rex;
618 fr = null;
619 }
620 if (ex != null)
621 u = null;
622 else {
623 Object r = fr.result;
624 if (r == null)
625 r = fr.waitingGet(false);
626 if (r instanceof AltResult) {
627 ex = ((AltResult)r).ex;
628 u = null;
629 }
630 else {
631 @SuppressWarnings("unchecked") U ur = (U) r;
632 u = ur;
633 }
634 }
635 d.internalComplete(u, ex);
636 }
637 return true;
638 }
639 private static final long serialVersionUID = 5232453952276885070L;
640 }
641
642 static final class AsyncWhenComplete<T> extends Async {
643 final T arg1;
644 final Throwable arg2;
645 final BiConsumer<? super T,? super Throwable> fn;
646 final CompletableFuture<T> dst;
647 AsyncWhenComplete(T arg1, Throwable arg2,
648 BiConsumer<? super T,? super Throwable> fn,
649 CompletableFuture<T> dst) {
650 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
651 }
652 public final boolean exec() {
653 CompletableFuture<T> d;
654 if ((d = this.dst) != null && d.result == null) {
655 Throwable ex = arg2;
656 try {
657 fn.accept(arg1, ex);
658 } catch (Throwable rex) {
659 if (ex == null)
660 ex = rex;
661 }
662 d.internalComplete(arg1, ex);
663 }
664 return true;
665 }
666 private static final long serialVersionUID = 5232453952276885070L;
667 }
668
669 /* ------------- Completions -------------- */
670
671 /**
672 * Simple linked list nodes to record completions, used in
673 * basically the same way as WaitNodes. (We separate nodes from
674 * the Completions themselves mainly because for the And and Or
675 * methods, the same Completion object resides in two lists.)
676 */
677 static final class CompletionNode {
678 final Completion completion;
679 volatile CompletionNode next;
680 CompletionNode(Completion completion) { this.completion = completion; }
681 }
682
683 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
684 @SuppressWarnings("serial")
685 abstract static class Completion extends AtomicInteger {
686 /**
687 * Complete a dependent Completablefuture if enabled
688 * @return the dependent Completablefuture
689 */
690 public abstract CompletableFuture<?> trigger();
691 }
692
693 static final class ThenApply<T,U> extends Completion {
694 final CompletableFuture<? extends T> src;
695 final Function<? super T,? extends U> fn;
696 final CompletableFuture<U> dst;
697 final Executor executor;
698 ThenApply(CompletableFuture<? extends T> src,
699 Function<? super T,? extends U> fn,
700 CompletableFuture<U> dst,
701 Executor executor) {
702 this.src = src; this.fn = fn; this.dst = dst;
703 this.executor = executor;
704 }
705 public final CompletableFuture<?> trigger() {
706 final CompletableFuture<? extends T> a;
707 final Function<? super T,? extends U> fn;
708 final CompletableFuture<U> dst;
709 Object r; T t; Throwable ex;
710 if ((dst = this.dst) != null &&
711 (fn = this.fn) != null &&
712 (a = this.src) != null &&
713 (r = a.result) != null &&
714 compareAndSet(0, 1)) {
715 if (r instanceof AltResult) {
716 ex = ((AltResult)r).ex;
717 t = null;
718 }
719 else {
720 ex = null;
721 @SuppressWarnings("unchecked") T tr = (T) r;
722 t = tr;
723 }
724 Executor e = executor;
725 U u = null;
726 if (ex == null) {
727 try {
728 if (e != null)
729 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
730 else
731 u = fn.apply(t);
732 } catch (Throwable rex) {
733 ex = rex;
734 }
735 }
736 if (e == null || ex != null)
737 dst.setInternalResult(u, ex);
738 }
739 return dst;
740 }
741 private static final long serialVersionUID = 5232453952276885070L;
742 }
743
744 static final class ThenAccept<T> extends Completion {
745 final CompletableFuture<? extends T> src;
746 final Consumer<? super T> fn;
747 final CompletableFuture<?> dst;
748 final Executor executor;
749 ThenAccept(CompletableFuture<? extends T> src,
750 Consumer<? super T> fn,
751 CompletableFuture<?> dst,
752 Executor executor) {
753 this.src = src; this.fn = fn; this.dst = dst;
754 this.executor = executor;
755 }
756 public final CompletableFuture<?> trigger() {
757 final CompletableFuture<? extends T> a;
758 final Consumer<? super T> fn;
759 final CompletableFuture<?> dst;
760 Object r; T t; Throwable ex;
761 if ((dst = this.dst) != null &&
762 (fn = this.fn) != null &&
763 (a = this.src) != null &&
764 (r = a.result) != null &&
765 compareAndSet(0, 1)) {
766 if (r instanceof AltResult) {
767 ex = ((AltResult)r).ex;
768 t = null;
769 }
770 else {
771 ex = null;
772 @SuppressWarnings("unchecked") T tr = (T) r;
773 t = tr;
774 }
775 Executor e = executor;
776 if (ex == null) {
777 try {
778 if (e != null)
779 execAsync(e, new AsyncAccept<T>(t, fn, dst));
780 else
781 fn.accept(t);
782 } catch (Throwable rex) {
783 ex = rex;
784 }
785 }
786 if (e == null || ex != null)
787 dst.setInternalResult(null, ex);
788 }
789 return dst;
790 }
791 private static final long serialVersionUID = 5232453952276885070L;
792 }
793
794 static final class ThenRun extends Completion {
795 final CompletableFuture<?> src;
796 final Runnable fn;
797 final CompletableFuture<Void> dst;
798 final Executor executor;
799 ThenRun(CompletableFuture<?> src,
800 Runnable fn,
801 CompletableFuture<Void> dst,
802 Executor executor) {
803 this.src = src; this.fn = fn; this.dst = dst;
804 this.executor = executor;
805 }
806 public final CompletableFuture<?> trigger() {
807 final CompletableFuture<?> a;
808 final Runnable fn;
809 final CompletableFuture<Void> dst;
810 Object r; Throwable ex;
811 if ((dst = this.dst) != null &&
812 (fn = this.fn) != null &&
813 (a = this.src) != null &&
814 (r = a.result) != null &&
815 compareAndSet(0, 1)) {
816 if (r instanceof AltResult)
817 ex = ((AltResult)r).ex;
818 else
819 ex = null;
820 Executor e = executor;
821 if (ex == null) {
822 try {
823 if (e != null)
824 execAsync(e, new AsyncRun(fn, dst));
825 else
826 fn.run();
827 } catch (Throwable rex) {
828 ex = rex;
829 }
830 }
831 if (e == null || ex != null)
832 dst.setInternalResult(null, ex);
833 }
834 return dst;
835 }
836 private static final long serialVersionUID = 5232453952276885070L;
837 }
838
839 static final class ThenCombine<T,U,V> extends Completion {
840 final CompletableFuture<? extends T> src;
841 final CompletableFuture<? extends U> snd;
842 final BiFunction<? super T,? super U,? extends V> fn;
843 final CompletableFuture<V> dst;
844 final Executor executor;
845 ThenCombine(CompletableFuture<? extends T> src,
846 CompletableFuture<? extends U> snd,
847 BiFunction<? super T,? super U,? extends V> fn,
848 CompletableFuture<V> dst,
849 Executor executor) {
850 this.src = src; this.snd = snd;
851 this.fn = fn; this.dst = dst;
852 this.executor = executor;
853 }
854 public final CompletableFuture<?> trigger() {
855 final CompletableFuture<? extends T> a;
856 final CompletableFuture<? extends U> b;
857 final BiFunction<? super T,? super U,? extends V> fn;
858 final CompletableFuture<V> dst;
859 Object r, s; T t; U u; Throwable ex;
860 if ((dst = this.dst) != null &&
861 (fn = this.fn) != null &&
862 (a = this.src) != null &&
863 (r = a.result) != null &&
864 (b = this.snd) != null &&
865 (s = b.result) != null &&
866 compareAndSet(0, 1)) {
867 if (r instanceof AltResult) {
868 ex = ((AltResult)r).ex;
869 t = null;
870 }
871 else {
872 ex = null;
873 @SuppressWarnings("unchecked") T tr = (T) r;
874 t = tr;
875 }
876 if (ex != null)
877 u = null;
878 else if (s instanceof AltResult) {
879 ex = ((AltResult)s).ex;
880 u = null;
881 }
882 else {
883 @SuppressWarnings("unchecked") U us = (U) s;
884 u = us;
885 }
886 Executor e = executor;
887 V v = null;
888 if (ex == null) {
889 try {
890 if (e != null)
891 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
892 else
893 v = fn.apply(t, u);
894 } catch (Throwable rex) {
895 ex = rex;
896 }
897 }
898 if (e == null || ex != null)
899 dst.setInternalResult(v, ex);
900 }
901 return dst;
902 }
903 private static final long serialVersionUID = 5232453952276885070L;
904 }
905
906 static final class ThenAcceptBoth<T,U> extends Completion {
907 final CompletableFuture<? extends T> src;
908 final CompletableFuture<? extends U> snd;
909 final BiConsumer<? super T,? super U> fn;
910 final CompletableFuture<Void> dst;
911 final Executor executor;
912 ThenAcceptBoth(CompletableFuture<? extends T> src,
913 CompletableFuture<? extends U> snd,
914 BiConsumer<? super T,? super U> fn,
915 CompletableFuture<Void> dst,
916 Executor executor) {
917 this.src = src; this.snd = snd;
918 this.fn = fn; this.dst = dst;
919 this.executor = executor;
920 }
921 public final CompletableFuture<?> trigger() {
922 final CompletableFuture<? extends T> a;
923 final CompletableFuture<? extends U> b;
924 final BiConsumer<? super T,? super U> fn;
925 final CompletableFuture<Void> dst;
926 Object r, s; T t; U u; Throwable ex;
927 if ((dst = this.dst) != null &&
928 (fn = this.fn) != null &&
929 (a = this.src) != null &&
930 (r = a.result) != null &&
931 (b = this.snd) != null &&
932 (s = b.result) != null &&
933 compareAndSet(0, 1)) {
934 if (r instanceof AltResult) {
935 ex = ((AltResult)r).ex;
936 t = null;
937 }
938 else {
939 ex = null;
940 @SuppressWarnings("unchecked") T tr = (T) r;
941 t = tr;
942 }
943 if (ex != null)
944 u = null;
945 else if (s instanceof AltResult) {
946 ex = ((AltResult)s).ex;
947 u = null;
948 }
949 else {
950 @SuppressWarnings("unchecked") U us = (U) s;
951 u = us;
952 }
953 Executor e = executor;
954 if (ex == null) {
955 try {
956 if (e != null)
957 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
958 else
959 fn.accept(t, u);
960 } catch (Throwable rex) {
961 ex = rex;
962 }
963 }
964 if (e == null || ex != null)
965 dst.setInternalResult(null, ex);
966 }
967 return dst;
968 }
969 private static final long serialVersionUID = 5232453952276885070L;
970 }
971
972 static final class RunAfterBoth extends Completion {
973 final CompletableFuture<?> src;
974 final CompletableFuture<?> snd;
975 final Runnable fn;
976 final CompletableFuture<Void> dst;
977 final Executor executor;
978 RunAfterBoth(CompletableFuture<?> src,
979 CompletableFuture<?> snd,
980 Runnable fn,
981 CompletableFuture<Void> dst,
982 Executor executor) {
983 this.src = src; this.snd = snd;
984 this.fn = fn; this.dst = dst;
985 this.executor = executor;
986 }
987 public final CompletableFuture<?> trigger() {
988 final CompletableFuture<?> a;
989 final CompletableFuture<?> b;
990 final Runnable fn;
991 final CompletableFuture<Void> dst;
992 Object r, s; Throwable ex;
993 if ((dst = this.dst) != null &&
994 (fn = this.fn) != null &&
995 (a = this.src) != null &&
996 (r = a.result) != null &&
997 (b = this.snd) != null &&
998 (s = b.result) != null &&
999 compareAndSet(0, 1)) {
1000 if (r instanceof AltResult)
1001 ex = ((AltResult)r).ex;
1002 else
1003 ex = null;
1004 if (ex == null && (s instanceof AltResult))
1005 ex = ((AltResult)s).ex;
1006 Executor e = executor;
1007 if (ex == null) {
1008 try {
1009 if (e != null)
1010 execAsync(e, new AsyncRun(fn, dst));
1011 else
1012 fn.run();
1013 } catch (Throwable rex) {
1014 ex = rex;
1015 }
1016 }
1017 if (e == null || ex != null)
1018 dst.setInternalResult(null, ex);
1019 }
1020 return dst;
1021 }
1022 private static final long serialVersionUID = 5232453952276885070L;
1023 }
1024
1025 static final class AndCompletion extends Completion {
1026 final CompletableFuture<?> src;
1027 final CompletableFuture<?> snd;
1028 final CompletableFuture<Void> dst;
1029 AndCompletion(CompletableFuture<?> src,
1030 CompletableFuture<?> snd,
1031 CompletableFuture<Void> dst) {
1032 this.src = src; this.snd = snd; this.dst = dst;
1033 }
1034 public final CompletableFuture<?> trigger() {
1035 final CompletableFuture<?> a;
1036 final CompletableFuture<?> b;
1037 final CompletableFuture<Void> dst;
1038 Object r, s; Throwable ex;
1039 if ((dst = this.dst) != null &&
1040 (a = this.src) != null &&
1041 (r = a.result) != null &&
1042 (b = this.snd) != null &&
1043 (s = b.result) != null &&
1044 compareAndSet(0, 1)) {
1045 if (r instanceof AltResult)
1046 ex = ((AltResult)r).ex;
1047 else
1048 ex = null;
1049 if (ex == null && (s instanceof AltResult))
1050 ex = ((AltResult)s).ex;
1051 dst.setInternalResult(null, ex);
1052 }
1053 return dst;
1054 }
1055 private static final long serialVersionUID = 5232453952276885070L;
1056 }
1057
1058 static final class ApplyToEither<T,U> extends Completion {
1059 final CompletableFuture<? extends T> src;
1060 final CompletableFuture<? extends T> snd;
1061 final Function<? super T,? extends U> fn;
1062 final CompletableFuture<U> dst;
1063 final Executor executor;
1064 ApplyToEither(CompletableFuture<? extends T> src,
1065 CompletableFuture<? extends T> snd,
1066 Function<? super T,? extends U> fn,
1067 CompletableFuture<U> dst,
1068 Executor executor) {
1069 this.src = src; this.snd = snd;
1070 this.fn = fn; this.dst = dst;
1071 this.executor = executor;
1072 }
1073 public final CompletableFuture<?> trigger() {
1074 final CompletableFuture<? extends T> a;
1075 final CompletableFuture<? extends T> b;
1076 final Function<? super T,? extends U> fn;
1077 final CompletableFuture<U> dst;
1078 Object r; T t; Throwable ex;
1079 if ((dst = this.dst) != null &&
1080 (fn = this.fn) != null &&
1081 (((a = this.src) != null && (r = a.result) != null) ||
1082 ((b = this.snd) != null && (r = b.result) != null)) &&
1083 compareAndSet(0, 1)) {
1084 if (r instanceof AltResult) {
1085 ex = ((AltResult)r).ex;
1086 t = null;
1087 }
1088 else {
1089 ex = null;
1090 @SuppressWarnings("unchecked") T tr = (T) r;
1091 t = tr;
1092 }
1093 Executor e = executor;
1094 U u = null;
1095 if (ex == null) {
1096 try {
1097 if (e != null)
1098 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1099 else
1100 u = fn.apply(t);
1101 } catch (Throwable rex) {
1102 ex = rex;
1103 }
1104 }
1105 if (e == null || ex != null)
1106 dst.setInternalResult(u, ex);
1107 }
1108 return dst;
1109 }
1110 private static final long serialVersionUID = 5232453952276885070L;
1111 }
1112
1113 static final class AcceptEither<T> extends Completion {
1114 final CompletableFuture<? extends T> src;
1115 final CompletableFuture<? extends T> snd;
1116 final Consumer<? super T> fn;
1117 final CompletableFuture<Void> dst;
1118 final Executor executor;
1119 AcceptEither(CompletableFuture<? extends T> src,
1120 CompletableFuture<? extends T> snd,
1121 Consumer<? super T> fn,
1122 CompletableFuture<Void> dst,
1123 Executor executor) {
1124 this.src = src; this.snd = snd;
1125 this.fn = fn; this.dst = dst;
1126 this.executor = executor;
1127 }
1128 public final CompletableFuture<?> trigger() {
1129 final CompletableFuture<? extends T> a;
1130 final CompletableFuture<? extends T> b;
1131 final Consumer<? super T> fn;
1132 final CompletableFuture<Void> dst;
1133 Object r; T t; Throwable ex;
1134 if ((dst = this.dst) != null &&
1135 (fn = this.fn) != null &&
1136 (((a = this.src) != null && (r = a.result) != null) ||
1137 ((b = this.snd) != null && (r = b.result) != null)) &&
1138 compareAndSet(0, 1)) {
1139 if (r instanceof AltResult) {
1140 ex = ((AltResult)r).ex;
1141 t = null;
1142 }
1143 else {
1144 ex = null;
1145 @SuppressWarnings("unchecked") T tr = (T) r;
1146 t = tr;
1147 }
1148 Executor e = executor;
1149 if (ex == null) {
1150 try {
1151 if (e != null)
1152 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1153 else
1154 fn.accept(t);
1155 } catch (Throwable rex) {
1156 ex = rex;
1157 }
1158 }
1159 if (e == null || ex != null)
1160 dst.setInternalResult(null, ex);
1161 }
1162 return dst;
1163 }
1164 private static final long serialVersionUID = 5232453952276885070L;
1165 }
1166
1167 static final class RunAfterEither extends Completion {
1168 final CompletableFuture<?> src;
1169 final CompletableFuture<?> snd;
1170 final Runnable fn;
1171 final CompletableFuture<Void> dst;
1172 final Executor executor;
1173 RunAfterEither(CompletableFuture<?> src,
1174 CompletableFuture<?> snd,
1175 Runnable fn,
1176 CompletableFuture<Void> dst,
1177 Executor executor) {
1178 this.src = src; this.snd = snd;
1179 this.fn = fn; this.dst = dst;
1180 this.executor = executor;
1181 }
1182 public final CompletableFuture<?> trigger() {
1183 final CompletableFuture<?> a;
1184 final CompletableFuture<?> b;
1185 final Runnable fn;
1186 final CompletableFuture<Void> dst;
1187 Object r; Throwable ex;
1188 if ((dst = this.dst) != null &&
1189 (fn = this.fn) != null &&
1190 (((a = this.src) != null && (r = a.result) != null) ||
1191 ((b = this.snd) != null && (r = b.result) != null)) &&
1192 compareAndSet(0, 1)) {
1193 if (r instanceof AltResult)
1194 ex = ((AltResult)r).ex;
1195 else
1196 ex = null;
1197 Executor e = executor;
1198 if (ex == null) {
1199 try {
1200 if (e != null)
1201 execAsync(e, new AsyncRun(fn, dst));
1202 else
1203 fn.run();
1204 } catch (Throwable rex) {
1205 ex = rex;
1206 }
1207 }
1208 if (e == null || ex != null)
1209 dst.setInternalResult(null, ex);
1210 }
1211 return dst;
1212 }
1213 private static final long serialVersionUID = 5232453952276885070L;
1214 }
1215
1216 static final class OrCompletion extends Completion {
1217 final CompletableFuture<?> src;
1218 final CompletableFuture<?> snd;
1219 final CompletableFuture<Object> dst;
1220 OrCompletion(CompletableFuture<?> src,
1221 CompletableFuture<?> snd,
1222 CompletableFuture<Object> dst) {
1223 this.src = src; this.snd = snd; this.dst = dst;
1224 }
1225 public final CompletableFuture<?> trigger() {
1226 final CompletableFuture<?> a;
1227 final CompletableFuture<?> b;
1228 final CompletableFuture<Object> dst;
1229 Object r, t; Throwable ex;
1230 if ((dst = this.dst) != null &&
1231 (((a = this.src) != null && (r = a.result) != null) ||
1232 ((b = this.snd) != null && (r = b.result) != null)) &&
1233 compareAndSet(0, 1)) {
1234 if (r instanceof AltResult) {
1235 ex = ((AltResult)r).ex;
1236 t = null;
1237 }
1238 else {
1239 ex = null;
1240 t = r;
1241 }
1242 dst.setInternalResult(t, ex);
1243 }
1244 return dst;
1245 }
1246 private static final long serialVersionUID = 5232453952276885070L;
1247 }
1248
1249 static final class ExceptionCompletion<T> extends Completion {
1250 final CompletableFuture<? extends T> src;
1251 final Function<? super Throwable, ? extends T> fn;
1252 final CompletableFuture<T> dst;
1253 ExceptionCompletion(CompletableFuture<? extends T> src,
1254 Function<? super Throwable, ? extends T> fn,
1255 CompletableFuture<T> dst) {
1256 this.src = src; this.fn = fn; this.dst = dst;
1257 }
1258 public final CompletableFuture<?> trigger() {
1259 final CompletableFuture<? extends T> a;
1260 final Function<? super Throwable, ? extends T> fn;
1261 final CompletableFuture<T> dst;
1262 Object r; T t = null; Throwable ex, dx = null;
1263 if ((dst = this.dst) != null &&
1264 (fn = this.fn) != null &&
1265 (a = this.src) != null &&
1266 (r = a.result) != null &&
1267 compareAndSet(0, 1)) {
1268 if ((r instanceof AltResult) &&
1269 (ex = ((AltResult)r).ex) != null) {
1270 try {
1271 t = fn.apply(ex);
1272 } catch (Throwable rex) {
1273 dx = rex;
1274 }
1275 }
1276 else {
1277 @SuppressWarnings("unchecked") T tr = (T) r;
1278 t = tr;
1279 }
1280 dst.setInternalResult(t, dx);
1281 }
1282 return dst;
1283 }
1284 private static final long serialVersionUID = 5232453952276885070L;
1285 }
1286
1287 static final class WhenCompleteCompletion<T> extends Completion {
1288 final CompletableFuture<? extends T> src;
1289 final BiConsumer<? super T, ? super Throwable> fn;
1290 final CompletableFuture<T> dst;
1291 final Executor executor;
1292 WhenCompleteCompletion(CompletableFuture<? extends T> src,
1293 BiConsumer<? super T, ? super Throwable> fn,
1294 CompletableFuture<T> dst,
1295 Executor executor) {
1296 this.src = src; this.fn = fn; this.dst = dst;
1297 this.executor = executor;
1298 }
1299 public final CompletableFuture<?> trigger() {
1300 final CompletableFuture<? extends T> a;
1301 final BiConsumer<? super T, ? super Throwable> fn;
1302 final CompletableFuture<T> dst;
1303 Object r; T t; Throwable ex;
1304 if ((dst = this.dst) != null &&
1305 (fn = this.fn) != null &&
1306 (a = this.src) != null &&
1307 (r = a.result) != null &&
1308 compareAndSet(0, 1)) {
1309 if (r instanceof AltResult) {
1310 ex = ((AltResult)r).ex;
1311 t = null;
1312 }
1313 else {
1314 ex = null;
1315 @SuppressWarnings("unchecked") T tr = (T) r;
1316 t = tr;
1317 }
1318 Executor e = executor;
1319 Throwable dx = null;
1320 try {
1321 if (e != null)
1322 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
1323 else
1324 fn.accept(t, ex);
1325 } catch (Throwable rex) {
1326 dx = rex;
1327 }
1328 if (e == null || dx != null)
1329 dst.setInternalResult(t, ex != null ? ex : dx);
1330 }
1331 return dst;
1332 }
1333 private static final long serialVersionUID = 5232453952276885070L;
1334 }
1335
1336 static final class ThenCopy<T> extends Completion {
1337 final CompletableFuture<?> src;
1338 final CompletableFuture<T> dst;
1339 ThenCopy(CompletableFuture<?> src,
1340 CompletableFuture<T> dst) {
1341 this.src = src; this.dst = dst;
1342 }
1343 public final CompletableFuture<?> trigger() {
1344 final CompletableFuture<?> a;
1345 final CompletableFuture<T> dst;
1346 Object r; T t; Throwable ex;
1347 if ((dst = this.dst) != null &&
1348 (a = this.src) != null &&
1349 (r = a.result) != null &&
1350 compareAndSet(0, 1)) {
1351 if (r instanceof AltResult) {
1352 ex = ((AltResult)r).ex;
1353 t = null;
1354 }
1355 else {
1356 ex = null;
1357 @SuppressWarnings("unchecked") T tr = (T) r;
1358 t = tr;
1359 }
1360 dst.setInternalResult(t, ex);
1361 }
1362 return dst;
1363 }
1364 private static final long serialVersionUID = 5232453952276885070L;
1365 }
1366
1367 // version of ThenCopy for CompletableFuture<Void> dst
1368 static final class ThenPropagate extends Completion {
1369 final CompletableFuture<?> src;
1370 final CompletableFuture<Void> dst;
1371 ThenPropagate(CompletableFuture<?> src,
1372 CompletableFuture<Void> dst) {
1373 this.src = src; this.dst = dst;
1374 }
1375 public final CompletableFuture<?> trigger() {
1376 final CompletableFuture<?> a;
1377 final CompletableFuture<Void> dst;
1378 Object r; Throwable ex;
1379 if ((dst = this.dst) != null &&
1380 (a = this.src) != null &&
1381 (r = a.result) != null &&
1382 compareAndSet(0, 1)) {
1383 if (r instanceof AltResult)
1384 ex = ((AltResult)r).ex;
1385 else
1386 ex = null;
1387 dst.setInternalResult(null, ex);
1388 }
1389 return dst;
1390 }
1391 private static final long serialVersionUID = 5232453952276885070L;
1392 }
1393
1394 static final class HandleCompletion<T,U> extends Completion {
1395 final CompletableFuture<? extends T> src;
1396 final BiFunction<? super T, Throwable, ? extends U> fn;
1397 final CompletableFuture<U> dst;
1398 final Executor executor;
1399 HandleCompletion(CompletableFuture<? extends T> src,
1400 BiFunction<? super T, Throwable, ? extends U> fn,
1401 CompletableFuture<U> dst,
1402 Executor executor) {
1403 this.src = src; this.fn = fn; this.dst = dst;
1404 this.executor = executor;
1405 }
1406 public final CompletableFuture<?> trigger() {
1407 final CompletableFuture<? extends T> a;
1408 final BiFunction<? super T, Throwable, ? extends U> fn;
1409 final CompletableFuture<U> dst;
1410 Object r; T t; Throwable ex;
1411 if ((dst = this.dst) != null &&
1412 (fn = this.fn) != null &&
1413 (a = this.src) != null &&
1414 (r = a.result) != null &&
1415 compareAndSet(0, 1)) {
1416 if (r instanceof AltResult) {
1417 ex = ((AltResult)r).ex;
1418 t = null;
1419 }
1420 else {
1421 ex = null;
1422 @SuppressWarnings("unchecked") T tr = (T) r;
1423 t = tr;
1424 }
1425 Executor e = executor;
1426 U u = null;
1427 Throwable dx = null;
1428 try {
1429 if (e != null)
1430 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1431 else
1432 u = fn.apply(t, ex);
1433 } catch (Throwable rex) {
1434 dx = rex;
1435 }
1436 if (e == null || dx != null)
1437 dst.setInternalResult(u, dx);
1438 }
1439 return dst;
1440 }
1441 private static final long serialVersionUID = 5232453952276885070L;
1442 }
1443
1444 static final class ThenCompose<T,U> extends Completion {
1445 final CompletableFuture<? extends T> src;
1446 final Function<? super T, ? extends CompletionStage<U>> fn;
1447 final CompletableFuture<U> dst;
1448 final Executor executor;
1449 ThenCompose(CompletableFuture<? extends T> src,
1450 Function<? super T, ? extends CompletionStage<U>> fn,
1451 CompletableFuture<U> dst,
1452 Executor executor) {
1453 this.src = src; this.fn = fn; this.dst = dst;
1454 this.executor = executor;
1455 }
1456 public final CompletableFuture<?> trigger() {
1457 final CompletableFuture<? extends T> a;
1458 final Function<? super T, ? extends CompletionStage<U>> fn;
1459 final CompletableFuture<U> dst;
1460 Object r; T t; Throwable ex; Executor e;
1461 if ((dst = this.dst) != null &&
1462 (fn = this.fn) != null &&
1463 (a = this.src) != null &&
1464 (r = a.result) != null &&
1465 compareAndSet(0, 1)) {
1466 if (r instanceof AltResult) {
1467 ex = ((AltResult)r).ex;
1468 t = null;
1469 }
1470 else {
1471 ex = null;
1472 @SuppressWarnings("unchecked") T tr = (T) r;
1473 t = tr;
1474 }
1475 CompletableFuture<U> c = null;
1476 U u = null;
1477 boolean complete = false;
1478 if (ex == null) {
1479 if ((e = executor) != null)
1480 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1481 else {
1482 try {
1483 CompletionStage<U> cs = fn.apply(t);
1484 c = (cs == null) ? null : cs.toCompletableFuture();
1485 if (c == null)
1486 ex = new NullPointerException();
1487 } catch (Throwable rex) {
1488 ex = rex;
1489 }
1490 }
1491 }
1492 if (c != null) {
1493 ThenCopy<U> d = null;
1494 Object s;
1495 if ((s = c.result) == null) {
1496 CompletionNode p = new CompletionNode
1497 (d = new ThenCopy<U>(c, dst));
1498 while ((s = c.result) == null) {
1499 if (UNSAFE.compareAndSwapObject
1500 (c, COMPLETIONS, p.next = c.completions, p))
1501 break;
1502 }
1503 }
1504 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1505 complete = true;
1506 if (s instanceof AltResult) {
1507 ex = ((AltResult)s).ex; // no rewrap
1508 u = null;
1509 }
1510 else {
1511 @SuppressWarnings("unchecked") U us = (U) s;
1512 u = us;
1513 }
1514 }
1515 }
1516 if (complete || ex != null)
1517 dst.setInternalResult(u, ex);
1518 if (c != null)
1519 c.helpPostComplete();
1520 }
1521 return dst;
1522 }
1523 private static final long serialVersionUID = 5232453952276885070L;
1524 }
1525
1526 // Implementations of stage methods with (plain, async, Executor) forms
1527
1528 private <U> CompletableFuture<U> doThenApply
1529 (Function<? super T,? extends U> fn,
1530 Executor e) {
1531 if (fn == null) throw new NullPointerException();
1532 CompletableFuture<U> dst = new CompletableFuture<U>();
1533 ThenApply<T,U> d = null;
1534 Object r;
1535 if ((r = result) == null) {
1536 CompletionNode p = new CompletionNode
1537 (d = new ThenApply<T,U>(this, fn, dst, e));
1538 while ((r = result) == null) {
1539 if (UNSAFE.compareAndSwapObject
1540 (this, COMPLETIONS, p.next = completions, p))
1541 break;
1542 }
1543 }
1544 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1545 T t; Throwable ex;
1546 if (r instanceof AltResult) {
1547 ex = ((AltResult)r).ex;
1548 t = null;
1549 }
1550 else {
1551 ex = null;
1552 @SuppressWarnings("unchecked") T tr = (T) r;
1553 t = tr;
1554 }
1555 U u = null;
1556 if (ex == null) {
1557 try {
1558 if (e != null)
1559 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1560 else
1561 u = fn.apply(t);
1562 } catch (Throwable rex) {
1563 ex = rex;
1564 }
1565 }
1566 if (e == null || ex != null)
1567 dst.internalComplete(u, ex);
1568 }
1569 helpPostComplete();
1570 return dst;
1571 }
1572
1573 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1574 Executor e) {
1575 if (fn == null) throw new NullPointerException();
1576 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1577 ThenAccept<T> d = null;
1578 Object r;
1579 if ((r = result) == null) {
1580 CompletionNode p = new CompletionNode
1581 (d = new ThenAccept<T>(this, fn, dst, e));
1582 while ((r = result) == null) {
1583 if (UNSAFE.compareAndSwapObject
1584 (this, COMPLETIONS, p.next = completions, p))
1585 break;
1586 }
1587 }
1588 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1589 T t; Throwable ex;
1590 if (r instanceof AltResult) {
1591 ex = ((AltResult)r).ex;
1592 t = null;
1593 }
1594 else {
1595 ex = null;
1596 @SuppressWarnings("unchecked") T tr = (T) r;
1597 t = tr;
1598 }
1599 if (ex == null) {
1600 try {
1601 if (e != null)
1602 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1603 else
1604 fn.accept(t);
1605 } catch (Throwable rex) {
1606 ex = rex;
1607 }
1608 }
1609 if (e == null || ex != null)
1610 dst.internalComplete(null, ex);
1611 }
1612 helpPostComplete();
1613 return dst;
1614 }
1615
1616 private CompletableFuture<Void> doThenRun(Runnable action,
1617 Executor e) {
1618 if (action == null) throw new NullPointerException();
1619 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1620 ThenRun d = null;
1621 Object r;
1622 if ((r = result) == null) {
1623 CompletionNode p = new CompletionNode
1624 (d = new ThenRun(this, action, dst, e));
1625 while ((r = result) == null) {
1626 if (UNSAFE.compareAndSwapObject
1627 (this, COMPLETIONS, p.next = completions, p))
1628 break;
1629 }
1630 }
1631 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1632 Throwable ex;
1633 if (r instanceof AltResult)
1634 ex = ((AltResult)r).ex;
1635 else
1636 ex = null;
1637 if (ex == null) {
1638 try {
1639 if (e != null)
1640 execAsync(e, new AsyncRun(action, dst));
1641 else
1642 action.run();
1643 } catch (Throwable rex) {
1644 ex = rex;
1645 }
1646 }
1647 if (e == null || ex != null)
1648 dst.internalComplete(null, ex);
1649 }
1650 helpPostComplete();
1651 return dst;
1652 }
1653
1654 private <U,V> CompletableFuture<V> doThenCombine
1655 (CompletableFuture<? extends U> other,
1656 BiFunction<? super T,? super U,? extends V> fn,
1657 Executor e) {
1658 if (other == null || fn == null) throw new NullPointerException();
1659 CompletableFuture<V> dst = new CompletableFuture<V>();
1660 ThenCombine<T,U,V> d = null;
1661 Object r, s = null;
1662 if ((r = result) == null || (s = other.result) == null) {
1663 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
1664 CompletionNode q = null, p = new CompletionNode(d);
1665 while ((r == null && (r = result) == null) ||
1666 (s == null && (s = other.result) == null)) {
1667 if (q != null) {
1668 if (s != null ||
1669 UNSAFE.compareAndSwapObject
1670 (other, COMPLETIONS, q.next = other.completions, q))
1671 break;
1672 }
1673 else if (r != null ||
1674 UNSAFE.compareAndSwapObject
1675 (this, COMPLETIONS, p.next = completions, p)) {
1676 if (s != null)
1677 break;
1678 q = new CompletionNode(d);
1679 }
1680 }
1681 }
1682 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1683 T t; U u; Throwable ex;
1684 if (r instanceof AltResult) {
1685 ex = ((AltResult)r).ex;
1686 t = null;
1687 }
1688 else {
1689 ex = null;
1690 @SuppressWarnings("unchecked") T tr = (T) r;
1691 t = tr;
1692 }
1693 if (ex != null)
1694 u = null;
1695 else if (s instanceof AltResult) {
1696 ex = ((AltResult)s).ex;
1697 u = null;
1698 }
1699 else {
1700 @SuppressWarnings("unchecked") U us = (U) s;
1701 u = us;
1702 }
1703 V v = null;
1704 if (ex == null) {
1705 try {
1706 if (e != null)
1707 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
1708 else
1709 v = fn.apply(t, u);
1710 } catch (Throwable rex) {
1711 ex = rex;
1712 }
1713 }
1714 if (e == null || ex != null)
1715 dst.internalComplete(v, ex);
1716 }
1717 helpPostComplete();
1718 other.helpPostComplete();
1719 return dst;
1720 }
1721
1722 private <U> CompletableFuture<Void> doThenAcceptBoth
1723 (CompletableFuture<? extends U> other,
1724 BiConsumer<? super T,? super U> fn,
1725 Executor e) {
1726 if (other == null || fn == null) throw new NullPointerException();
1727 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1728 ThenAcceptBoth<T,U> d = null;
1729 Object r, s = null;
1730 if ((r = result) == null || (s = other.result) == null) {
1731 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1732 CompletionNode q = null, p = new CompletionNode(d);
1733 while ((r == null && (r = result) == null) ||
1734 (s == null && (s = other.result) == null)) {
1735 if (q != null) {
1736 if (s != null ||
1737 UNSAFE.compareAndSwapObject
1738 (other, COMPLETIONS, q.next = other.completions, q))
1739 break;
1740 }
1741 else if (r != null ||
1742 UNSAFE.compareAndSwapObject
1743 (this, COMPLETIONS, p.next = completions, p)) {
1744 if (s != null)
1745 break;
1746 q = new CompletionNode(d);
1747 }
1748 }
1749 }
1750 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1751 T t; U u; Throwable ex;
1752 if (r instanceof AltResult) {
1753 ex = ((AltResult)r).ex;
1754 t = null;
1755 }
1756 else {
1757 ex = null;
1758 @SuppressWarnings("unchecked") T tr = (T) r;
1759 t = tr;
1760 }
1761 if (ex != null)
1762 u = null;
1763 else if (s instanceof AltResult) {
1764 ex = ((AltResult)s).ex;
1765 u = null;
1766 }
1767 else {
1768 @SuppressWarnings("unchecked") U us = (U) s;
1769 u = us;
1770 }
1771 if (ex == null) {
1772 try {
1773 if (e != null)
1774 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1775 else
1776 fn.accept(t, u);
1777 } catch (Throwable rex) {
1778 ex = rex;
1779 }
1780 }
1781 if (e == null || ex != null)
1782 dst.internalComplete(null, ex);
1783 }
1784 helpPostComplete();
1785 other.helpPostComplete();
1786 return dst;
1787 }
1788
1789 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1790 Runnable action,
1791 Executor e) {
1792 if (other == null || action == null) throw new NullPointerException();
1793 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1794 RunAfterBoth d = null;
1795 Object r, s = null;
1796 if ((r = result) == null || (s = other.result) == null) {
1797 d = new RunAfterBoth(this, other, action, dst, e);
1798 CompletionNode q = null, p = new CompletionNode(d);
1799 while ((r == null && (r = result) == null) ||
1800 (s == null && (s = other.result) == null)) {
1801 if (q != null) {
1802 if (s != null ||
1803 UNSAFE.compareAndSwapObject
1804 (other, COMPLETIONS, q.next = other.completions, q))
1805 break;
1806 }
1807 else if (r != null ||
1808 UNSAFE.compareAndSwapObject
1809 (this, COMPLETIONS, p.next = completions, p)) {
1810 if (s != null)
1811 break;
1812 q = new CompletionNode(d);
1813 }
1814 }
1815 }
1816 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1817 Throwable ex;
1818 if (r instanceof AltResult)
1819 ex = ((AltResult)r).ex;
1820 else
1821 ex = null;
1822 if (ex == null && (s instanceof AltResult))
1823 ex = ((AltResult)s).ex;
1824 if (ex == null) {
1825 try {
1826 if (e != null)
1827 execAsync(e, new AsyncRun(action, dst));
1828 else
1829 action.run();
1830 } catch (Throwable rex) {
1831 ex = rex;
1832 }
1833 }
1834 if (e == null || ex != null)
1835 dst.internalComplete(null, ex);
1836 }
1837 helpPostComplete();
1838 other.helpPostComplete();
1839 return dst;
1840 }
1841
1842 private <U> CompletableFuture<U> doApplyToEither
1843 (CompletableFuture<? extends T> other,
1844 Function<? super T, U> fn,
1845 Executor e) {
1846 if (other == null || fn == null) throw new NullPointerException();
1847 CompletableFuture<U> dst = new CompletableFuture<U>();
1848 ApplyToEither<T,U> d = null;
1849 Object r;
1850 if ((r = result) == null && (r = other.result) == null) {
1851 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
1852 CompletionNode q = null, p = new CompletionNode(d);
1853 while ((r = result) == null && (r = other.result) == null) {
1854 if (q != null) {
1855 if (UNSAFE.compareAndSwapObject
1856 (other, COMPLETIONS, q.next = other.completions, q))
1857 break;
1858 }
1859 else if (UNSAFE.compareAndSwapObject
1860 (this, COMPLETIONS, p.next = completions, p))
1861 q = new CompletionNode(d);
1862 }
1863 }
1864 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1865 T t; Throwable ex;
1866 if (r instanceof AltResult) {
1867 ex = ((AltResult)r).ex;
1868 t = null;
1869 }
1870 else {
1871 ex = null;
1872 @SuppressWarnings("unchecked") T tr = (T) r;
1873 t = tr;
1874 }
1875 U u = null;
1876 if (ex == null) {
1877 try {
1878 if (e != null)
1879 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1880 else
1881 u = fn.apply(t);
1882 } catch (Throwable rex) {
1883 ex = rex;
1884 }
1885 }
1886 if (e == null || ex != null)
1887 dst.internalComplete(u, ex);
1888 }
1889 helpPostComplete();
1890 other.helpPostComplete();
1891 return dst;
1892 }
1893
1894 private CompletableFuture<Void> doAcceptEither
1895 (CompletableFuture<? extends T> other,
1896 Consumer<? super T> fn,
1897 Executor e) {
1898 if (other == null || fn == null) throw new NullPointerException();
1899 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1900 AcceptEither<T> d = null;
1901 Object r;
1902 if ((r = result) == null && (r = other.result) == null) {
1903 d = new AcceptEither<T>(this, other, fn, dst, e);
1904 CompletionNode q = null, p = new CompletionNode(d);
1905 while ((r = result) == null && (r = other.result) == null) {
1906 if (q != null) {
1907 if (UNSAFE.compareAndSwapObject
1908 (other, COMPLETIONS, q.next = other.completions, q))
1909 break;
1910 }
1911 else if (UNSAFE.compareAndSwapObject
1912 (this, COMPLETIONS, p.next = completions, p))
1913 q = new CompletionNode(d);
1914 }
1915 }
1916 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1917 T t; Throwable ex;
1918 if (r instanceof AltResult) {
1919 ex = ((AltResult)r).ex;
1920 t = null;
1921 }
1922 else {
1923 ex = null;
1924 @SuppressWarnings("unchecked") T tr = (T) r;
1925 t = tr;
1926 }
1927 if (ex == null) {
1928 try {
1929 if (e != null)
1930 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1931 else
1932 fn.accept(t);
1933 } catch (Throwable rex) {
1934 ex = rex;
1935 }
1936 }
1937 if (e == null || ex != null)
1938 dst.internalComplete(null, ex);
1939 }
1940 helpPostComplete();
1941 other.helpPostComplete();
1942 return dst;
1943 }
1944
1945 private CompletableFuture<Void> doRunAfterEither
1946 (CompletableFuture<?> other,
1947 Runnable action,
1948 Executor e) {
1949 if (other == null || action == null) throw new NullPointerException();
1950 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1951 RunAfterEither d = null;
1952 Object r;
1953 if ((r = result) == null && (r = other.result) == null) {
1954 d = new RunAfterEither(this, other, action, dst, e);
1955 CompletionNode q = null, p = new CompletionNode(d);
1956 while ((r = result) == null && (r = other.result) == null) {
1957 if (q != null) {
1958 if (UNSAFE.compareAndSwapObject
1959 (other, COMPLETIONS, q.next = other.completions, q))
1960 break;
1961 }
1962 else if (UNSAFE.compareAndSwapObject
1963 (this, COMPLETIONS, p.next = completions, p))
1964 q = new CompletionNode(d);
1965 }
1966 }
1967 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1968 Throwable ex;
1969 if (r instanceof AltResult)
1970 ex = ((AltResult)r).ex;
1971 else
1972 ex = null;
1973 if (ex == null) {
1974 try {
1975 if (e != null)
1976 execAsync(e, new AsyncRun(action, dst));
1977 else
1978 action.run();
1979 } catch (Throwable rex) {
1980 ex = rex;
1981 }
1982 }
1983 if (e == null || ex != null)
1984 dst.internalComplete(null, ex);
1985 }
1986 helpPostComplete();
1987 other.helpPostComplete();
1988 return dst;
1989 }
1990
1991 private <U> CompletableFuture<U> doThenCompose
1992 (Function<? super T, ? extends CompletionStage<U>> fn,
1993 Executor e) {
1994 if (fn == null) throw new NullPointerException();
1995 CompletableFuture<U> dst = null;
1996 ThenCompose<T,U> d = null;
1997 Object r;
1998 if ((r = result) == null) {
1999 dst = new CompletableFuture<U>();
2000 CompletionNode p = new CompletionNode
2001 (d = new ThenCompose<T,U>(this, fn, dst, e));
2002 while ((r = result) == null) {
2003 if (UNSAFE.compareAndSwapObject
2004 (this, COMPLETIONS, p.next = completions, p))
2005 break;
2006 }
2007 }
2008 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2009 T t; Throwable ex;
2010 if (r instanceof AltResult) {
2011 ex = ((AltResult)r).ex;
2012 t = null;
2013 }
2014 else {
2015 ex = null;
2016 @SuppressWarnings("unchecked") T tr = (T) r;
2017 t = tr;
2018 }
2019 if (ex == null) {
2020 if (e != null) {
2021 if (dst == null)
2022 dst = new CompletableFuture<U>();
2023 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
2024 }
2025 else {
2026 try {
2027 CompletionStage<U> cs = fn.apply(t);
2028 if (cs == null ||
2029 (dst = cs.toCompletableFuture()) == null)
2030 ex = new NullPointerException();
2031 } catch (Throwable rex) {
2032 ex = rex;
2033 }
2034 }
2035 }
2036 if (dst == null)
2037 dst = new CompletableFuture<U>();
2038 if (ex != null)
2039 dst.internalComplete(null, ex);
2040 }
2041 helpPostComplete();
2042 dst.helpPostComplete();
2043 return dst;
2044 }
2045
2046 private CompletableFuture<T> doWhenComplete
2047 (BiConsumer<? super T, ? super Throwable> fn,
2048 Executor e) {
2049 if (fn == null) throw new NullPointerException();
2050 CompletableFuture<T> dst = new CompletableFuture<T>();
2051 WhenCompleteCompletion<T> d = null;
2052 Object r;
2053 if ((r = result) == null) {
2054 CompletionNode p =
2055 new CompletionNode(d = new WhenCompleteCompletion<T>
2056 (this, fn, dst, e));
2057 while ((r = result) == null) {
2058 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2059 p.next = completions, p))
2060 break;
2061 }
2062 }
2063 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2064 T t; Throwable ex;
2065 if (r instanceof AltResult) {
2066 ex = ((AltResult)r).ex;
2067 t = null;
2068 }
2069 else {
2070 ex = null;
2071 @SuppressWarnings("unchecked") T tr = (T) r;
2072 t = tr;
2073 }
2074 Throwable dx = null;
2075 try {
2076 if (e != null)
2077 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
2078 else
2079 fn.accept(t, ex);
2080 } catch (Throwable rex) {
2081 dx = rex;
2082 }
2083 if (e == null || dx != null)
2084 dst.internalComplete(t, ex != null ? ex : dx);
2085 }
2086 helpPostComplete();
2087 return dst;
2088 }
2089
2090 private <U> CompletableFuture<U> doHandle
2091 (BiFunction<? super T, Throwable, ? extends U> fn,
2092 Executor e) {
2093 if (fn == null) throw new NullPointerException();
2094 CompletableFuture<U> dst = new CompletableFuture<U>();
2095 HandleCompletion<T,U> d = null;
2096 Object r;
2097 if ((r = result) == null) {
2098 CompletionNode p =
2099 new CompletionNode(d = new HandleCompletion<T,U>
2100 (this, fn, dst, e));
2101 while ((r = result) == null) {
2102 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2103 p.next = completions, p))
2104 break;
2105 }
2106 }
2107 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2108 T t; Throwable ex;
2109 if (r instanceof AltResult) {
2110 ex = ((AltResult)r).ex;
2111 t = null;
2112 }
2113 else {
2114 ex = null;
2115 @SuppressWarnings("unchecked") T tr = (T) r;
2116 t = tr;
2117 }
2118 U u = null;
2119 Throwable dx = null;
2120 try {
2121 if (e != null)
2122 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2123 else {
2124 u = fn.apply(t, ex);
2125 dx = null;
2126 }
2127 } catch (Throwable rex) {
2128 dx = rex;
2129 u = null;
2130 }
2131 if (e == null || dx != null)
2132 dst.internalComplete(u, dx);
2133 }
2134 helpPostComplete();
2135 return dst;
2136 }
2137
2138
2139 // public methods
2140
2141 /**
2142 * Creates a new incomplete CompletableFuture.
2143 */
2144 public CompletableFuture() {
2145 }
2146
2147 /**
2148 * Returns a new CompletableFuture that is asynchronously completed
2149 * by a task running in the {@link ForkJoinPool#commonPool()} with
2150 * the value obtained by calling the given Supplier.
2151 *
2152 * @param supplier a function returning the value to be used
2153 * to complete the returned CompletableFuture
2154 * @param <U> the function's return type
2155 * @return the new CompletableFuture
2156 */
2157 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2158 if (supplier == null) throw new NullPointerException();
2159 CompletableFuture<U> f = new CompletableFuture<U>();
2160 execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
2161 return f;
2162 }
2163
2164 /**
2165 * Returns a new CompletableFuture that is asynchronously completed
2166 * by a task running in the given executor with the value obtained
2167 * by calling the given Supplier.
2168 *
2169 * @param supplier a function returning the value to be used
2170 * to complete the returned CompletableFuture
2171 * @param executor the executor to use for asynchronous execution
2172 * @param <U> the function's return type
2173 * @return the new CompletableFuture
2174 */
2175 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2176 Executor executor) {
2177 if (executor == null || supplier == null)
2178 throw new NullPointerException();
2179 CompletableFuture<U> f = new CompletableFuture<U>();
2180 execAsync(executor, new AsyncSupply<U>(supplier, f));
2181 return f;
2182 }
2183
2184 /**
2185 * Returns a new CompletableFuture that is asynchronously completed
2186 * by a task running in the {@link ForkJoinPool#commonPool()} after
2187 * it runs the given action.
2188 *
2189 * @param runnable the action to run before completing the
2190 * returned CompletableFuture
2191 * @return the new CompletableFuture
2192 */
2193 public static CompletableFuture<Void> runAsync(Runnable runnable) {
2194 if (runnable == null) throw new NullPointerException();
2195 CompletableFuture<Void> f = new CompletableFuture<Void>();
2196 execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f));
2197 return f;
2198 }
2199
2200 /**
2201 * Returns a new CompletableFuture that is asynchronously completed
2202 * by a task running in the given executor after it runs the given
2203 * action.
2204 *
2205 * @param runnable the action to run before completing the
2206 * returned CompletableFuture
2207 * @param executor the executor to use for asynchronous execution
2208 * @return the new CompletableFuture
2209 */
2210 public static CompletableFuture<Void> runAsync(Runnable runnable,
2211 Executor executor) {
2212 if (executor == null || runnable == null)
2213 throw new NullPointerException();
2214 CompletableFuture<Void> f = new CompletableFuture<Void>();
2215 execAsync(executor, new AsyncRun(runnable, f));
2216 return f;
2217 }
2218
2219 /**
2220 * Returns a new CompletableFuture that is already completed with
2221 * the given value.
2222 *
2223 * @param value the value
2224 * @param <U> the type of the value
2225 * @return the completed CompletableFuture
2226 */
2227 public static <U> CompletableFuture<U> completedFuture(U value) {
2228 CompletableFuture<U> f = new CompletableFuture<U>();
2229 f.result = (value == null) ? NIL : value;
2230 return f;
2231 }
2232
2233 /**
2234 * Returns {@code true} if completed in any fashion: normally,
2235 * exceptionally, or via cancellation.
2236 *
2237 * @return {@code true} if completed
2238 */
2239 public boolean isDone() {
2240 return result != null;
2241 }
2242
2243 /**
2244 * Waits if necessary for this future to complete, and then
2245 * returns its result.
2246 *
2247 * @return the result value
2248 * @throws CancellationException if this future was cancelled
2249 * @throws ExecutionException if this future completed exceptionally
2250 * @throws InterruptedException if the current thread was interrupted
2251 * while waiting
2252 */
2253 public T get() throws InterruptedException, ExecutionException {
2254 Object r; Throwable ex, cause;
2255 if ((r = result) == null && (r = waitingGet(true)) == null)
2256 throw new InterruptedException();
2257 if (!(r instanceof AltResult)) {
2258 @SuppressWarnings("unchecked") T tr = (T) r;
2259 return tr;
2260 }
2261 if ((ex = ((AltResult)r).ex) == null)
2262 return null;
2263 if (ex instanceof CancellationException)
2264 throw (CancellationException)ex;
2265 if ((ex instanceof CompletionException) &&
2266 (cause = ex.getCause()) != null)
2267 ex = cause;
2268 throw new ExecutionException(ex);
2269 }
2270
2271 /**
2272 * Waits if necessary for at most the given time for this future
2273 * to complete, and then returns its result, if available.
2274 *
2275 * @param timeout the maximum time to wait
2276 * @param unit the time unit of the timeout argument
2277 * @return the result value
2278 * @throws CancellationException if this future was cancelled
2279 * @throws ExecutionException if this future completed exceptionally
2280 * @throws InterruptedException if the current thread was interrupted
2281 * while waiting
2282 * @throws TimeoutException if the wait timed out
2283 */
2284 public T get(long timeout, TimeUnit unit)
2285 throws InterruptedException, ExecutionException, TimeoutException {
2286 Object r; Throwable ex, cause;
2287 long nanos = unit.toNanos(timeout);
2288 if (Thread.interrupted())
2289 throw new InterruptedException();
2290 if ((r = result) == null)
2291 r = timedAwaitDone(nanos);
2292 if (!(r instanceof AltResult)) {
2293 @SuppressWarnings("unchecked") T tr = (T) r;
2294 return tr;
2295 }
2296 if ((ex = ((AltResult)r).ex) == null)
2297 return null;
2298 if (ex instanceof CancellationException)
2299 throw (CancellationException)ex;
2300 if ((ex instanceof CompletionException) &&
2301 (cause = ex.getCause()) != null)
2302 ex = cause;
2303 throw new ExecutionException(ex);
2304 }
2305
2306 /**
2307 * Returns the result value when complete, or throws an
2308 * (unchecked) exception if completed exceptionally. To better
2309 * conform with the use of common functional forms, if a
2310 * computation involved in the completion of this
2311 * CompletableFuture threw an exception, this method throws an
2312 * (unchecked) {@link CompletionException} with the underlying
2313 * exception as its cause.
2314 *
2315 * @return the result value
2316 * @throws CancellationException if the computation was cancelled
2317 * @throws CompletionException if this future completed
2318 * exceptionally or a completion computation threw an exception
2319 */
2320 public T join() {
2321 Object r; Throwable ex;
2322 if ((r = result) == null)
2323 r = waitingGet(false);
2324 if (!(r instanceof AltResult)) {
2325 @SuppressWarnings("unchecked") T tr = (T) r;
2326 return tr;
2327 }
2328 if ((ex = ((AltResult)r).ex) == null)
2329 return null;
2330 if (ex instanceof CancellationException)
2331 throw (CancellationException)ex;
2332 if (ex instanceof CompletionException)
2333 throw (CompletionException)ex;
2334 throw new CompletionException(ex);
2335 }
2336
2337 /**
2338 * Returns the result value (or throws any encountered exception)
2339 * if completed, else returns the given valueIfAbsent.
2340 *
2341 * @param valueIfAbsent the value to return if not completed
2342 * @return the result value, if completed, else the given valueIfAbsent
2343 * @throws CancellationException if the computation was cancelled
2344 * @throws CompletionException if this future completed
2345 * exceptionally or a completion computation threw an exception
2346 */
2347 public T getNow(T valueIfAbsent) {
2348 Object r; Throwable ex;
2349 if ((r = result) == null)
2350 return valueIfAbsent;
2351 if (!(r instanceof AltResult)) {
2352 @SuppressWarnings("unchecked") T tr = (T) r;
2353 return tr;
2354 }
2355 if ((ex = ((AltResult)r).ex) == null)
2356 return null;
2357 if (ex instanceof CancellationException)
2358 throw (CancellationException)ex;
2359 if (ex instanceof CompletionException)
2360 throw (CompletionException)ex;
2361 throw new CompletionException(ex);
2362 }
2363
2364 /**
2365 * If not already completed, sets the value returned by {@link
2366 * #get()} and related methods to the given value.
2367 *
2368 * @param value the result value
2369 * @return {@code true} if this invocation caused this CompletableFuture
2370 * to transition to a completed state, else {@code false}
2371 */
2372 public boolean complete(T value) {
2373 boolean triggered = result == null &&
2374 UNSAFE.compareAndSwapObject(this, RESULT, null,
2375 value == null ? NIL : value);
2376 postComplete();
2377 return triggered;
2378 }
2379
2380 /**
2381 * If not already completed, causes invocations of {@link #get()}
2382 * and related methods to throw the given exception.
2383 *
2384 * @param ex the exception
2385 * @return {@code true} if this invocation caused this CompletableFuture
2386 * to transition to a completed state, else {@code false}
2387 */
2388 public boolean completeExceptionally(Throwable ex) {
2389 if (ex == null) throw new NullPointerException();
2390 boolean triggered = result == null &&
2391 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2392 postComplete();
2393 return triggered;
2394 }
2395
2396 // CompletionStage methods
2397
2398 public <U> CompletableFuture<U> thenApply
2399 (Function<? super T,? extends U> fn) {
2400 return doThenApply(fn, null);
2401 }
2402
2403 public <U> CompletableFuture<U> thenApplyAsync
2404 (Function<? super T,? extends U> fn) {
2405 return doThenApply(fn, ForkJoinPool.commonPool());
2406 }
2407
2408 public <U> CompletableFuture<U> thenApplyAsync
2409 (Function<? super T,? extends U> fn,
2410 Executor executor) {
2411 if (executor == null) throw new NullPointerException();
2412 return doThenApply(fn, executor);
2413 }
2414
2415 public CompletableFuture<Void> thenAccept
2416 (Consumer<? super T> action) {
2417 return doThenAccept(action, null);
2418 }
2419
2420 public CompletableFuture<Void> thenAcceptAsync
2421 (Consumer<? super T> action) {
2422 return doThenAccept(action, ForkJoinPool.commonPool());
2423 }
2424
2425 public CompletableFuture<Void> thenAcceptAsync
2426 (Consumer<? super T> action,
2427 Executor executor) {
2428 if (executor == null) throw new NullPointerException();
2429 return doThenAccept(action, executor);
2430 }
2431
2432 public CompletableFuture<Void> thenRun
2433 (Runnable action) {
2434 return doThenRun(action, null);
2435 }
2436
2437 public CompletableFuture<Void> thenRunAsync
2438 (Runnable action) {
2439 return doThenRun(action, ForkJoinPool.commonPool());
2440 }
2441
2442 public CompletableFuture<Void> thenRunAsync
2443 (Runnable action,
2444 Executor executor) {
2445 if (executor == null) throw new NullPointerException();
2446 return doThenRun(action, executor);
2447 }
2448
2449 public <U,V> CompletableFuture<V> thenCombine
2450 (CompletionStage<? extends U> other,
2451 BiFunction<? super T,? super U,? extends V> fn) {
2452 return doThenCombine(other.toCompletableFuture(), fn, null);
2453 }
2454
2455 public <U,V> CompletableFuture<V> thenCombineAsync
2456 (CompletionStage<? extends U> other,
2457 BiFunction<? super T,? super U,? extends V> fn) {
2458 return doThenCombine(other.toCompletableFuture(), fn,
2459 ForkJoinPool.commonPool());
2460 }
2461
2462 public <U,V> CompletableFuture<V> thenCombineAsync
2463 (CompletionStage<? extends U> other,
2464 BiFunction<? super T,? super U,? extends V> fn,
2465 Executor executor) {
2466 if (executor == null) throw new NullPointerException();
2467 return doThenCombine(other.toCompletableFuture(), fn, executor);
2468 }
2469
2470 public <U> CompletableFuture<Void> thenAcceptBoth
2471 (CompletionStage<? extends U> other,
2472 BiConsumer<? super T, ? super U> action) {
2473 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2474 }
2475
2476 public <U> CompletableFuture<Void> thenAcceptBothAsync
2477 (CompletionStage<? extends U> other,
2478 BiConsumer<? super T, ? super U> action) {
2479 return doThenAcceptBoth(other.toCompletableFuture(), action,
2480 ForkJoinPool.commonPool());
2481 }
2482
2483 public <U> CompletableFuture<Void> thenAcceptBothAsync
2484 (CompletionStage<? extends U> other,
2485 BiConsumer<? super T, ? super U> action,
2486 Executor executor) {
2487 if (executor == null) throw new NullPointerException();
2488 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2489 }
2490
2491 public CompletableFuture<Void> runAfterBoth
2492 (CompletionStage<?> other,
2493 Runnable action) {
2494 return doRunAfterBoth(other.toCompletableFuture(), action, null);
2495 }
2496
2497 public CompletableFuture<Void> runAfterBothAsync
2498 (CompletionStage<?> other,
2499 Runnable action) {
2500 return doRunAfterBoth(other.toCompletableFuture(), action,
2501 ForkJoinPool.commonPool());
2502 }
2503
2504 public CompletableFuture<Void> runAfterBothAsync
2505 (CompletionStage<?> other,
2506 Runnable action,
2507 Executor executor) {
2508 if (executor == null) throw new NullPointerException();
2509 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2510 }
2511
2512
2513 public <U> CompletableFuture<U> applyToEither
2514 (CompletionStage<? extends T> other,
2515 Function<? super T, U> fn) {
2516 return doApplyToEither(other.toCompletableFuture(), fn, null);
2517 }
2518
2519 public <U> CompletableFuture<U> applyToEitherAsync
2520 (CompletionStage<? extends T> other,
2521 Function<? super T, U> fn) {
2522 return doApplyToEither(other.toCompletableFuture(), fn,
2523 ForkJoinPool.commonPool());
2524 }
2525
2526 public <U> CompletableFuture<U> applyToEitherAsync
2527 (CompletionStage<? extends T> other,
2528 Function<? super T, U> fn,
2529 Executor executor) {
2530 if (executor == null) throw new NullPointerException();
2531 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2532 }
2533
2534 public CompletableFuture<Void> acceptEither
2535 (CompletionStage<? extends T> other,
2536 Consumer<? super T> action) {
2537 return doAcceptEither(other.toCompletableFuture(), action, null);
2538 }
2539
2540 public CompletableFuture<Void> acceptEitherAsync
2541 (CompletionStage<? extends T> other,
2542 Consumer<? super T> action) {
2543 return doAcceptEither(other.toCompletableFuture(), action,
2544 ForkJoinPool.commonPool());
2545 }
2546
2547 public CompletableFuture<Void> acceptEitherAsync
2548 (CompletionStage<? extends T> other,
2549 Consumer<? super T> action,
2550 Executor executor) {
2551 if (executor == null) throw new NullPointerException();
2552 return doAcceptEither(other.toCompletableFuture(), action, executor);
2553 }
2554
2555 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2556 Runnable action) {
2557 return doRunAfterEither(other.toCompletableFuture(), action, null);
2558 }
2559
2560 public CompletableFuture<Void> runAfterEitherAsync
2561 (CompletionStage<?> other,
2562 Runnable action) {
2563 return doRunAfterEither(other.toCompletableFuture(), action,
2564 ForkJoinPool.commonPool());
2565 }
2566
2567 public CompletableFuture<Void> runAfterEitherAsync
2568 (CompletionStage<?> other,
2569 Runnable action,
2570 Executor executor) {
2571 if (executor == null) throw new NullPointerException();
2572 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2573 }
2574
2575 public <U> CompletableFuture<U> thenCompose
2576 (Function<? super T, ? extends CompletionStage<U>> fn) {
2577 return doThenCompose(fn, null);
2578 }
2579
2580 public <U> CompletableFuture<U> thenComposeAsync
2581 (Function<? super T, ? extends CompletionStage<U>> fn) {
2582 return doThenCompose(fn, ForkJoinPool.commonPool());
2583 }
2584
2585 public <U> CompletableFuture<U> thenComposeAsync
2586 (Function<? super T, ? extends CompletionStage<U>> fn,
2587 Executor executor) {
2588 if (executor == null) throw new NullPointerException();
2589 return doThenCompose(fn, executor);
2590 }
2591
2592 public CompletableFuture<T> whenComplete
2593 (BiConsumer<? super T, ? super Throwable> action) {
2594 return doWhenComplete(action, null);
2595 }
2596
2597 public CompletableFuture<T> whenCompleteAsync
2598 (BiConsumer<? super T, ? super Throwable> action) {
2599 return doWhenComplete(action, ForkJoinPool.commonPool());
2600 }
2601
2602 public CompletableFuture<T> whenCompleteAsync
2603 (BiConsumer<? super T, ? super Throwable> action,
2604 Executor executor) {
2605 if (executor == null) throw new NullPointerException();
2606 return doWhenComplete(action, executor);
2607 }
2608
2609 public <U> CompletableFuture<U> handle
2610 (BiFunction<? super T, Throwable, ? extends U> fn) {
2611 return doHandle(fn, null);
2612 }
2613
2614 public <U> CompletableFuture<U> handleAsync
2615 (BiFunction<? super T, Throwable, ? extends U> fn) {
2616 return doHandle(fn, ForkJoinPool.commonPool());
2617 }
2618
2619 public <U> CompletableFuture<U> handleAsync
2620 (BiFunction<? super T, Throwable, ? extends U> fn,
2621 Executor executor) {
2622 if (executor == null) throw new NullPointerException();
2623 return doHandle(fn, executor);
2624 }
2625
2626 /**
2627 * Returns this CompletableFuture
2628 *
2629 * @return this CompletableFuture
2630 */
2631 public CompletableFuture<T> toCompletableFuture() {
2632 return this;
2633 }
2634
2635 // not in interface CompletionStage
2636
2637 /**
2638 * Returns a new CompletableFuture that is completed when this
2639 * CompletableFuture completes, with the result of the given
2640 * function of the exception triggering this CompletableFuture's
2641 * completion when it completes exceptionally; otherwise, if this
2642 * CompletableFuture completes normally, then the returned
2643 * CompletableFuture also completes normally with the same value.
2644 * Note: More flexible versions of this functionality are
2645 * available using methods {@code whenComplete} and {@code handle}.
2646 *
2647 * @param fn the function to use to compute the value of the
2648 * returned CompletableFuture if this CompletableFuture completed
2649 * exceptionally
2650 * @return the new CompletableFuture
2651 */
2652 public CompletableFuture<T> exceptionally
2653 (Function<Throwable, ? extends T> fn) {
2654 if (fn == null) throw new NullPointerException();
2655 CompletableFuture<T> dst = new CompletableFuture<T>();
2656 ExceptionCompletion<T> d = null;
2657 Object r;
2658 if ((r = result) == null) {
2659 CompletionNode p =
2660 new CompletionNode(d = new ExceptionCompletion<T>
2661 (this, fn, dst));
2662 while ((r = result) == null) {
2663 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2664 p.next = completions, p))
2665 break;
2666 }
2667 }
2668 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2669 T t = null; Throwable ex, dx = null;
2670 if (r instanceof AltResult) {
2671 if ((ex = ((AltResult)r).ex) != null) {
2672 try {
2673 t = fn.apply(ex);
2674 } catch (Throwable rex) {
2675 dx = rex;
2676 }
2677 }
2678 }
2679 else {
2680 @SuppressWarnings("unchecked") T tr = (T) r;
2681 t = tr;
2682 }
2683 dst.internalComplete(t, dx);
2684 }
2685 helpPostComplete();
2686 return dst;
2687 }
2688
2689 /* ------------- Arbitrary-arity constructions -------------- */
2690
2691 /*
2692 * The basic plan of attack is to recursively form binary
2693 * completion trees of elements. This can be overkill for small
2694 * sets, but scales nicely. The And/All vs Or/Any forms use the
2695 * same idea, but details differ.
2696 */
2697
2698 /**
2699 * Returns a new CompletableFuture that is completed when all of
2700 * the given CompletableFutures complete. If any of the given
2701 * CompletableFutures complete exceptionally, then the returned
2702 * CompletableFuture also does so, with a CompletionException
2703 * holding this exception as its cause. Otherwise, the results,
2704 * if any, of the given CompletableFutures are not reflected in
2705 * the returned CompletableFuture, but may be obtained by
2706 * inspecting them individually. If no CompletableFutures are
2707 * provided, returns a CompletableFuture completed with the value
2708 * {@code null}.
2709 *
2710 * <p>Among the applications of this method is to await completion
2711 * of a set of independent CompletableFutures before continuing a
2712 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2713 * c3).join();}.
2714 *
2715 * @param cfs the CompletableFutures
2716 * @return a new CompletableFuture that is completed when all of the
2717 * given CompletableFutures complete
2718 * @throws NullPointerException if the array or any of its elements are
2719 * {@code null}
2720 */
2721 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2722 int len = cfs.length; // Directly handle empty and singleton cases
2723 if (len > 1)
2724 return allTree(cfs, 0, len - 1);
2725 else {
2726 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2727 CompletableFuture<?> f;
2728 if (len == 0)
2729 dst.result = NIL;
2730 else if ((f = cfs[0]) == null)
2731 throw new NullPointerException();
2732 else {
2733 ThenPropagate d = null;
2734 CompletionNode p = null;
2735 Object r;
2736 while ((r = f.result) == null) {
2737 if (d == null)
2738 d = new ThenPropagate(f, dst);
2739 else if (p == null)
2740 p = new CompletionNode(d);
2741 else if (UNSAFE.compareAndSwapObject
2742 (f, COMPLETIONS, p.next = f.completions, p))
2743 break;
2744 }
2745 if (r != null && (d == null || d.compareAndSet(0, 1)))
2746 dst.internalComplete(null, (r instanceof AltResult) ?
2747 ((AltResult)r).ex : null);
2748 f.helpPostComplete();
2749 }
2750 return dst;
2751 }
2752 }
2753
2754 /**
2755 * Recursively constructs an And'ed tree of CompletableFutures.
2756 * Called only when array known to have at least two elements.
2757 */
2758 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2759 int lo, int hi) {
2760 CompletableFuture<?> fst, snd;
2761 int mid = (lo + hi) >>> 1;
2762 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2763 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2764 throw new NullPointerException();
2765 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2766 AndCompletion d = null;
2767 CompletionNode p = null, q = null;
2768 Object r = null, s = null;
2769 while ((r = fst.result) == null || (s = snd.result) == null) {
2770 if (d == null)
2771 d = new AndCompletion(fst, snd, dst);
2772 else if (p == null)
2773 p = new CompletionNode(d);
2774 else if (q == null) {
2775 if (UNSAFE.compareAndSwapObject
2776 (fst, COMPLETIONS, p.next = fst.completions, p))
2777 q = new CompletionNode(d);
2778 }
2779 else if (UNSAFE.compareAndSwapObject
2780 (snd, COMPLETIONS, q.next = snd.completions, q))
2781 break;
2782 }
2783 if ((r != null || (r = fst.result) != null) &&
2784 (s != null || (s = snd.result) != null) &&
2785 (d == null || d.compareAndSet(0, 1))) {
2786 Throwable ex;
2787 if (r instanceof AltResult)
2788 ex = ((AltResult)r).ex;
2789 else
2790 ex = null;
2791 if (ex == null && (s instanceof AltResult))
2792 ex = ((AltResult)s).ex;
2793 dst.internalComplete(null, ex);
2794 }
2795 fst.helpPostComplete();
2796 snd.helpPostComplete();
2797 return dst;
2798 }
2799
2800 /**
2801 * Returns a new CompletableFuture that is completed when any of
2802 * the given CompletableFutures complete, with the same result.
2803 * Otherwise, if it completed exceptionally, the returned
2804 * CompletableFuture also does so, with a CompletionException
2805 * holding this exception as its cause. If no CompletableFutures
2806 * are provided, returns an incomplete CompletableFuture.
2807 *
2808 * @param cfs the CompletableFutures
2809 * @return a new CompletableFuture that is completed with the
2810 * result or exception of any of the given CompletableFutures when
2811 * one completes
2812 * @throws NullPointerException if the array or any of its elements are
2813 * {@code null}
2814 */
2815 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2816 int len = cfs.length; // Same idea as allOf
2817 if (len > 1)
2818 return anyTree(cfs, 0, len - 1);
2819 else {
2820 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2821 CompletableFuture<?> f;
2822 if (len == 0)
2823 ; // skip
2824 else if ((f = cfs[0]) == null)
2825 throw new NullPointerException();
2826 else {
2827 ThenCopy<Object> d = null;
2828 CompletionNode p = null;
2829 Object r;
2830 while ((r = f.result) == null) {
2831 if (d == null)
2832 d = new ThenCopy<Object>(f, dst);
2833 else if (p == null)
2834 p = new CompletionNode(d);
2835 else if (UNSAFE.compareAndSwapObject
2836 (f, COMPLETIONS, p.next = f.completions, p))
2837 break;
2838 }
2839 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2840 Throwable ex; Object t;
2841 if (r instanceof AltResult) {
2842 ex = ((AltResult)r).ex;
2843 t = null;
2844 }
2845 else {
2846 ex = null;
2847 t = r;
2848 }
2849 dst.internalComplete(t, ex);
2850 }
2851 f.helpPostComplete();
2852 }
2853 return dst;
2854 }
2855 }
2856
2857 /**
2858 * Recursively constructs an Or'ed tree of CompletableFutures.
2859 */
2860 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2861 int lo, int hi) {
2862 CompletableFuture<?> fst, snd;
2863 int mid = (lo + hi) >>> 1;
2864 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2865 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2866 throw new NullPointerException();
2867 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2868 OrCompletion d = null;
2869 CompletionNode p = null, q = null;
2870 Object r;
2871 while ((r = fst.result) == null && (r = snd.result) == null) {
2872 if (d == null)
2873 d = new OrCompletion(fst, snd, dst);
2874 else if (p == null)
2875 p = new CompletionNode(d);
2876 else if (q == null) {
2877 if (UNSAFE.compareAndSwapObject
2878 (fst, COMPLETIONS, p.next = fst.completions, p))
2879 q = new CompletionNode(d);
2880 }
2881 else if (UNSAFE.compareAndSwapObject
2882 (snd, COMPLETIONS, q.next = snd.completions, q))
2883 break;
2884 }
2885 if ((r != null || (r = fst.result) != null ||
2886 (r = snd.result) != null) &&
2887 (d == null || d.compareAndSet(0, 1))) {
2888 Throwable ex; Object t;
2889 if (r instanceof AltResult) {
2890 ex = ((AltResult)r).ex;
2891 t = null;
2892 }
2893 else {
2894 ex = null;
2895 t = r;
2896 }
2897 dst.internalComplete(t, ex);
2898 }
2899 fst.helpPostComplete();
2900 snd.helpPostComplete();
2901 return dst;
2902 }
2903
2904 /* ------------- Control and status methods -------------- */
2905
2906 /**
2907 * If not already completed, completes this CompletableFuture with
2908 * a {@link CancellationException}. Dependent CompletableFutures
2909 * that have not already completed will also complete
2910 * exceptionally, with a {@link CompletionException} caused by
2911 * this {@code CancellationException}.
2912 *
2913 * @param mayInterruptIfRunning this value has no effect in this
2914 * implementation because interrupts are not used to control
2915 * processing.
2916 *
2917 * @return {@code true} if this task is now cancelled
2918 */
2919 public boolean cancel(boolean mayInterruptIfRunning) {
2920 boolean cancelled = (result == null) &&
2921 UNSAFE.compareAndSwapObject
2922 (this, RESULT, null, new AltResult(new CancellationException()));
2923 postComplete();
2924 return cancelled || isCancelled();
2925 }
2926
2927 /**
2928 * Returns {@code true} if this CompletableFuture was cancelled
2929 * before it completed normally.
2930 *
2931 * @return {@code true} if this CompletableFuture was cancelled
2932 * before it completed normally
2933 */
2934 public boolean isCancelled() {
2935 Object r;
2936 return ((r = result) instanceof AltResult) &&
2937 (((AltResult)r).ex instanceof CancellationException);
2938 }
2939
2940 /**
2941 * Returns {@code true} if this CompletableFuture completed
2942 * exceptionally, in any way. Possible causes include
2943 * cancellation, explicit invocation of {@code
2944 * completeExceptionally}, and abrupt termination of a
2945 * CompletionStage action.
2946 *
2947 * @return {@code true} if this CompletableFuture completed
2948 * exceptionally
2949 */
2950 public boolean isCompletedExceptionally() {
2951 Object r;
2952 return ((r = result) instanceof AltResult) && r != NIL;
2953 }
2954
2955 /**
2956 * Forcibly sets or resets the value subsequently returned by
2957 * method {@link #get()} and related methods, whether or not
2958 * already completed. This method is designed for use only in
2959 * error recovery actions, and even in such situations may result
2960 * in ongoing dependent completions using established versus
2961 * overwritten outcomes.
2962 *
2963 * @param value the completion value
2964 */
2965 public void obtrudeValue(T value) {
2966 result = (value == null) ? NIL : value;
2967 postComplete();
2968 }
2969
2970 /**
2971 * Forcibly causes subsequent invocations of method {@link #get()}
2972 * and related methods to throw the given exception, whether or
2973 * not already completed. This method is designed for use only in
2974 * recovery actions, and even in such situations may result in
2975 * ongoing dependent completions using established versus
2976 * overwritten outcomes.
2977 *
2978 * @param ex the exception
2979 */
2980 public void obtrudeException(Throwable ex) {
2981 if (ex == null) throw new NullPointerException();
2982 result = new AltResult(ex);
2983 postComplete();
2984 }
2985
2986 /**
2987 * Returns the estimated number of CompletableFutures whose
2988 * completions are awaiting completion of this CompletableFuture.
2989 * This method is designed for use in monitoring system state, not
2990 * for synchronization control.
2991 *
2992 * @return the number of dependent CompletableFutures
2993 */
2994 public int getNumberOfDependents() {
2995 int count = 0;
2996 for (CompletionNode p = completions; p != null; p = p.next)
2997 ++count;
2998 return count;
2999 }
3000
3001 /**
3002 * Returns a string identifying this CompletableFuture, as well as
3003 * its completion state. The state, in brackets, contains the
3004 * String {@code "Completed Normally"} or the String {@code
3005 * "Completed Exceptionally"}, or the String {@code "Not
3006 * completed"} followed by the number of CompletableFutures
3007 * dependent upon its completion, if any.
3008 *
3009 * @return a string identifying this CompletableFuture, as well as its state
3010 */
3011 public String toString() {
3012 Object r = result;
3013 int count;
3014 return super.toString() +
3015 ((r == null) ?
3016 (((count = getNumberOfDependents()) == 0) ?
3017 "[Not completed]" :
3018 "[Not completed, " + count + " dependents]") :
3019 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3020 "[Completed exceptionally]" :
3021 "[Completed normally]"));
3022 }
3023
3024 // Unsafe mechanics
3025 private static final sun.misc.Unsafe UNSAFE;
3026 private static final long RESULT;
3027 private static final long WAITERS;
3028 private static final long COMPLETIONS;
3029 static {
3030 try {
3031 UNSAFE = sun.misc.Unsafe.getUnsafe();
3032 Class<?> k = CompletableFuture.class;
3033 RESULT = UNSAFE.objectFieldOffset
3034 (k.getDeclaredField("result"));
3035 WAITERS = UNSAFE.objectFieldOffset
3036 (k.getDeclaredField("waiters"));
3037 COMPLETIONS = UNSAFE.objectFieldOffset
3038 (k.getDeclaredField("completions"));
3039 } catch (Exception e) {
3040 throw new Error(e);
3041 }
3042 }
3043 }