ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.98
Committed: Tue Nov 26 18:36:16 2013 UTC (10 years, 6 months ago) by dl
Branch: MAIN
Changes since 1.97: +1 -1 lines
Log Message:
Fix bad conditional

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.locks.LockSupport;
26
27 /**
28 * A {@link Future} that may be explicitly completed (setting its
29 * value and status), and may be used as a {@link CompletionStage},
30 * supporting dependent functions and actions that trigger upon its
31 * completion.
32 *
33 * <p>When two or more threads attempt to
34 * {@link #complete complete},
35 * {@link #completeExceptionally completeExceptionally}, or
36 * {@link #cancel cancel}
37 * a CompletableFuture, only one of them succeeds.
38 *
39 * <p>In addition to these and related methods for directly
40 * manipulating status and results, CompletableFuture implements
41 * interface {@link CompletionStage} with the following policies: <ul>
42 *
43 * <li>Actions supplied for dependent completions of
44 * <em>non-async</em> methods may be performed by the thread that
45 * completes the current CompletableFuture, or by any other caller of
46 * a completion method.</li>
47 *
48 * <li>All <em>async</em> methods without an explicit Executor
49 * argument are performed using the {@link ForkJoinPool#commonPool()}
50 * (unless it does not support a parallelism level of at least two, in
51 * which case, a new Thread is used). To simplify monitoring,
52 * debugging, and tracking, all generated asynchronous tasks are
53 * instances of the marker interface {@link
54 * AsynchronousCompletionTask}. </li>
55 *
56 * <li>All CompletionStage methods are implemented independently of
57 * other public methods, so the behavior of one method is not impacted
58 * by overrides of others in subclasses. </li> </ul>
59 *
60 * <p>CompletableFuture also implements {@link Future} with the following
61 * policies: <ul>
62 *
63 * <li>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional
66 * completion. Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}. Method
68 * {@link #isCompletedExceptionally} can be used to determine if a
69 * CompletableFuture completed in any exceptional fashion.</li>
70 *
71 * <li>In case of exceptional completion with a CompletionException,
72 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
73 * {@link ExecutionException} with the same cause as held in the
74 * corresponding CompletionException. To simplify usage in most
75 * contexts, this class also defines methods {@link #join()} and
76 * {@link #getNow} that instead throw the CompletionException directly
77 * in these cases.</li> </ul>
78 *
79 * @author Doug Lea
80 * @since 1.8
81 */
82 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
83
84 /*
85 * Overview:
86 *
87 * 1. Non-nullness of field result (set via CAS) indicates done.
88 * An AltResult is used to box null as a result, as well as to
89 * hold exceptions. Using a single field makes completion fast
90 * and simple to detect and trigger, at the expense of a lot of
91 * encoding and decoding that infiltrates many methods. One minor
92 * simplification relies on the (static) NIL (to box null results)
93 * being the only AltResult with a null exception field, so we
94 * don't usually need explicit comparisons with NIL. The CF
95 * exception propagation mechanics surrounding decoding rely on
96 * unchecked casts of decoded results really being unchecked,
97 * where user type errors are caught at point of use, as is
98 * currently the case in Java. These are highlighted by using
99 * SuppressWarnings-annotated temporaries.
100 *
101 * 2. Waiters are held in a Treiber stack similar to the one used
102 * in FutureTask, Phaser, and SynchronousQueue. See their
103 * internal documentation for algorithmic details.
104 *
105 * 3. Completions are also kept in a list/stack, and pulled off
106 * and run when completion is triggered. (We could even use the
107 * same stack as for waiters, but would give up the potential
108 * parallelism obtained because woken waiters help release/run
109 * others -- see method postComplete). Because post-processing
110 * may race with direct calls, class Completion opportunistically
111 * extends AtomicInteger so callers can claim the action via
112 * compareAndSet(0, 1). The Completion.run methods are all
113 * written a boringly similar uniform way (that sometimes includes
114 * unnecessary-looking checks, kept to maintain uniformity).
115 * There are enough dimensions upon which they differ that
116 * attempts to factor commonalities while maintaining efficiency
117 * require more lines of code than they would save.
118 *
119 * 4. The exported then/and/or methods do support a bit of
120 * factoring (see doThenApply etc). They must cope with the
121 * intrinsic races surrounding addition of a dependent action
122 * versus performing the action directly because the task is
123 * already complete. For example, a CF may not be complete upon
124 * entry, so a dependent completion is added, but by the time it
125 * is added, the target CF is complete, so must be directly
126 * executed. This is all done while avoiding unnecessary object
127 * construction in safe-bypass cases.
128 */
129
130 // preliminaries
131
132 static final class AltResult {
133 final Throwable ex; // null only for NIL
134 AltResult(Throwable ex) { this.ex = ex; }
135 }
136
137 static final AltResult NIL = new AltResult(null);
138
139 // Fields
140
141 volatile Object result; // Either the result or boxed AltResult
142 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
143 volatile CompletionNode completions; // list (Treiber stack) of completions
144
145 // Basic utilities for triggering and processing completions
146
147 /**
148 * Removes and signals all waiting threads and runs all completions.
149 */
150 final void postComplete() {
151 WaitNode q; Thread t;
152 while ((q = waiters) != null) {
153 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
154 (t = q.thread) != null) {
155 q.thread = null;
156 LockSupport.unpark(t);
157 }
158 }
159
160 CompletionNode h; Completion c;
161 while ((h = completions) != null) {
162 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
163 (c = h.completion) != null)
164 c.run();
165 }
166 }
167
168 /**
169 * Triggers completion with the encoding of the given arguments:
170 * if the exception is non-null, encodes it as a wrapped
171 * CompletionException unless it is one already. Otherwise uses
172 * the given result, boxed as NIL if null.
173 */
174 final void internalComplete(T v, Throwable ex) {
175 if (result == null)
176 UNSAFE.compareAndSwapObject
177 (this, RESULT, null,
178 (ex == null) ? (v == null) ? NIL : v :
179 new AltResult((ex instanceof CompletionException) ? ex :
180 new CompletionException(ex)));
181 postComplete(); // help out even if not triggered
182 }
183
184 /**
185 * If triggered, helps release and/or process completions.
186 */
187 final void helpPostComplete() {
188 if (result != null)
189 postComplete();
190 }
191
192 /* ------------- waiting for completions -------------- */
193
194 /** Number of processors, for spin control */
195 static final int NCPU = Runtime.getRuntime().availableProcessors();
196
197 /**
198 * Heuristic spin value for waitingGet() before blocking on
199 * multiprocessors
200 */
201 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
202
203 /**
204 * Linked nodes to record waiting threads in a Treiber stack. See
205 * other classes such as Phaser and SynchronousQueue for more
206 * detailed explanation. This class implements ManagedBlocker to
207 * avoid starvation when blocking actions pile up in
208 * ForkJoinPools.
209 */
210 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
211 long nanos; // wait time if timed
212 final long deadline; // non-zero if timed
213 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
214 volatile Thread thread;
215 volatile WaitNode next;
216 WaitNode(boolean interruptible, long nanos, long deadline) {
217 this.thread = Thread.currentThread();
218 this.interruptControl = interruptible ? 1 : 0;
219 this.nanos = nanos;
220 this.deadline = deadline;
221 }
222 public boolean isReleasable() {
223 if (thread == null)
224 return true;
225 if (Thread.interrupted()) {
226 int i = interruptControl;
227 interruptControl = -1;
228 if (i > 0)
229 return true;
230 }
231 if (deadline != 0L &&
232 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
233 thread = null;
234 return true;
235 }
236 return false;
237 }
238 public boolean block() {
239 if (isReleasable())
240 return true;
241 else if (deadline == 0L)
242 LockSupport.park(this);
243 else if (nanos > 0L)
244 LockSupport.parkNanos(this, nanos);
245 return isReleasable();
246 }
247 }
248
249 /**
250 * Returns raw result after waiting, or null if interruptible and
251 * interrupted.
252 */
253 private Object waitingGet(boolean interruptible) {
254 WaitNode q = null;
255 boolean queued = false;
256 int spins = SPINS;
257 for (Object r;;) {
258 if ((r = result) != null) {
259 if (q != null) { // suppress unpark
260 q.thread = null;
261 if (q.interruptControl < 0) {
262 if (interruptible) {
263 removeWaiter(q);
264 return null;
265 }
266 Thread.currentThread().interrupt();
267 }
268 }
269 postComplete(); // help release others
270 return r;
271 }
272 else if (spins > 0) {
273 int rnd = ThreadLocalRandom.nextSecondarySeed();
274 if (rnd == 0)
275 rnd = ThreadLocalRandom.current().nextInt();
276 if (rnd >= 0)
277 --spins;
278 }
279 else if (q == null)
280 q = new WaitNode(interruptible, 0L, 0L);
281 else if (!queued)
282 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
283 q.next = waiters, q);
284 else if (interruptible && q.interruptControl < 0) {
285 removeWaiter(q);
286 return null;
287 }
288 else if (q.thread != null && result == null) {
289 try {
290 ForkJoinPool.managedBlock(q);
291 } catch (InterruptedException ex) {
292 q.interruptControl = -1;
293 }
294 }
295 }
296 }
297
298 /**
299 * Awaits completion or aborts on interrupt or timeout.
300 *
301 * @param nanos time to wait
302 * @return raw result
303 */
304 private Object timedAwaitDone(long nanos)
305 throws InterruptedException, TimeoutException {
306 WaitNode q = null;
307 boolean queued = false;
308 for (Object r;;) {
309 if ((r = result) != null) {
310 if (q != null) {
311 q.thread = null;
312 if (q.interruptControl < 0) {
313 removeWaiter(q);
314 throw new InterruptedException();
315 }
316 }
317 postComplete();
318 return r;
319 }
320 else if (q == null) {
321 if (nanos <= 0L)
322 throw new TimeoutException();
323 long d = System.nanoTime() + nanos;
324 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
325 }
326 else if (!queued)
327 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
328 q.next = waiters, q);
329 else if (q.interruptControl < 0) {
330 removeWaiter(q);
331 throw new InterruptedException();
332 }
333 else if (q.nanos <= 0L) {
334 if (result == null) {
335 removeWaiter(q);
336 throw new TimeoutException();
337 }
338 }
339 else if (q.thread != null && result == null) {
340 try {
341 ForkJoinPool.managedBlock(q);
342 } catch (InterruptedException ex) {
343 q.interruptControl = -1;
344 }
345 }
346 }
347 }
348
349 /**
350 * Tries to unlink a timed-out or interrupted wait node to avoid
351 * accumulating garbage. Internal nodes are simply unspliced
352 * without CAS since it is harmless if they are traversed anyway
353 * by releasers. To avoid effects of unsplicing from already
354 * removed nodes, the list is retraversed in case of an apparent
355 * race. This is slow when there are a lot of nodes, but we don't
356 * expect lists to be long enough to outweigh higher-overhead
357 * schemes.
358 */
359 private void removeWaiter(WaitNode node) {
360 if (node != null) {
361 node.thread = null;
362 retry:
363 for (;;) { // restart on removeWaiter race
364 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
365 s = q.next;
366 if (q.thread != null)
367 pred = q;
368 else if (pred != null) {
369 pred.next = s;
370 if (pred.thread == null) // check for race
371 continue retry;
372 }
373 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
374 continue retry;
375 }
376 break;
377 }
378 }
379 }
380
381 /* ------------- Async tasks -------------- */
382
383 /**
384 * A marker interface identifying asynchronous tasks produced by
385 * {@code async} methods. This may be useful for monitoring,
386 * debugging, and tracking asynchronous activities.
387 *
388 * @since 1.8
389 */
390 public static interface AsynchronousCompletionTask {
391 }
392
393 /** Base class can act as either FJ or plain Runnable */
394 @SuppressWarnings("serial")
395 abstract static class Async extends ForkJoinTask<Void>
396 implements Runnable, AsynchronousCompletionTask {
397 public final Void getRawResult() { return null; }
398 public final void setRawResult(Void v) { }
399 public final void run() { exec(); }
400 }
401
402 /**
403 * Starts the given async task using the given executor, unless
404 * the executor is ForkJoinPool.commonPool and it has been
405 * disabled, in which case starts a new thread.
406 */
407 static void execAsync(Executor e, Async r) {
408 if (e == ForkJoinPool.commonPool() &&
409 ForkJoinPool.getCommonPoolParallelism() <= 1)
410 new Thread(r).start();
411 else
412 e.execute(r);
413 }
414
415 static final class AsyncRun extends Async {
416 final Runnable fn;
417 final CompletableFuture<Void> dst;
418 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
419 this.fn = fn; this.dst = dst;
420 }
421 public final boolean exec() {
422 CompletableFuture<Void> d; Throwable ex;
423 if ((d = this.dst) != null && d.result == null) {
424 try {
425 fn.run();
426 ex = null;
427 } catch (Throwable rex) {
428 ex = rex;
429 }
430 d.internalComplete(null, ex);
431 }
432 return true;
433 }
434 private static final long serialVersionUID = 5232453952276885070L;
435 }
436
437 static final class AsyncSupply<U> extends Async {
438 final Supplier<U> fn;
439 final CompletableFuture<U> dst;
440 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
441 this.fn = fn; this.dst = dst;
442 }
443 public final boolean exec() {
444 CompletableFuture<U> d; U u; Throwable ex;
445 if ((d = this.dst) != null && d.result == null) {
446 try {
447 u = fn.get();
448 ex = null;
449 } catch (Throwable rex) {
450 ex = rex;
451 u = null;
452 }
453 d.internalComplete(u, ex);
454 }
455 return true;
456 }
457 private static final long serialVersionUID = 5232453952276885070L;
458 }
459
460 static final class AsyncApply<T,U> extends Async {
461 final T arg;
462 final Function<? super T,? extends U> fn;
463 final CompletableFuture<U> dst;
464 AsyncApply(T arg, Function<? super T,? extends U> fn,
465 CompletableFuture<U> dst) {
466 this.arg = arg; this.fn = fn; this.dst = dst;
467 }
468 public final boolean exec() {
469 CompletableFuture<U> d; U u; Throwable ex;
470 if ((d = this.dst) != null && d.result == null) {
471 try {
472 u = fn.apply(arg);
473 ex = null;
474 } catch (Throwable rex) {
475 ex = rex;
476 u = null;
477 }
478 d.internalComplete(u, ex);
479 }
480 return true;
481 }
482 private static final long serialVersionUID = 5232453952276885070L;
483 }
484
485 static final class AsyncCombine<T,U,V> extends Async {
486 final T arg1;
487 final U arg2;
488 final BiFunction<? super T,? super U,? extends V> fn;
489 final CompletableFuture<V> dst;
490 AsyncCombine(T arg1, U arg2,
491 BiFunction<? super T,? super U,? extends V> fn,
492 CompletableFuture<V> dst) {
493 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
494 }
495 public final boolean exec() {
496 CompletableFuture<V> d; V v; Throwable ex;
497 if ((d = this.dst) != null && d.result == null) {
498 try {
499 v = fn.apply(arg1, arg2);
500 ex = null;
501 } catch (Throwable rex) {
502 ex = rex;
503 v = null;
504 }
505 d.internalComplete(v, ex);
506 }
507 return true;
508 }
509 private static final long serialVersionUID = 5232453952276885070L;
510 }
511
512 static final class AsyncAccept<T> extends Async {
513 final T arg;
514 final Consumer<? super T> fn;
515 final CompletableFuture<?> dst;
516 AsyncAccept(T arg, Consumer<? super T> fn,
517 CompletableFuture<?> dst) {
518 this.arg = arg; this.fn = fn; this.dst = dst;
519 }
520 public final boolean exec() {
521 CompletableFuture<?> d; Throwable ex;
522 if ((d = this.dst) != null && d.result == null) {
523 try {
524 fn.accept(arg);
525 ex = null;
526 } catch (Throwable rex) {
527 ex = rex;
528 }
529 d.internalComplete(null, ex);
530 }
531 return true;
532 }
533 private static final long serialVersionUID = 5232453952276885070L;
534 }
535
536 static final class AsyncAcceptBoth<T,U> extends Async {
537 final T arg1;
538 final U arg2;
539 final BiConsumer<? super T,? super U> fn;
540 final CompletableFuture<?> dst;
541 AsyncAcceptBoth(T arg1, U arg2,
542 BiConsumer<? super T,? super U> fn,
543 CompletableFuture<?> dst) {
544 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
545 }
546 public final boolean exec() {
547 CompletableFuture<?> d; Throwable ex;
548 if ((d = this.dst) != null && d.result == null) {
549 try {
550 fn.accept(arg1, arg2);
551 ex = null;
552 } catch (Throwable rex) {
553 ex = rex;
554 }
555 d.internalComplete(null, ex);
556 }
557 return true;
558 }
559 private static final long serialVersionUID = 5232453952276885070L;
560 }
561
562 static final class AsyncCompose<T,U> extends Async {
563 final T arg;
564 final Function<? super T, ? extends CompletionStage<U>> fn;
565 final CompletableFuture<U> dst;
566 AsyncCompose(T arg,
567 Function<? super T, ? extends CompletionStage<U>> fn,
568 CompletableFuture<U> dst) {
569 this.arg = arg; this.fn = fn; this.dst = dst;
570 }
571 public final boolean exec() {
572 CompletableFuture<U> d, fr; U u; Throwable ex;
573 if ((d = this.dst) != null && d.result == null) {
574 try {
575 CompletionStage<U> cs = fn.apply(arg);
576 fr = (cs == null) ? null : cs.toCompletableFuture();
577 ex = (fr == null) ? new NullPointerException() : null;
578 } catch (Throwable rex) {
579 ex = rex;
580 fr = null;
581 }
582 if (ex != null)
583 u = null;
584 else {
585 Object r = fr.result;
586 if (r == null)
587 r = fr.waitingGet(false);
588 if (r instanceof AltResult) {
589 ex = ((AltResult)r).ex;
590 u = null;
591 }
592 else {
593 @SuppressWarnings("unchecked") U ur = (U) r;
594 u = ur;
595 }
596 }
597 d.internalComplete(u, ex);
598 }
599 return true;
600 }
601 private static final long serialVersionUID = 5232453952276885070L;
602 }
603
604 static final class AsyncWhenComplete<T> extends Async {
605 final T arg1;
606 final Throwable arg2;
607 final BiConsumer<? super T,? super Throwable> fn;
608 final CompletableFuture<T> dst;
609 AsyncWhenComplete(T arg1, Throwable arg2,
610 BiConsumer<? super T,? super Throwable> fn,
611 CompletableFuture<T> dst) {
612 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
613 }
614 public final boolean exec() {
615 CompletableFuture<T> d;
616 if ((d = this.dst) != null && d.result == null) {
617 Throwable ex = arg2;
618 try {
619 fn.accept(arg1, ex);
620 } catch (Throwable rex) {
621 if (ex == null)
622 ex = rex;
623 }
624 d.internalComplete(arg1, ex);
625 }
626 return true;
627 }
628 private static final long serialVersionUID = 5232453952276885070L;
629 }
630
631 /* ------------- Completions -------------- */
632
633 /**
634 * Simple linked list nodes to record completions, used in
635 * basically the same way as WaitNodes. (We separate nodes from
636 * the Completions themselves mainly because for the And and Or
637 * methods, the same Completion object resides in two lists.)
638 */
639 static final class CompletionNode {
640 final Completion completion;
641 volatile CompletionNode next;
642 CompletionNode(Completion completion) { this.completion = completion; }
643 }
644
645 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
646 @SuppressWarnings("serial")
647 abstract static class Completion extends AtomicInteger implements Runnable {
648 }
649
650 static final class ThenApply<T,U> extends Completion {
651 final CompletableFuture<? extends T> src;
652 final Function<? super T,? extends U> fn;
653 final CompletableFuture<U> dst;
654 final Executor executor;
655 ThenApply(CompletableFuture<? extends T> src,
656 Function<? super T,? extends U> fn,
657 CompletableFuture<U> dst,
658 Executor executor) {
659 this.src = src; this.fn = fn; this.dst = dst;
660 this.executor = executor;
661 }
662 public final void run() {
663 final CompletableFuture<? extends T> a;
664 final Function<? super T,? extends U> fn;
665 final CompletableFuture<U> dst;
666 Object r; T t; Throwable ex;
667 if ((dst = this.dst) != null &&
668 (fn = this.fn) != null &&
669 (a = this.src) != null &&
670 (r = a.result) != null &&
671 compareAndSet(0, 1)) {
672 if (r instanceof AltResult) {
673 ex = ((AltResult)r).ex;
674 t = null;
675 }
676 else {
677 ex = null;
678 @SuppressWarnings("unchecked") T tr = (T) r;
679 t = tr;
680 }
681 Executor e = executor;
682 U u = null;
683 if (ex == null) {
684 try {
685 if (e != null)
686 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
687 else
688 u = fn.apply(t);
689 } catch (Throwable rex) {
690 ex = rex;
691 }
692 }
693 if (e == null || ex != null)
694 dst.internalComplete(u, ex);
695 }
696 }
697 private static final long serialVersionUID = 5232453952276885070L;
698 }
699
700 static final class ThenAccept<T> extends Completion {
701 final CompletableFuture<? extends T> src;
702 final Consumer<? super T> fn;
703 final CompletableFuture<?> dst;
704 final Executor executor;
705 ThenAccept(CompletableFuture<? extends T> src,
706 Consumer<? super T> fn,
707 CompletableFuture<?> dst,
708 Executor executor) {
709 this.src = src; this.fn = fn; this.dst = dst;
710 this.executor = executor;
711 }
712 public final void run() {
713 final CompletableFuture<? extends T> a;
714 final Consumer<? super T> fn;
715 final CompletableFuture<?> dst;
716 Object r; T t; Throwable ex;
717 if ((dst = this.dst) != null &&
718 (fn = this.fn) != null &&
719 (a = this.src) != null &&
720 (r = a.result) != null &&
721 compareAndSet(0, 1)) {
722 if (r instanceof AltResult) {
723 ex = ((AltResult)r).ex;
724 t = null;
725 }
726 else {
727 ex = null;
728 @SuppressWarnings("unchecked") T tr = (T) r;
729 t = tr;
730 }
731 Executor e = executor;
732 if (ex == null) {
733 try {
734 if (e != null)
735 execAsync(e, new AsyncAccept<T>(t, fn, dst));
736 else
737 fn.accept(t);
738 } catch (Throwable rex) {
739 ex = rex;
740 }
741 }
742 if (e == null || ex != null)
743 dst.internalComplete(null, ex);
744 }
745 }
746 private static final long serialVersionUID = 5232453952276885070L;
747 }
748
749 static final class ThenRun extends Completion {
750 final CompletableFuture<?> src;
751 final Runnable fn;
752 final CompletableFuture<Void> dst;
753 final Executor executor;
754 ThenRun(CompletableFuture<?> src,
755 Runnable fn,
756 CompletableFuture<Void> dst,
757 Executor executor) {
758 this.src = src; this.fn = fn; this.dst = dst;
759 this.executor = executor;
760 }
761 public final void run() {
762 final CompletableFuture<?> a;
763 final Runnable fn;
764 final CompletableFuture<Void> dst;
765 Object r; Throwable ex;
766 if ((dst = this.dst) != null &&
767 (fn = this.fn) != null &&
768 (a = this.src) != null &&
769 (r = a.result) != null &&
770 compareAndSet(0, 1)) {
771 if (r instanceof AltResult)
772 ex = ((AltResult)r).ex;
773 else
774 ex = null;
775 Executor e = executor;
776 if (ex == null) {
777 try {
778 if (e != null)
779 execAsync(e, new AsyncRun(fn, dst));
780 else
781 fn.run();
782 } catch (Throwable rex) {
783 ex = rex;
784 }
785 }
786 if (e == null || ex != null)
787 dst.internalComplete(null, ex);
788 }
789 }
790 private static final long serialVersionUID = 5232453952276885070L;
791 }
792
793 static final class ThenCombine<T,U,V> extends Completion {
794 final CompletableFuture<? extends T> src;
795 final CompletableFuture<? extends U> snd;
796 final BiFunction<? super T,? super U,? extends V> fn;
797 final CompletableFuture<V> dst;
798 final Executor executor;
799 ThenCombine(CompletableFuture<? extends T> src,
800 CompletableFuture<? extends U> snd,
801 BiFunction<? super T,? super U,? extends V> fn,
802 CompletableFuture<V> dst,
803 Executor executor) {
804 this.src = src; this.snd = snd;
805 this.fn = fn; this.dst = dst;
806 this.executor = executor;
807 }
808 public final void run() {
809 final CompletableFuture<? extends T> a;
810 final CompletableFuture<? extends U> b;
811 final BiFunction<? super T,? super U,? extends V> fn;
812 final CompletableFuture<V> dst;
813 Object r, s; T t; U u; Throwable ex;
814 if ((dst = this.dst) != null &&
815 (fn = this.fn) != null &&
816 (a = this.src) != null &&
817 (r = a.result) != null &&
818 (b = this.snd) != null &&
819 (s = b.result) != null &&
820 compareAndSet(0, 1)) {
821 if (r instanceof AltResult) {
822 ex = ((AltResult)r).ex;
823 t = null;
824 }
825 else {
826 ex = null;
827 @SuppressWarnings("unchecked") T tr = (T) r;
828 t = tr;
829 }
830 if (ex != null)
831 u = null;
832 else if (s instanceof AltResult) {
833 ex = ((AltResult)s).ex;
834 u = null;
835 }
836 else {
837 @SuppressWarnings("unchecked") U us = (U) s;
838 u = us;
839 }
840 Executor e = executor;
841 V v = null;
842 if (ex == null) {
843 try {
844 if (e != null)
845 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
846 else
847 v = fn.apply(t, u);
848 } catch (Throwable rex) {
849 ex = rex;
850 }
851 }
852 if (e == null || ex != null)
853 dst.internalComplete(v, ex);
854 }
855 }
856 private static final long serialVersionUID = 5232453952276885070L;
857 }
858
859 static final class ThenAcceptBoth<T,U> extends Completion {
860 final CompletableFuture<? extends T> src;
861 final CompletableFuture<? extends U> snd;
862 final BiConsumer<? super T,? super U> fn;
863 final CompletableFuture<Void> dst;
864 final Executor executor;
865 ThenAcceptBoth(CompletableFuture<? extends T> src,
866 CompletableFuture<? extends U> snd,
867 BiConsumer<? super T,? super U> fn,
868 CompletableFuture<Void> dst,
869 Executor executor) {
870 this.src = src; this.snd = snd;
871 this.fn = fn; this.dst = dst;
872 this.executor = executor;
873 }
874 public final void run() {
875 final CompletableFuture<? extends T> a;
876 final CompletableFuture<? extends U> b;
877 final BiConsumer<? super T,? super U> fn;
878 final CompletableFuture<Void> dst;
879 Object r, s; T t; U u; Throwable ex;
880 if ((dst = this.dst) != null &&
881 (fn = this.fn) != null &&
882 (a = this.src) != null &&
883 (r = a.result) != null &&
884 (b = this.snd) != null &&
885 (s = b.result) != null &&
886 compareAndSet(0, 1)) {
887 if (r instanceof AltResult) {
888 ex = ((AltResult)r).ex;
889 t = null;
890 }
891 else {
892 ex = null;
893 @SuppressWarnings("unchecked") T tr = (T) r;
894 t = tr;
895 }
896 if (ex != null)
897 u = null;
898 else if (s instanceof AltResult) {
899 ex = ((AltResult)s).ex;
900 u = null;
901 }
902 else {
903 @SuppressWarnings("unchecked") U us = (U) s;
904 u = us;
905 }
906 Executor e = executor;
907 if (ex == null) {
908 try {
909 if (e != null)
910 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
911 else
912 fn.accept(t, u);
913 } catch (Throwable rex) {
914 ex = rex;
915 }
916 }
917 if (e == null || ex != null)
918 dst.internalComplete(null, ex);
919 }
920 }
921 private static final long serialVersionUID = 5232453952276885070L;
922 }
923
924 static final class RunAfterBoth extends Completion {
925 final CompletableFuture<?> src;
926 final CompletableFuture<?> snd;
927 final Runnable fn;
928 final CompletableFuture<Void> dst;
929 final Executor executor;
930 RunAfterBoth(CompletableFuture<?> src,
931 CompletableFuture<?> snd,
932 Runnable fn,
933 CompletableFuture<Void> dst,
934 Executor executor) {
935 this.src = src; this.snd = snd;
936 this.fn = fn; this.dst = dst;
937 this.executor = executor;
938 }
939 public final void run() {
940 final CompletableFuture<?> a;
941 final CompletableFuture<?> b;
942 final Runnable fn;
943 final CompletableFuture<Void> dst;
944 Object r, s; Throwable ex;
945 if ((dst = this.dst) != null &&
946 (fn = this.fn) != null &&
947 (a = this.src) != null &&
948 (r = a.result) != null &&
949 (b = this.snd) != null &&
950 (s = b.result) != null &&
951 compareAndSet(0, 1)) {
952 if (r instanceof AltResult)
953 ex = ((AltResult)r).ex;
954 else
955 ex = null;
956 if (ex == null && (s instanceof AltResult))
957 ex = ((AltResult)s).ex;
958 Executor e = executor;
959 if (ex == null) {
960 try {
961 if (e != null)
962 execAsync(e, new AsyncRun(fn, dst));
963 else
964 fn.run();
965 } catch (Throwable rex) {
966 ex = rex;
967 }
968 }
969 if (e == null || ex != null)
970 dst.internalComplete(null, ex);
971 }
972 }
973 private static final long serialVersionUID = 5232453952276885070L;
974 }
975
976 static final class AndCompletion extends Completion {
977 final CompletableFuture<?> src;
978 final CompletableFuture<?> snd;
979 final CompletableFuture<Void> dst;
980 AndCompletion(CompletableFuture<?> src,
981 CompletableFuture<?> snd,
982 CompletableFuture<Void> dst) {
983 this.src = src; this.snd = snd; this.dst = dst;
984 }
985 public final void run() {
986 final CompletableFuture<?> a;
987 final CompletableFuture<?> b;
988 final CompletableFuture<Void> dst;
989 Object r, s; Throwable ex;
990 if ((dst = this.dst) != null &&
991 (a = this.src) != null &&
992 (r = a.result) != null &&
993 (b = this.snd) != null &&
994 (s = b.result) != null &&
995 compareAndSet(0, 1)) {
996 if (r instanceof AltResult)
997 ex = ((AltResult)r).ex;
998 else
999 ex = null;
1000 if (ex == null && (s instanceof AltResult))
1001 ex = ((AltResult)s).ex;
1002 dst.internalComplete(null, ex);
1003 }
1004 }
1005 private static final long serialVersionUID = 5232453952276885070L;
1006 }
1007
1008 static final class ApplyToEither<T,U> extends Completion {
1009 final CompletableFuture<? extends T> src;
1010 final CompletableFuture<? extends T> snd;
1011 final Function<? super T,? extends U> fn;
1012 final CompletableFuture<U> dst;
1013 final Executor executor;
1014 ApplyToEither(CompletableFuture<? extends T> src,
1015 CompletableFuture<? extends T> snd,
1016 Function<? super T,? extends U> fn,
1017 CompletableFuture<U> dst,
1018 Executor executor) {
1019 this.src = src; this.snd = snd;
1020 this.fn = fn; this.dst = dst;
1021 this.executor = executor;
1022 }
1023 public final void run() {
1024 final CompletableFuture<? extends T> a;
1025 final CompletableFuture<? extends T> b;
1026 final Function<? super T,? extends U> fn;
1027 final CompletableFuture<U> dst;
1028 Object r; T t; Throwable ex;
1029 if ((dst = this.dst) != null &&
1030 (fn = this.fn) != null &&
1031 (((a = this.src) != null && (r = a.result) != null) ||
1032 ((b = this.snd) != null && (r = b.result) != null)) &&
1033 compareAndSet(0, 1)) {
1034 if (r instanceof AltResult) {
1035 ex = ((AltResult)r).ex;
1036 t = null;
1037 }
1038 else {
1039 ex = null;
1040 @SuppressWarnings("unchecked") T tr = (T) r;
1041 t = tr;
1042 }
1043 Executor e = executor;
1044 U u = null;
1045 if (ex == null) {
1046 try {
1047 if (e != null)
1048 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1049 else
1050 u = fn.apply(t);
1051 } catch (Throwable rex) {
1052 ex = rex;
1053 }
1054 }
1055 if (e == null || ex != null)
1056 dst.internalComplete(u, ex);
1057 }
1058 }
1059 private static final long serialVersionUID = 5232453952276885070L;
1060 }
1061
1062 static final class AcceptEither<T> extends Completion {
1063 final CompletableFuture<? extends T> src;
1064 final CompletableFuture<? extends T> snd;
1065 final Consumer<? super T> fn;
1066 final CompletableFuture<Void> dst;
1067 final Executor executor;
1068 AcceptEither(CompletableFuture<? extends T> src,
1069 CompletableFuture<? extends T> snd,
1070 Consumer<? super T> fn,
1071 CompletableFuture<Void> dst,
1072 Executor executor) {
1073 this.src = src; this.snd = snd;
1074 this.fn = fn; this.dst = dst;
1075 this.executor = executor;
1076 }
1077 public final void run() {
1078 final CompletableFuture<? extends T> a;
1079 final CompletableFuture<? extends T> b;
1080 final Consumer<? super T> fn;
1081 final CompletableFuture<Void> dst;
1082 Object r; T t; Throwable ex;
1083 if ((dst = this.dst) != null &&
1084 (fn = this.fn) != null &&
1085 (((a = this.src) != null && (r = a.result) != null) ||
1086 ((b = this.snd) != null && (r = b.result) != null)) &&
1087 compareAndSet(0, 1)) {
1088 if (r instanceof AltResult) {
1089 ex = ((AltResult)r).ex;
1090 t = null;
1091 }
1092 else {
1093 ex = null;
1094 @SuppressWarnings("unchecked") T tr = (T) r;
1095 t = tr;
1096 }
1097 Executor e = executor;
1098 if (ex == null) {
1099 try {
1100 if (e != null)
1101 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1102 else
1103 fn.accept(t);
1104 } catch (Throwable rex) {
1105 ex = rex;
1106 }
1107 }
1108 if (e == null || ex != null)
1109 dst.internalComplete(null, ex);
1110 }
1111 }
1112 private static final long serialVersionUID = 5232453952276885070L;
1113 }
1114
1115 static final class RunAfterEither extends Completion {
1116 final CompletableFuture<?> src;
1117 final CompletableFuture<?> snd;
1118 final Runnable fn;
1119 final CompletableFuture<Void> dst;
1120 final Executor executor;
1121 RunAfterEither(CompletableFuture<?> src,
1122 CompletableFuture<?> snd,
1123 Runnable fn,
1124 CompletableFuture<Void> dst,
1125 Executor executor) {
1126 this.src = src; this.snd = snd;
1127 this.fn = fn; this.dst = dst;
1128 this.executor = executor;
1129 }
1130 public final void run() {
1131 final CompletableFuture<?> a;
1132 final CompletableFuture<?> b;
1133 final Runnable fn;
1134 final CompletableFuture<Void> dst;
1135 Object r; Throwable ex;
1136 if ((dst = this.dst) != null &&
1137 (fn = this.fn) != null &&
1138 (((a = this.src) != null && (r = a.result) != null) ||
1139 ((b = this.snd) != null && (r = b.result) != null)) &&
1140 compareAndSet(0, 1)) {
1141 if (r instanceof AltResult)
1142 ex = ((AltResult)r).ex;
1143 else
1144 ex = null;
1145 Executor e = executor;
1146 if (ex == null) {
1147 try {
1148 if (e != null)
1149 execAsync(e, new AsyncRun(fn, dst));
1150 else
1151 fn.run();
1152 } catch (Throwable rex) {
1153 ex = rex;
1154 }
1155 }
1156 if (e == null || ex != null)
1157 dst.internalComplete(null, ex);
1158 }
1159 }
1160 private static final long serialVersionUID = 5232453952276885070L;
1161 }
1162
1163 static final class OrCompletion extends Completion {
1164 final CompletableFuture<?> src;
1165 final CompletableFuture<?> snd;
1166 final CompletableFuture<Object> dst;
1167 OrCompletion(CompletableFuture<?> src,
1168 CompletableFuture<?> snd,
1169 CompletableFuture<Object> dst) {
1170 this.src = src; this.snd = snd; this.dst = dst;
1171 }
1172 public final void run() {
1173 final CompletableFuture<?> a;
1174 final CompletableFuture<?> b;
1175 final CompletableFuture<Object> dst;
1176 Object r, t; Throwable ex;
1177 if ((dst = this.dst) != null &&
1178 (((a = this.src) != null && (r = a.result) != null) ||
1179 ((b = this.snd) != null && (r = b.result) != null)) &&
1180 compareAndSet(0, 1)) {
1181 if (r instanceof AltResult) {
1182 ex = ((AltResult)r).ex;
1183 t = null;
1184 }
1185 else {
1186 ex = null;
1187 t = r;
1188 }
1189 dst.internalComplete(t, ex);
1190 }
1191 }
1192 private static final long serialVersionUID = 5232453952276885070L;
1193 }
1194
1195 static final class ExceptionCompletion<T> extends Completion {
1196 final CompletableFuture<? extends T> src;
1197 final Function<? super Throwable, ? extends T> fn;
1198 final CompletableFuture<T> dst;
1199 ExceptionCompletion(CompletableFuture<? extends T> src,
1200 Function<? super Throwable, ? extends T> fn,
1201 CompletableFuture<T> dst) {
1202 this.src = src; this.fn = fn; this.dst = dst;
1203 }
1204 public final void run() {
1205 final CompletableFuture<? extends T> a;
1206 final Function<? super Throwable, ? extends T> fn;
1207 final CompletableFuture<T> dst;
1208 Object r; T t = null; Throwable ex, dx = null;
1209 if ((dst = this.dst) != null &&
1210 (fn = this.fn) != null &&
1211 (a = this.src) != null &&
1212 (r = a.result) != null &&
1213 compareAndSet(0, 1)) {
1214 if ((r instanceof AltResult) &&
1215 (ex = ((AltResult)r).ex) != null) {
1216 try {
1217 t = fn.apply(ex);
1218 } catch (Throwable rex) {
1219 dx = rex;
1220 }
1221 }
1222 else {
1223 @SuppressWarnings("unchecked") T tr = (T) r;
1224 t = tr;
1225 }
1226 dst.internalComplete(t, dx);
1227 }
1228 }
1229 private static final long serialVersionUID = 5232453952276885070L;
1230 }
1231
1232 static final class WhenCompleteCompletion<T> extends Completion {
1233 final CompletableFuture<? extends T> src;
1234 final BiConsumer<? super T, ? super Throwable> fn;
1235 final CompletableFuture<T> dst;
1236 final Executor executor;
1237 WhenCompleteCompletion(CompletableFuture<? extends T> src,
1238 BiConsumer<? super T, ? super Throwable> fn,
1239 CompletableFuture<T> dst,
1240 Executor executor) {
1241 this.src = src; this.fn = fn; this.dst = dst;
1242 this.executor = executor;
1243 }
1244 public final void run() {
1245 final CompletableFuture<? extends T> a;
1246 final BiConsumer<? super T, ? super Throwable> fn;
1247 final CompletableFuture<T> dst;
1248 Object r; T t; Throwable ex;
1249 if ((dst = this.dst) != null &&
1250 (fn = this.fn) != null &&
1251 (a = this.src) != null &&
1252 (r = a.result) != null &&
1253 compareAndSet(0, 1)) {
1254 if (r instanceof AltResult) {
1255 ex = ((AltResult)r).ex;
1256 t = null;
1257 }
1258 else {
1259 ex = null;
1260 @SuppressWarnings("unchecked") T tr = (T) r;
1261 t = tr;
1262 }
1263 Executor e = executor;
1264 Throwable dx = null;
1265 try {
1266 if (e != null)
1267 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
1268 else
1269 fn.accept(t, ex);
1270 } catch (Throwable rex) {
1271 dx = rex;
1272 }
1273 if (e == null || dx != null)
1274 dst.internalComplete(t, ex != null ? ex : dx);
1275 }
1276 }
1277 private static final long serialVersionUID = 5232453952276885070L;
1278 }
1279
1280 static final class ThenCopy<T> extends Completion {
1281 final CompletableFuture<?> src;
1282 final CompletableFuture<T> dst;
1283 ThenCopy(CompletableFuture<?> src,
1284 CompletableFuture<T> dst) {
1285 this.src = src; this.dst = dst;
1286 }
1287 public final void run() {
1288 final CompletableFuture<?> a;
1289 final CompletableFuture<T> dst;
1290 Object r; T t; Throwable ex;
1291 if ((dst = this.dst) != null &&
1292 (a = this.src) != null &&
1293 (r = a.result) != null &&
1294 compareAndSet(0, 1)) {
1295 if (r instanceof AltResult) {
1296 ex = ((AltResult)r).ex;
1297 t = null;
1298 }
1299 else {
1300 ex = null;
1301 @SuppressWarnings("unchecked") T tr = (T) r;
1302 t = tr;
1303 }
1304 dst.internalComplete(t, ex);
1305 }
1306 }
1307 private static final long serialVersionUID = 5232453952276885070L;
1308 }
1309
1310 // version of ThenCopy for CompletableFuture<Void> dst
1311 static final class ThenPropagate extends Completion {
1312 final CompletableFuture<?> src;
1313 final CompletableFuture<Void> dst;
1314 ThenPropagate(CompletableFuture<?> src,
1315 CompletableFuture<Void> dst) {
1316 this.src = src; this.dst = dst;
1317 }
1318 public final void run() {
1319 final CompletableFuture<?> a;
1320 final CompletableFuture<Void> dst;
1321 Object r; Throwable ex;
1322 if ((dst = this.dst) != null &&
1323 (a = this.src) != null &&
1324 (r = a.result) != null &&
1325 compareAndSet(0, 1)) {
1326 if (r instanceof AltResult)
1327 ex = ((AltResult)r).ex;
1328 else
1329 ex = null;
1330 dst.internalComplete(null, ex);
1331 }
1332 }
1333 private static final long serialVersionUID = 5232453952276885070L;
1334 }
1335
1336 static final class HandleCompletion<T,U> extends Completion {
1337 final CompletableFuture<? extends T> src;
1338 final BiFunction<? super T, Throwable, ? extends U> fn;
1339 final CompletableFuture<U> dst;
1340 final Executor executor;
1341 HandleCompletion(CompletableFuture<? extends T> src,
1342 BiFunction<? super T, Throwable, ? extends U> fn,
1343 CompletableFuture<U> dst,
1344 Executor executor) {
1345 this.src = src; this.fn = fn; this.dst = dst;
1346 this.executor = executor;
1347 }
1348 public final void run() {
1349 final CompletableFuture<? extends T> a;
1350 final BiFunction<? super T, Throwable, ? extends U> fn;
1351 final CompletableFuture<U> dst;
1352 Object r; T t; Throwable ex;
1353 if ((dst = this.dst) != null &&
1354 (fn = this.fn) != null &&
1355 (a = this.src) != null &&
1356 (r = a.result) != null &&
1357 compareAndSet(0, 1)) {
1358 if (r instanceof AltResult) {
1359 ex = ((AltResult)r).ex;
1360 t = null;
1361 }
1362 else {
1363 ex = null;
1364 @SuppressWarnings("unchecked") T tr = (T) r;
1365 t = tr;
1366 }
1367 Executor e = executor;
1368 U u = null;
1369 Throwable dx = null;
1370 try {
1371 if (e != null)
1372 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1373 else
1374 u = fn.apply(t, ex);
1375 } catch (Throwable rex) {
1376 dx = rex;
1377 }
1378 if (e == null || dx != null)
1379 dst.internalComplete(u, dx);
1380 }
1381 }
1382 private static final long serialVersionUID = 5232453952276885070L;
1383 }
1384
1385 static final class ThenCompose<T,U> extends Completion {
1386 final CompletableFuture<? extends T> src;
1387 final Function<? super T, ? extends CompletionStage<U>> fn;
1388 final CompletableFuture<U> dst;
1389 final Executor executor;
1390 ThenCompose(CompletableFuture<? extends T> src,
1391 Function<? super T, ? extends CompletionStage<U>> fn,
1392 CompletableFuture<U> dst,
1393 Executor executor) {
1394 this.src = src; this.fn = fn; this.dst = dst;
1395 this.executor = executor;
1396 }
1397 public final void run() {
1398 final CompletableFuture<? extends T> a;
1399 final Function<? super T, ? extends CompletionStage<U>> fn;
1400 final CompletableFuture<U> dst;
1401 Object r; T t; Throwable ex; Executor e;
1402 if ((dst = this.dst) != null &&
1403 (fn = this.fn) != null &&
1404 (a = this.src) != null &&
1405 (r = a.result) != null &&
1406 compareAndSet(0, 1)) {
1407 if (r instanceof AltResult) {
1408 ex = ((AltResult)r).ex;
1409 t = null;
1410 }
1411 else {
1412 ex = null;
1413 @SuppressWarnings("unchecked") T tr = (T) r;
1414 t = tr;
1415 }
1416 CompletableFuture<U> c = null;
1417 U u = null;
1418 boolean complete = false;
1419 if (ex == null) {
1420 if ((e = executor) != null)
1421 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1422 else {
1423 try {
1424 CompletionStage<U> cs = fn.apply(t);
1425 c = (cs == null) ? null : cs.toCompletableFuture();
1426 if (c == null)
1427 ex = new NullPointerException();
1428 } catch (Throwable rex) {
1429 ex = rex;
1430 }
1431 }
1432 }
1433 if (c != null) {
1434 ThenCopy<U> d = null;
1435 Object s;
1436 if ((s = c.result) == null) {
1437 CompletionNode p = new CompletionNode
1438 (d = new ThenCopy<U>(c, dst));
1439 while ((s = c.result) == null) {
1440 if (UNSAFE.compareAndSwapObject
1441 (c, COMPLETIONS, p.next = c.completions, p))
1442 break;
1443 }
1444 }
1445 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1446 complete = true;
1447 if (s instanceof AltResult) {
1448 ex = ((AltResult)s).ex; // no rewrap
1449 u = null;
1450 }
1451 else {
1452 @SuppressWarnings("unchecked") U us = (U) s;
1453 u = us;
1454 }
1455 }
1456 }
1457 if (complete || ex != null)
1458 dst.internalComplete(u, ex);
1459 if (c != null)
1460 c.helpPostComplete();
1461 }
1462 }
1463 private static final long serialVersionUID = 5232453952276885070L;
1464 }
1465
1466 // Implementations of stage methods with (plain, async, Executor) forms
1467
1468 private <U> CompletableFuture<U> doThenApply
1469 (Function<? super T,? extends U> fn,
1470 Executor e) {
1471 if (fn == null) throw new NullPointerException();
1472 CompletableFuture<U> dst = new CompletableFuture<U>();
1473 ThenApply<T,U> d = null;
1474 Object r;
1475 if ((r = result) == null) {
1476 CompletionNode p = new CompletionNode
1477 (d = new ThenApply<T,U>(this, fn, dst, e));
1478 while ((r = result) == null) {
1479 if (UNSAFE.compareAndSwapObject
1480 (this, COMPLETIONS, p.next = completions, p))
1481 break;
1482 }
1483 }
1484 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1485 T t; Throwable ex;
1486 if (r instanceof AltResult) {
1487 ex = ((AltResult)r).ex;
1488 t = null;
1489 }
1490 else {
1491 ex = null;
1492 @SuppressWarnings("unchecked") T tr = (T) r;
1493 t = tr;
1494 }
1495 U u = null;
1496 if (ex == null) {
1497 try {
1498 if (e != null)
1499 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1500 else
1501 u = fn.apply(t);
1502 } catch (Throwable rex) {
1503 ex = rex;
1504 }
1505 }
1506 if (e == null || ex != null)
1507 dst.internalComplete(u, ex);
1508 }
1509 helpPostComplete();
1510 return dst;
1511 }
1512
1513 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1514 Executor e) {
1515 if (fn == null) throw new NullPointerException();
1516 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1517 ThenAccept<T> d = null;
1518 Object r;
1519 if ((r = result) == null) {
1520 CompletionNode p = new CompletionNode
1521 (d = new ThenAccept<T>(this, fn, dst, e));
1522 while ((r = result) == null) {
1523 if (UNSAFE.compareAndSwapObject
1524 (this, COMPLETIONS, p.next = completions, p))
1525 break;
1526 }
1527 }
1528 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1529 T t; Throwable ex;
1530 if (r instanceof AltResult) {
1531 ex = ((AltResult)r).ex;
1532 t = null;
1533 }
1534 else {
1535 ex = null;
1536 @SuppressWarnings("unchecked") T tr = (T) r;
1537 t = tr;
1538 }
1539 if (ex == null) {
1540 try {
1541 if (e != null)
1542 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1543 else
1544 fn.accept(t);
1545 } catch (Throwable rex) {
1546 ex = rex;
1547 }
1548 }
1549 if (e == null || ex != null)
1550 dst.internalComplete(null, ex);
1551 }
1552 helpPostComplete();
1553 return dst;
1554 }
1555
1556 private CompletableFuture<Void> doThenRun(Runnable action,
1557 Executor e) {
1558 if (action == null) throw new NullPointerException();
1559 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1560 ThenRun d = null;
1561 Object r;
1562 if ((r = result) == null) {
1563 CompletionNode p = new CompletionNode
1564 (d = new ThenRun(this, action, dst, e));
1565 while ((r = result) == null) {
1566 if (UNSAFE.compareAndSwapObject
1567 (this, COMPLETIONS, p.next = completions, p))
1568 break;
1569 }
1570 }
1571 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1572 Throwable ex;
1573 if (r instanceof AltResult)
1574 ex = ((AltResult)r).ex;
1575 else
1576 ex = null;
1577 if (ex == null) {
1578 try {
1579 if (e != null)
1580 execAsync(e, new AsyncRun(action, dst));
1581 else
1582 action.run();
1583 } catch (Throwable rex) {
1584 ex = rex;
1585 }
1586 }
1587 if (e == null || ex != null)
1588 dst.internalComplete(null, ex);
1589 }
1590 helpPostComplete();
1591 return dst;
1592 }
1593
1594 private <U,V> CompletableFuture<V> doThenCombine
1595 (CompletableFuture<? extends U> other,
1596 BiFunction<? super T,? super U,? extends V> fn,
1597 Executor e) {
1598 if (other == null || fn == null) throw new NullPointerException();
1599 CompletableFuture<V> dst = new CompletableFuture<V>();
1600 ThenCombine<T,U,V> d = null;
1601 Object r, s = null;
1602 if ((r = result) == null || (s = other.result) == null) {
1603 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
1604 CompletionNode q = null, p = new CompletionNode(d);
1605 while ((r == null && (r = result) == null) ||
1606 (s == null && (s = other.result) == null)) {
1607 if (q != null) {
1608 if (s != null ||
1609 UNSAFE.compareAndSwapObject
1610 (other, COMPLETIONS, q.next = other.completions, q))
1611 break;
1612 }
1613 else if (r != null ||
1614 UNSAFE.compareAndSwapObject
1615 (this, COMPLETIONS, p.next = completions, p)) {
1616 if (s != null)
1617 break;
1618 q = new CompletionNode(d);
1619 }
1620 }
1621 }
1622 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1623 T t; U u; Throwable ex;
1624 if (r instanceof AltResult) {
1625 ex = ((AltResult)r).ex;
1626 t = null;
1627 }
1628 else {
1629 ex = null;
1630 @SuppressWarnings("unchecked") T tr = (T) r;
1631 t = tr;
1632 }
1633 if (ex != null)
1634 u = null;
1635 else if (s instanceof AltResult) {
1636 ex = ((AltResult)s).ex;
1637 u = null;
1638 }
1639 else {
1640 @SuppressWarnings("unchecked") U us = (U) s;
1641 u = us;
1642 }
1643 V v = null;
1644 if (ex == null) {
1645 try {
1646 if (e != null)
1647 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
1648 else
1649 v = fn.apply(t, u);
1650 } catch (Throwable rex) {
1651 ex = rex;
1652 }
1653 }
1654 if (e == null || ex != null)
1655 dst.internalComplete(v, ex);
1656 }
1657 helpPostComplete();
1658 other.helpPostComplete();
1659 return dst;
1660 }
1661
1662 private <U> CompletableFuture<Void> doThenAcceptBoth
1663 (CompletableFuture<? extends U> other,
1664 BiConsumer<? super T,? super U> fn,
1665 Executor e) {
1666 if (other == null || fn == null) throw new NullPointerException();
1667 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1668 ThenAcceptBoth<T,U> d = null;
1669 Object r, s = null;
1670 if ((r = result) == null || (s = other.result) == null) {
1671 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1672 CompletionNode q = null, p = new CompletionNode(d);
1673 while ((r == null && (r = result) == null) ||
1674 (s == null && (s = other.result) == null)) {
1675 if (q != null) {
1676 if (s != null ||
1677 UNSAFE.compareAndSwapObject
1678 (other, COMPLETIONS, q.next = other.completions, q))
1679 break;
1680 }
1681 else if (r != null ||
1682 UNSAFE.compareAndSwapObject
1683 (this, COMPLETIONS, p.next = completions, p)) {
1684 if (s != null)
1685 break;
1686 q = new CompletionNode(d);
1687 }
1688 }
1689 }
1690 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1691 T t; U u; Throwable ex;
1692 if (r instanceof AltResult) {
1693 ex = ((AltResult)r).ex;
1694 t = null;
1695 }
1696 else {
1697 ex = null;
1698 @SuppressWarnings("unchecked") T tr = (T) r;
1699 t = tr;
1700 }
1701 if (ex != null)
1702 u = null;
1703 else if (s instanceof AltResult) {
1704 ex = ((AltResult)s).ex;
1705 u = null;
1706 }
1707 else {
1708 @SuppressWarnings("unchecked") U us = (U) s;
1709 u = us;
1710 }
1711 if (ex == null) {
1712 try {
1713 if (e != null)
1714 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1715 else
1716 fn.accept(t, u);
1717 } catch (Throwable rex) {
1718 ex = rex;
1719 }
1720 }
1721 if (e == null || ex != null)
1722 dst.internalComplete(null, ex);
1723 }
1724 helpPostComplete();
1725 other.helpPostComplete();
1726 return dst;
1727 }
1728
1729 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1730 Runnable action,
1731 Executor e) {
1732 if (other == null || action == null) throw new NullPointerException();
1733 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1734 RunAfterBoth d = null;
1735 Object r, s = null;
1736 if ((r = result) == null || (s = other.result) == null) {
1737 d = new RunAfterBoth(this, other, action, dst, e);
1738 CompletionNode q = null, p = new CompletionNode(d);
1739 while ((r == null && (r = result) == null) ||
1740 (s == null && (s = other.result) == null)) {
1741 if (q != null) {
1742 if (s != null ||
1743 UNSAFE.compareAndSwapObject
1744 (other, COMPLETIONS, q.next = other.completions, q))
1745 break;
1746 }
1747 else if (r != null ||
1748 UNSAFE.compareAndSwapObject
1749 (this, COMPLETIONS, p.next = completions, p)) {
1750 if (s != null)
1751 break;
1752 q = new CompletionNode(d);
1753 }
1754 }
1755 }
1756 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1757 Throwable ex;
1758 if (r instanceof AltResult)
1759 ex = ((AltResult)r).ex;
1760 else
1761 ex = null;
1762 if (ex == null && (s instanceof AltResult))
1763 ex = ((AltResult)s).ex;
1764 if (ex == null) {
1765 try {
1766 if (e != null)
1767 execAsync(e, new AsyncRun(action, dst));
1768 else
1769 action.run();
1770 } catch (Throwable rex) {
1771 ex = rex;
1772 }
1773 }
1774 if (e == null || ex != null)
1775 dst.internalComplete(null, ex);
1776 }
1777 helpPostComplete();
1778 other.helpPostComplete();
1779 return dst;
1780 }
1781
1782 private <U> CompletableFuture<U> doApplyToEither
1783 (CompletableFuture<? extends T> other,
1784 Function<? super T, U> fn,
1785 Executor e) {
1786 if (other == null || fn == null) throw new NullPointerException();
1787 CompletableFuture<U> dst = new CompletableFuture<U>();
1788 ApplyToEither<T,U> d = null;
1789 Object r;
1790 if ((r = result) == null && (r = other.result) == null) {
1791 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
1792 CompletionNode q = null, p = new CompletionNode(d);
1793 while ((r = result) == null && (r = other.result) == null) {
1794 if (q != null) {
1795 if (UNSAFE.compareAndSwapObject
1796 (other, COMPLETIONS, q.next = other.completions, q))
1797 break;
1798 }
1799 else if (UNSAFE.compareAndSwapObject
1800 (this, COMPLETIONS, p.next = completions, p))
1801 q = new CompletionNode(d);
1802 }
1803 }
1804 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1805 T t; Throwable ex;
1806 if (r instanceof AltResult) {
1807 ex = ((AltResult)r).ex;
1808 t = null;
1809 }
1810 else {
1811 ex = null;
1812 @SuppressWarnings("unchecked") T tr = (T) r;
1813 t = tr;
1814 }
1815 U u = null;
1816 if (ex == null) {
1817 try {
1818 if (e != null)
1819 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1820 else
1821 u = fn.apply(t);
1822 } catch (Throwable rex) {
1823 ex = rex;
1824 }
1825 }
1826 if (e == null || ex != null)
1827 dst.internalComplete(u, ex);
1828 }
1829 helpPostComplete();
1830 other.helpPostComplete();
1831 return dst;
1832 }
1833
1834 private CompletableFuture<Void> doAcceptEither
1835 (CompletableFuture<? extends T> other,
1836 Consumer<? super T> fn,
1837 Executor e) {
1838 if (other == null || fn == null) throw new NullPointerException();
1839 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1840 AcceptEither<T> d = null;
1841 Object r;
1842 if ((r = result) == null && (r = other.result) == null) {
1843 d = new AcceptEither<T>(this, other, fn, dst, e);
1844 CompletionNode q = null, p = new CompletionNode(d);
1845 while ((r = result) == null && (r = other.result) == null) {
1846 if (q != null) {
1847 if (UNSAFE.compareAndSwapObject
1848 (other, COMPLETIONS, q.next = other.completions, q))
1849 break;
1850 }
1851 else if (UNSAFE.compareAndSwapObject
1852 (this, COMPLETIONS, p.next = completions, p))
1853 q = new CompletionNode(d);
1854 }
1855 }
1856 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1857 T t; Throwable ex;
1858 if (r instanceof AltResult) {
1859 ex = ((AltResult)r).ex;
1860 t = null;
1861 }
1862 else {
1863 ex = null;
1864 @SuppressWarnings("unchecked") T tr = (T) r;
1865 t = tr;
1866 }
1867 if (ex == null) {
1868 try {
1869 if (e != null)
1870 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1871 else
1872 fn.accept(t);
1873 } catch (Throwable rex) {
1874 ex = rex;
1875 }
1876 }
1877 if (e == null || ex != null)
1878 dst.internalComplete(null, ex);
1879 }
1880 helpPostComplete();
1881 other.helpPostComplete();
1882 return dst;
1883 }
1884
1885 private CompletableFuture<Void> doRunAfterEither
1886 (CompletableFuture<?> other,
1887 Runnable action,
1888 Executor e) {
1889 if (other == null || action == null) throw new NullPointerException();
1890 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1891 RunAfterEither d = null;
1892 Object r;
1893 if ((r = result) == null && (r = other.result) == null) {
1894 d = new RunAfterEither(this, other, action, dst, e);
1895 CompletionNode q = null, p = new CompletionNode(d);
1896 while ((r = result) == null && (r = other.result) == null) {
1897 if (q != null) {
1898 if (UNSAFE.compareAndSwapObject
1899 (other, COMPLETIONS, q.next = other.completions, q))
1900 break;
1901 }
1902 else if (UNSAFE.compareAndSwapObject
1903 (this, COMPLETIONS, p.next = completions, p))
1904 q = new CompletionNode(d);
1905 }
1906 }
1907 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1908 Throwable ex;
1909 if (r instanceof AltResult)
1910 ex = ((AltResult)r).ex;
1911 else
1912 ex = null;
1913 if (ex == null) {
1914 try {
1915 if (e != null)
1916 execAsync(e, new AsyncRun(action, dst));
1917 else
1918 action.run();
1919 } catch (Throwable rex) {
1920 ex = rex;
1921 }
1922 }
1923 if (e == null || ex != null)
1924 dst.internalComplete(null, ex);
1925 }
1926 helpPostComplete();
1927 other.helpPostComplete();
1928 return dst;
1929 }
1930
1931 private <U> CompletableFuture<U> doThenCompose
1932 (Function<? super T, ? extends CompletionStage<U>> fn,
1933 Executor e) {
1934 if (fn == null) throw new NullPointerException();
1935 CompletableFuture<U> dst = null;
1936 ThenCompose<T,U> d = null;
1937 Object r;
1938 if ((r = result) == null) {
1939 dst = new CompletableFuture<U>();
1940 CompletionNode p = new CompletionNode
1941 (d = new ThenCompose<T,U>(this, fn, dst, e));
1942 while ((r = result) == null) {
1943 if (UNSAFE.compareAndSwapObject
1944 (this, COMPLETIONS, p.next = completions, p))
1945 break;
1946 }
1947 }
1948 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1949 T t; Throwable ex;
1950 if (r instanceof AltResult) {
1951 ex = ((AltResult)r).ex;
1952 t = null;
1953 }
1954 else {
1955 ex = null;
1956 @SuppressWarnings("unchecked") T tr = (T) r;
1957 t = tr;
1958 }
1959 if (ex == null) {
1960 if (e != null) {
1961 if (dst == null)
1962 dst = new CompletableFuture<U>();
1963 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1964 }
1965 else {
1966 try {
1967 CompletionStage<U> cs = fn.apply(t);
1968 if (cs == null ||
1969 (dst = cs.toCompletableFuture()) == null)
1970 ex = new NullPointerException();
1971 } catch (Throwable rex) {
1972 ex = rex;
1973 }
1974 }
1975 }
1976 if (dst == null)
1977 dst = new CompletableFuture<U>();
1978 if (ex != null)
1979 dst.internalComplete(null, ex);
1980 }
1981 helpPostComplete();
1982 dst.helpPostComplete();
1983 return dst;
1984 }
1985
1986 private CompletableFuture<T> doWhenComplete
1987 (BiConsumer<? super T, ? super Throwable> fn,
1988 Executor e) {
1989 if (fn == null) throw new NullPointerException();
1990 CompletableFuture<T> dst = new CompletableFuture<T>();
1991 WhenCompleteCompletion<T> d = null;
1992 Object r;
1993 if ((r = result) == null) {
1994 CompletionNode p =
1995 new CompletionNode(d = new WhenCompleteCompletion<T>
1996 (this, fn, dst, e));
1997 while ((r = result) == null) {
1998 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1999 p.next = completions, p))
2000 break;
2001 }
2002 }
2003 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2004 T t; Throwable ex;
2005 if (r instanceof AltResult) {
2006 ex = ((AltResult)r).ex;
2007 t = null;
2008 }
2009 else {
2010 ex = null;
2011 @SuppressWarnings("unchecked") T tr = (T) r;
2012 t = tr;
2013 }
2014 Throwable dx = null;
2015 try {
2016 if (e != null)
2017 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
2018 else
2019 fn.accept(t, ex);
2020 } catch (Throwable rex) {
2021 dx = rex;
2022 }
2023 if (e == null || dx != null)
2024 dst.internalComplete(t, ex != null ? ex : dx);
2025 }
2026 helpPostComplete();
2027 return dst;
2028 }
2029
2030 private <U> CompletableFuture<U> doHandle
2031 (BiFunction<? super T, Throwable, ? extends U> fn,
2032 Executor e) {
2033 if (fn == null) throw new NullPointerException();
2034 CompletableFuture<U> dst = new CompletableFuture<U>();
2035 HandleCompletion<T,U> d = null;
2036 Object r;
2037 if ((r = result) == null) {
2038 CompletionNode p =
2039 new CompletionNode(d = new HandleCompletion<T,U>
2040 (this, fn, dst, e));
2041 while ((r = result) == null) {
2042 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2043 p.next = completions, p))
2044 break;
2045 }
2046 }
2047 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2048 T t; Throwable ex;
2049 if (r instanceof AltResult) {
2050 ex = ((AltResult)r).ex;
2051 t = null;
2052 }
2053 else {
2054 ex = null;
2055 @SuppressWarnings("unchecked") T tr = (T) r;
2056 t = tr;
2057 }
2058 U u = null;
2059 Throwable dx = null;
2060 try {
2061 if (e != null)
2062 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2063 else {
2064 u = fn.apply(t, ex);
2065 dx = null;
2066 }
2067 } catch (Throwable rex) {
2068 dx = rex;
2069 u = null;
2070 }
2071 if (e == null || dx != null)
2072 dst.internalComplete(u, dx);
2073 }
2074 helpPostComplete();
2075 return dst;
2076 }
2077
2078
2079 // public methods
2080
2081 /**
2082 * Creates a new incomplete CompletableFuture.
2083 */
2084 public CompletableFuture() {
2085 }
2086
2087 /**
2088 * Returns a new CompletableFuture that is asynchronously completed
2089 * by a task running in the {@link ForkJoinPool#commonPool()} with
2090 * the value obtained by calling the given Supplier.
2091 *
2092 * @param supplier a function returning the value to be used
2093 * to complete the returned CompletableFuture
2094 * @param <U> the function's return type
2095 * @return the new CompletableFuture
2096 */
2097 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2098 if (supplier == null) throw new NullPointerException();
2099 CompletableFuture<U> f = new CompletableFuture<U>();
2100 execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
2101 return f;
2102 }
2103
2104 /**
2105 * Returns a new CompletableFuture that is asynchronously completed
2106 * by a task running in the given executor with the value obtained
2107 * by calling the given Supplier.
2108 *
2109 * @param supplier a function returning the value to be used
2110 * to complete the returned CompletableFuture
2111 * @param executor the executor to use for asynchronous execution
2112 * @param <U> the function's return type
2113 * @return the new CompletableFuture
2114 */
2115 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2116 Executor executor) {
2117 if (executor == null || supplier == null)
2118 throw new NullPointerException();
2119 CompletableFuture<U> f = new CompletableFuture<U>();
2120 execAsync(executor, new AsyncSupply<U>(supplier, f));
2121 return f;
2122 }
2123
2124 /**
2125 * Returns a new CompletableFuture that is asynchronously completed
2126 * by a task running in the {@link ForkJoinPool#commonPool()} after
2127 * it runs the given action.
2128 *
2129 * @param runnable the action to run before completing the
2130 * returned CompletableFuture
2131 * @return the new CompletableFuture
2132 */
2133 public static CompletableFuture<Void> runAsync(Runnable runnable) {
2134 if (runnable == null) throw new NullPointerException();
2135 CompletableFuture<Void> f = new CompletableFuture<Void>();
2136 execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f));
2137 return f;
2138 }
2139
2140 /**
2141 * Returns a new CompletableFuture that is asynchronously completed
2142 * by a task running in the given executor after it runs the given
2143 * action.
2144 *
2145 * @param runnable the action to run before completing the
2146 * returned CompletableFuture
2147 * @param executor the executor to use for asynchronous execution
2148 * @return the new CompletableFuture
2149 */
2150 public static CompletableFuture<Void> runAsync(Runnable runnable,
2151 Executor executor) {
2152 if (executor == null || runnable == null)
2153 throw new NullPointerException();
2154 CompletableFuture<Void> f = new CompletableFuture<Void>();
2155 execAsync(executor, new AsyncRun(runnable, f));
2156 return f;
2157 }
2158
2159 /**
2160 * Returns a new CompletableFuture that is already completed with
2161 * the given value.
2162 *
2163 * @param value the value
2164 * @param <U> the type of the value
2165 * @return the completed CompletableFuture
2166 */
2167 public static <U> CompletableFuture<U> completedFuture(U value) {
2168 CompletableFuture<U> f = new CompletableFuture<U>();
2169 f.result = (value == null) ? NIL : value;
2170 return f;
2171 }
2172
2173 /**
2174 * Returns {@code true} if completed in any fashion: normally,
2175 * exceptionally, or via cancellation.
2176 *
2177 * @return {@code true} if completed
2178 */
2179 public boolean isDone() {
2180 return result != null;
2181 }
2182
2183 /**
2184 * Waits if necessary for this future to complete, and then
2185 * returns its result.
2186 *
2187 * @return the result value
2188 * @throws CancellationException if this future was cancelled
2189 * @throws ExecutionException if this future completed exceptionally
2190 * @throws InterruptedException if the current thread was interrupted
2191 * while waiting
2192 */
2193 public T get() throws InterruptedException, ExecutionException {
2194 Object r; Throwable ex, cause;
2195 if ((r = result) == null && (r = waitingGet(true)) == null)
2196 throw new InterruptedException();
2197 if (!(r instanceof AltResult)) {
2198 @SuppressWarnings("unchecked") T tr = (T) r;
2199 return tr;
2200 }
2201 if ((ex = ((AltResult)r).ex) == null)
2202 return null;
2203 if (ex instanceof CancellationException)
2204 throw (CancellationException)ex;
2205 if ((ex instanceof CompletionException) &&
2206 (cause = ex.getCause()) != null)
2207 ex = cause;
2208 throw new ExecutionException(ex);
2209 }
2210
2211 /**
2212 * Waits if necessary for at most the given time for this future
2213 * to complete, and then returns its result, if available.
2214 *
2215 * @param timeout the maximum time to wait
2216 * @param unit the time unit of the timeout argument
2217 * @return the result value
2218 * @throws CancellationException if this future was cancelled
2219 * @throws ExecutionException if this future completed exceptionally
2220 * @throws InterruptedException if the current thread was interrupted
2221 * while waiting
2222 * @throws TimeoutException if the wait timed out
2223 */
2224 public T get(long timeout, TimeUnit unit)
2225 throws InterruptedException, ExecutionException, TimeoutException {
2226 Object r; Throwable ex, cause;
2227 long nanos = unit.toNanos(timeout);
2228 if (Thread.interrupted())
2229 throw new InterruptedException();
2230 if ((r = result) == null)
2231 r = timedAwaitDone(nanos);
2232 if (!(r instanceof AltResult)) {
2233 @SuppressWarnings("unchecked") T tr = (T) r;
2234 return tr;
2235 }
2236 if ((ex = ((AltResult)r).ex) == null)
2237 return null;
2238 if (ex instanceof CancellationException)
2239 throw (CancellationException)ex;
2240 if ((ex instanceof CompletionException) &&
2241 (cause = ex.getCause()) != null)
2242 ex = cause;
2243 throw new ExecutionException(ex);
2244 }
2245
2246 /**
2247 * Returns the result value when complete, or throws an
2248 * (unchecked) exception if completed exceptionally. To better
2249 * conform with the use of common functional forms, if a
2250 * computation involved in the completion of this
2251 * CompletableFuture threw an exception, this method throws an
2252 * (unchecked) {@link CompletionException} with the underlying
2253 * exception as its cause.
2254 *
2255 * @return the result value
2256 * @throws CancellationException if the computation was cancelled
2257 * @throws CompletionException if this future completed
2258 * exceptionally or a completion computation threw an exception
2259 */
2260 public T join() {
2261 Object r; Throwable ex;
2262 if ((r = result) == null)
2263 r = waitingGet(false);
2264 if (!(r instanceof AltResult)) {
2265 @SuppressWarnings("unchecked") T tr = (T) r;
2266 return tr;
2267 }
2268 if ((ex = ((AltResult)r).ex) == null)
2269 return null;
2270 if (ex instanceof CancellationException)
2271 throw (CancellationException)ex;
2272 if (ex instanceof CompletionException)
2273 throw (CompletionException)ex;
2274 throw new CompletionException(ex);
2275 }
2276
2277 /**
2278 * Returns the result value (or throws any encountered exception)
2279 * if completed, else returns the given valueIfAbsent.
2280 *
2281 * @param valueIfAbsent the value to return if not completed
2282 * @return the result value, if completed, else the given valueIfAbsent
2283 * @throws CancellationException if the computation was cancelled
2284 * @throws CompletionException if this future completed
2285 * exceptionally or a completion computation threw an exception
2286 */
2287 public T getNow(T valueIfAbsent) {
2288 Object r; Throwable ex;
2289 if ((r = result) == null)
2290 return valueIfAbsent;
2291 if (!(r instanceof AltResult)) {
2292 @SuppressWarnings("unchecked") T tr = (T) r;
2293 return tr;
2294 }
2295 if ((ex = ((AltResult)r).ex) == null)
2296 return null;
2297 if (ex instanceof CancellationException)
2298 throw (CancellationException)ex;
2299 if (ex instanceof CompletionException)
2300 throw (CompletionException)ex;
2301 throw new CompletionException(ex);
2302 }
2303
2304 /**
2305 * If not already completed, sets the value returned by {@link
2306 * #get()} and related methods to the given value.
2307 *
2308 * @param value the result value
2309 * @return {@code true} if this invocation caused this CompletableFuture
2310 * to transition to a completed state, else {@code false}
2311 */
2312 public boolean complete(T value) {
2313 boolean triggered = result == null &&
2314 UNSAFE.compareAndSwapObject(this, RESULT, null,
2315 value == null ? NIL : value);
2316 postComplete();
2317 return triggered;
2318 }
2319
2320 /**
2321 * If not already completed, causes invocations of {@link #get()}
2322 * and related methods to throw the given exception.
2323 *
2324 * @param ex the exception
2325 * @return {@code true} if this invocation caused this CompletableFuture
2326 * to transition to a completed state, else {@code false}
2327 */
2328 public boolean completeExceptionally(Throwable ex) {
2329 if (ex == null) throw new NullPointerException();
2330 boolean triggered = result == null &&
2331 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2332 postComplete();
2333 return triggered;
2334 }
2335
2336 // CompletionStage methods
2337
2338 public <U> CompletableFuture<U> thenApply
2339 (Function<? super T,? extends U> fn) {
2340 return doThenApply(fn, null);
2341 }
2342
2343 public <U> CompletableFuture<U> thenApplyAsync
2344 (Function<? super T,? extends U> fn) {
2345 return doThenApply(fn, ForkJoinPool.commonPool());
2346 }
2347
2348 public <U> CompletableFuture<U> thenApplyAsync
2349 (Function<? super T,? extends U> fn,
2350 Executor executor) {
2351 if (executor == null) throw new NullPointerException();
2352 return doThenApply(fn, executor);
2353 }
2354
2355 public CompletableFuture<Void> thenAccept
2356 (Consumer<? super T> action) {
2357 return doThenAccept(action, null);
2358 }
2359
2360 public CompletableFuture<Void> thenAcceptAsync
2361 (Consumer<? super T> action) {
2362 return doThenAccept(action, ForkJoinPool.commonPool());
2363 }
2364
2365 public CompletableFuture<Void> thenAcceptAsync
2366 (Consumer<? super T> action,
2367 Executor executor) {
2368 if (executor == null) throw new NullPointerException();
2369 return doThenAccept(action, executor);
2370 }
2371
2372 public CompletableFuture<Void> thenRun
2373 (Runnable action) {
2374 return doThenRun(action, null);
2375 }
2376
2377 public CompletableFuture<Void> thenRunAsync
2378 (Runnable action) {
2379 return doThenRun(action, ForkJoinPool.commonPool());
2380 }
2381
2382 public CompletableFuture<Void> thenRunAsync
2383 (Runnable action,
2384 Executor executor) {
2385 if (executor == null) throw new NullPointerException();
2386 return doThenRun(action, executor);
2387 }
2388
2389 public <U,V> CompletableFuture<V> thenCombine
2390 (CompletionStage<? extends U> other,
2391 BiFunction<? super T,? super U,? extends V> fn) {
2392 return doThenCombine(other.toCompletableFuture(), fn, null);
2393 }
2394
2395 public <U,V> CompletableFuture<V> thenCombineAsync
2396 (CompletionStage<? extends U> other,
2397 BiFunction<? super T,? super U,? extends V> fn) {
2398 return doThenCombine(other.toCompletableFuture(), fn,
2399 ForkJoinPool.commonPool());
2400 }
2401
2402 public <U,V> CompletableFuture<V> thenCombineAsync
2403 (CompletionStage<? extends U> other,
2404 BiFunction<? super T,? super U,? extends V> fn,
2405 Executor executor) {
2406 if (executor == null) throw new NullPointerException();
2407 return doThenCombine(other.toCompletableFuture(), fn, executor);
2408 }
2409
2410 public <U> CompletableFuture<Void> thenAcceptBoth
2411 (CompletionStage<? extends U> other,
2412 BiConsumer<? super T, ? super U> action) {
2413 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2414 }
2415
2416 public <U> CompletableFuture<Void> thenAcceptBothAsync
2417 (CompletionStage<? extends U> other,
2418 BiConsumer<? super T, ? super U> action) {
2419 return doThenAcceptBoth(other.toCompletableFuture(), action,
2420 ForkJoinPool.commonPool());
2421 }
2422
2423 public <U> CompletableFuture<Void> thenAcceptBothAsync
2424 (CompletionStage<? extends U> other,
2425 BiConsumer<? super T, ? super U> action,
2426 Executor executor) {
2427 if (executor == null) throw new NullPointerException();
2428 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2429 }
2430
2431 public CompletableFuture<Void> runAfterBoth
2432 (CompletionStage<?> other,
2433 Runnable action) {
2434 return doRunAfterBoth(other.toCompletableFuture(), action, null);
2435 }
2436
2437 public CompletableFuture<Void> runAfterBothAsync
2438 (CompletionStage<?> other,
2439 Runnable action) {
2440 return doRunAfterBoth(other.toCompletableFuture(), action,
2441 ForkJoinPool.commonPool());
2442 }
2443
2444 public CompletableFuture<Void> runAfterBothAsync
2445 (CompletionStage<?> other,
2446 Runnable action,
2447 Executor executor) {
2448 if (executor == null) throw new NullPointerException();
2449 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2450 }
2451
2452
2453 public <U> CompletableFuture<U> applyToEither
2454 (CompletionStage<? extends T> other,
2455 Function<? super T, U> fn) {
2456 return doApplyToEither(other.toCompletableFuture(), fn, null);
2457 }
2458
2459 public <U> CompletableFuture<U> applyToEitherAsync
2460 (CompletionStage<? extends T> other,
2461 Function<? super T, U> fn) {
2462 return doApplyToEither(other.toCompletableFuture(), fn,
2463 ForkJoinPool.commonPool());
2464 }
2465
2466 public <U> CompletableFuture<U> applyToEitherAsync
2467 (CompletionStage<? extends T> other,
2468 Function<? super T, U> fn,
2469 Executor executor) {
2470 if (executor == null) throw new NullPointerException();
2471 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2472 }
2473
2474 public CompletableFuture<Void> acceptEither
2475 (CompletionStage<? extends T> other,
2476 Consumer<? super T> action) {
2477 return doAcceptEither(other.toCompletableFuture(), action, null);
2478 }
2479
2480 public CompletableFuture<Void> acceptEitherAsync
2481 (CompletionStage<? extends T> other,
2482 Consumer<? super T> action) {
2483 return doAcceptEither(other.toCompletableFuture(), action,
2484 ForkJoinPool.commonPool());
2485 }
2486
2487 public CompletableFuture<Void> acceptEitherAsync
2488 (CompletionStage<? extends T> other,
2489 Consumer<? super T> action,
2490 Executor executor) {
2491 if (executor == null) throw new NullPointerException();
2492 return doAcceptEither(other.toCompletableFuture(), action, executor);
2493 }
2494
2495 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2496 Runnable action) {
2497 return doRunAfterEither(other.toCompletableFuture(), action, null);
2498 }
2499
2500 public CompletableFuture<Void> runAfterEitherAsync
2501 (CompletionStage<?> other,
2502 Runnable action) {
2503 return doRunAfterEither(other.toCompletableFuture(), action,
2504 ForkJoinPool.commonPool());
2505 }
2506
2507 public CompletableFuture<Void> runAfterEitherAsync
2508 (CompletionStage<?> other,
2509 Runnable action,
2510 Executor executor) {
2511 if (executor == null) throw new NullPointerException();
2512 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2513 }
2514
2515 public <U> CompletableFuture<U> thenCompose
2516 (Function<? super T, ? extends CompletionStage<U>> fn) {
2517 return doThenCompose(fn, null);
2518 }
2519
2520 public <U> CompletableFuture<U> thenComposeAsync
2521 (Function<? super T, ? extends CompletionStage<U>> fn) {
2522 return doThenCompose(fn, ForkJoinPool.commonPool());
2523 }
2524
2525 public <U> CompletableFuture<U> thenComposeAsync
2526 (Function<? super T, ? extends CompletionStage<U>> fn,
2527 Executor executor) {
2528 if (executor == null) throw new NullPointerException();
2529 return doThenCompose(fn, executor);
2530 }
2531
2532 public CompletableFuture<T> whenComplete
2533 (BiConsumer<? super T, ? super Throwable> action) {
2534 return doWhenComplete(action, null);
2535 }
2536
2537 public CompletableFuture<T> whenCompleteAsync
2538 (BiConsumer<? super T, ? super Throwable> action) {
2539 return doWhenComplete(action, ForkJoinPool.commonPool());
2540 }
2541
2542 public CompletableFuture<T> whenCompleteAsync
2543 (BiConsumer<? super T, ? super Throwable> action,
2544 Executor executor) {
2545 if (executor == null) throw new NullPointerException();
2546 return doWhenComplete(action, executor);
2547 }
2548
2549 public <U> CompletableFuture<U> handle
2550 (BiFunction<? super T, Throwable, ? extends U> fn) {
2551 return doHandle(fn, null);
2552 }
2553
2554 public <U> CompletableFuture<U> handleAsync
2555 (BiFunction<? super T, Throwable, ? extends U> fn) {
2556 return doHandle(fn, ForkJoinPool.commonPool());
2557 }
2558
2559 public <U> CompletableFuture<U> handleAsync
2560 (BiFunction<? super T, Throwable, ? extends U> fn,
2561 Executor executor) {
2562 if (executor == null) throw new NullPointerException();
2563 return doHandle(fn, executor);
2564 }
2565
2566 /**
2567 * Returns this CompletableFuture
2568 *
2569 * @return this CompletableFuture
2570 */
2571 public CompletableFuture<T> toCompletableFuture() {
2572 return this;
2573 }
2574
2575 // not in interface CompletionStage
2576
2577 /**
2578 * Returns a new CompletableFuture that is completed when this
2579 * CompletableFuture completes, with the result of the given
2580 * function of the exception triggering this CompletableFuture's
2581 * completion when it completes exceptionally; otherwise, if this
2582 * CompletableFuture completes normally, then the returned
2583 * CompletableFuture also completes normally with the same value.
2584 * Note: More flexible versions of this functionality are
2585 * available using methods {@code whenComplete} and {@code handle}.
2586 *
2587 * @param fn the function to use to compute the value of the
2588 * returned CompletableFuture if this CompletableFuture completed
2589 * exceptionally
2590 * @return the new CompletableFuture
2591 */
2592 public CompletableFuture<T> exceptionally
2593 (Function<Throwable, ? extends T> fn) {
2594 if (fn == null) throw new NullPointerException();
2595 CompletableFuture<T> dst = new CompletableFuture<T>();
2596 ExceptionCompletion<T> d = null;
2597 Object r;
2598 if ((r = result) == null) {
2599 CompletionNode p =
2600 new CompletionNode(d = new ExceptionCompletion<T>
2601 (this, fn, dst));
2602 while ((r = result) == null) {
2603 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2604 p.next = completions, p))
2605 break;
2606 }
2607 }
2608 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2609 T t = null; Throwable ex, dx = null;
2610 if (r instanceof AltResult) {
2611 if ((ex = ((AltResult)r).ex) != null) {
2612 try {
2613 t = fn.apply(ex);
2614 } catch (Throwable rex) {
2615 dx = rex;
2616 }
2617 }
2618 }
2619 else {
2620 @SuppressWarnings("unchecked") T tr = (T) r;
2621 t = tr;
2622 }
2623 dst.internalComplete(t, dx);
2624 }
2625 helpPostComplete();
2626 return dst;
2627 }
2628
2629 /* ------------- Arbitrary-arity constructions -------------- */
2630
2631 /*
2632 * The basic plan of attack is to recursively form binary
2633 * completion trees of elements. This can be overkill for small
2634 * sets, but scales nicely. The And/All vs Or/Any forms use the
2635 * same idea, but details differ.
2636 */
2637
2638 /**
2639 * Returns a new CompletableFuture that is completed when all of
2640 * the given CompletableFutures complete. If any of the given
2641 * CompletableFutures complete exceptionally, then the returned
2642 * CompletableFuture also does so, with a CompletionException
2643 * holding this exception as its cause. Otherwise, the results,
2644 * if any, of the given CompletableFutures are not reflected in
2645 * the returned CompletableFuture, but may be obtained by
2646 * inspecting them individually. If no CompletableFutures are
2647 * provided, returns a CompletableFuture completed with the value
2648 * {@code null}.
2649 *
2650 * <p>Among the applications of this method is to await completion
2651 * of a set of independent CompletableFutures before continuing a
2652 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2653 * c3).join();}.
2654 *
2655 * @param cfs the CompletableFutures
2656 * @return a new CompletableFuture that is completed when all of the
2657 * given CompletableFutures complete
2658 * @throws NullPointerException if the array or any of its elements are
2659 * {@code null}
2660 */
2661 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2662 int len = cfs.length; // Directly handle empty and singleton cases
2663 if (len > 1)
2664 return allTree(cfs, 0, len - 1);
2665 else {
2666 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2667 CompletableFuture<?> f;
2668 if (len == 0)
2669 dst.result = NIL;
2670 else if ((f = cfs[0]) == null)
2671 throw new NullPointerException();
2672 else {
2673 ThenPropagate d = null;
2674 CompletionNode p = null;
2675 Object r;
2676 while ((r = f.result) == null) {
2677 if (d == null)
2678 d = new ThenPropagate(f, dst);
2679 else if (p == null)
2680 p = new CompletionNode(d);
2681 else if (UNSAFE.compareAndSwapObject
2682 (f, COMPLETIONS, p.next = f.completions, p))
2683 break;
2684 }
2685 if (r != null && (d == null || d.compareAndSet(0, 1)))
2686 dst.internalComplete(null, (r instanceof AltResult) ?
2687 ((AltResult)r).ex : null);
2688 f.helpPostComplete();
2689 }
2690 return dst;
2691 }
2692 }
2693
2694 /**
2695 * Recursively constructs an And'ed tree of CompletableFutures.
2696 * Called only when array known to have at least two elements.
2697 */
2698 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2699 int lo, int hi) {
2700 CompletableFuture<?> fst, snd;
2701 int mid = (lo + hi) >>> 1;
2702 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2703 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2704 throw new NullPointerException();
2705 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2706 AndCompletion d = null;
2707 CompletionNode p = null, q = null;
2708 Object r = null, s = null;
2709 while ((r = fst.result) == null || (s = snd.result) == null) {
2710 if (d == null)
2711 d = new AndCompletion(fst, snd, dst);
2712 else if (p == null)
2713 p = new CompletionNode(d);
2714 else if (q == null) {
2715 if (UNSAFE.compareAndSwapObject
2716 (fst, COMPLETIONS, p.next = fst.completions, p))
2717 q = new CompletionNode(d);
2718 }
2719 else if (UNSAFE.compareAndSwapObject
2720 (snd, COMPLETIONS, q.next = snd.completions, q))
2721 break;
2722 }
2723 if ((r != null || (r = fst.result) != null) &&
2724 (s != null || (s = snd.result) != null) &&
2725 (d == null || d.compareAndSet(0, 1))) {
2726 Throwable ex;
2727 if (r instanceof AltResult)
2728 ex = ((AltResult)r).ex;
2729 else
2730 ex = null;
2731 if (ex == null && (s instanceof AltResult))
2732 ex = ((AltResult)s).ex;
2733 dst.internalComplete(null, ex);
2734 }
2735 fst.helpPostComplete();
2736 snd.helpPostComplete();
2737 return dst;
2738 }
2739
2740 /**
2741 * Returns a new CompletableFuture that is completed when any of
2742 * the given CompletableFutures complete, with the same result.
2743 * Otherwise, if it completed exceptionally, the returned
2744 * CompletableFuture also does so, with a CompletionException
2745 * holding this exception as its cause. If no CompletableFutures
2746 * are provided, returns an incomplete CompletableFuture.
2747 *
2748 * @param cfs the CompletableFutures
2749 * @return a new CompletableFuture that is completed with the
2750 * result or exception of any of the given CompletableFutures when
2751 * one completes
2752 * @throws NullPointerException if the array or any of its elements are
2753 * {@code null}
2754 */
2755 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2756 int len = cfs.length; // Same idea as allOf
2757 if (len > 1)
2758 return anyTree(cfs, 0, len - 1);
2759 else {
2760 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2761 CompletableFuture<?> f;
2762 if (len == 0)
2763 ; // skip
2764 else if ((f = cfs[0]) == null)
2765 throw new NullPointerException();
2766 else {
2767 ThenCopy<Object> d = null;
2768 CompletionNode p = null;
2769 Object r;
2770 while ((r = f.result) == null) {
2771 if (d == null)
2772 d = new ThenCopy<Object>(f, dst);
2773 else if (p == null)
2774 p = new CompletionNode(d);
2775 else if (UNSAFE.compareAndSwapObject
2776 (f, COMPLETIONS, p.next = f.completions, p))
2777 break;
2778 }
2779 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2780 Throwable ex; Object t;
2781 if (r instanceof AltResult) {
2782 ex = ((AltResult)r).ex;
2783 t = null;
2784 }
2785 else {
2786 ex = null;
2787 t = r;
2788 }
2789 dst.internalComplete(t, ex);
2790 }
2791 f.helpPostComplete();
2792 }
2793 return dst;
2794 }
2795 }
2796
2797 /**
2798 * Recursively constructs an Or'ed tree of CompletableFutures.
2799 */
2800 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2801 int lo, int hi) {
2802 CompletableFuture<?> fst, snd;
2803 int mid = (lo + hi) >>> 1;
2804 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2805 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2806 throw new NullPointerException();
2807 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2808 OrCompletion d = null;
2809 CompletionNode p = null, q = null;
2810 Object r;
2811 while ((r = fst.result) == null && (r = snd.result) == null) {
2812 if (d == null)
2813 d = new OrCompletion(fst, snd, dst);
2814 else if (p == null)
2815 p = new CompletionNode(d);
2816 else if (q == null) {
2817 if (UNSAFE.compareAndSwapObject
2818 (fst, COMPLETIONS, p.next = fst.completions, p))
2819 q = new CompletionNode(d);
2820 }
2821 else if (UNSAFE.compareAndSwapObject
2822 (snd, COMPLETIONS, q.next = snd.completions, q))
2823 break;
2824 }
2825 if ((r != null || (r = fst.result) != null ||
2826 (r = snd.result) != null) &&
2827 (d == null || d.compareAndSet(0, 1))) {
2828 Throwable ex; Object t;
2829 if (r instanceof AltResult) {
2830 ex = ((AltResult)r).ex;
2831 t = null;
2832 }
2833 else {
2834 ex = null;
2835 t = r;
2836 }
2837 dst.internalComplete(t, ex);
2838 }
2839 fst.helpPostComplete();
2840 snd.helpPostComplete();
2841 return dst;
2842 }
2843
2844 /* ------------- Control and status methods -------------- */
2845
2846 /**
2847 * If not already completed, completes this CompletableFuture with
2848 * a {@link CancellationException}. Dependent CompletableFutures
2849 * that have not already completed will also complete
2850 * exceptionally, with a {@link CompletionException} caused by
2851 * this {@code CancellationException}.
2852 *
2853 * @param mayInterruptIfRunning this value has no effect in this
2854 * implementation because interrupts are not used to control
2855 * processing.
2856 *
2857 * @return {@code true} if this task is now cancelled
2858 */
2859 public boolean cancel(boolean mayInterruptIfRunning) {
2860 boolean cancelled = (result == null) &&
2861 UNSAFE.compareAndSwapObject
2862 (this, RESULT, null, new AltResult(new CancellationException()));
2863 postComplete();
2864 return cancelled || isCancelled();
2865 }
2866
2867 /**
2868 * Returns {@code true} if this CompletableFuture was cancelled
2869 * before it completed normally.
2870 *
2871 * @return {@code true} if this CompletableFuture was cancelled
2872 * before it completed normally
2873 */
2874 public boolean isCancelled() {
2875 Object r;
2876 return ((r = result) instanceof AltResult) &&
2877 (((AltResult)r).ex instanceof CancellationException);
2878 }
2879
2880 /**
2881 * Returns {@code true} if this CompletableFuture completed
2882 * exceptionally, in any way. Possible causes include
2883 * cancellation, explicit invocation of {@code
2884 * completeExceptionally}, and abrupt termination of a
2885 * CompletionStage action.
2886 *
2887 * @return {@code true} if this CompletableFuture completed
2888 * exceptionally
2889 */
2890 public boolean isCompletedExceptionally() {
2891 Object r;
2892 return ((r = result) instanceof AltResult) && r != NIL;
2893 }
2894
2895 /**
2896 * Forcibly sets or resets the value subsequently returned by
2897 * method {@link #get()} and related methods, whether or not
2898 * already completed. This method is designed for use only in
2899 * error recovery actions, and even in such situations may result
2900 * in ongoing dependent completions using established versus
2901 * overwritten outcomes.
2902 *
2903 * @param value the completion value
2904 */
2905 public void obtrudeValue(T value) {
2906 result = (value == null) ? NIL : value;
2907 postComplete();
2908 }
2909
2910 /**
2911 * Forcibly causes subsequent invocations of method {@link #get()}
2912 * and related methods to throw the given exception, whether or
2913 * not already completed. This method is designed for use only in
2914 * recovery actions, and even in such situations may result in
2915 * ongoing dependent completions using established versus
2916 * overwritten outcomes.
2917 *
2918 * @param ex the exception
2919 */
2920 public void obtrudeException(Throwable ex) {
2921 if (ex == null) throw new NullPointerException();
2922 result = new AltResult(ex);
2923 postComplete();
2924 }
2925
2926 /**
2927 * Returns the estimated number of CompletableFutures whose
2928 * completions are awaiting completion of this CompletableFuture.
2929 * This method is designed for use in monitoring system state, not
2930 * for synchronization control.
2931 *
2932 * @return the number of dependent CompletableFutures
2933 */
2934 public int getNumberOfDependents() {
2935 int count = 0;
2936 for (CompletionNode p = completions; p != null; p = p.next)
2937 ++count;
2938 return count;
2939 }
2940
2941 /**
2942 * Returns a string identifying this CompletableFuture, as well as
2943 * its completion state. The state, in brackets, contains the
2944 * String {@code "Completed Normally"} or the String {@code
2945 * "Completed Exceptionally"}, or the String {@code "Not
2946 * completed"} followed by the number of CompletableFutures
2947 * dependent upon its completion, if any.
2948 *
2949 * @return a string identifying this CompletableFuture, as well as its state
2950 */
2951 public String toString() {
2952 Object r = result;
2953 int count;
2954 return super.toString() +
2955 ((r == null) ?
2956 (((count = getNumberOfDependents()) == 0) ?
2957 "[Not completed]" :
2958 "[Not completed, " + count + " dependents]") :
2959 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2960 "[Completed exceptionally]" :
2961 "[Completed normally]"));
2962 }
2963
2964 // Unsafe mechanics
2965 private static final sun.misc.Unsafe UNSAFE;
2966 private static final long RESULT;
2967 private static final long WAITERS;
2968 private static final long COMPLETIONS;
2969 static {
2970 try {
2971 UNSAFE = sun.misc.Unsafe.getUnsafe();
2972 Class<?> k = CompletableFuture.class;
2973 RESULT = UNSAFE.objectFieldOffset
2974 (k.getDeclaredField("result"));
2975 WAITERS = UNSAFE.objectFieldOffset
2976 (k.getDeclaredField("waiters"));
2977 COMPLETIONS = UNSAFE.objectFieldOffset
2978 (k.getDeclaredField("completions"));
2979 } catch (Exception e) {
2980 throw new Error(e);
2981 }
2982 }
2983 }