ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.96
Committed: Wed Jul 24 15:25:27 2013 UTC (10 years, 10 months ago) by dl
Branch: MAIN
Changes since 1.95: +48 -35 lines
Log Message:
Incorporate review suggeastions; cope with disabled commonPool

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.locks.LockSupport;
26
27 /**
28 * A {@link Future} that may be explicitly completed (setting its
29 * value and status), and may be used as a {@link CompletionStage},
30 * supporting dependent functions and actions that trigger upon its
31 * completion.
32 *
33 * <p>When two or more threads attempt to
34 * {@link #complete complete},
35 * {@link #completeExceptionally completeExceptionally}, or
36 * {@link #cancel cancel}
37 * a CompletableFuture, only one of them succeeds.
38 *
39 * <p>In addition to these and related methods for directly
40 * manipulating status and results, CompletableFuture implements
41 * interface {@link CompletionStage} with the following policies: <ul>
42 *
43 * <li>Actions supplied for dependent completions of
44 * <em>non-async</em> methods may be performed by the thread that
45 * completes the current CompletableFuture, or by any other caller of
46 * a completion method.</li>
47 *
48 * <li>All <em>async</em> methods without an explicit Executor
49 * argument are performed using the {@link ForkJoinPool#commonPool()}
50 * (unless it does not support a parallelism level of at least two, in
51 * which case, a new Thread is used). To simplify monitoring,
52 * debugging, and tracking, all generated asynchronous tasks are
53 * instances of the marker interface {@link
54 * AsynchronousCompletionTask}. </li>
55 *
56 * <li>All CompletionStage methods are implemented independently of
57 * other public methods, so the behavior of one method is not impacted
58 * by overrides of others in subclasses. </li> </ul>
59 *
60 * <p>CompletableFuture also implements {@link Future} with the following
61 * policies: <ul>
62 *
63 * <li>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional
66 * completion. Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}. Method
68 * {@link #isCompletedExceptionally} can be used to determine if a
69 * CompletableFuture completed in any exceptional fashion.</li>
70 *
71 * <li>In case of exceptional completion with a CompletionException,
72 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
73 * {@link ExecutionException} with the same cause as held in the
74 * corresponding CompletionException. To simplify usage in most
75 * contexts, this class also defines methods {@link #join()} and
76 * {@link #getNow} that instead throw the CompletionException directly
77 * in these cases.</li> </ul>
78 *
79 * @author Doug Lea
80 * @since 1.8
81 */
82 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
83
84 /*
85 * Overview:
86 *
87 * 1. Non-nullness of field result (set via CAS) indicates done.
88 * An AltResult is used to box null as a result, as well as to
89 * hold exceptions. Using a single field makes completion fast
90 * and simple to detect and trigger, at the expense of a lot of
91 * encoding and decoding that infiltrates many methods. One minor
92 * simplification relies on the (static) NIL (to box null results)
93 * being the only AltResult with a null exception field, so we
94 * don't usually need explicit comparisons with NIL. The CF
95 * exception propagation mechanics surrounding decoding rely on
96 * unchecked casts of decoded results really being unchecked,
97 * where user type errors are caught at point of use, as is
98 * currently the case in Java. These are highlighted by using
99 * SuppressWarnings-annotated temporaries.
100 *
101 * 2. Waiters are held in a Treiber stack similar to the one used
102 * in FutureTask, Phaser, and SynchronousQueue. See their
103 * internal documentation for algorithmic details.
104 *
105 * 3. Completions are also kept in a list/stack, and pulled off
106 * and run when completion is triggered. (We could even use the
107 * same stack as for waiters, but would give up the potential
108 * parallelism obtained because woken waiters help release/run
109 * others -- see method postComplete). Because post-processing
110 * may race with direct calls, class Completion opportunistically
111 * extends AtomicInteger so callers can claim the action via
112 * compareAndSet(0, 1). The Completion.run methods are all
113 * written a boringly similar uniform way (that sometimes includes
114 * unnecessary-looking checks, kept to maintain uniformity).
115 * There are enough dimensions upon which they differ that
116 * attempts to factor commonalities while maintaining efficiency
117 * require more lines of code than they would save.
118 *
119 * 4. The exported then/and/or methods do support a bit of
120 * factoring (see doThenApply etc). They must cope with the
121 * intrinsic races surrounding addition of a dependent action
122 * versus performing the action directly because the task is
123 * already complete. For example, a CF may not be complete upon
124 * entry, so a dependent completion is added, but by the time it
125 * is added, the target CF is complete, so must be directly
126 * executed. This is all done while avoiding unnecessary object
127 * construction in safe-bypass cases.
128 */
129
130 // preliminaries
131
132 static final class AltResult {
133 final Throwable ex; // null only for NIL
134 AltResult(Throwable ex) { this.ex = ex; }
135 }
136
137 static final AltResult NIL = new AltResult(null);
138
139 // Fields
140
141 volatile Object result; // Either the result or boxed AltResult
142 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
143 volatile CompletionNode completions; // list (Treiber stack) of completions
144
145 // Basic utilities for triggering and processing completions
146
147 /**
148 * Removes and signals all waiting threads and runs all completions.
149 */
150 final void postComplete() {
151 WaitNode q; Thread t;
152 while ((q = waiters) != null) {
153 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
154 (t = q.thread) != null) {
155 q.thread = null;
156 LockSupport.unpark(t);
157 }
158 }
159
160 CompletionNode h; Completion c;
161 while ((h = completions) != null) {
162 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
163 (c = h.completion) != null)
164 c.run();
165 }
166 }
167
168 /**
169 * Triggers completion with the encoding of the given arguments:
170 * if the exception is non-null, encodes it as a wrapped
171 * CompletionException unless it is one already. Otherwise uses
172 * the given result, boxed as NIL if null.
173 */
174 final void internalComplete(T v, Throwable ex) {
175 if (result == null)
176 UNSAFE.compareAndSwapObject
177 (this, RESULT, null,
178 (ex == null) ? (v == null) ? NIL : v :
179 new AltResult((ex instanceof CompletionException) ? ex :
180 new CompletionException(ex)));
181 postComplete(); // help out even if not triggered
182 }
183
184 /**
185 * If triggered, helps release and/or process completions.
186 */
187 final void helpPostComplete() {
188 if (result != null)
189 postComplete();
190 }
191
192 /* ------------- waiting for completions -------------- */
193
194 /** Number of processors, for spin control */
195 static final int NCPU = Runtime.getRuntime().availableProcessors();
196
197 /**
198 * Heuristic spin value for waitingGet() before blocking on
199 * multiprocessors
200 */
201 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
202
203 /**
204 * Linked nodes to record waiting threads in a Treiber stack. See
205 * other classes such as Phaser and SynchronousQueue for more
206 * detailed explanation. This class implements ManagedBlocker to
207 * avoid starvation when blocking actions pile up in
208 * ForkJoinPools.
209 */
210 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
211 long nanos; // wait time if timed
212 final long deadline; // non-zero if timed
213 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
214 volatile Thread thread;
215 volatile WaitNode next;
216 WaitNode(boolean interruptible, long nanos, long deadline) {
217 this.thread = Thread.currentThread();
218 this.interruptControl = interruptible ? 1 : 0;
219 this.nanos = nanos;
220 this.deadline = deadline;
221 }
222 public boolean isReleasable() {
223 if (thread == null)
224 return true;
225 if (Thread.interrupted()) {
226 int i = interruptControl;
227 interruptControl = -1;
228 if (i > 0)
229 return true;
230 }
231 if (deadline != 0L &&
232 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
233 thread = null;
234 return true;
235 }
236 return false;
237 }
238 public boolean block() {
239 if (isReleasable())
240 return true;
241 else if (deadline == 0L)
242 LockSupport.park(this);
243 else if (nanos > 0L)
244 LockSupport.parkNanos(this, nanos);
245 return isReleasable();
246 }
247 }
248
249 /**
250 * Returns raw result after waiting, or null if interruptible and
251 * interrupted.
252 */
253 private Object waitingGet(boolean interruptible) {
254 WaitNode q = null;
255 boolean queued = false;
256 int spins = SPINS;
257 for (Object r;;) {
258 if ((r = result) != null) {
259 if (q != null) { // suppress unpark
260 q.thread = null;
261 if (q.interruptControl < 0) {
262 if (interruptible) {
263 removeWaiter(q);
264 return null;
265 }
266 Thread.currentThread().interrupt();
267 }
268 }
269 postComplete(); // help release others
270 return r;
271 }
272 else if (spins > 0) {
273 int rnd = ThreadLocalRandom.nextSecondarySeed();
274 if (rnd == 0)
275 rnd = ThreadLocalRandom.current().nextInt();
276 if (rnd >= 0)
277 --spins;
278 }
279 else if (q == null)
280 q = new WaitNode(interruptible, 0L, 0L);
281 else if (!queued)
282 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
283 q.next = waiters, q);
284 else if (interruptible && q.interruptControl < 0) {
285 removeWaiter(q);
286 return null;
287 }
288 else if (q.thread != null && result == null) {
289 try {
290 ForkJoinPool.managedBlock(q);
291 } catch (InterruptedException ex) {
292 q.interruptControl = -1;
293 }
294 }
295 }
296 }
297
298 /**
299 * Awaits completion or aborts on interrupt or timeout.
300 *
301 * @param nanos time to wait
302 * @return raw result
303 */
304 private Object timedAwaitDone(long nanos)
305 throws InterruptedException, TimeoutException {
306 WaitNode q = null;
307 boolean queued = false;
308 for (Object r;;) {
309 if ((r = result) != null) {
310 if (q != null) {
311 q.thread = null;
312 if (q.interruptControl < 0) {
313 removeWaiter(q);
314 throw new InterruptedException();
315 }
316 }
317 postComplete();
318 return r;
319 }
320 else if (q == null) {
321 if (nanos <= 0L)
322 throw new TimeoutException();
323 long d = System.nanoTime() + nanos;
324 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
325 }
326 else if (!queued)
327 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
328 q.next = waiters, q);
329 else if (q.interruptControl < 0) {
330 removeWaiter(q);
331 throw new InterruptedException();
332 }
333 else if (q.nanos <= 0L) {
334 if (result == null) {
335 removeWaiter(q);
336 throw new TimeoutException();
337 }
338 }
339 else if (q.thread != null && result == null) {
340 try {
341 ForkJoinPool.managedBlock(q);
342 } catch (InterruptedException ex) {
343 q.interruptControl = -1;
344 }
345 }
346 }
347 }
348
349 /**
350 * Tries to unlink a timed-out or interrupted wait node to avoid
351 * accumulating garbage. Internal nodes are simply unspliced
352 * without CAS since it is harmless if they are traversed anyway
353 * by releasers. To avoid effects of unsplicing from already
354 * removed nodes, the list is retraversed in case of an apparent
355 * race. This is slow when there are a lot of nodes, but we don't
356 * expect lists to be long enough to outweigh higher-overhead
357 * schemes.
358 */
359 private void removeWaiter(WaitNode node) {
360 if (node != null) {
361 node.thread = null;
362 retry:
363 for (;;) { // restart on removeWaiter race
364 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
365 s = q.next;
366 if (q.thread != null)
367 pred = q;
368 else if (pred != null) {
369 pred.next = s;
370 if (pred.thread == null) // check for race
371 continue retry;
372 }
373 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
374 continue retry;
375 }
376 break;
377 }
378 }
379 }
380
381 /* ------------- Async tasks -------------- */
382
383 /**
384 * A marker interface identifying asynchronous tasks produced by
385 * {@code async} methods. This may be useful for monitoring,
386 * debugging, and tracking asynchronous activities.
387 *
388 * @since 1.8
389 */
390 public static interface AsynchronousCompletionTask {
391 }
392
393 /** Base class can act as either FJ or plain Runnable */
394 abstract static class Async extends ForkJoinTask<Void>
395 implements Runnable, AsynchronousCompletionTask {
396 public final Void getRawResult() { return null; }
397 public final void setRawResult(Void v) { }
398 public final void run() { exec(); }
399 }
400
401 /**
402 * Starts the given async task using the given executor, unless
403 * the executor is ForkJoinPool.commonPool and it has been
404 * disabled, in which case starts a new thread.
405 */
406 static void execAsync(Executor e, Async r) {
407 if (e == ForkJoinPool.commonPool() &&
408 ForkJoinPool.getCommonPoolParallelism() <= 1)
409 new Thread(r).start();
410 else
411 e.execute(r);
412 }
413
414 static final class AsyncRun extends Async {
415 final Runnable fn;
416 final CompletableFuture<Void> dst;
417 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
418 this.fn = fn; this.dst = dst;
419 }
420 public final boolean exec() {
421 CompletableFuture<Void> d; Throwable ex;
422 if ((d = this.dst) != null && d.result == null) {
423 try {
424 fn.run();
425 ex = null;
426 } catch (Throwable rex) {
427 ex = rex;
428 }
429 d.internalComplete(null, ex);
430 }
431 return true;
432 }
433 private static final long serialVersionUID = 5232453952276885070L;
434 }
435
436 static final class AsyncSupply<U> extends Async {
437 final Supplier<U> fn;
438 final CompletableFuture<U> dst;
439 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
440 this.fn = fn; this.dst = dst;
441 }
442 public final boolean exec() {
443 CompletableFuture<U> d; U u; Throwable ex;
444 if ((d = this.dst) != null && d.result == null) {
445 try {
446 u = fn.get();
447 ex = null;
448 } catch (Throwable rex) {
449 ex = rex;
450 u = null;
451 }
452 d.internalComplete(u, ex);
453 }
454 return true;
455 }
456 private static final long serialVersionUID = 5232453952276885070L;
457 }
458
459 static final class AsyncApply<T,U> extends Async {
460 final T arg;
461 final Function<? super T,? extends U> fn;
462 final CompletableFuture<U> dst;
463 AsyncApply(T arg, Function<? super T,? extends U> fn,
464 CompletableFuture<U> dst) {
465 this.arg = arg; this.fn = fn; this.dst = dst;
466 }
467 public final boolean exec() {
468 CompletableFuture<U> d; U u; Throwable ex;
469 if ((d = this.dst) != null && d.result == null) {
470 try {
471 u = fn.apply(arg);
472 ex = null;
473 } catch (Throwable rex) {
474 ex = rex;
475 u = null;
476 }
477 d.internalComplete(u, ex);
478 }
479 return true;
480 }
481 private static final long serialVersionUID = 5232453952276885070L;
482 }
483
484 static final class AsyncCombine<T,U,V> extends Async {
485 final T arg1;
486 final U arg2;
487 final BiFunction<? super T,? super U,? extends V> fn;
488 final CompletableFuture<V> dst;
489 AsyncCombine(T arg1, U arg2,
490 BiFunction<? super T,? super U,? extends V> fn,
491 CompletableFuture<V> dst) {
492 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
493 }
494 public final boolean exec() {
495 CompletableFuture<V> d; V v; Throwable ex;
496 if ((d = this.dst) != null && d.result == null) {
497 try {
498 v = fn.apply(arg1, arg2);
499 ex = null;
500 } catch (Throwable rex) {
501 ex = rex;
502 v = null;
503 }
504 d.internalComplete(v, ex);
505 }
506 return true;
507 }
508 private static final long serialVersionUID = 5232453952276885070L;
509 }
510
511 static final class AsyncAccept<T> extends Async {
512 final T arg;
513 final Consumer<? super T> fn;
514 final CompletableFuture<?> dst;
515 AsyncAccept(T arg, Consumer<? super T> fn,
516 CompletableFuture<?> dst) {
517 this.arg = arg; this.fn = fn; this.dst = dst;
518 }
519 public final boolean exec() {
520 CompletableFuture<?> d; Throwable ex;
521 if ((d = this.dst) != null && d.result == null) {
522 try {
523 fn.accept(arg);
524 ex = null;
525 } catch (Throwable rex) {
526 ex = rex;
527 }
528 d.internalComplete(null, ex);
529 }
530 return true;
531 }
532 private static final long serialVersionUID = 5232453952276885070L;
533 }
534
535 static final class AsyncAcceptBoth<T,U> extends Async {
536 final T arg1;
537 final U arg2;
538 final BiConsumer<? super T,? super U> fn;
539 final CompletableFuture<?> dst;
540 AsyncAcceptBoth(T arg1, U arg2,
541 BiConsumer<? super T,? super U> fn,
542 CompletableFuture<?> dst) {
543 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
544 }
545 public final boolean exec() {
546 CompletableFuture<?> d; Throwable ex;
547 if ((d = this.dst) != null && d.result == null) {
548 try {
549 fn.accept(arg1, arg2);
550 ex = null;
551 } catch (Throwable rex) {
552 ex = rex;
553 }
554 d.internalComplete(null, ex);
555 }
556 return true;
557 }
558 private static final long serialVersionUID = 5232453952276885070L;
559 }
560
561 static final class AsyncCompose<T,U> extends Async {
562 final T arg;
563 final Function<? super T, ? extends CompletionStage<U>> fn;
564 final CompletableFuture<U> dst;
565 AsyncCompose(T arg,
566 Function<? super T, ? extends CompletionStage<U>> fn,
567 CompletableFuture<U> dst) {
568 this.arg = arg; this.fn = fn; this.dst = dst;
569 }
570 public final boolean exec() {
571 CompletableFuture<U> d, fr; U u; Throwable ex;
572 if ((d = this.dst) != null && d.result == null) {
573 try {
574 CompletionStage<U> cs = fn.apply(arg);
575 fr = (cs == null) ? null : cs.toCompletableFuture();
576 ex = (fr == null) ? new NullPointerException() : null;
577 } catch (Throwable rex) {
578 ex = rex;
579 fr = null;
580 }
581 if (ex != null)
582 u = null;
583 else {
584 Object r = fr.result;
585 if (r == null)
586 r = fr.waitingGet(false);
587 if (r instanceof AltResult) {
588 ex = ((AltResult)r).ex;
589 u = null;
590 }
591 else {
592 @SuppressWarnings("unchecked") U ur = (U) r;
593 u = ur;
594 }
595 }
596 d.internalComplete(u, ex);
597 }
598 return true;
599 }
600 private static final long serialVersionUID = 5232453952276885070L;
601 }
602
603 static final class AsyncWhenComplete<T> extends Async {
604 final T arg1;
605 final Throwable arg2;
606 final BiConsumer<? super T,? super Throwable> fn;
607 final CompletableFuture<T> dst;
608 AsyncWhenComplete(T arg1, Throwable arg2,
609 BiConsumer<? super T,? super Throwable> fn,
610 CompletableFuture<T> dst) {
611 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
612 }
613 public final boolean exec() {
614 CompletableFuture<T> d;
615 if ((d = this.dst) != null && d.result == null) {
616 Throwable ex = arg2;
617 try {
618 fn.accept(arg1, ex);
619 } catch (Throwable rex) {
620 if (ex == null)
621 ex = rex;
622 }
623 d.internalComplete(arg1, ex);
624 }
625 return true;
626 }
627 private static final long serialVersionUID = 5232453952276885070L;
628 }
629
630 /* ------------- Completions -------------- */
631
632 /**
633 * Simple linked list nodes to record completions, used in
634 * basically the same way as WaitNodes. (We separate nodes from
635 * the Completions themselves mainly because for the And and Or
636 * methods, the same Completion object resides in two lists.)
637 */
638 static final class CompletionNode {
639 final Completion completion;
640 volatile CompletionNode next;
641 CompletionNode(Completion completion) { this.completion = completion; }
642 }
643
644 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
645 abstract static class Completion extends AtomicInteger implements Runnable {
646 }
647
648 static final class ThenApply<T,U> extends Completion {
649 final CompletableFuture<? extends T> src;
650 final Function<? super T,? extends U> fn;
651 final CompletableFuture<U> dst;
652 final Executor executor;
653 ThenApply(CompletableFuture<? extends T> src,
654 Function<? super T,? extends U> fn,
655 CompletableFuture<U> dst,
656 Executor executor) {
657 this.src = src; this.fn = fn; this.dst = dst;
658 this.executor = executor;
659 }
660 public final void run() {
661 final CompletableFuture<? extends T> a;
662 final Function<? super T,? extends U> fn;
663 final CompletableFuture<U> dst;
664 Object r; T t; Throwable ex;
665 if ((dst = this.dst) != null &&
666 (fn = this.fn) != null &&
667 (a = this.src) != null &&
668 (r = a.result) != null &&
669 compareAndSet(0, 1)) {
670 if (r instanceof AltResult) {
671 ex = ((AltResult)r).ex;
672 t = null;
673 }
674 else {
675 ex = null;
676 @SuppressWarnings("unchecked") T tr = (T) r;
677 t = tr;
678 }
679 Executor e = executor;
680 U u = null;
681 if (ex == null) {
682 try {
683 if (e != null)
684 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
685 else
686 u = fn.apply(t);
687 } catch (Throwable rex) {
688 ex = rex;
689 }
690 }
691 if (e == null || ex != null)
692 dst.internalComplete(u, ex);
693 }
694 }
695 private static final long serialVersionUID = 5232453952276885070L;
696 }
697
698 static final class ThenAccept<T> extends Completion {
699 final CompletableFuture<? extends T> src;
700 final Consumer<? super T> fn;
701 final CompletableFuture<?> dst;
702 final Executor executor;
703 ThenAccept(CompletableFuture<? extends T> src,
704 Consumer<? super T> fn,
705 CompletableFuture<?> dst,
706 Executor executor) {
707 this.src = src; this.fn = fn; this.dst = dst;
708 this.executor = executor;
709 }
710 public final void run() {
711 final CompletableFuture<? extends T> a;
712 final Consumer<? super T> fn;
713 final CompletableFuture<?> dst;
714 Object r; T t; Throwable ex;
715 if ((dst = this.dst) != null &&
716 (fn = this.fn) != null &&
717 (a = this.src) != null &&
718 (r = a.result) != null &&
719 compareAndSet(0, 1)) {
720 if (r instanceof AltResult) {
721 ex = ((AltResult)r).ex;
722 t = null;
723 }
724 else {
725 ex = null;
726 @SuppressWarnings("unchecked") T tr = (T) r;
727 t = tr;
728 }
729 Executor e = executor;
730 if (ex == null) {
731 try {
732 if (e != null)
733 execAsync(e, new AsyncAccept<T>(t, fn, dst));
734 else
735 fn.accept(t);
736 } catch (Throwable rex) {
737 ex = rex;
738 }
739 }
740 if (e == null || ex != null)
741 dst.internalComplete(null, ex);
742 }
743 }
744 private static final long serialVersionUID = 5232453952276885070L;
745 }
746
747 static final class ThenRun extends Completion {
748 final CompletableFuture<?> src;
749 final Runnable fn;
750 final CompletableFuture<Void> dst;
751 final Executor executor;
752 ThenRun(CompletableFuture<?> src,
753 Runnable fn,
754 CompletableFuture<Void> dst,
755 Executor executor) {
756 this.src = src; this.fn = fn; this.dst = dst;
757 this.executor = executor;
758 }
759 public final void run() {
760 final CompletableFuture<?> a;
761 final Runnable fn;
762 final CompletableFuture<Void> dst;
763 Object r; Throwable ex;
764 if ((dst = this.dst) != null &&
765 (fn = this.fn) != null &&
766 (a = this.src) != null &&
767 (r = a.result) != null &&
768 compareAndSet(0, 1)) {
769 if (r instanceof AltResult)
770 ex = ((AltResult)r).ex;
771 else
772 ex = null;
773 Executor e = executor;
774 if (ex == null) {
775 try {
776 if (e != null)
777 execAsync(e, new AsyncRun(fn, dst));
778 else
779 fn.run();
780 } catch (Throwable rex) {
781 ex = rex;
782 }
783 }
784 if (e == null || ex != null)
785 dst.internalComplete(null, ex);
786 }
787 }
788 private static final long serialVersionUID = 5232453952276885070L;
789 }
790
791 static final class ThenCombine<T,U,V> extends Completion {
792 final CompletableFuture<? extends T> src;
793 final CompletableFuture<? extends U> snd;
794 final BiFunction<? super T,? super U,? extends V> fn;
795 final CompletableFuture<V> dst;
796 final Executor executor;
797 ThenCombine(CompletableFuture<? extends T> src,
798 CompletableFuture<? extends U> snd,
799 BiFunction<? super T,? super U,? extends V> fn,
800 CompletableFuture<V> dst,
801 Executor executor) {
802 this.src = src; this.snd = snd;
803 this.fn = fn; this.dst = dst;
804 this.executor = executor;
805 }
806 public final void run() {
807 final CompletableFuture<? extends T> a;
808 final CompletableFuture<? extends U> b;
809 final BiFunction<? super T,? super U,? extends V> fn;
810 final CompletableFuture<V> dst;
811 Object r, s; T t; U u; Throwable ex;
812 if ((dst = this.dst) != null &&
813 (fn = this.fn) != null &&
814 (a = this.src) != null &&
815 (r = a.result) != null &&
816 (b = this.snd) != null &&
817 (s = b.result) != null &&
818 compareAndSet(0, 1)) {
819 if (r instanceof AltResult) {
820 ex = ((AltResult)r).ex;
821 t = null;
822 }
823 else {
824 ex = null;
825 @SuppressWarnings("unchecked") T tr = (T) r;
826 t = tr;
827 }
828 if (ex != null)
829 u = null;
830 else if (s instanceof AltResult) {
831 ex = ((AltResult)s).ex;
832 u = null;
833 }
834 else {
835 @SuppressWarnings("unchecked") U us = (U) s;
836 u = us;
837 }
838 Executor e = executor;
839 V v = null;
840 if (ex == null) {
841 try {
842 if (e != null)
843 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
844 else
845 v = fn.apply(t, u);
846 } catch (Throwable rex) {
847 ex = rex;
848 }
849 }
850 if (e == null || ex != null)
851 dst.internalComplete(v, ex);
852 }
853 }
854 private static final long serialVersionUID = 5232453952276885070L;
855 }
856
857 static final class ThenAcceptBoth<T,U> extends Completion {
858 final CompletableFuture<? extends T> src;
859 final CompletableFuture<? extends U> snd;
860 final BiConsumer<? super T,? super U> fn;
861 final CompletableFuture<Void> dst;
862 final Executor executor;
863 ThenAcceptBoth(CompletableFuture<? extends T> src,
864 CompletableFuture<? extends U> snd,
865 BiConsumer<? super T,? super U> fn,
866 CompletableFuture<Void> dst,
867 Executor executor) {
868 this.src = src; this.snd = snd;
869 this.fn = fn; this.dst = dst;
870 this.executor = executor;
871 }
872 public final void run() {
873 final CompletableFuture<? extends T> a;
874 final CompletableFuture<? extends U> b;
875 final BiConsumer<? super T,? super U> fn;
876 final CompletableFuture<Void> dst;
877 Object r, s; T t; U u; Throwable ex;
878 if ((dst = this.dst) != null &&
879 (fn = this.fn) != null &&
880 (a = this.src) != null &&
881 (r = a.result) != null &&
882 (b = this.snd) != null &&
883 (s = b.result) != null &&
884 compareAndSet(0, 1)) {
885 if (r instanceof AltResult) {
886 ex = ((AltResult)r).ex;
887 t = null;
888 }
889 else {
890 ex = null;
891 @SuppressWarnings("unchecked") T tr = (T) r;
892 t = tr;
893 }
894 if (ex != null)
895 u = null;
896 else if (s instanceof AltResult) {
897 ex = ((AltResult)s).ex;
898 u = null;
899 }
900 else {
901 @SuppressWarnings("unchecked") U us = (U) s;
902 u = us;
903 }
904 Executor e = executor;
905 if (ex == null) {
906 try {
907 if (e != null)
908 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
909 else
910 fn.accept(t, u);
911 } catch (Throwable rex) {
912 ex = rex;
913 }
914 }
915 if (e == null || ex != null)
916 dst.internalComplete(null, ex);
917 }
918 }
919 private static final long serialVersionUID = 5232453952276885070L;
920 }
921
922 static final class RunAfterBoth extends Completion {
923 final CompletableFuture<?> src;
924 final CompletableFuture<?> snd;
925 final Runnable fn;
926 final CompletableFuture<Void> dst;
927 final Executor executor;
928 RunAfterBoth(CompletableFuture<?> src,
929 CompletableFuture<?> snd,
930 Runnable fn,
931 CompletableFuture<Void> dst,
932 Executor executor) {
933 this.src = src; this.snd = snd;
934 this.fn = fn; this.dst = dst;
935 this.executor = executor;
936 }
937 public final void run() {
938 final CompletableFuture<?> a;
939 final CompletableFuture<?> b;
940 final Runnable fn;
941 final CompletableFuture<Void> dst;
942 Object r, s; Throwable ex;
943 if ((dst = this.dst) != null &&
944 (fn = this.fn) != null &&
945 (a = this.src) != null &&
946 (r = a.result) != null &&
947 (b = this.snd) != null &&
948 (s = b.result) != null &&
949 compareAndSet(0, 1)) {
950 if (r instanceof AltResult)
951 ex = ((AltResult)r).ex;
952 else
953 ex = null;
954 if (ex == null && (s instanceof AltResult))
955 ex = ((AltResult)s).ex;
956 Executor e = executor;
957 if (ex == null) {
958 try {
959 if (e != null)
960 execAsync(e, new AsyncRun(fn, dst));
961 else
962 fn.run();
963 } catch (Throwable rex) {
964 ex = rex;
965 }
966 }
967 if (e == null || ex != null)
968 dst.internalComplete(null, ex);
969 }
970 }
971 private static final long serialVersionUID = 5232453952276885070L;
972 }
973
974 static final class AndCompletion extends Completion {
975 final CompletableFuture<?> src;
976 final CompletableFuture<?> snd;
977 final CompletableFuture<Void> dst;
978 AndCompletion(CompletableFuture<?> src,
979 CompletableFuture<?> snd,
980 CompletableFuture<Void> dst) {
981 this.src = src; this.snd = snd; this.dst = dst;
982 }
983 public final void run() {
984 final CompletableFuture<?> a;
985 final CompletableFuture<?> b;
986 final CompletableFuture<Void> dst;
987 Object r, s; Throwable ex;
988 if ((dst = this.dst) != null &&
989 (a = this.src) != null &&
990 (r = a.result) != null &&
991 (b = this.snd) != null &&
992 (s = b.result) != null &&
993 compareAndSet(0, 1)) {
994 if (r instanceof AltResult)
995 ex = ((AltResult)r).ex;
996 else
997 ex = null;
998 if (ex == null && (s instanceof AltResult))
999 ex = ((AltResult)s).ex;
1000 dst.internalComplete(null, ex);
1001 }
1002 }
1003 private static final long serialVersionUID = 5232453952276885070L;
1004 }
1005
1006 static final class ApplyToEither<T,U> extends Completion {
1007 final CompletableFuture<? extends T> src;
1008 final CompletableFuture<? extends T> snd;
1009 final Function<? super T,? extends U> fn;
1010 final CompletableFuture<U> dst;
1011 final Executor executor;
1012 ApplyToEither(CompletableFuture<? extends T> src,
1013 CompletableFuture<? extends T> snd,
1014 Function<? super T,? extends U> fn,
1015 CompletableFuture<U> dst,
1016 Executor executor) {
1017 this.src = src; this.snd = snd;
1018 this.fn = fn; this.dst = dst;
1019 this.executor = executor;
1020 }
1021 public final void run() {
1022 final CompletableFuture<? extends T> a;
1023 final CompletableFuture<? extends T> b;
1024 final Function<? super T,? extends U> fn;
1025 final CompletableFuture<U> dst;
1026 Object r; T t; Throwable ex;
1027 if ((dst = this.dst) != null &&
1028 (fn = this.fn) != null &&
1029 (((a = this.src) != null && (r = a.result) != null) ||
1030 ((b = this.snd) != null && (r = b.result) != null)) &&
1031 compareAndSet(0, 1)) {
1032 if (r instanceof AltResult) {
1033 ex = ((AltResult)r).ex;
1034 t = null;
1035 }
1036 else {
1037 ex = null;
1038 @SuppressWarnings("unchecked") T tr = (T) r;
1039 t = tr;
1040 }
1041 Executor e = executor;
1042 U u = null;
1043 if (ex == null) {
1044 try {
1045 if (e != null)
1046 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1047 else
1048 u = fn.apply(t);
1049 } catch (Throwable rex) {
1050 ex = rex;
1051 }
1052 }
1053 if (e == null || ex != null)
1054 dst.internalComplete(u, ex);
1055 }
1056 }
1057 private static final long serialVersionUID = 5232453952276885070L;
1058 }
1059
1060 static final class AcceptEither<T> extends Completion {
1061 final CompletableFuture<? extends T> src;
1062 final CompletableFuture<? extends T> snd;
1063 final Consumer<? super T> fn;
1064 final CompletableFuture<Void> dst;
1065 final Executor executor;
1066 AcceptEither(CompletableFuture<? extends T> src,
1067 CompletableFuture<? extends T> snd,
1068 Consumer<? super T> fn,
1069 CompletableFuture<Void> dst,
1070 Executor executor) {
1071 this.src = src; this.snd = snd;
1072 this.fn = fn; this.dst = dst;
1073 this.executor = executor;
1074 }
1075 public final void run() {
1076 final CompletableFuture<? extends T> a;
1077 final CompletableFuture<? extends T> b;
1078 final Consumer<? super T> fn;
1079 final CompletableFuture<Void> dst;
1080 Object r; T t; Throwable ex;
1081 if ((dst = this.dst) != null &&
1082 (fn = this.fn) != null &&
1083 (((a = this.src) != null && (r = a.result) != null) ||
1084 ((b = this.snd) != null && (r = b.result) != null)) &&
1085 compareAndSet(0, 1)) {
1086 if (r instanceof AltResult) {
1087 ex = ((AltResult)r).ex;
1088 t = null;
1089 }
1090 else {
1091 ex = null;
1092 @SuppressWarnings("unchecked") T tr = (T) r;
1093 t = tr;
1094 }
1095 Executor e = executor;
1096 if (ex == null) {
1097 try {
1098 if (e != null)
1099 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1100 else
1101 fn.accept(t);
1102 } catch (Throwable rex) {
1103 ex = rex;
1104 }
1105 }
1106 if (e == null || ex != null)
1107 dst.internalComplete(null, ex);
1108 }
1109 }
1110 private static final long serialVersionUID = 5232453952276885070L;
1111 }
1112
1113 static final class RunAfterEither extends Completion {
1114 final CompletableFuture<?> src;
1115 final CompletableFuture<?> snd;
1116 final Runnable fn;
1117 final CompletableFuture<Void> dst;
1118 final Executor executor;
1119 RunAfterEither(CompletableFuture<?> src,
1120 CompletableFuture<?> snd,
1121 Runnable fn,
1122 CompletableFuture<Void> dst,
1123 Executor executor) {
1124 this.src = src; this.snd = snd;
1125 this.fn = fn; this.dst = dst;
1126 this.executor = executor;
1127 }
1128 public final void run() {
1129 final CompletableFuture<?> a;
1130 final CompletableFuture<?> b;
1131 final Runnable fn;
1132 final CompletableFuture<Void> dst;
1133 Object r; Throwable ex;
1134 if ((dst = this.dst) != null &&
1135 (fn = this.fn) != null &&
1136 (((a = this.src) != null && (r = a.result) != null) ||
1137 ((b = this.snd) != null && (r = b.result) != null)) &&
1138 compareAndSet(0, 1)) {
1139 if (r instanceof AltResult)
1140 ex = ((AltResult)r).ex;
1141 else
1142 ex = null;
1143 Executor e = executor;
1144 if (ex == null) {
1145 try {
1146 if (e != null)
1147 execAsync(e, new AsyncRun(fn, dst));
1148 else
1149 fn.run();
1150 } catch (Throwable rex) {
1151 ex = rex;
1152 }
1153 }
1154 if (e == null || ex != null)
1155 dst.internalComplete(null, ex);
1156 }
1157 }
1158 private static final long serialVersionUID = 5232453952276885070L;
1159 }
1160
1161 static final class OrCompletion extends Completion {
1162 final CompletableFuture<?> src;
1163 final CompletableFuture<?> snd;
1164 final CompletableFuture<Object> dst;
1165 OrCompletion(CompletableFuture<?> src,
1166 CompletableFuture<?> snd,
1167 CompletableFuture<Object> dst) {
1168 this.src = src; this.snd = snd; this.dst = dst;
1169 }
1170 public final void run() {
1171 final CompletableFuture<?> a;
1172 final CompletableFuture<?> b;
1173 final CompletableFuture<Object> dst;
1174 Object r, t; Throwable ex;
1175 if ((dst = this.dst) != null &&
1176 (((a = this.src) != null && (r = a.result) != null) ||
1177 ((b = this.snd) != null && (r = b.result) != null)) &&
1178 compareAndSet(0, 1)) {
1179 if (r instanceof AltResult) {
1180 ex = ((AltResult)r).ex;
1181 t = null;
1182 }
1183 else {
1184 ex = null;
1185 t = r;
1186 }
1187 dst.internalComplete(t, ex);
1188 }
1189 }
1190 private static final long serialVersionUID = 5232453952276885070L;
1191 }
1192
1193 static final class ExceptionCompletion<T> extends Completion {
1194 final CompletableFuture<? extends T> src;
1195 final Function<? super Throwable, ? extends T> fn;
1196 final CompletableFuture<T> dst;
1197 ExceptionCompletion(CompletableFuture<? extends T> src,
1198 Function<? super Throwable, ? extends T> fn,
1199 CompletableFuture<T> dst) {
1200 this.src = src; this.fn = fn; this.dst = dst;
1201 }
1202 public final void run() {
1203 final CompletableFuture<? extends T> a;
1204 final Function<? super Throwable, ? extends T> fn;
1205 final CompletableFuture<T> dst;
1206 Object r; T t = null; Throwable ex, dx = null;
1207 if ((dst = this.dst) != null &&
1208 (fn = this.fn) != null &&
1209 (a = this.src) != null &&
1210 (r = a.result) != null &&
1211 compareAndSet(0, 1)) {
1212 if ((r instanceof AltResult) &&
1213 (ex = ((AltResult)r).ex) != null) {
1214 try {
1215 t = fn.apply(ex);
1216 } catch (Throwable rex) {
1217 dx = rex;
1218 }
1219 }
1220 else {
1221 @SuppressWarnings("unchecked") T tr = (T) r;
1222 t = tr;
1223 }
1224 dst.internalComplete(t, dx);
1225 }
1226 }
1227 private static final long serialVersionUID = 5232453952276885070L;
1228 }
1229
1230 static final class WhenCompleteCompletion<T> extends Completion {
1231 final CompletableFuture<? extends T> src;
1232 final BiConsumer<? super T, ? super Throwable> fn;
1233 final CompletableFuture<T> dst;
1234 final Executor executor;
1235 WhenCompleteCompletion(CompletableFuture<? extends T> src,
1236 BiConsumer<? super T, ? super Throwable> fn,
1237 CompletableFuture<T> dst,
1238 Executor executor) {
1239 this.src = src; this.fn = fn; this.dst = dst;
1240 this.executor = executor;
1241 }
1242 public final void run() {
1243 final CompletableFuture<? extends T> a;
1244 final BiConsumer<? super T, ? super Throwable> fn;
1245 final CompletableFuture<T> dst;
1246 Object r; T t; Throwable ex;
1247 if ((dst = this.dst) != null &&
1248 (fn = this.fn) != null &&
1249 (a = this.src) != null &&
1250 (r = a.result) != null &&
1251 compareAndSet(0, 1)) {
1252 if (r instanceof AltResult) {
1253 ex = ((AltResult)r).ex;
1254 t = null;
1255 }
1256 else {
1257 ex = null;
1258 @SuppressWarnings("unchecked") T tr = (T) r;
1259 t = tr;
1260 }
1261 Executor e = executor;
1262 Throwable dx = null;
1263 try {
1264 if (e != null)
1265 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
1266 else
1267 fn.accept(t, ex);
1268 } catch (Throwable rex) {
1269 dx = rex;
1270 }
1271 if (e == null || dx != null)
1272 dst.internalComplete(t, ex != null ? ex : dx);
1273 }
1274 }
1275 private static final long serialVersionUID = 5232453952276885070L;
1276 }
1277
1278 static final class ThenCopy<T> extends Completion {
1279 final CompletableFuture<?> src;
1280 final CompletableFuture<T> dst;
1281 ThenCopy(CompletableFuture<?> src,
1282 CompletableFuture<T> dst) {
1283 this.src = src; this.dst = dst;
1284 }
1285 public final void run() {
1286 final CompletableFuture<?> a;
1287 final CompletableFuture<T> dst;
1288 Object r; T t; Throwable ex;
1289 if ((dst = this.dst) != null &&
1290 (a = this.src) != null &&
1291 (r = a.result) != null &&
1292 compareAndSet(0, 1)) {
1293 if (r instanceof AltResult) {
1294 ex = ((AltResult)r).ex;
1295 t = null;
1296 }
1297 else {
1298 ex = null;
1299 @SuppressWarnings("unchecked") T tr = (T) r;
1300 t = tr;
1301 }
1302 dst.internalComplete(t, ex);
1303 }
1304 }
1305 private static final long serialVersionUID = 5232453952276885070L;
1306 }
1307
1308 // version of ThenCopy for CompletableFuture<Void> dst
1309 static final class ThenPropagate extends Completion {
1310 final CompletableFuture<?> src;
1311 final CompletableFuture<Void> dst;
1312 ThenPropagate(CompletableFuture<?> src,
1313 CompletableFuture<Void> dst) {
1314 this.src = src; this.dst = dst;
1315 }
1316 public final void run() {
1317 final CompletableFuture<?> a;
1318 final CompletableFuture<Void> dst;
1319 Object r; Throwable ex;
1320 if ((dst = this.dst) != null &&
1321 (a = this.src) != null &&
1322 (r = a.result) != null &&
1323 compareAndSet(0, 1)) {
1324 if (r instanceof AltResult)
1325 ex = ((AltResult)r).ex;
1326 else
1327 ex = null;
1328 dst.internalComplete(null, ex);
1329 }
1330 }
1331 private static final long serialVersionUID = 5232453952276885070L;
1332 }
1333
1334 static final class HandleCompletion<T,U> extends Completion {
1335 final CompletableFuture<? extends T> src;
1336 final BiFunction<? super T, Throwable, ? extends U> fn;
1337 final CompletableFuture<U> dst;
1338 final Executor executor;
1339 HandleCompletion(CompletableFuture<? extends T> src,
1340 BiFunction<? super T, Throwable, ? extends U> fn,
1341 CompletableFuture<U> dst,
1342 Executor executor) {
1343 this.src = src; this.fn = fn; this.dst = dst;
1344 this.executor = executor;
1345 }
1346 public final void run() {
1347 final CompletableFuture<? extends T> a;
1348 final BiFunction<? super T, Throwable, ? extends U> fn;
1349 final CompletableFuture<U> dst;
1350 Object r; T t; Throwable ex;
1351 if ((dst = this.dst) != null &&
1352 (fn = this.fn) != null &&
1353 (a = this.src) != null &&
1354 (r = a.result) != null &&
1355 compareAndSet(0, 1)) {
1356 if (r instanceof AltResult) {
1357 ex = ((AltResult)r).ex;
1358 t = null;
1359 }
1360 else {
1361 ex = null;
1362 @SuppressWarnings("unchecked") T tr = (T) r;
1363 t = tr;
1364 }
1365 Executor e = executor;
1366 U u = null;
1367 Throwable dx = null;
1368 try {
1369 if (e != null)
1370 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1371 else
1372 u = fn.apply(t, ex);
1373 } catch (Throwable rex) {
1374 dx = rex;
1375 }
1376 if (e == null || dx != null)
1377 dst.internalComplete(u, dx);
1378 }
1379 }
1380 private static final long serialVersionUID = 5232453952276885070L;
1381 }
1382
1383 static final class ThenCompose<T,U> extends Completion {
1384 final CompletableFuture<? extends T> src;
1385 final Function<? super T, ? extends CompletionStage<U>> fn;
1386 final CompletableFuture<U> dst;
1387 final Executor executor;
1388 ThenCompose(CompletableFuture<? extends T> src,
1389 Function<? super T, ? extends CompletionStage<U>> fn,
1390 CompletableFuture<U> dst,
1391 Executor executor) {
1392 this.src = src; this.fn = fn; this.dst = dst;
1393 this.executor = executor;
1394 }
1395 public final void run() {
1396 final CompletableFuture<? extends T> a;
1397 final Function<? super T, ? extends CompletionStage<U>> fn;
1398 final CompletableFuture<U> dst;
1399 Object r; T t; Throwable ex; Executor e;
1400 if ((dst = this.dst) != null &&
1401 (fn = this.fn) != null &&
1402 (a = this.src) != null &&
1403 (r = a.result) != null &&
1404 compareAndSet(0, 1)) {
1405 if (r instanceof AltResult) {
1406 ex = ((AltResult)r).ex;
1407 t = null;
1408 }
1409 else {
1410 ex = null;
1411 @SuppressWarnings("unchecked") T tr = (T) r;
1412 t = tr;
1413 }
1414 CompletableFuture<U> c = null;
1415 U u = null;
1416 boolean complete = false;
1417 if (ex == null) {
1418 if ((e = executor) != null)
1419 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1420 else {
1421 try {
1422 CompletionStage<U> cs = fn.apply(t);
1423 c = (cs == null) ? null : cs.toCompletableFuture();
1424 if (c == null)
1425 ex = new NullPointerException();
1426 } catch (Throwable rex) {
1427 ex = rex;
1428 }
1429 }
1430 }
1431 if (c != null) {
1432 ThenCopy<U> d = null;
1433 Object s;
1434 if ((s = c.result) == null) {
1435 CompletionNode p = new CompletionNode
1436 (d = new ThenCopy<U>(c, dst));
1437 while ((s = c.result) == null) {
1438 if (UNSAFE.compareAndSwapObject
1439 (c, COMPLETIONS, p.next = c.completions, p))
1440 break;
1441 }
1442 }
1443 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1444 complete = true;
1445 if (s instanceof AltResult) {
1446 ex = ((AltResult)s).ex; // no rewrap
1447 u = null;
1448 }
1449 else {
1450 @SuppressWarnings("unchecked") U us = (U) s;
1451 u = us;
1452 }
1453 }
1454 }
1455 if (complete || ex != null)
1456 dst.internalComplete(u, ex);
1457 if (c != null)
1458 c.helpPostComplete();
1459 }
1460 }
1461 private static final long serialVersionUID = 5232453952276885070L;
1462 }
1463
1464 // Implementations of stage methods with (plain, async, Executor) forms
1465
1466 private <U> CompletableFuture<U> doThenApply
1467 (Function<? super T,? extends U> fn,
1468 Executor e) {
1469 if (fn == null) throw new NullPointerException();
1470 CompletableFuture<U> dst = new CompletableFuture<U>();
1471 ThenApply<T,U> d = null;
1472 Object r;
1473 if ((r = result) == null) {
1474 CompletionNode p = new CompletionNode
1475 (d = new ThenApply<T,U>(this, fn, dst, e));
1476 while ((r = result) == null) {
1477 if (UNSAFE.compareAndSwapObject
1478 (this, COMPLETIONS, p.next = completions, p))
1479 break;
1480 }
1481 }
1482 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1483 T t; Throwable ex;
1484 if (r instanceof AltResult) {
1485 ex = ((AltResult)r).ex;
1486 t = null;
1487 }
1488 else {
1489 ex = null;
1490 @SuppressWarnings("unchecked") T tr = (T) r;
1491 t = tr;
1492 }
1493 U u = null;
1494 if (ex == null) {
1495 try {
1496 if (e != null)
1497 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1498 else
1499 u = fn.apply(t);
1500 } catch (Throwable rex) {
1501 ex = rex;
1502 }
1503 }
1504 if (e == null || ex != null)
1505 dst.internalComplete(u, ex);
1506 }
1507 helpPostComplete();
1508 return dst;
1509 }
1510
1511 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1512 Executor e) {
1513 if (fn == null) throw new NullPointerException();
1514 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1515 ThenAccept<T> d = null;
1516 Object r;
1517 if ((r = result) == null) {
1518 CompletionNode p = new CompletionNode
1519 (d = new ThenAccept<T>(this, fn, dst, e));
1520 while ((r = result) == null) {
1521 if (UNSAFE.compareAndSwapObject
1522 (this, COMPLETIONS, p.next = completions, p))
1523 break;
1524 }
1525 }
1526 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1527 T t; Throwable ex;
1528 if (r instanceof AltResult) {
1529 ex = ((AltResult)r).ex;
1530 t = null;
1531 }
1532 else {
1533 ex = null;
1534 @SuppressWarnings("unchecked") T tr = (T) r;
1535 t = tr;
1536 }
1537 if (ex == null) {
1538 try {
1539 if (e != null)
1540 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1541 else
1542 fn.accept(t);
1543 } catch (Throwable rex) {
1544 ex = rex;
1545 }
1546 }
1547 if (e == null || ex != null)
1548 dst.internalComplete(null, ex);
1549 }
1550 helpPostComplete();
1551 return dst;
1552 }
1553
1554 private CompletableFuture<Void> doThenRun(Runnable action,
1555 Executor e) {
1556 if (action == null) throw new NullPointerException();
1557 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1558 ThenRun d = null;
1559 Object r;
1560 if ((r = result) == null) {
1561 CompletionNode p = new CompletionNode
1562 (d = new ThenRun(this, action, dst, e));
1563 while ((r = result) == null) {
1564 if (UNSAFE.compareAndSwapObject
1565 (this, COMPLETIONS, p.next = completions, p))
1566 break;
1567 }
1568 }
1569 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1570 Throwable ex;
1571 if (r instanceof AltResult)
1572 ex = ((AltResult)r).ex;
1573 else
1574 ex = null;
1575 if (ex == null) {
1576 try {
1577 if (e != null)
1578 execAsync(e, new AsyncRun(action, dst));
1579 else
1580 action.run();
1581 } catch (Throwable rex) {
1582 ex = rex;
1583 }
1584 }
1585 if (e == null || ex != null)
1586 dst.internalComplete(null, ex);
1587 }
1588 helpPostComplete();
1589 return dst;
1590 }
1591
1592 private <U,V> CompletableFuture<V> doThenCombine
1593 (CompletableFuture<? extends U> other,
1594 BiFunction<? super T,? super U,? extends V> fn,
1595 Executor e) {
1596 if (other == null || fn == null) throw new NullPointerException();
1597 CompletableFuture<V> dst = new CompletableFuture<V>();
1598 ThenCombine<T,U,V> d = null;
1599 Object r, s = null;
1600 if ((r = result) == null || (s = other.result) == null) {
1601 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
1602 CompletionNode q = null, p = new CompletionNode(d);
1603 while ((r == null && (r = result) == null) ||
1604 (s == null && (s = other.result) == null)) {
1605 if (q != null) {
1606 if (s != null ||
1607 UNSAFE.compareAndSwapObject
1608 (other, COMPLETIONS, q.next = other.completions, q))
1609 break;
1610 }
1611 else if (r != null ||
1612 UNSAFE.compareAndSwapObject
1613 (this, COMPLETIONS, p.next = completions, p)) {
1614 if (s != null)
1615 break;
1616 q = new CompletionNode(d);
1617 }
1618 }
1619 }
1620 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1621 T t; U u; Throwable ex;
1622 if (r instanceof AltResult) {
1623 ex = ((AltResult)r).ex;
1624 t = null;
1625 }
1626 else {
1627 ex = null;
1628 @SuppressWarnings("unchecked") T tr = (T) r;
1629 t = tr;
1630 }
1631 if (ex != null)
1632 u = null;
1633 else if (s instanceof AltResult) {
1634 ex = ((AltResult)s).ex;
1635 u = null;
1636 }
1637 else {
1638 @SuppressWarnings("unchecked") U us = (U) s;
1639 u = us;
1640 }
1641 V v = null;
1642 if (ex == null) {
1643 try {
1644 if (e != null)
1645 execAsync(e, new AsyncCombine<T,U,V>(t, u, fn, dst));
1646 else
1647 v = fn.apply(t, u);
1648 } catch (Throwable rex) {
1649 ex = rex;
1650 }
1651 }
1652 if (e == null || ex != null)
1653 dst.internalComplete(v, ex);
1654 }
1655 helpPostComplete();
1656 other.helpPostComplete();
1657 return dst;
1658 }
1659
1660 private <U> CompletableFuture<Void> doThenAcceptBoth
1661 (CompletableFuture<? extends U> other,
1662 BiConsumer<? super T,? super U> fn,
1663 Executor e) {
1664 if (other == null || fn == null) throw new NullPointerException();
1665 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1666 ThenAcceptBoth<T,U> d = null;
1667 Object r, s = null;
1668 if ((r = result) == null || (s = other.result) == null) {
1669 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1670 CompletionNode q = null, p = new CompletionNode(d);
1671 while ((r == null && (r = result) == null) ||
1672 (s == null && (s = other.result) == null)) {
1673 if (q != null) {
1674 if (s != null ||
1675 UNSAFE.compareAndSwapObject
1676 (other, COMPLETIONS, q.next = other.completions, q))
1677 break;
1678 }
1679 else if (r != null ||
1680 UNSAFE.compareAndSwapObject
1681 (this, COMPLETIONS, p.next = completions, p)) {
1682 if (s != null)
1683 break;
1684 q = new CompletionNode(d);
1685 }
1686 }
1687 }
1688 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1689 T t; U u; Throwable ex;
1690 if (r instanceof AltResult) {
1691 ex = ((AltResult)r).ex;
1692 t = null;
1693 }
1694 else {
1695 ex = null;
1696 @SuppressWarnings("unchecked") T tr = (T) r;
1697 t = tr;
1698 }
1699 if (ex != null)
1700 u = null;
1701 else if (s instanceof AltResult) {
1702 ex = ((AltResult)s).ex;
1703 u = null;
1704 }
1705 else {
1706 @SuppressWarnings("unchecked") U us = (U) s;
1707 u = us;
1708 }
1709 if (ex == null) {
1710 try {
1711 if (e != null)
1712 execAsync(e, new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1713 else
1714 fn.accept(t, u);
1715 } catch (Throwable rex) {
1716 ex = rex;
1717 }
1718 }
1719 if (e == null || ex != null)
1720 dst.internalComplete(null, ex);
1721 }
1722 helpPostComplete();
1723 other.helpPostComplete();
1724 return dst;
1725 }
1726
1727 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1728 Runnable action,
1729 Executor e) {
1730 if (other == null || action == null) throw new NullPointerException();
1731 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1732 RunAfterBoth d = null;
1733 Object r, s = null;
1734 if ((r = result) == null || (s = other.result) == null) {
1735 d = new RunAfterBoth(this, other, action, dst, e);
1736 CompletionNode q = null, p = new CompletionNode(d);
1737 while ((r == null && (r = result) == null) ||
1738 (s == null && (s = other.result) == null)) {
1739 if (q != null) {
1740 if (s != null ||
1741 UNSAFE.compareAndSwapObject
1742 (other, COMPLETIONS, q.next = other.completions, q))
1743 break;
1744 }
1745 else if (r != null ||
1746 UNSAFE.compareAndSwapObject
1747 (this, COMPLETIONS, p.next = completions, p)) {
1748 if (s != null)
1749 break;
1750 q = new CompletionNode(d);
1751 }
1752 }
1753 }
1754 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1755 Throwable ex;
1756 if (r instanceof AltResult)
1757 ex = ((AltResult)r).ex;
1758 else
1759 ex = null;
1760 if (ex == null && (s instanceof AltResult))
1761 ex = ((AltResult)s).ex;
1762 if (ex == null) {
1763 try {
1764 if (e != null)
1765 execAsync(e, new AsyncRun(action, dst));
1766 else
1767 action.run();
1768 } catch (Throwable rex) {
1769 ex = rex;
1770 }
1771 }
1772 if (e == null || ex != null)
1773 dst.internalComplete(null, ex);
1774 }
1775 helpPostComplete();
1776 other.helpPostComplete();
1777 return dst;
1778 }
1779
1780 private <U> CompletableFuture<U> doApplyToEither
1781 (CompletableFuture<? extends T> other,
1782 Function<? super T, U> fn,
1783 Executor e) {
1784 if (other == null || fn == null) throw new NullPointerException();
1785 CompletableFuture<U> dst = new CompletableFuture<U>();
1786 ApplyToEither<T,U> d = null;
1787 Object r;
1788 if ((r = result) == null && (r = other.result) == null) {
1789 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
1790 CompletionNode q = null, p = new CompletionNode(d);
1791 while ((r = result) == null && (r = other.result) == null) {
1792 if (q != null) {
1793 if (UNSAFE.compareAndSwapObject
1794 (other, COMPLETIONS, q.next = other.completions, q))
1795 break;
1796 }
1797 else if (UNSAFE.compareAndSwapObject
1798 (this, COMPLETIONS, p.next = completions, p))
1799 q = new CompletionNode(d);
1800 }
1801 }
1802 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1803 T t; Throwable ex;
1804 if (r instanceof AltResult) {
1805 ex = ((AltResult)r).ex;
1806 t = null;
1807 }
1808 else {
1809 ex = null;
1810 @SuppressWarnings("unchecked") T tr = (T) r;
1811 t = tr;
1812 }
1813 U u = null;
1814 if (ex == null) {
1815 try {
1816 if (e != null)
1817 execAsync(e, new AsyncApply<T,U>(t, fn, dst));
1818 else
1819 u = fn.apply(t);
1820 } catch (Throwable rex) {
1821 ex = rex;
1822 }
1823 }
1824 if (e == null || ex != null)
1825 dst.internalComplete(u, ex);
1826 }
1827 helpPostComplete();
1828 other.helpPostComplete();
1829 return dst;
1830 }
1831
1832 private CompletableFuture<Void> doAcceptEither
1833 (CompletableFuture<? extends T> other,
1834 Consumer<? super T> fn,
1835 Executor e) {
1836 if (other == null || fn == null) throw new NullPointerException();
1837 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1838 AcceptEither<T> d = null;
1839 Object r;
1840 if ((r = result) == null && (r = other.result) == null) {
1841 d = new AcceptEither<T>(this, other, fn, dst, e);
1842 CompletionNode q = null, p = new CompletionNode(d);
1843 while ((r = result) == null && (r = other.result) == null) {
1844 if (q != null) {
1845 if (UNSAFE.compareAndSwapObject
1846 (other, COMPLETIONS, q.next = other.completions, q))
1847 break;
1848 }
1849 else if (UNSAFE.compareAndSwapObject
1850 (this, COMPLETIONS, p.next = completions, p))
1851 q = new CompletionNode(d);
1852 }
1853 }
1854 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1855 T t; Throwable ex;
1856 if (r instanceof AltResult) {
1857 ex = ((AltResult)r).ex;
1858 t = null;
1859 }
1860 else {
1861 ex = null;
1862 @SuppressWarnings("unchecked") T tr = (T) r;
1863 t = tr;
1864 }
1865 if (ex == null) {
1866 try {
1867 if (e != null)
1868 execAsync(e, new AsyncAccept<T>(t, fn, dst));
1869 else
1870 fn.accept(t);
1871 } catch (Throwable rex) {
1872 ex = rex;
1873 }
1874 }
1875 if (e == null || ex != null)
1876 dst.internalComplete(null, ex);
1877 }
1878 helpPostComplete();
1879 other.helpPostComplete();
1880 return dst;
1881 }
1882
1883 private CompletableFuture<Void> doRunAfterEither
1884 (CompletableFuture<?> other,
1885 Runnable action,
1886 Executor e) {
1887 if (other == null || action == null) throw new NullPointerException();
1888 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1889 RunAfterEither d = null;
1890 Object r;
1891 if ((r = result) == null && (r = other.result) == null) {
1892 d = new RunAfterEither(this, other, action, dst, e);
1893 CompletionNode q = null, p = new CompletionNode(d);
1894 while ((r = result) == null && (r = other.result) == null) {
1895 if (q != null) {
1896 if (UNSAFE.compareAndSwapObject
1897 (other, COMPLETIONS, q.next = other.completions, q))
1898 break;
1899 }
1900 else if (UNSAFE.compareAndSwapObject
1901 (this, COMPLETIONS, p.next = completions, p))
1902 q = new CompletionNode(d);
1903 }
1904 }
1905 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1906 Throwable ex;
1907 if (r instanceof AltResult)
1908 ex = ((AltResult)r).ex;
1909 else
1910 ex = null;
1911 if (ex == null) {
1912 try {
1913 if (e != null)
1914 execAsync(e, new AsyncRun(action, dst));
1915 else
1916 action.run();
1917 } catch (Throwable rex) {
1918 ex = rex;
1919 }
1920 }
1921 if (e == null || ex != null)
1922 dst.internalComplete(null, ex);
1923 }
1924 helpPostComplete();
1925 other.helpPostComplete();
1926 return dst;
1927 }
1928
1929 private <U> CompletableFuture<U> doThenCompose
1930 (Function<? super T, ? extends CompletionStage<U>> fn,
1931 Executor e) {
1932 if (fn == null) throw new NullPointerException();
1933 CompletableFuture<U> dst = null;
1934 ThenCompose<T,U> d = null;
1935 Object r;
1936 if ((r = result) == null) {
1937 dst = new CompletableFuture<U>();
1938 CompletionNode p = new CompletionNode
1939 (d = new ThenCompose<T,U>(this, fn, dst, e));
1940 while ((r = result) == null) {
1941 if (UNSAFE.compareAndSwapObject
1942 (this, COMPLETIONS, p.next = completions, p))
1943 break;
1944 }
1945 }
1946 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1947 T t; Throwable ex;
1948 if (r instanceof AltResult) {
1949 ex = ((AltResult)r).ex;
1950 t = null;
1951 }
1952 else {
1953 ex = null;
1954 @SuppressWarnings("unchecked") T tr = (T) r;
1955 t = tr;
1956 }
1957 if (ex == null) {
1958 if (e != null) {
1959 if (dst == null)
1960 dst = new CompletableFuture<U>();
1961 execAsync(e, new AsyncCompose<T,U>(t, fn, dst));
1962 }
1963 else {
1964 try {
1965 CompletionStage<U> cs = fn.apply(t);
1966 if (cs == null ||
1967 (dst = cs.toCompletableFuture()) == null)
1968 ex = new NullPointerException();
1969 } catch (Throwable rex) {
1970 ex = rex;
1971 }
1972 }
1973 }
1974 if (dst == null)
1975 dst = new CompletableFuture<U>();
1976 if (e == null || ex != null)
1977 dst.internalComplete(null, ex);
1978 }
1979 helpPostComplete();
1980 dst.helpPostComplete();
1981 return dst;
1982 }
1983
1984 private CompletableFuture<T> doWhenComplete
1985 (BiConsumer<? super T, ? super Throwable> fn,
1986 Executor e) {
1987 if (fn == null) throw new NullPointerException();
1988 CompletableFuture<T> dst = new CompletableFuture<T>();
1989 WhenCompleteCompletion<T> d = null;
1990 Object r;
1991 if ((r = result) == null) {
1992 CompletionNode p =
1993 new CompletionNode(d = new WhenCompleteCompletion<T>
1994 (this, fn, dst, e));
1995 while ((r = result) == null) {
1996 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1997 p.next = completions, p))
1998 break;
1999 }
2000 }
2001 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2002 T t; Throwable ex;
2003 if (r instanceof AltResult) {
2004 ex = ((AltResult)r).ex;
2005 t = null;
2006 }
2007 else {
2008 ex = null;
2009 @SuppressWarnings("unchecked") T tr = (T) r;
2010 t = tr;
2011 }
2012 Throwable dx = null;
2013 try {
2014 if (e != null)
2015 execAsync(e, new AsyncWhenComplete<T>(t, ex, fn, dst));
2016 else
2017 fn.accept(t, ex);
2018 } catch (Throwable rex) {
2019 dx = rex;
2020 }
2021 if (e == null || dx != null)
2022 dst.internalComplete(t, ex != null ? ex : dx);
2023 }
2024 helpPostComplete();
2025 return dst;
2026 }
2027
2028 private <U> CompletableFuture<U> doHandle
2029 (BiFunction<? super T, Throwable, ? extends U> fn,
2030 Executor e) {
2031 if (fn == null) throw new NullPointerException();
2032 CompletableFuture<U> dst = new CompletableFuture<U>();
2033 HandleCompletion<T,U> d = null;
2034 Object r;
2035 if ((r = result) == null) {
2036 CompletionNode p =
2037 new CompletionNode(d = new HandleCompletion<T,U>
2038 (this, fn, dst, e));
2039 while ((r = result) == null) {
2040 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2041 p.next = completions, p))
2042 break;
2043 }
2044 }
2045 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2046 T t; Throwable ex;
2047 if (r instanceof AltResult) {
2048 ex = ((AltResult)r).ex;
2049 t = null;
2050 }
2051 else {
2052 ex = null;
2053 @SuppressWarnings("unchecked") T tr = (T) r;
2054 t = tr;
2055 }
2056 U u = null;
2057 Throwable dx = null;
2058 try {
2059 if (e != null)
2060 execAsync(e, new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2061 else {
2062 u = fn.apply(t, ex);
2063 dx = null;
2064 }
2065 } catch (Throwable rex) {
2066 dx = rex;
2067 u = null;
2068 }
2069 if (e == null || dx != null)
2070 dst.internalComplete(u, dx);
2071 }
2072 helpPostComplete();
2073 return dst;
2074 }
2075
2076
2077 // public methods
2078
2079 /**
2080 * Creates a new incomplete CompletableFuture.
2081 */
2082 public CompletableFuture() {
2083 }
2084
2085 /**
2086 * Returns a new CompletableFuture that is asynchronously completed
2087 * by a task running in the {@link ForkJoinPool#commonPool()} with
2088 * the value obtained by calling the given Supplier.
2089 *
2090 * @param supplier a function returning the value to be used
2091 * to complete the returned CompletableFuture
2092 * @param <U> the function's return type
2093 * @return the new CompletableFuture
2094 */
2095 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2096 if (supplier == null) throw new NullPointerException();
2097 CompletableFuture<U> f = new CompletableFuture<U>();
2098 execAsync(ForkJoinPool.commonPool(), new AsyncSupply<U>(supplier, f));
2099 return f;
2100 }
2101
2102 /**
2103 * Returns a new CompletableFuture that is asynchronously completed
2104 * by a task running in the given executor with the value obtained
2105 * by calling the given Supplier.
2106 *
2107 * @param supplier a function returning the value to be used
2108 * to complete the returned CompletableFuture
2109 * @param executor the executor to use for asynchronous execution
2110 * @param <U> the function's return type
2111 * @return the new CompletableFuture
2112 */
2113 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2114 Executor executor) {
2115 if (executor == null || supplier == null)
2116 throw new NullPointerException();
2117 CompletableFuture<U> f = new CompletableFuture<U>();
2118 execAsync(executor, new AsyncSupply<U>(supplier, f));
2119 return f;
2120 }
2121
2122 /**
2123 * Returns a new CompletableFuture that is asynchronously completed
2124 * by a task running in the {@link ForkJoinPool#commonPool()} after
2125 * it runs the given action.
2126 *
2127 * @param runnable the action to run before completing the
2128 * returned CompletableFuture
2129 * @return the new CompletableFuture
2130 */
2131 public static CompletableFuture<Void> runAsync(Runnable runnable) {
2132 if (runnable == null) throw new NullPointerException();
2133 CompletableFuture<Void> f = new CompletableFuture<Void>();
2134 execAsync(ForkJoinPool.commonPool(), new AsyncRun(runnable, f));
2135 return f;
2136 }
2137
2138 /**
2139 * Returns a new CompletableFuture that is asynchronously completed
2140 * by a task running in the given executor after it runs the given
2141 * action.
2142 *
2143 * @param runnable the action to run before completing the
2144 * returned CompletableFuture
2145 * @param executor the executor to use for asynchronous execution
2146 * @return the new CompletableFuture
2147 */
2148 public static CompletableFuture<Void> runAsync(Runnable runnable,
2149 Executor executor) {
2150 if (executor == null || runnable == null)
2151 throw new NullPointerException();
2152 CompletableFuture<Void> f = new CompletableFuture<Void>();
2153 execAsync(executor, new AsyncRun(runnable, f));
2154 return f;
2155 }
2156
2157 /**
2158 * Returns a new CompletableFuture that is already completed with
2159 * the given value.
2160 *
2161 * @param value the value
2162 * @param <U> the type of the value
2163 * @return the completed CompletableFuture
2164 */
2165 public static <U> CompletableFuture<U> completedFuture(U value) {
2166 CompletableFuture<U> f = new CompletableFuture<U>();
2167 f.result = (value == null) ? NIL : value;
2168 return f;
2169 }
2170
2171 /**
2172 * Returns {@code true} if completed in any fashion: normally,
2173 * exceptionally, or via cancellation.
2174 *
2175 * @return {@code true} if completed
2176 */
2177 public boolean isDone() {
2178 return result != null;
2179 }
2180
2181 /**
2182 * Waits if necessary for this future to complete, and then
2183 * returns its result.
2184 *
2185 * @return the result value
2186 * @throws CancellationException if this future was cancelled
2187 * @throws ExecutionException if this future completed exceptionally
2188 * @throws InterruptedException if the current thread was interrupted
2189 * while waiting
2190 */
2191 public T get() throws InterruptedException, ExecutionException {
2192 Object r; Throwable ex, cause;
2193 if ((r = result) == null && (r = waitingGet(true)) == null)
2194 throw new InterruptedException();
2195 if (!(r instanceof AltResult)) {
2196 @SuppressWarnings("unchecked") T tr = (T) r;
2197 return tr;
2198 }
2199 if ((ex = ((AltResult)r).ex) == null)
2200 return null;
2201 if (ex instanceof CancellationException)
2202 throw (CancellationException)ex;
2203 if ((ex instanceof CompletionException) &&
2204 (cause = ex.getCause()) != null)
2205 ex = cause;
2206 throw new ExecutionException(ex);
2207 }
2208
2209 /**
2210 * Waits if necessary for at most the given time for this future
2211 * to complete, and then returns its result, if available.
2212 *
2213 * @param timeout the maximum time to wait
2214 * @param unit the time unit of the timeout argument
2215 * @return the result value
2216 * @throws CancellationException if this future was cancelled
2217 * @throws ExecutionException if this future completed exceptionally
2218 * @throws InterruptedException if the current thread was interrupted
2219 * while waiting
2220 * @throws TimeoutException if the wait timed out
2221 */
2222 public T get(long timeout, TimeUnit unit)
2223 throws InterruptedException, ExecutionException, TimeoutException {
2224 Object r; Throwable ex, cause;
2225 long nanos = unit.toNanos(timeout);
2226 if (Thread.interrupted())
2227 throw new InterruptedException();
2228 if ((r = result) == null)
2229 r = timedAwaitDone(nanos);
2230 if (!(r instanceof AltResult)) {
2231 @SuppressWarnings("unchecked") T tr = (T) r;
2232 return tr;
2233 }
2234 if ((ex = ((AltResult)r).ex) == null)
2235 return null;
2236 if (ex instanceof CancellationException)
2237 throw (CancellationException)ex;
2238 if ((ex instanceof CompletionException) &&
2239 (cause = ex.getCause()) != null)
2240 ex = cause;
2241 throw new ExecutionException(ex);
2242 }
2243
2244 /**
2245 * Returns the result value when complete, or throws an
2246 * (unchecked) exception if completed exceptionally. To better
2247 * conform with the use of common functional forms, if a
2248 * computation involved in the completion of this
2249 * CompletableFuture threw an exception, this method throws an
2250 * (unchecked) {@link CompletionException} with the underlying
2251 * exception as its cause.
2252 *
2253 * @return the result value
2254 * @throws CancellationException if the computation was cancelled
2255 * @throws CompletionException if this future completed
2256 * exceptionally or a completion computation threw an exception
2257 */
2258 public T join() {
2259 Object r; Throwable ex;
2260 if ((r = result) == null)
2261 r = waitingGet(false);
2262 if (!(r instanceof AltResult)) {
2263 @SuppressWarnings("unchecked") T tr = (T) r;
2264 return tr;
2265 }
2266 if ((ex = ((AltResult)r).ex) == null)
2267 return null;
2268 if (ex instanceof CancellationException)
2269 throw (CancellationException)ex;
2270 if (ex instanceof CompletionException)
2271 throw (CompletionException)ex;
2272 throw new CompletionException(ex);
2273 }
2274
2275 /**
2276 * Returns the result value (or throws any encountered exception)
2277 * if completed, else returns the given valueIfAbsent.
2278 *
2279 * @param valueIfAbsent the value to return if not completed
2280 * @return the result value, if completed, else the given valueIfAbsent
2281 * @throws CancellationException if the computation was cancelled
2282 * @throws CompletionException if this future completed
2283 * exceptionally or a completion computation threw an exception
2284 */
2285 public T getNow(T valueIfAbsent) {
2286 Object r; Throwable ex;
2287 if ((r = result) == null)
2288 return valueIfAbsent;
2289 if (!(r instanceof AltResult)) {
2290 @SuppressWarnings("unchecked") T tr = (T) r;
2291 return tr;
2292 }
2293 if ((ex = ((AltResult)r).ex) == null)
2294 return null;
2295 if (ex instanceof CancellationException)
2296 throw (CancellationException)ex;
2297 if (ex instanceof CompletionException)
2298 throw (CompletionException)ex;
2299 throw new CompletionException(ex);
2300 }
2301
2302 /**
2303 * If not already completed, sets the value returned by {@link
2304 * #get()} and related methods to the given value.
2305 *
2306 * @param value the result value
2307 * @return {@code true} if this invocation caused this CompletableFuture
2308 * to transition to a completed state, else {@code false}
2309 */
2310 public boolean complete(T value) {
2311 boolean triggered = result == null &&
2312 UNSAFE.compareAndSwapObject(this, RESULT, null,
2313 value == null ? NIL : value);
2314 postComplete();
2315 return triggered;
2316 }
2317
2318 /**
2319 * If not already completed, causes invocations of {@link #get()}
2320 * and related methods to throw the given exception.
2321 *
2322 * @param ex the exception
2323 * @return {@code true} if this invocation caused this CompletableFuture
2324 * to transition to a completed state, else {@code false}
2325 */
2326 public boolean completeExceptionally(Throwable ex) {
2327 if (ex == null) throw new NullPointerException();
2328 boolean triggered = result == null &&
2329 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2330 postComplete();
2331 return triggered;
2332 }
2333
2334 // CompletionStage methods
2335
2336 public <U> CompletableFuture<U> thenApply
2337 (Function<? super T,? extends U> fn) {
2338 return doThenApply(fn, null);
2339 }
2340
2341 public <U> CompletableFuture<U> thenApplyAsync
2342 (Function<? super T,? extends U> fn) {
2343 return doThenApply(fn, ForkJoinPool.commonPool());
2344 }
2345
2346 public <U> CompletableFuture<U> thenApplyAsync
2347 (Function<? super T,? extends U> fn,
2348 Executor executor) {
2349 if (executor == null) throw new NullPointerException();
2350 return doThenApply(fn, executor);
2351 }
2352
2353 public CompletableFuture<Void> thenAccept
2354 (Consumer<? super T> action) {
2355 return doThenAccept(action, null);
2356 }
2357
2358 public CompletableFuture<Void> thenAcceptAsync
2359 (Consumer<? super T> action) {
2360 return doThenAccept(action, ForkJoinPool.commonPool());
2361 }
2362
2363 public CompletableFuture<Void> thenAcceptAsync
2364 (Consumer<? super T> action,
2365 Executor executor) {
2366 if (executor == null) throw new NullPointerException();
2367 return doThenAccept(action, executor);
2368 }
2369
2370 public CompletableFuture<Void> thenRun
2371 (Runnable action) {
2372 return doThenRun(action, null);
2373 }
2374
2375 public CompletableFuture<Void> thenRunAsync
2376 (Runnable action) {
2377 return doThenRun(action, ForkJoinPool.commonPool());
2378 }
2379
2380 public CompletableFuture<Void> thenRunAsync
2381 (Runnable action,
2382 Executor executor) {
2383 if (executor == null) throw new NullPointerException();
2384 return doThenRun(action, executor);
2385 }
2386
2387 public <U,V> CompletableFuture<V> thenCombine
2388 (CompletionStage<? extends U> other,
2389 BiFunction<? super T,? super U,? extends V> fn) {
2390 return doThenCombine(other.toCompletableFuture(), fn, null);
2391 }
2392
2393 public <U,V> CompletableFuture<V> thenCombineAsync
2394 (CompletionStage<? extends U> other,
2395 BiFunction<? super T,? super U,? extends V> fn) {
2396 return doThenCombine(other.toCompletableFuture(), fn,
2397 ForkJoinPool.commonPool());
2398 }
2399
2400 public <U,V> CompletableFuture<V> thenCombineAsync
2401 (CompletionStage<? extends U> other,
2402 BiFunction<? super T,? super U,? extends V> fn,
2403 Executor executor) {
2404 if (executor == null) throw new NullPointerException();
2405 return doThenCombine(other.toCompletableFuture(), fn, executor);
2406 }
2407
2408 public <U> CompletableFuture<Void> thenAcceptBoth
2409 (CompletionStage<? extends U> other,
2410 BiConsumer<? super T, ? super U> action) {
2411 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2412 }
2413
2414 public <U> CompletableFuture<Void> thenAcceptBothAsync
2415 (CompletionStage<? extends U> other,
2416 BiConsumer<? super T, ? super U> action) {
2417 return doThenAcceptBoth(other.toCompletableFuture(), action,
2418 ForkJoinPool.commonPool());
2419 }
2420
2421 public <U> CompletableFuture<Void> thenAcceptBothAsync
2422 (CompletionStage<? extends U> other,
2423 BiConsumer<? super T, ? super U> action,
2424 Executor executor) {
2425 if (executor == null) throw new NullPointerException();
2426 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2427 }
2428
2429 public CompletableFuture<Void> runAfterBoth
2430 (CompletionStage<?> other,
2431 Runnable action) {
2432 return doRunAfterBoth(other.toCompletableFuture(), action, null);
2433 }
2434
2435 public CompletableFuture<Void> runAfterBothAsync
2436 (CompletionStage<?> other,
2437 Runnable action) {
2438 return doRunAfterBoth(other.toCompletableFuture(), action,
2439 ForkJoinPool.commonPool());
2440 }
2441
2442 public CompletableFuture<Void> runAfterBothAsync
2443 (CompletionStage<?> other,
2444 Runnable action,
2445 Executor executor) {
2446 if (executor == null) throw new NullPointerException();
2447 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2448 }
2449
2450
2451 public <U> CompletableFuture<U> applyToEither
2452 (CompletionStage<? extends T> other,
2453 Function<? super T, U> fn) {
2454 return doApplyToEither(other.toCompletableFuture(), fn, null);
2455 }
2456
2457 public <U> CompletableFuture<U> applyToEitherAsync
2458 (CompletionStage<? extends T> other,
2459 Function<? super T, U> fn) {
2460 return doApplyToEither(other.toCompletableFuture(), fn,
2461 ForkJoinPool.commonPool());
2462 }
2463
2464 public <U> CompletableFuture<U> applyToEitherAsync
2465 (CompletionStage<? extends T> other,
2466 Function<? super T, U> fn,
2467 Executor executor) {
2468 if (executor == null) throw new NullPointerException();
2469 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2470 }
2471
2472 public CompletableFuture<Void> acceptEither
2473 (CompletionStage<? extends T> other,
2474 Consumer<? super T> action) {
2475 return doAcceptEither(other.toCompletableFuture(), action, null);
2476 }
2477
2478 public CompletableFuture<Void> acceptEitherAsync
2479 (CompletionStage<? extends T> other,
2480 Consumer<? super T> action) {
2481 return doAcceptEither(other.toCompletableFuture(), action,
2482 ForkJoinPool.commonPool());
2483 }
2484
2485 public CompletableFuture<Void> acceptEitherAsync
2486 (CompletionStage<? extends T> other,
2487 Consumer<? super T> action,
2488 Executor executor) {
2489 if (executor == null) throw new NullPointerException();
2490 return doAcceptEither(other.toCompletableFuture(), action, executor);
2491 }
2492
2493 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2494 Runnable action) {
2495 return doRunAfterEither(other.toCompletableFuture(), action, null);
2496 }
2497
2498 public CompletableFuture<Void> runAfterEitherAsync
2499 (CompletionStage<?> other,
2500 Runnable action) {
2501 return doRunAfterEither(other.toCompletableFuture(), action,
2502 ForkJoinPool.commonPool());
2503 }
2504
2505 public CompletableFuture<Void> runAfterEitherAsync
2506 (CompletionStage<?> other,
2507 Runnable action,
2508 Executor executor) {
2509 if (executor == null) throw new NullPointerException();
2510 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2511 }
2512
2513 public <U> CompletableFuture<U> thenCompose
2514 (Function<? super T, ? extends CompletionStage<U>> fn) {
2515 return doThenCompose(fn, null);
2516 }
2517
2518 public <U> CompletableFuture<U> thenComposeAsync
2519 (Function<? super T, ? extends CompletionStage<U>> fn) {
2520 return doThenCompose(fn, ForkJoinPool.commonPool());
2521 }
2522
2523 public <U> CompletableFuture<U> thenComposeAsync
2524 (Function<? super T, ? extends CompletionStage<U>> fn,
2525 Executor executor) {
2526 if (executor == null) throw new NullPointerException();
2527 return doThenCompose(fn, executor);
2528 }
2529
2530 public CompletableFuture<T> whenComplete
2531 (BiConsumer<? super T, ? super Throwable> action) {
2532 return doWhenComplete(action, null);
2533 }
2534
2535 public CompletableFuture<T> whenCompleteAsync
2536 (BiConsumer<? super T, ? super Throwable> action) {
2537 return doWhenComplete(action, ForkJoinPool.commonPool());
2538 }
2539
2540 public CompletableFuture<T> whenCompleteAsync
2541 (BiConsumer<? super T, ? super Throwable> action,
2542 Executor executor) {
2543 if (executor == null) throw new NullPointerException();
2544 return doWhenComplete(action, executor);
2545 }
2546
2547 public <U> CompletableFuture<U> handle
2548 (BiFunction<? super T, Throwable, ? extends U> fn) {
2549 return doHandle(fn, null);
2550 }
2551
2552 public <U> CompletableFuture<U> handleAsync
2553 (BiFunction<? super T, Throwable, ? extends U> fn) {
2554 return doHandle(fn, ForkJoinPool.commonPool());
2555 }
2556
2557 public <U> CompletableFuture<U> handleAsync
2558 (BiFunction<? super T, Throwable, ? extends U> fn,
2559 Executor executor) {
2560 if (executor == null) throw new NullPointerException();
2561 return doHandle(fn, executor);
2562 }
2563
2564 /**
2565 * Returns this CompletableFuture
2566 *
2567 * @return this CompletableFuture
2568 */
2569 public CompletableFuture<T> toCompletableFuture() {
2570 return this;
2571 }
2572
2573 // not in interface CompletionStage
2574
2575 /**
2576 * Returns a new CompletableFuture that is completed when this
2577 * CompletableFuture completes, with the result of the given
2578 * function of the exception triggering this CompletableFuture's
2579 * completion when it completes exceptionally; otherwise, if this
2580 * CompletableFuture completes normally, then the returned
2581 * CompletableFuture also completes normally with the same value.
2582 * Note: More flexible versions of this functionality are
2583 * available using methods {@code whenComplete} and {@code handle}.
2584 *
2585 * @param fn the function to use to compute the value of the
2586 * returned CompletableFuture if this CompletableFuture completed
2587 * exceptionally
2588 * @return the new CompletableFuture
2589 */
2590 public CompletableFuture<T> exceptionally
2591 (Function<Throwable, ? extends T> fn) {
2592 if (fn == null) throw new NullPointerException();
2593 CompletableFuture<T> dst = new CompletableFuture<T>();
2594 ExceptionCompletion<T> d = null;
2595 Object r;
2596 if ((r = result) == null) {
2597 CompletionNode p =
2598 new CompletionNode(d = new ExceptionCompletion<T>
2599 (this, fn, dst));
2600 while ((r = result) == null) {
2601 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2602 p.next = completions, p))
2603 break;
2604 }
2605 }
2606 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2607 T t = null; Throwable ex, dx = null;
2608 if (r instanceof AltResult) {
2609 if ((ex = ((AltResult)r).ex) != null) {
2610 try {
2611 t = fn.apply(ex);
2612 } catch (Throwable rex) {
2613 dx = rex;
2614 }
2615 }
2616 }
2617 else {
2618 @SuppressWarnings("unchecked") T tr = (T) r;
2619 t = tr;
2620 }
2621 dst.internalComplete(t, dx);
2622 }
2623 helpPostComplete();
2624 return dst;
2625 }
2626
2627 /* ------------- Arbitrary-arity constructions -------------- */
2628
2629 /*
2630 * The basic plan of attack is to recursively form binary
2631 * completion trees of elements. This can be overkill for small
2632 * sets, but scales nicely. The And/All vs Or/Any forms use the
2633 * same idea, but details differ.
2634 */
2635
2636 /**
2637 * Returns a new CompletableFuture that is completed when all of
2638 * the given CompletableFutures complete. If any of the given
2639 * CompletableFutures complete exceptionally, then the returned
2640 * CompletableFuture also does so, with a CompletionException
2641 * holding this exception as its cause. Otherwise, the results,
2642 * if any, of the given CompletableFutures are not reflected in
2643 * the returned CompletableFuture, but may be obtained by
2644 * inspecting them individually. If no CompletableFutures are
2645 * provided, returns a CompletableFuture completed with the value
2646 * {@code null}.
2647 *
2648 * <p>Among the applications of this method is to await completion
2649 * of a set of independent CompletableFutures before continuing a
2650 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2651 * c3).join();}.
2652 *
2653 * @param cfs the CompletableFutures
2654 * @return a new CompletableFuture that is completed when all of the
2655 * given CompletableFutures complete
2656 * @throws NullPointerException if the array or any of its elements are
2657 * {@code null}
2658 */
2659 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2660 int len = cfs.length; // Directly handle empty and singleton cases
2661 if (len > 1)
2662 return allTree(cfs, 0, len - 1);
2663 else {
2664 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2665 CompletableFuture<?> f;
2666 if (len == 0)
2667 dst.result = NIL;
2668 else if ((f = cfs[0]) == null)
2669 throw new NullPointerException();
2670 else {
2671 ThenPropagate d = null;
2672 CompletionNode p = null;
2673 Object r;
2674 while ((r = f.result) == null) {
2675 if (d == null)
2676 d = new ThenPropagate(f, dst);
2677 else if (p == null)
2678 p = new CompletionNode(d);
2679 else if (UNSAFE.compareAndSwapObject
2680 (f, COMPLETIONS, p.next = f.completions, p))
2681 break;
2682 }
2683 if (r != null && (d == null || d.compareAndSet(0, 1)))
2684 dst.internalComplete(null, (r instanceof AltResult) ?
2685 ((AltResult)r).ex : null);
2686 f.helpPostComplete();
2687 }
2688 return dst;
2689 }
2690 }
2691
2692 /**
2693 * Recursively constructs an And'ed tree of CompletableFutures.
2694 * Called only when array known to have at least two elements.
2695 */
2696 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2697 int lo, int hi) {
2698 CompletableFuture<?> fst, snd;
2699 int mid = (lo + hi) >>> 1;
2700 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2701 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2702 throw new NullPointerException();
2703 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2704 AndCompletion d = null;
2705 CompletionNode p = null, q = null;
2706 Object r = null, s = null;
2707 while ((r = fst.result) == null || (s = snd.result) == null) {
2708 if (d == null)
2709 d = new AndCompletion(fst, snd, dst);
2710 else if (p == null)
2711 p = new CompletionNode(d);
2712 else if (q == null) {
2713 if (UNSAFE.compareAndSwapObject
2714 (fst, COMPLETIONS, p.next = fst.completions, p))
2715 q = new CompletionNode(d);
2716 }
2717 else if (UNSAFE.compareAndSwapObject
2718 (snd, COMPLETIONS, q.next = snd.completions, q))
2719 break;
2720 }
2721 if ((r != null || (r = fst.result) != null) &&
2722 (s != null || (s = snd.result) != null) &&
2723 (d == null || d.compareAndSet(0, 1))) {
2724 Throwable ex;
2725 if (r instanceof AltResult)
2726 ex = ((AltResult)r).ex;
2727 else
2728 ex = null;
2729 if (ex == null && (s instanceof AltResult))
2730 ex = ((AltResult)s).ex;
2731 dst.internalComplete(null, ex);
2732 }
2733 fst.helpPostComplete();
2734 snd.helpPostComplete();
2735 return dst;
2736 }
2737
2738 /**
2739 * Returns a new CompletableFuture that is completed when any of
2740 * the given CompletableFutures complete, with the same result.
2741 * Otherwise, if it completed exceptionally, the returned
2742 * CompletableFuture also does so, with a CompletionException
2743 * holding this exception as its cause. If no CompletableFutures
2744 * are provided, returns an incomplete CompletableFuture.
2745 *
2746 * @param cfs the CompletableFutures
2747 * @return a new CompletableFuture that is completed with the
2748 * result or exception of any of the given CompletableFutures when
2749 * one completes
2750 * @throws NullPointerException if the array or any of its elements are
2751 * {@code null}
2752 */
2753 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2754 int len = cfs.length; // Same idea as allOf
2755 if (len > 1)
2756 return anyTree(cfs, 0, len - 1);
2757 else {
2758 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2759 CompletableFuture<?> f;
2760 if (len == 0)
2761 ; // skip
2762 else if ((f = cfs[0]) == null)
2763 throw new NullPointerException();
2764 else {
2765 ThenCopy<Object> d = null;
2766 CompletionNode p = null;
2767 Object r;
2768 while ((r = f.result) == null) {
2769 if (d == null)
2770 d = new ThenCopy<Object>(f, dst);
2771 else if (p == null)
2772 p = new CompletionNode(d);
2773 else if (UNSAFE.compareAndSwapObject
2774 (f, COMPLETIONS, p.next = f.completions, p))
2775 break;
2776 }
2777 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2778 Throwable ex; Object t;
2779 if (r instanceof AltResult) {
2780 ex = ((AltResult)r).ex;
2781 t = null;
2782 }
2783 else {
2784 ex = null;
2785 t = r;
2786 }
2787 dst.internalComplete(t, ex);
2788 }
2789 f.helpPostComplete();
2790 }
2791 return dst;
2792 }
2793 }
2794
2795 /**
2796 * Recursively constructs an Or'ed tree of CompletableFutures.
2797 */
2798 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2799 int lo, int hi) {
2800 CompletableFuture<?> fst, snd;
2801 int mid = (lo + hi) >>> 1;
2802 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2803 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2804 throw new NullPointerException();
2805 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2806 OrCompletion d = null;
2807 CompletionNode p = null, q = null;
2808 Object r;
2809 while ((r = fst.result) == null && (r = snd.result) == null) {
2810 if (d == null)
2811 d = new OrCompletion(fst, snd, dst);
2812 else if (p == null)
2813 p = new CompletionNode(d);
2814 else if (q == null) {
2815 if (UNSAFE.compareAndSwapObject
2816 (fst, COMPLETIONS, p.next = fst.completions, p))
2817 q = new CompletionNode(d);
2818 }
2819 else if (UNSAFE.compareAndSwapObject
2820 (snd, COMPLETIONS, q.next = snd.completions, q))
2821 break;
2822 }
2823 if ((r != null || (r = fst.result) != null ||
2824 (r = snd.result) != null) &&
2825 (d == null || d.compareAndSet(0, 1))) {
2826 Throwable ex; Object t;
2827 if (r instanceof AltResult) {
2828 ex = ((AltResult)r).ex;
2829 t = null;
2830 }
2831 else {
2832 ex = null;
2833 t = r;
2834 }
2835 dst.internalComplete(t, ex);
2836 }
2837 fst.helpPostComplete();
2838 snd.helpPostComplete();
2839 return dst;
2840 }
2841
2842 /* ------------- Control and status methods -------------- */
2843
2844 /**
2845 * If not already completed, completes this CompletableFuture with
2846 * a {@link CancellationException}. Dependent CompletableFutures
2847 * that have not already completed will also complete
2848 * exceptionally, with a {@link CompletionException} caused by
2849 * this {@code CancellationException}.
2850 *
2851 * @param mayInterruptIfRunning this value has no effect in this
2852 * implementation because interrupts are not used to control
2853 * processing.
2854 *
2855 * @return {@code true} if this task is now cancelled
2856 */
2857 public boolean cancel(boolean mayInterruptIfRunning) {
2858 boolean cancelled = (result == null) &&
2859 UNSAFE.compareAndSwapObject
2860 (this, RESULT, null, new AltResult(new CancellationException()));
2861 postComplete();
2862 return cancelled || isCancelled();
2863 }
2864
2865 /**
2866 * Returns {@code true} if this CompletableFuture was cancelled
2867 * before it completed normally.
2868 *
2869 * @return {@code true} if this CompletableFuture was cancelled
2870 * before it completed normally
2871 */
2872 public boolean isCancelled() {
2873 Object r;
2874 return ((r = result) instanceof AltResult) &&
2875 (((AltResult)r).ex instanceof CancellationException);
2876 }
2877
2878 /**
2879 * Returns {@code true} if this CompletableFuture completed
2880 * exceptionally, in any way. Possible causes include
2881 * cancellation, explicit invocation of {@code
2882 * completeExceptionally}, and abrupt termination of a
2883 * CompletionStage action.
2884 *
2885 * @return {@code true} if this CompletableFuture completed
2886 * exceptionally
2887 */
2888 public boolean isCompletedExceptionally() {
2889 Object r;
2890 return ((r = result) instanceof AltResult) && r != NIL;
2891 }
2892
2893 /**
2894 * Forcibly sets or resets the value subsequently returned by
2895 * method {@link #get()} and related methods, whether or not
2896 * already completed. This method is designed for use only in
2897 * error recovery actions, and even in such situations may result
2898 * in ongoing dependent completions using established versus
2899 * overwritten outcomes.
2900 *
2901 * @param value the completion value
2902 */
2903 public void obtrudeValue(T value) {
2904 result = (value == null) ? NIL : value;
2905 postComplete();
2906 }
2907
2908 /**
2909 * Forcibly causes subsequent invocations of method {@link #get()}
2910 * and related methods to throw the given exception, whether or
2911 * not already completed. This method is designed for use only in
2912 * recovery actions, and even in such situations may result in
2913 * ongoing dependent completions using established versus
2914 * overwritten outcomes.
2915 *
2916 * @param ex the exception
2917 */
2918 public void obtrudeException(Throwable ex) {
2919 if (ex == null) throw new NullPointerException();
2920 result = new AltResult(ex);
2921 postComplete();
2922 }
2923
2924 /**
2925 * Returns the estimated number of CompletableFutures whose
2926 * completions are awaiting completion of this CompletableFuture.
2927 * This method is designed for use in monitoring system state, not
2928 * for synchronization control.
2929 *
2930 * @return the number of dependent CompletableFutures
2931 */
2932 public int getNumberOfDependents() {
2933 int count = 0;
2934 for (CompletionNode p = completions; p != null; p = p.next)
2935 ++count;
2936 return count;
2937 }
2938
2939 /**
2940 * Returns a string identifying this CompletableFuture, as well as
2941 * its completion state. The state, in brackets, contains the
2942 * String {@code "Completed Normally"} or the String {@code
2943 * "Completed Exceptionally"}, or the String {@code "Not
2944 * completed"} followed by the number of CompletableFutures
2945 * dependent upon its completion, if any.
2946 *
2947 * @return a string identifying this CompletableFuture, as well as its state
2948 */
2949 public String toString() {
2950 Object r = result;
2951 int count;
2952 return super.toString() +
2953 ((r == null) ?
2954 (((count = getNumberOfDependents()) == 0) ?
2955 "[Not completed]" :
2956 "[Not completed, " + count + " dependents]") :
2957 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2958 "[Completed exceptionally]" :
2959 "[Completed normally]"));
2960 }
2961
2962 // Unsafe mechanics
2963 private static final sun.misc.Unsafe UNSAFE;
2964 private static final long RESULT;
2965 private static final long WAITERS;
2966 private static final long COMPLETIONS;
2967 static {
2968 try {
2969 UNSAFE = sun.misc.Unsafe.getUnsafe();
2970 Class<?> k = CompletableFuture.class;
2971 RESULT = UNSAFE.objectFieldOffset
2972 (k.getDeclaredField("result"));
2973 WAITERS = UNSAFE.objectFieldOffset
2974 (k.getDeclaredField("waiters"));
2975 COMPLETIONS = UNSAFE.objectFieldOffset
2976 (k.getDeclaredField("completions"));
2977 } catch (Exception e) {
2978 throw new Error(e);
2979 }
2980 }
2981 }