ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.89
Committed: Mon Jul 1 19:08:00 2013 UTC (10 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.88: +1 -1 lines
Log Message:
Sync with JDK

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.locks.LockSupport;
26
27 /**
28 * A {@link Future} that may be explicitly completed (setting its
29 * value and status), and may be used as a {@link CompletionStage},
30 * supporting dependent functions and actions that trigger upon its
31 * completion.
32 *
33 * <p>When two or more threads attempt to
34 * {@link #complete complete},
35 * {@link #completeExceptionally completeExceptionally}, or
36 * {@link #cancel cancel}
37 * a CompletableFuture, only one of them succeeds.
38 *
39 * <p>CompletableFuture implements {@link CompletionStage} with the
40 * following policies: <ul>
41 *
42 * <li>Actions supplied for dependent completions of
43 * <em>non-async</em> methods may be performed by the thread that
44 * completes the current CompletableFuture, or by any other caller of
45 * these methods.</li>
46 *
47 * <li>All <em>async</em> methods without an explicit Executor
48 * argument are performed using the {@link
49 * ForkJoinPool#commonPool()}. To simplify monitoring, debugging, and
50 * tracking, all generated asynchronous tasks are instances of the
51 * marker interface {@link AsynchronousCompletionTask}. </li>
52 *
53 * <li>All CompletionStage methods are implemented independently of
54 * other public methods, so the behavior of one method is not impacted
55 * by overrides of others in subclasses. </li> </ul>
56 *
57 * <p>CompletableFuture implements {@link Future} with the following
58 * policies: <ul>
59 *
60 * <li>Since (unlike {@link FutureTask}) this class has no direct
61 * control over the computation that causes it to be completed,
62 * cancellation is treated as just another form of exceptional
63 * completion. Method {@link #cancel cancel} has the same effect as
64 * {@code completeExceptionally(new CancellationException())}. Method
65 * {@link #isCompletedExceptionally} can be used to determine if a
66 * CompletableFuture completed in any exceptional fashion.</li>
67 *
68 * <li>In case of exceptional completion with a CompletionException,
69 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
70 * {@link ExecutionException} with the same cause as held in the
71 * corresponding CompletionException. To simplify usage in most
72 * contexts, this class also defines methods {@link #join()} and
73 * {@link #getNow} that instead throw the CompletionException directly
74 * in these cases.</li> </ul>
75 *
76 * @author Doug Lea
77 * @since 1.8
78 */
79 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
80
81 /*
82 * Overview:
83 *
84 * 1. Non-nullness of field result (set via CAS) indicates done.
85 * An AltResult is used to box null as a result, as well as to
86 * hold exceptions. Using a single field makes completion fast
87 * and simple to detect and trigger, at the expense of a lot of
88 * encoding and decoding that infiltrates many methods. One minor
89 * simplification relies on the (static) NIL (to box null results)
90 * being the only AltResult with a null exception field, so we
91 * don't usually need explicit comparisons with NIL. The CF
92 * exception propagation mechanics surrounding decoding rely on
93 * unchecked casts of decoded results really being unchecked,
94 * where user type errors are caught at point of use, as is
95 * currently the case in Java. These are highlighted by using
96 * SuppressWarnings-annotated temporaries.
97 *
98 * 2. Waiters are held in a Treiber stack similar to the one used
99 * in FutureTask, Phaser, and SynchronousQueue. See their
100 * internal documentation for algorithmic details.
101 *
102 * 3. Completions are also kept in a list/stack, and pulled off
103 * and run when completion is triggered. (We could even use the
104 * same stack as for waiters, but would give up the potential
105 * parallelism obtained because woken waiters help release/run
106 * others -- see method postComplete). Because post-processing
107 * may race with direct calls, class Completion opportunistically
108 * extends AtomicInteger so callers can claim the action via
109 * compareAndSet(0, 1). The Completion.run methods are all
110 * written a boringly similar uniform way (that sometimes includes
111 * unnecessary-looking checks, kept to maintain uniformity).
112 * There are enough dimensions upon which they differ that
113 * attempts to factor commonalities while maintaining efficiency
114 * require more lines of code than they would save.
115 *
116 * 4. The exported then/and/or methods do support a bit of
117 * factoring (see doThenApply etc). They must cope with the
118 * intrinsic races surrounding addition of a dependent action
119 * versus performing the action directly because the task is
120 * already complete. For example, a CF may not be complete upon
121 * entry, so a dependent completion is added, but by the time it
122 * is added, the target CF is complete, so must be directly
123 * executed. This is all done while avoiding unnecessary object
124 * construction in safe-bypass cases.
125 */
126
127 // preliminaries
128
129 static final class AltResult {
130 final Throwable ex; // null only for NIL
131 AltResult(Throwable ex) { this.ex = ex; }
132 }
133
134 static final AltResult NIL = new AltResult(null);
135
136 // Fields
137
138 volatile Object result; // Either the result or boxed AltResult
139 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
140 volatile CompletionNode completions; // list (Treiber stack) of completions
141
142 // Basic utilities for triggering and processing completions
143
144 /**
145 * Removes and signals all waiting threads and runs all completions.
146 */
147 final void postComplete() {
148 WaitNode q; Thread t;
149 while ((q = waiters) != null) {
150 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
151 (t = q.thread) != null) {
152 q.thread = null;
153 LockSupport.unpark(t);
154 }
155 }
156
157 CompletionNode h; Completion c;
158 while ((h = completions) != null) {
159 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
160 (c = h.completion) != null)
161 c.run();
162 }
163 }
164
165 /**
166 * Triggers completion with the encoding of the given arguments:
167 * if the exception is non-null, encodes it as a wrapped
168 * CompletionException unless it is one already. Otherwise uses
169 * the given result, boxed as NIL if null.
170 */
171 final void internalComplete(T v, Throwable ex) {
172 if (result == null)
173 UNSAFE.compareAndSwapObject
174 (this, RESULT, null,
175 (ex == null) ? (v == null) ? NIL : v :
176 new AltResult((ex instanceof CompletionException) ? ex :
177 new CompletionException(ex)));
178 postComplete(); // help out even if not triggered
179 }
180
181 /**
182 * If triggered, helps release and/or process completions.
183 */
184 final void helpPostComplete() {
185 if (result != null)
186 postComplete();
187 }
188
189 /* ------------- waiting for completions -------------- */
190
191 /** Number of processors, for spin control */
192 static final int NCPU = Runtime.getRuntime().availableProcessors();
193
194 /**
195 * Heuristic spin value for waitingGet() before blocking on
196 * multiprocessors
197 */
198 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
199
200 /**
201 * Linked nodes to record waiting threads in a Treiber stack. See
202 * other classes such as Phaser and SynchronousQueue for more
203 * detailed explanation. This class implements ManagedBlocker to
204 * avoid starvation when blocking actions pile up in
205 * ForkJoinPools.
206 */
207 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
208 long nanos; // wait time if timed
209 final long deadline; // non-zero if timed
210 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
211 volatile Thread thread;
212 volatile WaitNode next;
213 WaitNode(boolean interruptible, long nanos, long deadline) {
214 this.thread = Thread.currentThread();
215 this.interruptControl = interruptible ? 1 : 0;
216 this.nanos = nanos;
217 this.deadline = deadline;
218 }
219 public boolean isReleasable() {
220 if (thread == null)
221 return true;
222 if (Thread.interrupted()) {
223 int i = interruptControl;
224 interruptControl = -1;
225 if (i > 0)
226 return true;
227 }
228 if (deadline != 0L &&
229 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
230 thread = null;
231 return true;
232 }
233 return false;
234 }
235 public boolean block() {
236 if (isReleasable())
237 return true;
238 else if (deadline == 0L)
239 LockSupport.park(this);
240 else if (nanos > 0L)
241 LockSupport.parkNanos(this, nanos);
242 return isReleasable();
243 }
244 }
245
246 /**
247 * Returns raw result after waiting, or null if interruptible and
248 * interrupted.
249 */
250 private Object waitingGet(boolean interruptible) {
251 WaitNode q = null;
252 boolean queued = false;
253 int spins = SPINS;
254 for (Object r;;) {
255 if ((r = result) != null) {
256 if (q != null) { // suppress unpark
257 q.thread = null;
258 if (q.interruptControl < 0) {
259 if (interruptible) {
260 removeWaiter(q);
261 return null;
262 }
263 Thread.currentThread().interrupt();
264 }
265 }
266 postComplete(); // help release others
267 return r;
268 }
269 else if (spins > 0) {
270 int rnd = ThreadLocalRandom.nextSecondarySeed();
271 if (rnd == 0)
272 rnd = ThreadLocalRandom.current().nextInt();
273 if (rnd >= 0)
274 --spins;
275 }
276 else if (q == null)
277 q = new WaitNode(interruptible, 0L, 0L);
278 else if (!queued)
279 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
280 q.next = waiters, q);
281 else if (interruptible && q.interruptControl < 0) {
282 removeWaiter(q);
283 return null;
284 }
285 else if (q.thread != null && result == null) {
286 try {
287 ForkJoinPool.managedBlock(q);
288 } catch (InterruptedException ex) {
289 q.interruptControl = -1;
290 }
291 }
292 }
293 }
294
295 /**
296 * Awaits completion or aborts on interrupt or timeout.
297 *
298 * @param nanos time to wait
299 * @return raw result
300 */
301 private Object timedAwaitDone(long nanos)
302 throws InterruptedException, TimeoutException {
303 WaitNode q = null;
304 boolean queued = false;
305 for (Object r;;) {
306 if ((r = result) != null) {
307 if (q != null) {
308 q.thread = null;
309 if (q.interruptControl < 0) {
310 removeWaiter(q);
311 throw new InterruptedException();
312 }
313 }
314 postComplete();
315 return r;
316 }
317 else if (q == null) {
318 if (nanos <= 0L)
319 throw new TimeoutException();
320 long d = System.nanoTime() + nanos;
321 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
322 }
323 else if (!queued)
324 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
325 q.next = waiters, q);
326 else if (q.interruptControl < 0) {
327 removeWaiter(q);
328 throw new InterruptedException();
329 }
330 else if (q.nanos <= 0L) {
331 if (result == null) {
332 removeWaiter(q);
333 throw new TimeoutException();
334 }
335 }
336 else if (q.thread != null && result == null) {
337 try {
338 ForkJoinPool.managedBlock(q);
339 } catch (InterruptedException ex) {
340 q.interruptControl = -1;
341 }
342 }
343 }
344 }
345
346 /**
347 * Tries to unlink a timed-out or interrupted wait node to avoid
348 * accumulating garbage. Internal nodes are simply unspliced
349 * without CAS since it is harmless if they are traversed anyway
350 * by releasers. To avoid effects of unsplicing from already
351 * removed nodes, the list is retraversed in case of an apparent
352 * race. This is slow when there are a lot of nodes, but we don't
353 * expect lists to be long enough to outweigh higher-overhead
354 * schemes.
355 */
356 private void removeWaiter(WaitNode node) {
357 if (node != null) {
358 node.thread = null;
359 retry:
360 for (;;) { // restart on removeWaiter race
361 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
362 s = q.next;
363 if (q.thread != null)
364 pred = q;
365 else if (pred != null) {
366 pred.next = s;
367 if (pred.thread == null) // check for race
368 continue retry;
369 }
370 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
371 continue retry;
372 }
373 break;
374 }
375 }
376 }
377
378 /* ------------- Async tasks -------------- */
379
380 /**
381 * A marker interface identifying asynchronous tasks produced by
382 * {@code async} methods. This may be useful for monitoring,
383 * debugging, and tracking asynchronous activities.
384 *
385 * @since 1.8
386 */
387 public static interface AsynchronousCompletionTask {
388 }
389
390 /** Base class can act as either FJ or plain Runnable */
391 abstract static class Async extends ForkJoinTask<Void>
392 implements Runnable, AsynchronousCompletionTask {
393 public final Void getRawResult() { return null; }
394 public final void setRawResult(Void v) { }
395 public final void run() { exec(); }
396 }
397
398 static final class AsyncRun extends Async {
399 final Runnable fn;
400 final CompletableFuture<Void> dst;
401 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
402 this.fn = fn; this.dst = dst;
403 }
404 public final boolean exec() {
405 CompletableFuture<Void> d; Throwable ex;
406 if ((d = this.dst) != null && d.result == null) {
407 try {
408 fn.run();
409 ex = null;
410 } catch (Throwable rex) {
411 ex = rex;
412 }
413 d.internalComplete(null, ex);
414 }
415 return true;
416 }
417 private static final long serialVersionUID = 5232453952276885070L;
418 }
419
420 static final class AsyncSupply<U> extends Async {
421 final Supplier<U> fn;
422 final CompletableFuture<U> dst;
423 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
424 this.fn = fn; this.dst = dst;
425 }
426 public final boolean exec() {
427 CompletableFuture<U> d; U u; Throwable ex;
428 if ((d = this.dst) != null && d.result == null) {
429 try {
430 u = fn.get();
431 ex = null;
432 } catch (Throwable rex) {
433 ex = rex;
434 u = null;
435 }
436 d.internalComplete(u, ex);
437 }
438 return true;
439 }
440 private static final long serialVersionUID = 5232453952276885070L;
441 }
442
443 static final class AsyncApply<T,U> extends Async {
444 final T arg;
445 final Function<? super T,? extends U> fn;
446 final CompletableFuture<U> dst;
447 AsyncApply(T arg, Function<? super T,? extends U> fn,
448 CompletableFuture<U> dst) {
449 this.arg = arg; this.fn = fn; this.dst = dst;
450 }
451 public final boolean exec() {
452 CompletableFuture<U> d; U u; Throwable ex;
453 if ((d = this.dst) != null && d.result == null) {
454 try {
455 u = fn.apply(arg);
456 ex = null;
457 } catch (Throwable rex) {
458 ex = rex;
459 u = null;
460 }
461 d.internalComplete(u, ex);
462 }
463 return true;
464 }
465 private static final long serialVersionUID = 5232453952276885070L;
466 }
467
468 static final class AsyncCombine<T,U,V> extends Async {
469 final T arg1;
470 final U arg2;
471 final BiFunction<? super T,? super U,? extends V> fn;
472 final CompletableFuture<V> dst;
473 AsyncCombine(T arg1, U arg2,
474 BiFunction<? super T,? super U,? extends V> fn,
475 CompletableFuture<V> dst) {
476 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
477 }
478 public final boolean exec() {
479 CompletableFuture<V> d; V v; Throwable ex;
480 if ((d = this.dst) != null && d.result == null) {
481 try {
482 v = fn.apply(arg1, arg2);
483 ex = null;
484 } catch (Throwable rex) {
485 ex = rex;
486 v = null;
487 }
488 d.internalComplete(v, ex);
489 }
490 return true;
491 }
492 private static final long serialVersionUID = 5232453952276885070L;
493 }
494
495 static final class AsyncAccept<T> extends Async {
496 final T arg;
497 final Consumer<? super T> fn;
498 final CompletableFuture<?> dst;
499 AsyncAccept(T arg, Consumer<? super T> fn,
500 CompletableFuture<?> dst) {
501 this.arg = arg; this.fn = fn; this.dst = dst;
502 }
503 public final boolean exec() {
504 CompletableFuture<?> d; Throwable ex;
505 if ((d = this.dst) != null && d.result == null) {
506 try {
507 fn.accept(arg);
508 ex = null;
509 } catch (Throwable rex) {
510 ex = rex;
511 }
512 d.internalComplete(null, ex);
513 }
514 return true;
515 }
516 private static final long serialVersionUID = 5232453952276885070L;
517 }
518
519 static final class AsyncAcceptBoth<T,U> extends Async {
520 final T arg1;
521 final U arg2;
522 final BiConsumer<? super T,? super U> fn;
523 final CompletableFuture<?> dst;
524 AsyncAcceptBoth(T arg1, U arg2,
525 BiConsumer<? super T,? super U> fn,
526 CompletableFuture<?> dst) {
527 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
528 }
529 public final boolean exec() {
530 CompletableFuture<?> d; Throwable ex;
531 if ((d = this.dst) != null && d.result == null) {
532 try {
533 fn.accept(arg1, arg2);
534 ex = null;
535 } catch (Throwable rex) {
536 ex = rex;
537 }
538 d.internalComplete(null, ex);
539 }
540 return true;
541 }
542 private static final long serialVersionUID = 5232453952276885070L;
543 }
544
545 static final class AsyncCompose<T,U> extends Async {
546 final T arg;
547 final Function<? super T, ? extends CompletionStage<U>> fn;
548 final CompletableFuture<U> dst;
549 AsyncCompose(T arg,
550 Function<? super T, ? extends CompletionStage<U>> fn,
551 CompletableFuture<U> dst) {
552 this.arg = arg; this.fn = fn; this.dst = dst;
553 }
554 public final boolean exec() {
555 CompletableFuture<U> d, fr; U u; Throwable ex;
556 if ((d = this.dst) != null && d.result == null) {
557 try {
558 CompletionStage<U> cs = fn.apply(arg);
559 fr = (cs == null) ? null : cs.toCompletableFuture();
560 ex = (fr == null) ? new NullPointerException() : null;
561 } catch (Throwable rex) {
562 ex = rex;
563 fr = null;
564 }
565 if (ex != null)
566 u = null;
567 else {
568 Object r = fr.result;
569 if (r == null)
570 r = fr.waitingGet(false);
571 if (r instanceof AltResult) {
572 ex = ((AltResult)r).ex;
573 u = null;
574 }
575 else {
576 @SuppressWarnings("unchecked") U ur = (U) r;
577 u = ur;
578 }
579 }
580 d.internalComplete(u, ex);
581 }
582 return true;
583 }
584 private static final long serialVersionUID = 5232453952276885070L;
585 }
586
587 /* ------------- Completions -------------- */
588
589 /**
590 * Simple linked list nodes to record completions, used in
591 * basically the same way as WaitNodes. (We separate nodes from
592 * the Completions themselves mainly because for the And and Or
593 * methods, the same Completion object resides in two lists.)
594 */
595 static final class CompletionNode {
596 final Completion completion;
597 volatile CompletionNode next;
598 CompletionNode(Completion completion) { this.completion = completion; }
599 }
600
601 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
602 abstract static class Completion extends AtomicInteger implements Runnable {
603 }
604
605 static final class ThenApply<T,U> extends Completion {
606 final CompletableFuture<? extends T> src;
607 final Function<? super T,? extends U> fn;
608 final CompletableFuture<U> dst;
609 final Executor executor;
610 ThenApply(CompletableFuture<? extends T> src,
611 Function<? super T,? extends U> fn,
612 CompletableFuture<U> dst,
613 Executor executor) {
614 this.src = src; this.fn = fn; this.dst = dst;
615 this.executor = executor;
616 }
617 public final void run() {
618 final CompletableFuture<? extends T> a;
619 final Function<? super T,? extends U> fn;
620 final CompletableFuture<U> dst;
621 Object r; T t; Throwable ex;
622 if ((dst = this.dst) != null &&
623 (fn = this.fn) != null &&
624 (a = this.src) != null &&
625 (r = a.result) != null &&
626 compareAndSet(0, 1)) {
627 if (r instanceof AltResult) {
628 ex = ((AltResult)r).ex;
629 t = null;
630 }
631 else {
632 ex = null;
633 @SuppressWarnings("unchecked") T tr = (T) r;
634 t = tr;
635 }
636 Executor e = executor;
637 U u = null;
638 if (ex == null) {
639 try {
640 if (e != null)
641 e.execute(new AsyncApply<T,U>(t, fn, dst));
642 else
643 u = fn.apply(t);
644 } catch (Throwable rex) {
645 ex = rex;
646 }
647 }
648 if (e == null || ex != null)
649 dst.internalComplete(u, ex);
650 }
651 }
652 private static final long serialVersionUID = 5232453952276885070L;
653 }
654
655 static final class ThenAccept<T> extends Completion {
656 final CompletableFuture<? extends T> src;
657 final Consumer<? super T> fn;
658 final CompletableFuture<?> dst;
659 final Executor executor;
660 ThenAccept(CompletableFuture<? extends T> src,
661 Consumer<? super T> fn,
662 CompletableFuture<?> dst,
663 Executor executor) {
664 this.src = src; this.fn = fn; this.dst = dst;
665 this.executor = executor;
666 }
667 public final void run() {
668 final CompletableFuture<? extends T> a;
669 final Consumer<? super T> fn;
670 final CompletableFuture<?> dst;
671 Object r; T t; Throwable ex;
672 if ((dst = this.dst) != null &&
673 (fn = this.fn) != null &&
674 (a = this.src) != null &&
675 (r = a.result) != null &&
676 compareAndSet(0, 1)) {
677 if (r instanceof AltResult) {
678 ex = ((AltResult)r).ex;
679 t = null;
680 }
681 else {
682 ex = null;
683 @SuppressWarnings("unchecked") T tr = (T) r;
684 t = tr;
685 }
686 Executor e = executor;
687 if (ex == null) {
688 try {
689 if (e != null)
690 e.execute(new AsyncAccept<T>(t, fn, dst));
691 else
692 fn.accept(t);
693 } catch (Throwable rex) {
694 ex = rex;
695 }
696 }
697 if (e == null || ex != null)
698 dst.internalComplete(null, ex);
699 }
700 }
701 private static final long serialVersionUID = 5232453952276885070L;
702 }
703
704 static final class ThenRun extends Completion {
705 final CompletableFuture<?> src;
706 final Runnable fn;
707 final CompletableFuture<Void> dst;
708 final Executor executor;
709 ThenRun(CompletableFuture<?> src,
710 Runnable fn,
711 CompletableFuture<Void> dst,
712 Executor executor) {
713 this.src = src; this.fn = fn; this.dst = dst;
714 this.executor = executor;
715 }
716 public final void run() {
717 final CompletableFuture<?> a;
718 final Runnable fn;
719 final CompletableFuture<Void> dst;
720 Object r; Throwable ex;
721 if ((dst = this.dst) != null &&
722 (fn = this.fn) != null &&
723 (a = this.src) != null &&
724 (r = a.result) != null &&
725 compareAndSet(0, 1)) {
726 if (r instanceof AltResult)
727 ex = ((AltResult)r).ex;
728 else
729 ex = null;
730 Executor e = executor;
731 if (ex == null) {
732 try {
733 if (e != null)
734 e.execute(new AsyncRun(fn, dst));
735 else
736 fn.run();
737 } catch (Throwable rex) {
738 ex = rex;
739 }
740 }
741 if (e == null || ex != null)
742 dst.internalComplete(null, ex);
743 }
744 }
745 private static final long serialVersionUID = 5232453952276885070L;
746 }
747
748 static final class ThenCombine<T,U,V> extends Completion {
749 final CompletableFuture<? extends T> src;
750 final CompletableFuture<? extends U> snd;
751 final BiFunction<? super T,? super U,? extends V> fn;
752 final CompletableFuture<V> dst;
753 final Executor executor;
754 ThenCombine(CompletableFuture<? extends T> src,
755 CompletableFuture<? extends U> snd,
756 BiFunction<? super T,? super U,? extends V> fn,
757 CompletableFuture<V> dst,
758 Executor executor) {
759 this.src = src; this.snd = snd;
760 this.fn = fn; this.dst = dst;
761 this.executor = executor;
762 }
763 public final void run() {
764 final CompletableFuture<? extends T> a;
765 final CompletableFuture<? extends U> b;
766 final BiFunction<? super T,? super U,? extends V> fn;
767 final CompletableFuture<V> dst;
768 Object r, s; T t; U u; Throwable ex;
769 if ((dst = this.dst) != null &&
770 (fn = this.fn) != null &&
771 (a = this.src) != null &&
772 (r = a.result) != null &&
773 (b = this.snd) != null &&
774 (s = b.result) != null &&
775 compareAndSet(0, 1)) {
776 if (r instanceof AltResult) {
777 ex = ((AltResult)r).ex;
778 t = null;
779 }
780 else {
781 ex = null;
782 @SuppressWarnings("unchecked") T tr = (T) r;
783 t = tr;
784 }
785 if (ex != null)
786 u = null;
787 else if (s instanceof AltResult) {
788 ex = ((AltResult)s).ex;
789 u = null;
790 }
791 else {
792 @SuppressWarnings("unchecked") U us = (U) s;
793 u = us;
794 }
795 Executor e = executor;
796 V v = null;
797 if (ex == null) {
798 try {
799 if (e != null)
800 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
801 else
802 v = fn.apply(t, u);
803 } catch (Throwable rex) {
804 ex = rex;
805 }
806 }
807 if (e == null || ex != null)
808 dst.internalComplete(v, ex);
809 }
810 }
811 private static final long serialVersionUID = 5232453952276885070L;
812 }
813
814 static final class ThenAcceptBoth<T,U> extends Completion {
815 final CompletableFuture<? extends T> src;
816 final CompletableFuture<? extends U> snd;
817 final BiConsumer<? super T,? super U> fn;
818 final CompletableFuture<Void> dst;
819 final Executor executor;
820 ThenAcceptBoth(CompletableFuture<? extends T> src,
821 CompletableFuture<? extends U> snd,
822 BiConsumer<? super T,? super U> fn,
823 CompletableFuture<Void> dst,
824 Executor executor) {
825 this.src = src; this.snd = snd;
826 this.fn = fn; this.dst = dst;
827 this.executor = executor;
828 }
829 public final void run() {
830 final CompletableFuture<? extends T> a;
831 final CompletableFuture<? extends U> b;
832 final BiConsumer<? super T,? super U> fn;
833 final CompletableFuture<Void> dst;
834 Object r, s; T t; U u; Throwable ex;
835 if ((dst = this.dst) != null &&
836 (fn = this.fn) != null &&
837 (a = this.src) != null &&
838 (r = a.result) != null &&
839 (b = this.snd) != null &&
840 (s = b.result) != null &&
841 compareAndSet(0, 1)) {
842 if (r instanceof AltResult) {
843 ex = ((AltResult)r).ex;
844 t = null;
845 }
846 else {
847 ex = null;
848 @SuppressWarnings("unchecked") T tr = (T) r;
849 t = tr;
850 }
851 if (ex != null)
852 u = null;
853 else if (s instanceof AltResult) {
854 ex = ((AltResult)s).ex;
855 u = null;
856 }
857 else {
858 @SuppressWarnings("unchecked") U us = (U) s;
859 u = us;
860 }
861 Executor e = executor;
862 if (ex == null) {
863 try {
864 if (e != null)
865 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
866 else
867 fn.accept(t, u);
868 } catch (Throwable rex) {
869 ex = rex;
870 }
871 }
872 if (e == null || ex != null)
873 dst.internalComplete(null, ex);
874 }
875 }
876 private static final long serialVersionUID = 5232453952276885070L;
877 }
878
879 static final class RunAfterBoth extends Completion {
880 final CompletableFuture<?> src;
881 final CompletableFuture<?> snd;
882 final Runnable fn;
883 final CompletableFuture<Void> dst;
884 final Executor executor;
885 RunAfterBoth(CompletableFuture<?> src,
886 CompletableFuture<?> snd,
887 Runnable fn,
888 CompletableFuture<Void> dst,
889 Executor executor) {
890 this.src = src; this.snd = snd;
891 this.fn = fn; this.dst = dst;
892 this.executor = executor;
893 }
894 public final void run() {
895 final CompletableFuture<?> a;
896 final CompletableFuture<?> b;
897 final Runnable fn;
898 final CompletableFuture<Void> dst;
899 Object r, s; Throwable ex;
900 if ((dst = this.dst) != null &&
901 (fn = this.fn) != null &&
902 (a = this.src) != null &&
903 (r = a.result) != null &&
904 (b = this.snd) != null &&
905 (s = b.result) != null &&
906 compareAndSet(0, 1)) {
907 if (r instanceof AltResult)
908 ex = ((AltResult)r).ex;
909 else
910 ex = null;
911 if (ex == null && (s instanceof AltResult))
912 ex = ((AltResult)s).ex;
913 Executor e = executor;
914 if (ex == null) {
915 try {
916 if (e != null)
917 e.execute(new AsyncRun(fn, dst));
918 else
919 fn.run();
920 } catch (Throwable rex) {
921 ex = rex;
922 }
923 }
924 if (e == null || ex != null)
925 dst.internalComplete(null, ex);
926 }
927 }
928 private static final long serialVersionUID = 5232453952276885070L;
929 }
930
931 static final class AndCompletion extends Completion {
932 final CompletableFuture<?> src;
933 final CompletableFuture<?> snd;
934 final CompletableFuture<Void> dst;
935 AndCompletion(CompletableFuture<?> src,
936 CompletableFuture<?> snd,
937 CompletableFuture<Void> dst) {
938 this.src = src; this.snd = snd; this.dst = dst;
939 }
940 public final void run() {
941 final CompletableFuture<?> a;
942 final CompletableFuture<?> b;
943 final CompletableFuture<Void> dst;
944 Object r, s; Throwable ex;
945 if ((dst = this.dst) != null &&
946 (a = this.src) != null &&
947 (r = a.result) != null &&
948 (b = this.snd) != null &&
949 (s = b.result) != null &&
950 compareAndSet(0, 1)) {
951 if (r instanceof AltResult)
952 ex = ((AltResult)r).ex;
953 else
954 ex = null;
955 if (ex == null && (s instanceof AltResult))
956 ex = ((AltResult)s).ex;
957 dst.internalComplete(null, ex);
958 }
959 }
960 private static final long serialVersionUID = 5232453952276885070L;
961 }
962
963 static final class ApplyToEither<T,U> extends Completion {
964 final CompletableFuture<? extends T> src;
965 final CompletableFuture<? extends T> snd;
966 final Function<? super T,? extends U> fn;
967 final CompletableFuture<U> dst;
968 final Executor executor;
969 ApplyToEither(CompletableFuture<? extends T> src,
970 CompletableFuture<? extends T> snd,
971 Function<? super T,? extends U> fn,
972 CompletableFuture<U> dst,
973 Executor executor) {
974 this.src = src; this.snd = snd;
975 this.fn = fn; this.dst = dst;
976 this.executor = executor;
977 }
978 public final void run() {
979 final CompletableFuture<? extends T> a;
980 final CompletableFuture<? extends T> b;
981 final Function<? super T,? extends U> fn;
982 final CompletableFuture<U> dst;
983 Object r; T t; Throwable ex;
984 if ((dst = this.dst) != null &&
985 (fn = this.fn) != null &&
986 (((a = this.src) != null && (r = a.result) != null) ||
987 ((b = this.snd) != null && (r = b.result) != null)) &&
988 compareAndSet(0, 1)) {
989 if (r instanceof AltResult) {
990 ex = ((AltResult)r).ex;
991 t = null;
992 }
993 else {
994 ex = null;
995 @SuppressWarnings("unchecked") T tr = (T) r;
996 t = tr;
997 }
998 Executor e = executor;
999 U u = null;
1000 if (ex == null) {
1001 try {
1002 if (e != null)
1003 e.execute(new AsyncApply<T,U>(t, fn, dst));
1004 else
1005 u = fn.apply(t);
1006 } catch (Throwable rex) {
1007 ex = rex;
1008 }
1009 }
1010 if (e == null || ex != null)
1011 dst.internalComplete(u, ex);
1012 }
1013 }
1014 private static final long serialVersionUID = 5232453952276885070L;
1015 }
1016
1017 static final class AcceptEither<T> extends Completion {
1018 final CompletableFuture<? extends T> src;
1019 final CompletableFuture<? extends T> snd;
1020 final Consumer<? super T> fn;
1021 final CompletableFuture<Void> dst;
1022 final Executor executor;
1023 AcceptEither(CompletableFuture<? extends T> src,
1024 CompletableFuture<? extends T> snd,
1025 Consumer<? super T> fn,
1026 CompletableFuture<Void> dst,
1027 Executor executor) {
1028 this.src = src; this.snd = snd;
1029 this.fn = fn; this.dst = dst;
1030 this.executor = executor;
1031 }
1032 public final void run() {
1033 final CompletableFuture<? extends T> a;
1034 final CompletableFuture<? extends T> b;
1035 final Consumer<? super T> fn;
1036 final CompletableFuture<Void> dst;
1037 Object r; T t; Throwable ex;
1038 if ((dst = this.dst) != null &&
1039 (fn = this.fn) != null &&
1040 (((a = this.src) != null && (r = a.result) != null) ||
1041 ((b = this.snd) != null && (r = b.result) != null)) &&
1042 compareAndSet(0, 1)) {
1043 if (r instanceof AltResult) {
1044 ex = ((AltResult)r).ex;
1045 t = null;
1046 }
1047 else {
1048 ex = null;
1049 @SuppressWarnings("unchecked") T tr = (T) r;
1050 t = tr;
1051 }
1052 Executor e = executor;
1053 if (ex == null) {
1054 try {
1055 if (e != null)
1056 e.execute(new AsyncAccept<T>(t, fn, dst));
1057 else
1058 fn.accept(t);
1059 } catch (Throwable rex) {
1060 ex = rex;
1061 }
1062 }
1063 if (e == null || ex != null)
1064 dst.internalComplete(null, ex);
1065 }
1066 }
1067 private static final long serialVersionUID = 5232453952276885070L;
1068 }
1069
1070 static final class RunAfterEither extends Completion {
1071 final CompletableFuture<?> src;
1072 final CompletableFuture<?> snd;
1073 final Runnable fn;
1074 final CompletableFuture<Void> dst;
1075 final Executor executor;
1076 RunAfterEither(CompletableFuture<?> src,
1077 CompletableFuture<?> snd,
1078 Runnable fn,
1079 CompletableFuture<Void> dst,
1080 Executor executor) {
1081 this.src = src; this.snd = snd;
1082 this.fn = fn; this.dst = dst;
1083 this.executor = executor;
1084 }
1085 public final void run() {
1086 final CompletableFuture<?> a;
1087 final CompletableFuture<?> b;
1088 final Runnable fn;
1089 final CompletableFuture<Void> dst;
1090 Object r; Throwable ex;
1091 if ((dst = this.dst) != null &&
1092 (fn = this.fn) != null &&
1093 (((a = this.src) != null && (r = a.result) != null) ||
1094 ((b = this.snd) != null && (r = b.result) != null)) &&
1095 compareAndSet(0, 1)) {
1096 if (r instanceof AltResult)
1097 ex = ((AltResult)r).ex;
1098 else
1099 ex = null;
1100 Executor e = executor;
1101 if (ex == null) {
1102 try {
1103 if (e != null)
1104 e.execute(new AsyncRun(fn, dst));
1105 else
1106 fn.run();
1107 } catch (Throwable rex) {
1108 ex = rex;
1109 }
1110 }
1111 if (e == null || ex != null)
1112 dst.internalComplete(null, ex);
1113 }
1114 }
1115 private static final long serialVersionUID = 5232453952276885070L;
1116 }
1117
1118 static final class OrCompletion extends Completion {
1119 final CompletableFuture<?> src;
1120 final CompletableFuture<?> snd;
1121 final CompletableFuture<Object> dst;
1122 OrCompletion(CompletableFuture<?> src,
1123 CompletableFuture<?> snd,
1124 CompletableFuture<Object> dst) {
1125 this.src = src; this.snd = snd; this.dst = dst;
1126 }
1127 public final void run() {
1128 final CompletableFuture<?> a;
1129 final CompletableFuture<?> b;
1130 final CompletableFuture<Object> dst;
1131 Object r, t; Throwable ex;
1132 if ((dst = this.dst) != null &&
1133 (((a = this.src) != null && (r = a.result) != null) ||
1134 ((b = this.snd) != null && (r = b.result) != null)) &&
1135 compareAndSet(0, 1)) {
1136 if (r instanceof AltResult) {
1137 ex = ((AltResult)r).ex;
1138 t = null;
1139 }
1140 else {
1141 ex = null;
1142 t = r;
1143 }
1144 dst.internalComplete(t, ex);
1145 }
1146 }
1147 private static final long serialVersionUID = 5232453952276885070L;
1148 }
1149
1150 static final class ExceptionCompletion<T> extends Completion {
1151 final CompletableFuture<? extends T> src;
1152 final Function<? super Throwable, ? extends T> fn;
1153 final CompletableFuture<T> dst;
1154 ExceptionCompletion(CompletableFuture<? extends T> src,
1155 Function<? super Throwable, ? extends T> fn,
1156 CompletableFuture<T> dst) {
1157 this.src = src; this.fn = fn; this.dst = dst;
1158 }
1159 public final void run() {
1160 final CompletableFuture<? extends T> a;
1161 final Function<? super Throwable, ? extends T> fn;
1162 final CompletableFuture<T> dst;
1163 Object r; T t = null; Throwable ex, dx = null;
1164 if ((dst = this.dst) != null &&
1165 (fn = this.fn) != null &&
1166 (a = this.src) != null &&
1167 (r = a.result) != null &&
1168 compareAndSet(0, 1)) {
1169 if ((r instanceof AltResult) &&
1170 (ex = ((AltResult)r).ex) != null) {
1171 try {
1172 t = fn.apply(ex);
1173 } catch (Throwable rex) {
1174 dx = rex;
1175 }
1176 }
1177 else {
1178 @SuppressWarnings("unchecked") T tr = (T) r;
1179 t = tr;
1180 }
1181 dst.internalComplete(t, dx);
1182 }
1183 }
1184 private static final long serialVersionUID = 5232453952276885070L;
1185 }
1186
1187 static final class WhenCompleteCompletion<T> extends Completion {
1188 final CompletableFuture<? extends T> src;
1189 final BiConsumer<? super T, ? super Throwable> fn;
1190 final CompletableFuture<T> dst;
1191 final Executor executor;
1192 WhenCompleteCompletion(CompletableFuture<? extends T> src,
1193 BiConsumer<? super T, ? super Throwable> fn,
1194 CompletableFuture<T> dst,
1195 Executor executor) {
1196 this.src = src; this.fn = fn; this.dst = dst;
1197 this.executor = executor;
1198 }
1199 public final void run() {
1200 final CompletableFuture<? extends T> a;
1201 final BiConsumer<? super T, ? super Throwable> fn;
1202 final CompletableFuture<T> dst;
1203 Object r; T t; Throwable ex;
1204 if ((dst = this.dst) != null &&
1205 (fn = this.fn) != null &&
1206 (a = this.src) != null &&
1207 (r = a.result) != null &&
1208 compareAndSet(0, 1)) {
1209 if (r instanceof AltResult) {
1210 ex = ((AltResult)r).ex;
1211 t = null;
1212 }
1213 else {
1214 ex = null;
1215 @SuppressWarnings("unchecked") T tr = (T) r;
1216 t = tr;
1217 }
1218 Executor e = executor;
1219 Throwable dx = null;
1220 try {
1221 if (e != null)
1222 e.execute(new AsyncAcceptBoth<T,Throwable>(t, ex, fn, dst));
1223 else
1224 fn.accept(t, ex);
1225 } catch (Throwable rex) {
1226 dx = rex;
1227 }
1228 if (e == null || dx != null)
1229 dst.internalComplete(t, ex != null ? ex : dx);
1230 }
1231 }
1232 private static final long serialVersionUID = 5232453952276885070L;
1233 }
1234
1235 static final class ThenCopy<T> extends Completion {
1236 final CompletableFuture<?> src;
1237 final CompletableFuture<T> dst;
1238 ThenCopy(CompletableFuture<?> src,
1239 CompletableFuture<T> dst) {
1240 this.src = src; this.dst = dst;
1241 }
1242 public final void run() {
1243 final CompletableFuture<?> a;
1244 final CompletableFuture<T> dst;
1245 Object r; T t; Throwable ex;
1246 if ((dst = this.dst) != null &&
1247 (a = this.src) != null &&
1248 (r = a.result) != null &&
1249 compareAndSet(0, 1)) {
1250 if (r instanceof AltResult) {
1251 ex = ((AltResult)r).ex;
1252 t = null;
1253 }
1254 else {
1255 ex = null;
1256 @SuppressWarnings("unchecked") T tr = (T) r;
1257 t = tr;
1258 }
1259 dst.internalComplete(t, ex);
1260 }
1261 }
1262 private static final long serialVersionUID = 5232453952276885070L;
1263 }
1264
1265 // version of ThenCopy for CompletableFuture<Void> dst
1266 static final class ThenPropagate extends Completion {
1267 final CompletableFuture<?> src;
1268 final CompletableFuture<Void> dst;
1269 ThenPropagate(CompletableFuture<?> src,
1270 CompletableFuture<Void> dst) {
1271 this.src = src; this.dst = dst;
1272 }
1273 public final void run() {
1274 final CompletableFuture<?> a;
1275 final CompletableFuture<Void> dst;
1276 Object r; Throwable ex;
1277 if ((dst = this.dst) != null &&
1278 (a = this.src) != null &&
1279 (r = a.result) != null &&
1280 compareAndSet(0, 1)) {
1281 if (r instanceof AltResult)
1282 ex = ((AltResult)r).ex;
1283 else
1284 ex = null;
1285 dst.internalComplete(null, ex);
1286 }
1287 }
1288 private static final long serialVersionUID = 5232453952276885070L;
1289 }
1290
1291 static final class HandleCompletion<T,U> extends Completion {
1292 final CompletableFuture<? extends T> src;
1293 final BiFunction<? super T, Throwable, ? extends U> fn;
1294 final CompletableFuture<U> dst;
1295 final Executor executor;
1296 HandleCompletion(CompletableFuture<? extends T> src,
1297 BiFunction<? super T, Throwable, ? extends U> fn,
1298 CompletableFuture<U> dst,
1299 Executor executor) {
1300 this.src = src; this.fn = fn; this.dst = dst;
1301 this.executor = executor;
1302 }
1303 public final void run() {
1304 final CompletableFuture<? extends T> a;
1305 final BiFunction<? super T, Throwable, ? extends U> fn;
1306 final CompletableFuture<U> dst;
1307 Object r; T t; Throwable ex;
1308 if ((dst = this.dst) != null &&
1309 (fn = this.fn) != null &&
1310 (a = this.src) != null &&
1311 (r = a.result) != null &&
1312 compareAndSet(0, 1)) {
1313 if (r instanceof AltResult) {
1314 ex = ((AltResult)r).ex;
1315 t = null;
1316 }
1317 else {
1318 ex = null;
1319 @SuppressWarnings("unchecked") T tr = (T) r;
1320 t = tr;
1321 }
1322 Executor e = executor;
1323 U u = null;
1324 Throwable dx = null;
1325 try {
1326 if (e != null)
1327 e.execute(new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1328 else
1329 u = fn.apply(t, ex);
1330 } catch (Throwable rex) {
1331 dx = rex;
1332 }
1333 if (e == null || dx != null)
1334 dst.internalComplete(u, dx);
1335 dst.internalComplete(u, dx);
1336 }
1337 }
1338 private static final long serialVersionUID = 5232453952276885070L;
1339 }
1340
1341 static final class ThenCompose<T,U> extends Completion {
1342 final CompletableFuture<? extends T> src;
1343 final Function<? super T, ? extends CompletionStage<U>> fn;
1344 final CompletableFuture<U> dst;
1345 final Executor executor;
1346 ThenCompose(CompletableFuture<? extends T> src,
1347 Function<? super T, ? extends CompletionStage<U>> fn,
1348 CompletableFuture<U> dst,
1349 Executor executor) {
1350 this.src = src; this.fn = fn; this.dst = dst;
1351 this.executor = executor;
1352 }
1353 public final void run() {
1354 final CompletableFuture<? extends T> a;
1355 final Function<? super T, ? extends CompletionStage<U>> fn;
1356 final CompletableFuture<U> dst;
1357 Object r; T t; Throwable ex; Executor e;
1358 if ((dst = this.dst) != null &&
1359 (fn = this.fn) != null &&
1360 (a = this.src) != null &&
1361 (r = a.result) != null &&
1362 compareAndSet(0, 1)) {
1363 if (r instanceof AltResult) {
1364 ex = ((AltResult)r).ex;
1365 t = null;
1366 }
1367 else {
1368 ex = null;
1369 @SuppressWarnings("unchecked") T tr = (T) r;
1370 t = tr;
1371 }
1372 CompletableFuture<U> c = null;
1373 U u = null;
1374 boolean complete = false;
1375 if (ex == null) {
1376 if ((e = executor) != null)
1377 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1378 else {
1379 try {
1380 CompletionStage<U> cs = fn.apply(t);
1381 c = (cs == null) ? null : cs.toCompletableFuture();
1382 if (c == null)
1383 ex = new NullPointerException();
1384 } catch (Throwable rex) {
1385 ex = rex;
1386 }
1387 }
1388 }
1389 if (c != null) {
1390 ThenCopy<U> d = null;
1391 Object s;
1392 if ((s = c.result) == null) {
1393 CompletionNode p = new CompletionNode
1394 (d = new ThenCopy<U>(c, dst));
1395 while ((s = c.result) == null) {
1396 if (UNSAFE.compareAndSwapObject
1397 (c, COMPLETIONS, p.next = c.completions, p))
1398 break;
1399 }
1400 }
1401 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1402 complete = true;
1403 if (s instanceof AltResult) {
1404 ex = ((AltResult)s).ex; // no rewrap
1405 u = null;
1406 }
1407 else {
1408 @SuppressWarnings("unchecked") U us = (U) s;
1409 u = us;
1410 }
1411 }
1412 }
1413 if (complete || ex != null)
1414 dst.internalComplete(u, ex);
1415 if (c != null)
1416 c.helpPostComplete();
1417 }
1418 }
1419 private static final long serialVersionUID = 5232453952276885070L;
1420 }
1421
1422 // Implementations of stage methods with (plain, async, Executor) forms
1423
1424 private <U> CompletableFuture<U> doThenApply
1425 (Function<? super T,? extends U> fn,
1426 Executor e) {
1427 if (fn == null) throw new NullPointerException();
1428 CompletableFuture<U> dst = new CompletableFuture<U>();
1429 ThenApply<T,U> d = null;
1430 Object r;
1431 if ((r = result) == null) {
1432 CompletionNode p = new CompletionNode
1433 (d = new ThenApply<T,U>(this, fn, dst, e));
1434 while ((r = result) == null) {
1435 if (UNSAFE.compareAndSwapObject
1436 (this, COMPLETIONS, p.next = completions, p))
1437 break;
1438 }
1439 }
1440 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1441 T t; Throwable ex;
1442 if (r instanceof AltResult) {
1443 ex = ((AltResult)r).ex;
1444 t = null;
1445 }
1446 else {
1447 ex = null;
1448 @SuppressWarnings("unchecked") T tr = (T) r;
1449 t = tr;
1450 }
1451 U u = null;
1452 if (ex == null) {
1453 try {
1454 if (e != null)
1455 e.execute(new AsyncApply<T,U>(t, fn, dst));
1456 else
1457 u = fn.apply(t);
1458 } catch (Throwable rex) {
1459 ex = rex;
1460 }
1461 }
1462 if (e == null || ex != null)
1463 dst.internalComplete(u, ex);
1464 }
1465 helpPostComplete();
1466 return dst;
1467 }
1468
1469 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1470 Executor e) {
1471 if (fn == null) throw new NullPointerException();
1472 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1473 ThenAccept<T> d = null;
1474 Object r;
1475 if ((r = result) == null) {
1476 CompletionNode p = new CompletionNode
1477 (d = new ThenAccept<T>(this, fn, dst, e));
1478 while ((r = result) == null) {
1479 if (UNSAFE.compareAndSwapObject
1480 (this, COMPLETIONS, p.next = completions, p))
1481 break;
1482 }
1483 }
1484 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1485 T t; Throwable ex;
1486 if (r instanceof AltResult) {
1487 ex = ((AltResult)r).ex;
1488 t = null;
1489 }
1490 else {
1491 ex = null;
1492 @SuppressWarnings("unchecked") T tr = (T) r;
1493 t = tr;
1494 }
1495 if (ex == null) {
1496 try {
1497 if (e != null)
1498 e.execute(new AsyncAccept<T>(t, fn, dst));
1499 else
1500 fn.accept(t);
1501 } catch (Throwable rex) {
1502 ex = rex;
1503 }
1504 }
1505 if (e == null || ex != null)
1506 dst.internalComplete(null, ex);
1507 }
1508 helpPostComplete();
1509 return dst;
1510 }
1511
1512 private CompletableFuture<Void> doThenRun(Runnable action,
1513 Executor e) {
1514 if (action == null) throw new NullPointerException();
1515 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1516 ThenRun d = null;
1517 Object r;
1518 if ((r = result) == null) {
1519 CompletionNode p = new CompletionNode
1520 (d = new ThenRun(this, action, dst, e));
1521 while ((r = result) == null) {
1522 if (UNSAFE.compareAndSwapObject
1523 (this, COMPLETIONS, p.next = completions, p))
1524 break;
1525 }
1526 }
1527 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1528 Throwable ex;
1529 if (r instanceof AltResult)
1530 ex = ((AltResult)r).ex;
1531 else
1532 ex = null;
1533 if (ex == null) {
1534 try {
1535 if (e != null)
1536 e.execute(new AsyncRun(action, dst));
1537 else
1538 action.run();
1539 } catch (Throwable rex) {
1540 ex = rex;
1541 }
1542 }
1543 if (e == null || ex != null)
1544 dst.internalComplete(null, ex);
1545 }
1546 helpPostComplete();
1547 return dst;
1548 }
1549
1550 private <U,V> CompletableFuture<V> doThenCombine
1551 (CompletableFuture<? extends U> other,
1552 BiFunction<? super T,? super U,? extends V> fn,
1553 Executor e) {
1554 if (other == null || fn == null) throw new NullPointerException();
1555 CompletableFuture<V> dst = new CompletableFuture<V>();
1556 ThenCombine<T,U,V> d = null;
1557 Object r, s = null;
1558 if ((r = result) == null || (s = other.result) == null) {
1559 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
1560 CompletionNode q = null, p = new CompletionNode(d);
1561 while ((r == null && (r = result) == null) ||
1562 (s == null && (s = other.result) == null)) {
1563 if (q != null) {
1564 if (s != null ||
1565 UNSAFE.compareAndSwapObject
1566 (other, COMPLETIONS, q.next = other.completions, q))
1567 break;
1568 }
1569 else if (r != null ||
1570 UNSAFE.compareAndSwapObject
1571 (this, COMPLETIONS, p.next = completions, p)) {
1572 if (s != null)
1573 break;
1574 q = new CompletionNode(d);
1575 }
1576 }
1577 }
1578 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1579 T t; U u; Throwable ex;
1580 if (r instanceof AltResult) {
1581 ex = ((AltResult)r).ex;
1582 t = null;
1583 }
1584 else {
1585 ex = null;
1586 @SuppressWarnings("unchecked") T tr = (T) r;
1587 t = tr;
1588 }
1589 if (ex != null)
1590 u = null;
1591 else if (s instanceof AltResult) {
1592 ex = ((AltResult)s).ex;
1593 u = null;
1594 }
1595 else {
1596 @SuppressWarnings("unchecked") U us = (U) s;
1597 u = us;
1598 }
1599 V v = null;
1600 if (ex == null) {
1601 try {
1602 if (e != null)
1603 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
1604 else
1605 v = fn.apply(t, u);
1606 } catch (Throwable rex) {
1607 ex = rex;
1608 }
1609 }
1610 if (e == null || ex != null)
1611 dst.internalComplete(v, ex);
1612 }
1613 helpPostComplete();
1614 other.helpPostComplete();
1615 return dst;
1616 }
1617
1618 private <U> CompletableFuture<Void> doThenAcceptBoth
1619 (CompletableFuture<? extends U> other,
1620 BiConsumer<? super T,? super U> fn,
1621 Executor e) {
1622 if (other == null || fn == null) throw new NullPointerException();
1623 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1624 ThenAcceptBoth<T,U> d = null;
1625 Object r, s = null;
1626 if ((r = result) == null || (s = other.result) == null) {
1627 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1628 CompletionNode q = null, p = new CompletionNode(d);
1629 while ((r == null && (r = result) == null) ||
1630 (s == null && (s = other.result) == null)) {
1631 if (q != null) {
1632 if (s != null ||
1633 UNSAFE.compareAndSwapObject
1634 (other, COMPLETIONS, q.next = other.completions, q))
1635 break;
1636 }
1637 else if (r != null ||
1638 UNSAFE.compareAndSwapObject
1639 (this, COMPLETIONS, p.next = completions, p)) {
1640 if (s != null)
1641 break;
1642 q = new CompletionNode(d);
1643 }
1644 }
1645 }
1646 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1647 T t; U u; Throwable ex;
1648 if (r instanceof AltResult) {
1649 ex = ((AltResult)r).ex;
1650 t = null;
1651 }
1652 else {
1653 ex = null;
1654 @SuppressWarnings("unchecked") T tr = (T) r;
1655 t = tr;
1656 }
1657 if (ex != null)
1658 u = null;
1659 else if (s instanceof AltResult) {
1660 ex = ((AltResult)s).ex;
1661 u = null;
1662 }
1663 else {
1664 @SuppressWarnings("unchecked") U us = (U) s;
1665 u = us;
1666 }
1667 if (ex == null) {
1668 try {
1669 if (e != null)
1670 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1671 else
1672 fn.accept(t, u);
1673 } catch (Throwable rex) {
1674 ex = rex;
1675 }
1676 }
1677 if (e == null || ex != null)
1678 dst.internalComplete(null, ex);
1679 }
1680 helpPostComplete();
1681 other.helpPostComplete();
1682 return dst;
1683 }
1684
1685 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1686 Runnable action,
1687 Executor e) {
1688 if (other == null || action == null) throw new NullPointerException();
1689 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1690 RunAfterBoth d = null;
1691 Object r, s = null;
1692 if ((r = result) == null || (s = other.result) == null) {
1693 d = new RunAfterBoth(this, other, action, dst, e);
1694 CompletionNode q = null, p = new CompletionNode(d);
1695 while ((r == null && (r = result) == null) ||
1696 (s == null && (s = other.result) == null)) {
1697 if (q != null) {
1698 if (s != null ||
1699 UNSAFE.compareAndSwapObject
1700 (other, COMPLETIONS, q.next = other.completions, q))
1701 break;
1702 }
1703 else if (r != null ||
1704 UNSAFE.compareAndSwapObject
1705 (this, COMPLETIONS, p.next = completions, p)) {
1706 if (s != null)
1707 break;
1708 q = new CompletionNode(d);
1709 }
1710 }
1711 }
1712 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1713 Throwable ex;
1714 if (r instanceof AltResult)
1715 ex = ((AltResult)r).ex;
1716 else
1717 ex = null;
1718 if (ex == null && (s instanceof AltResult))
1719 ex = ((AltResult)s).ex;
1720 if (ex == null) {
1721 try {
1722 if (e != null)
1723 e.execute(new AsyncRun(action, dst));
1724 else
1725 action.run();
1726 } catch (Throwable rex) {
1727 ex = rex;
1728 }
1729 }
1730 if (e == null || ex != null)
1731 dst.internalComplete(null, ex);
1732 }
1733 helpPostComplete();
1734 other.helpPostComplete();
1735 return dst;
1736 }
1737
1738 private <U> CompletableFuture<U> doApplyToEither
1739 (CompletableFuture<? extends T> other,
1740 Function<? super T, U> fn,
1741 Executor e) {
1742 if (other == null || fn == null) throw new NullPointerException();
1743 CompletableFuture<U> dst = new CompletableFuture<U>();
1744 ApplyToEither<T,U> d = null;
1745 Object r;
1746 if ((r = result) == null && (r = other.result) == null) {
1747 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
1748 CompletionNode q = null, p = new CompletionNode(d);
1749 while ((r = result) == null && (r = other.result) == null) {
1750 if (q != null) {
1751 if (UNSAFE.compareAndSwapObject
1752 (other, COMPLETIONS, q.next = other.completions, q))
1753 break;
1754 }
1755 else if (UNSAFE.compareAndSwapObject
1756 (this, COMPLETIONS, p.next = completions, p))
1757 q = new CompletionNode(d);
1758 }
1759 }
1760 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1761 T t; Throwable ex;
1762 if (r instanceof AltResult) {
1763 ex = ((AltResult)r).ex;
1764 t = null;
1765 }
1766 else {
1767 ex = null;
1768 @SuppressWarnings("unchecked") T tr = (T) r;
1769 t = tr;
1770 }
1771 U u = null;
1772 if (ex == null) {
1773 try {
1774 if (e != null)
1775 e.execute(new AsyncApply<T,U>(t, fn, dst));
1776 else
1777 u = fn.apply(t);
1778 } catch (Throwable rex) {
1779 ex = rex;
1780 }
1781 }
1782 if (e == null || ex != null)
1783 dst.internalComplete(u, ex);
1784 }
1785 helpPostComplete();
1786 other.helpPostComplete();
1787 return dst;
1788 }
1789
1790 private CompletableFuture<Void> doAcceptEither
1791 (CompletableFuture<? extends T> other,
1792 Consumer<? super T> fn,
1793 Executor e) {
1794 if (other == null || fn == null) throw new NullPointerException();
1795 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1796 AcceptEither<T> d = null;
1797 Object r;
1798 if ((r = result) == null && (r = other.result) == null) {
1799 d = new AcceptEither<T>(this, other, fn, dst, e);
1800 CompletionNode q = null, p = new CompletionNode(d);
1801 while ((r = result) == null && (r = other.result) == null) {
1802 if (q != null) {
1803 if (UNSAFE.compareAndSwapObject
1804 (other, COMPLETIONS, q.next = other.completions, q))
1805 break;
1806 }
1807 else if (UNSAFE.compareAndSwapObject
1808 (this, COMPLETIONS, p.next = completions, p))
1809 q = new CompletionNode(d);
1810 }
1811 }
1812 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1813 T t; Throwable ex;
1814 if (r instanceof AltResult) {
1815 ex = ((AltResult)r).ex;
1816 t = null;
1817 }
1818 else {
1819 ex = null;
1820 @SuppressWarnings("unchecked") T tr = (T) r;
1821 t = tr;
1822 }
1823 if (ex == null) {
1824 try {
1825 if (e != null)
1826 e.execute(new AsyncAccept<T>(t, fn, dst));
1827 else
1828 fn.accept(t);
1829 } catch (Throwable rex) {
1830 ex = rex;
1831 }
1832 }
1833 if (e == null || ex != null)
1834 dst.internalComplete(null, ex);
1835 }
1836 helpPostComplete();
1837 other.helpPostComplete();
1838 return dst;
1839 }
1840
1841 private CompletableFuture<Void> doRunAfterEither
1842 (CompletableFuture<?> other,
1843 Runnable action,
1844 Executor e) {
1845 if (other == null || action == null) throw new NullPointerException();
1846 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1847 RunAfterEither d = null;
1848 Object r;
1849 if ((r = result) == null && (r = other.result) == null) {
1850 d = new RunAfterEither(this, other, action, dst, e);
1851 CompletionNode q = null, p = new CompletionNode(d);
1852 while ((r = result) == null && (r = other.result) == null) {
1853 if (q != null) {
1854 if (UNSAFE.compareAndSwapObject
1855 (other, COMPLETIONS, q.next = other.completions, q))
1856 break;
1857 }
1858 else if (UNSAFE.compareAndSwapObject
1859 (this, COMPLETIONS, p.next = completions, p))
1860 q = new CompletionNode(d);
1861 }
1862 }
1863 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1864 Throwable ex;
1865 if (r instanceof AltResult)
1866 ex = ((AltResult)r).ex;
1867 else
1868 ex = null;
1869 if (ex == null) {
1870 try {
1871 if (e != null)
1872 e.execute(new AsyncRun(action, dst));
1873 else
1874 action.run();
1875 } catch (Throwable rex) {
1876 ex = rex;
1877 }
1878 }
1879 if (e == null || ex != null)
1880 dst.internalComplete(null, ex);
1881 }
1882 helpPostComplete();
1883 other.helpPostComplete();
1884 return dst;
1885 }
1886
1887 private <U> CompletableFuture<U> doThenCompose
1888 (Function<? super T, ? extends CompletionStage<U>> fn,
1889 Executor e) {
1890 if (fn == null) throw new NullPointerException();
1891 CompletableFuture<U> dst = null;
1892 ThenCompose<T,U> d = null;
1893 Object r;
1894 if ((r = result) == null) {
1895 dst = new CompletableFuture<U>();
1896 CompletionNode p = new CompletionNode
1897 (d = new ThenCompose<T,U>(this, fn, dst, e));
1898 while ((r = result) == null) {
1899 if (UNSAFE.compareAndSwapObject
1900 (this, COMPLETIONS, p.next = completions, p))
1901 break;
1902 }
1903 }
1904 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1905 T t; Throwable ex;
1906 if (r instanceof AltResult) {
1907 ex = ((AltResult)r).ex;
1908 t = null;
1909 }
1910 else {
1911 ex = null;
1912 @SuppressWarnings("unchecked") T tr = (T) r;
1913 t = tr;
1914 }
1915 if (ex == null) {
1916 if (e != null) {
1917 if (dst == null)
1918 dst = new CompletableFuture<U>();
1919 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1920 }
1921 else {
1922 try {
1923 CompletionStage<U> cs = fn.apply(t);
1924 if (cs == null ||
1925 (dst = cs.toCompletableFuture()) == null)
1926 ex = new NullPointerException();
1927 } catch (Throwable rex) {
1928 ex = rex;
1929 }
1930 }
1931 }
1932 if (dst == null)
1933 dst = new CompletableFuture<U>();
1934 if (e == null || ex != null)
1935 dst.internalComplete(null, ex);
1936 }
1937 helpPostComplete();
1938 dst.helpPostComplete();
1939 return dst;
1940 }
1941
1942 private CompletableFuture<T> doWhenComplete
1943 (BiConsumer<? super T, ? super Throwable> fn,
1944 Executor e) {
1945 if (fn == null) throw new NullPointerException();
1946 CompletableFuture<T> dst = new CompletableFuture<T>();
1947 WhenCompleteCompletion<T> d = null;
1948 Object r;
1949 if ((r = result) == null) {
1950 CompletionNode p =
1951 new CompletionNode(d = new WhenCompleteCompletion<T>
1952 (this, fn, dst, e));
1953 while ((r = result) == null) {
1954 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1955 p.next = completions, p))
1956 break;
1957 }
1958 }
1959 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1960 T t; Throwable ex;
1961 if (r instanceof AltResult) {
1962 ex = ((AltResult)r).ex;
1963 t = null;
1964 }
1965 else {
1966 ex = null;
1967 @SuppressWarnings("unchecked") T tr = (T) r;
1968 t = tr;
1969 }
1970 Throwable dx = null;
1971 try {
1972 if (e != null)
1973 e.execute(new AsyncAcceptBoth<T,Throwable>(t, ex, fn, dst));
1974 else
1975 fn.accept(t, ex);
1976 } catch (Throwable rex) {
1977 dx = rex;
1978 }
1979 if (e == null || dx != null)
1980 dst.internalComplete(t, ex != null ? ex : dx);
1981 }
1982 helpPostComplete();
1983 return dst;
1984 }
1985
1986 private <U> CompletableFuture<U> doHandle
1987 (BiFunction<? super T, Throwable, ? extends U> fn,
1988 Executor e) {
1989 if (fn == null) throw new NullPointerException();
1990 CompletableFuture<U> dst = new CompletableFuture<U>();
1991 HandleCompletion<T,U> d = null;
1992 Object r;
1993 if ((r = result) == null) {
1994 CompletionNode p =
1995 new CompletionNode(d = new HandleCompletion<T,U>
1996 (this, fn, dst, e));
1997 while ((r = result) == null) {
1998 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1999 p.next = completions, p))
2000 break;
2001 }
2002 }
2003 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2004 T t; Throwable ex;
2005 if (r instanceof AltResult) {
2006 ex = ((AltResult)r).ex;
2007 t = null;
2008 }
2009 else {
2010 ex = null;
2011 @SuppressWarnings("unchecked") T tr = (T) r;
2012 t = tr;
2013 }
2014 U u = null;
2015 Throwable dx = null;
2016 try {
2017 if (e != null)
2018 e.execute(new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2019 else {
2020 u = fn.apply(t, ex);
2021 dx = null;
2022 }
2023 } catch (Throwable rex) {
2024 dx = rex;
2025 u = null;
2026 }
2027 if (e == null || dx != null)
2028 dst.internalComplete(u, dx);
2029 dst.internalComplete(u, dx);
2030 }
2031 helpPostComplete();
2032 return dst;
2033 }
2034
2035
2036 // public methods
2037
2038 /**
2039 * Creates a new incomplete CompletableFuture.
2040 */
2041 public CompletableFuture() {
2042 }
2043
2044 /**
2045 * Returns a new CompletableFuture that is asynchronously completed
2046 * by a task running in the {@link ForkJoinPool#commonPool()} with
2047 * the value obtained by calling the given Supplier.
2048 *
2049 * @param supplier a function returning the value to be used
2050 * to complete the returned CompletableFuture
2051 * @return the new CompletableFuture
2052 */
2053 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2054 if (supplier == null) throw new NullPointerException();
2055 CompletableFuture<U> f = new CompletableFuture<U>();
2056 ForkJoinPool.commonPool().
2057 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
2058 return f;
2059 }
2060
2061 /**
2062 * Returns a new CompletableFuture that is asynchronously completed
2063 * by a task running in the given executor with the value obtained
2064 * by calling the given Supplier.
2065 *
2066 * @param supplier a function returning the value to be used
2067 * to complete the returned CompletableFuture
2068 * @param executor the executor to use for asynchronous execution
2069 * @return the new CompletableFuture
2070 */
2071 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2072 Executor executor) {
2073 if (executor == null || supplier == null)
2074 throw new NullPointerException();
2075 CompletableFuture<U> f = new CompletableFuture<U>();
2076 executor.execute(new AsyncSupply<U>(supplier, f));
2077 return f;
2078 }
2079
2080 /**
2081 * Returns a new CompletableFuture that is asynchronously completed
2082 * by a task running in the {@link ForkJoinPool#commonPool()} after
2083 * it runs the given action.
2084 *
2085 * @param runnable the action to run before completing the
2086 * returned CompletableFuture
2087 * @return the new CompletableFuture
2088 */
2089 public static CompletableFuture<Void> runAsync(Runnable runnable) {
2090 if (runnable == null) throw new NullPointerException();
2091 CompletableFuture<Void> f = new CompletableFuture<Void>();
2092 ForkJoinPool.commonPool().
2093 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
2094 return f;
2095 }
2096
2097 /**
2098 * Returns a new CompletableFuture that is asynchronously completed
2099 * by a task running in the given executor after it runs the given
2100 * action.
2101 *
2102 * @param runnable the action to run before completing the
2103 * returned CompletableFuture
2104 * @param executor the executor to use for asynchronous execution
2105 * @return the new CompletableFuture
2106 */
2107 public static CompletableFuture<Void> runAsync(Runnable runnable,
2108 Executor executor) {
2109 if (executor == null || runnable == null)
2110 throw new NullPointerException();
2111 CompletableFuture<Void> f = new CompletableFuture<Void>();
2112 executor.execute(new AsyncRun(runnable, f));
2113 return f;
2114 }
2115
2116 /**
2117 * Returns a new CompletableFuture that is already completed with
2118 * the given value.
2119 *
2120 * @param value the value
2121 * @return the completed CompletableFuture
2122 */
2123 public static <U> CompletableFuture<U> completedFuture(U value) {
2124 CompletableFuture<U> f = new CompletableFuture<U>();
2125 f.result = (value == null) ? NIL : value;
2126 return f;
2127 }
2128
2129 /**
2130 * Returns {@code true} if completed in any fashion: normally,
2131 * exceptionally, or via cancellation.
2132 *
2133 * @return {@code true} if completed
2134 */
2135 public boolean isDone() {
2136 return result != null;
2137 }
2138
2139 /**
2140 * Waits if necessary for this future to complete, and then
2141 * returns its result.
2142 *
2143 * @return the result value
2144 * @throws CancellationException if this future was cancelled
2145 * @throws ExecutionException if this future completed exceptionally
2146 * @throws InterruptedException if the current thread was interrupted
2147 * while waiting
2148 */
2149 public T get() throws InterruptedException, ExecutionException {
2150 Object r; Throwable ex, cause;
2151 if ((r = result) == null && (r = waitingGet(true)) == null)
2152 throw new InterruptedException();
2153 if (!(r instanceof AltResult)) {
2154 @SuppressWarnings("unchecked") T tr = (T) r;
2155 return tr;
2156 }
2157 if ((ex = ((AltResult)r).ex) == null)
2158 return null;
2159 if (ex instanceof CancellationException)
2160 throw (CancellationException)ex;
2161 if ((ex instanceof CompletionException) &&
2162 (cause = ex.getCause()) != null)
2163 ex = cause;
2164 throw new ExecutionException(ex);
2165 }
2166
2167 /**
2168 * Waits if necessary for at most the given time for this future
2169 * to complete, and then returns its result, if available.
2170 *
2171 * @param timeout the maximum time to wait
2172 * @param unit the time unit of the timeout argument
2173 * @return the result value
2174 * @throws CancellationException if this future was cancelled
2175 * @throws ExecutionException if this future completed exceptionally
2176 * @throws InterruptedException if the current thread was interrupted
2177 * while waiting
2178 * @throws TimeoutException if the wait timed out
2179 */
2180 public T get(long timeout, TimeUnit unit)
2181 throws InterruptedException, ExecutionException, TimeoutException {
2182 Object r; Throwable ex, cause;
2183 long nanos = unit.toNanos(timeout);
2184 if (Thread.interrupted())
2185 throw new InterruptedException();
2186 if ((r = result) == null)
2187 r = timedAwaitDone(nanos);
2188 if (!(r instanceof AltResult)) {
2189 @SuppressWarnings("unchecked") T tr = (T) r;
2190 return tr;
2191 }
2192 if ((ex = ((AltResult)r).ex) == null)
2193 return null;
2194 if (ex instanceof CancellationException)
2195 throw (CancellationException)ex;
2196 if ((ex instanceof CompletionException) &&
2197 (cause = ex.getCause()) != null)
2198 ex = cause;
2199 throw new ExecutionException(ex);
2200 }
2201
2202 /**
2203 * Returns the result value when complete, or throws an
2204 * (unchecked) exception if completed exceptionally. To better
2205 * conform with the use of common functional forms, if a
2206 * computation involved in the completion of this
2207 * CompletableFuture threw an exception, this method throws an
2208 * (unchecked) {@link CompletionException} with the underlying
2209 * exception as its cause.
2210 *
2211 * @return the result value
2212 * @throws CancellationException if the computation was cancelled
2213 * @throws CompletionException if this future completed
2214 * exceptionally or a completion computation threw an exception
2215 */
2216 public T join() {
2217 Object r; Throwable ex;
2218 if ((r = result) == null)
2219 r = waitingGet(false);
2220 if (!(r instanceof AltResult)) {
2221 @SuppressWarnings("unchecked") T tr = (T) r;
2222 return tr;
2223 }
2224 if ((ex = ((AltResult)r).ex) == null)
2225 return null;
2226 if (ex instanceof CancellationException)
2227 throw (CancellationException)ex;
2228 if (ex instanceof CompletionException)
2229 throw (CompletionException)ex;
2230 throw new CompletionException(ex);
2231 }
2232
2233 /**
2234 * Returns the result value (or throws any encountered exception)
2235 * if completed, else returns the given valueIfAbsent.
2236 *
2237 * @param valueIfAbsent the value to return if not completed
2238 * @return the result value, if completed, else the given valueIfAbsent
2239 * @throws CancellationException if the computation was cancelled
2240 * @throws CompletionException if this future completed
2241 * exceptionally or a completion computation threw an exception
2242 */
2243 public T getNow(T valueIfAbsent) {
2244 Object r; Throwable ex;
2245 if ((r = result) == null)
2246 return valueIfAbsent;
2247 if (!(r instanceof AltResult)) {
2248 @SuppressWarnings("unchecked") T tr = (T) r;
2249 return tr;
2250 }
2251 if ((ex = ((AltResult)r).ex) == null)
2252 return null;
2253 if (ex instanceof CancellationException)
2254 throw (CancellationException)ex;
2255 if (ex instanceof CompletionException)
2256 throw (CompletionException)ex;
2257 throw new CompletionException(ex);
2258 }
2259
2260 /**
2261 * If not already completed, sets the value returned by {@link
2262 * #get()} and related methods to the given value.
2263 *
2264 * @param value the result value
2265 * @return {@code true} if this invocation caused this CompletableFuture
2266 * to transition to a completed state, else {@code false}
2267 */
2268 public boolean complete(T value) {
2269 boolean triggered = result == null &&
2270 UNSAFE.compareAndSwapObject(this, RESULT, null,
2271 value == null ? NIL : value);
2272 postComplete();
2273 return triggered;
2274 }
2275
2276 /**
2277 * If not already completed, causes invocations of {@link #get()}
2278 * and related methods to throw the given exception.
2279 *
2280 * @param ex the exception
2281 * @return {@code true} if this invocation caused this CompletableFuture
2282 * to transition to a completed state, else {@code false}
2283 */
2284 public boolean completeExceptionally(Throwable ex) {
2285 if (ex == null) throw new NullPointerException();
2286 boolean triggered = result == null &&
2287 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2288 postComplete();
2289 return triggered;
2290 }
2291
2292 // CompletionStage methods
2293
2294 public <U> CompletableFuture<U> thenApply
2295 (Function<? super T,? extends U> fn) {
2296 return doThenApply(fn, null);
2297 }
2298
2299 public <U> CompletableFuture<U> thenApplyAsync
2300 (Function<? super T,? extends U> fn) {
2301 return doThenApply(fn, ForkJoinPool.commonPool());
2302 }
2303
2304 public <U> CompletableFuture<U> thenApplyAsync
2305 (Function<? super T,? extends U> fn,
2306 Executor executor) {
2307 if (executor == null) throw new NullPointerException();
2308 return doThenApply(fn, executor);
2309 }
2310
2311 public CompletableFuture<Void> thenAccept
2312 (Consumer<? super T> action) {
2313 return doThenAccept(action, null);
2314 }
2315
2316 public CompletableFuture<Void> thenAcceptAsync
2317 (Consumer<? super T> action) {
2318 return doThenAccept(action, ForkJoinPool.commonPool());
2319 }
2320
2321 public CompletableFuture<Void> thenAcceptAsync
2322 (Consumer<? super T> action,
2323 Executor executor) {
2324 if (executor == null) throw new NullPointerException();
2325 return doThenAccept(action, executor);
2326 }
2327
2328 public CompletableFuture<Void> thenRun
2329 (Runnable action) {
2330 return doThenRun(action, null);
2331 }
2332
2333 public CompletableFuture<Void> thenRunAsync
2334 (Runnable action) {
2335 return doThenRun(action, ForkJoinPool.commonPool());
2336 }
2337
2338 public CompletableFuture<Void> thenRunAsync
2339 (Runnable action,
2340 Executor executor) {
2341 if (executor == null) throw new NullPointerException();
2342 return doThenRun(action, executor);
2343 }
2344
2345 public <U,V> CompletableFuture<V> thenCombine
2346 (CompletionStage<? extends U> other,
2347 BiFunction<? super T,? super U,? extends V> fn) {
2348 return doThenCombine(other.toCompletableFuture(), fn, null);
2349 }
2350
2351 public <U,V> CompletableFuture<V> thenCombineAsync
2352 (CompletionStage<? extends U> other,
2353 BiFunction<? super T,? super U,? extends V> fn) {
2354 return doThenCombine(other.toCompletableFuture(), fn,
2355 ForkJoinPool.commonPool());
2356 }
2357
2358 public <U,V> CompletableFuture<V> thenCombineAsync
2359 (CompletionStage<? extends U> other,
2360 BiFunction<? super T,? super U,? extends V> fn,
2361 Executor executor) {
2362 if (executor == null) throw new NullPointerException();
2363 return doThenCombine(other.toCompletableFuture(), fn, executor);
2364 }
2365
2366 public <U> CompletableFuture<Void> thenAcceptBoth
2367 (CompletionStage<? extends U> other,
2368 BiConsumer<? super T, ? super U> action) {
2369 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2370 }
2371
2372 public <U> CompletableFuture<Void> thenAcceptBothAsync
2373 (CompletionStage<? extends U> other,
2374 BiConsumer<? super T, ? super U> action) {
2375 return doThenAcceptBoth(other.toCompletableFuture(), action,
2376 ForkJoinPool.commonPool());
2377 }
2378
2379 public <U> CompletableFuture<Void> thenAcceptBothAsync
2380 (CompletionStage<? extends U> other,
2381 BiConsumer<? super T, ? super U> action,
2382 Executor executor) {
2383 if (executor == null) throw new NullPointerException();
2384 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2385 }
2386
2387 public CompletableFuture<Void> runAfterBoth
2388 (CompletionStage<?> other,
2389 Runnable action) {
2390 return doRunAfterBoth(other.toCompletableFuture(), action, null);
2391 }
2392
2393 public CompletableFuture<Void> runAfterBothAsync
2394 (CompletionStage<?> other,
2395 Runnable action) {
2396 return doRunAfterBoth(other.toCompletableFuture(), action,
2397 ForkJoinPool.commonPool());
2398 }
2399
2400 public CompletableFuture<Void> runAfterBothAsync
2401 (CompletionStage<?> other,
2402 Runnable action,
2403 Executor executor) {
2404 if (executor == null) throw new NullPointerException();
2405 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2406 }
2407
2408
2409 public <U> CompletableFuture<U> applyToEither
2410 (CompletionStage<? extends T> other,
2411 Function<? super T, U> fn) {
2412 return doApplyToEither(other.toCompletableFuture(), fn, null);
2413 }
2414
2415 public <U> CompletableFuture<U> applyToEitherAsync
2416 (CompletionStage<? extends T> other,
2417 Function<? super T, U> fn) {
2418 return doApplyToEither(other.toCompletableFuture(), fn,
2419 ForkJoinPool.commonPool());
2420 }
2421
2422 public <U> CompletableFuture<U> applyToEitherAsync
2423 (CompletionStage<? extends T> other,
2424 Function<? super T, U> fn,
2425 Executor executor) {
2426 if (executor == null) throw new NullPointerException();
2427 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2428 }
2429
2430 public CompletableFuture<Void> acceptEither
2431 (CompletionStage<? extends T> other,
2432 Consumer<? super T> action) {
2433 return doAcceptEither(other.toCompletableFuture(), action, null);
2434 }
2435
2436 public CompletableFuture<Void> acceptEitherAsync
2437 (CompletionStage<? extends T> other,
2438 Consumer<? super T> action) {
2439 return doAcceptEither(other.toCompletableFuture(), action,
2440 ForkJoinPool.commonPool());
2441 }
2442
2443 public CompletableFuture<Void> acceptEitherAsync
2444 (CompletionStage<? extends T> other,
2445 Consumer<? super T> action,
2446 Executor executor) {
2447 if (executor == null) throw new NullPointerException();
2448 return doAcceptEither(other.toCompletableFuture(), action, executor);
2449 }
2450
2451 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2452 Runnable action) {
2453 return doRunAfterEither(other.toCompletableFuture(), action, null);
2454 }
2455
2456 public CompletableFuture<Void> runAfterEitherAsync
2457 (CompletionStage<?> other,
2458 Runnable action) {
2459 return doRunAfterEither(other.toCompletableFuture(), action,
2460 ForkJoinPool.commonPool());
2461 }
2462
2463 public CompletableFuture<Void> runAfterEitherAsync
2464 (CompletionStage<?> other,
2465 Runnable action,
2466 Executor executor) {
2467 if (executor == null) throw new NullPointerException();
2468 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2469 }
2470
2471 public <U> CompletableFuture<U> thenCompose
2472 (Function<? super T, ? extends CompletionStage<U>> fn) {
2473 return doThenCompose(fn, null);
2474 }
2475
2476 public <U> CompletableFuture<U> thenComposeAsync
2477 (Function<? super T, ? extends CompletionStage<U>> fn) {
2478 return doThenCompose(fn, ForkJoinPool.commonPool());
2479 }
2480
2481 public <U> CompletableFuture<U> thenComposeAsync
2482 (Function<? super T, ? extends CompletionStage<U>> fn,
2483 Executor executor) {
2484 if (executor == null) throw new NullPointerException();
2485 return doThenCompose(fn, executor);
2486 }
2487
2488 public CompletableFuture<T> whenComplete
2489 (BiConsumer<? super T, ? super Throwable> action) {
2490 return doWhenComplete(action, null);
2491 }
2492
2493 public CompletableFuture<T> whenCompleteAsync
2494 (BiConsumer<? super T, ? super Throwable> action) {
2495 return doWhenComplete(action, ForkJoinPool.commonPool());
2496 }
2497
2498 public CompletableFuture<T> whenCompleteAsync
2499 (BiConsumer<? super T, ? super Throwable> action,
2500 Executor executor) {
2501 if (executor == null) throw new NullPointerException();
2502 return doWhenComplete(action, executor);
2503 }
2504
2505 public <U> CompletableFuture<U> handle
2506 (BiFunction<? super T, Throwable, ? extends U> fn) {
2507 return doHandle(fn, null);
2508 }
2509
2510 public <U> CompletableFuture<U> handleAsync
2511 (BiFunction<? super T, Throwable, ? extends U> fn) {
2512 return doHandle(fn, ForkJoinPool.commonPool());
2513 }
2514
2515 public <U> CompletableFuture<U> handleAsync
2516 (BiFunction<? super T, Throwable, ? extends U> fn,
2517 Executor executor) {
2518 if (executor == null) throw new NullPointerException();
2519 return doHandle(fn, executor);
2520 }
2521
2522 /**
2523 * Returns this CompletableFuture
2524 *
2525 * @return this CompletableFuture
2526 */
2527 public CompletableFuture<T> toCompletableFuture() {
2528 return this;
2529 }
2530
2531 // not in interface CompletionStage
2532
2533 /**
2534 * Returns a new CompletableFuture that is completed when this
2535 * CompletableFuture completes, with the result of the given
2536 * function of the exception triggering this CompletableFuture's
2537 * completion when it completes exceptionally; otherwise, if this
2538 * CompletableFuture completes normally, then the returned
2539 * CompletableFuture also completes normally with the same value.
2540 * Note: More flexible versions of this functionality are
2541 * available using methods {@code whenComplete} and {@code handle}.
2542 *
2543 * @param fn the function to use to compute the value of the
2544 * returned CompletableFuture if this CompletableFuture completed
2545 * exceptionally
2546 * @return the new CompletableFuture
2547 */
2548 public CompletableFuture<T> exceptionally
2549 (Function<Throwable, ? extends T> fn) {
2550 if (fn == null) throw new NullPointerException();
2551 CompletableFuture<T> dst = new CompletableFuture<T>();
2552 ExceptionCompletion<T> d = null;
2553 Object r;
2554 if ((r = result) == null) {
2555 CompletionNode p =
2556 new CompletionNode(d = new ExceptionCompletion<T>
2557 (this, fn, dst));
2558 while ((r = result) == null) {
2559 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2560 p.next = completions, p))
2561 break;
2562 }
2563 }
2564 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2565 T t = null; Throwable ex, dx = null;
2566 if (r instanceof AltResult) {
2567 if ((ex = ((AltResult)r).ex) != null) {
2568 try {
2569 t = fn.apply(ex);
2570 } catch (Throwable rex) {
2571 dx = rex;
2572 }
2573 }
2574 }
2575 else {
2576 @SuppressWarnings("unchecked") T tr = (T) r;
2577 t = tr;
2578 }
2579 dst.internalComplete(t, dx);
2580 }
2581 helpPostComplete();
2582 return dst;
2583 }
2584
2585 /* ------------- Arbitrary-arity constructions -------------- */
2586
2587 /*
2588 * The basic plan of attack is to recursively form binary
2589 * completion trees of elements. This can be overkill for small
2590 * sets, but scales nicely. The And/All vs Or/Any forms use the
2591 * same idea, but details differ.
2592 */
2593
2594 /**
2595 * Returns a new CompletableFuture that is completed when all of
2596 * the given CompletableFutures complete. If any of the given
2597 * CompletableFutures complete exceptionally, then the returned
2598 * CompletableFuture also does so, with a CompletionException
2599 * holding this exception as its cause. Otherwise, the results,
2600 * if any, of the given CompletableFutures are not reflected in
2601 * the returned CompletableFuture, but may be obtained by
2602 * inspecting them individually. If no CompletableFutures are
2603 * provided, returns a CompletableFuture completed with the value
2604 * {@code null}.
2605 *
2606 * <p>Among the applications of this method is to await completion
2607 * of a set of independent CompletableFutures before continuing a
2608 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2609 * c3).join();}.
2610 *
2611 * @param cfs the CompletableFutures
2612 * @return a new CompletableFuture that is completed when all of the
2613 * given CompletableFutures complete
2614 * @throws NullPointerException if the array or any of its elements are
2615 * {@code null}
2616 */
2617 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2618 int len = cfs.length; // Directly handle empty and singleton cases
2619 if (len > 1)
2620 return allTree(cfs, 0, len - 1);
2621 else {
2622 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2623 CompletableFuture<?> f;
2624 if (len == 0)
2625 dst.result = NIL;
2626 else if ((f = cfs[0]) == null)
2627 throw new NullPointerException();
2628 else {
2629 ThenPropagate d = null;
2630 CompletionNode p = null;
2631 Object r;
2632 while ((r = f.result) == null) {
2633 if (d == null)
2634 d = new ThenPropagate(f, dst);
2635 else if (p == null)
2636 p = new CompletionNode(d);
2637 else if (UNSAFE.compareAndSwapObject
2638 (f, COMPLETIONS, p.next = f.completions, p))
2639 break;
2640 }
2641 if (r != null && (d == null || d.compareAndSet(0, 1)))
2642 dst.internalComplete(null, (r instanceof AltResult) ?
2643 ((AltResult)r).ex : null);
2644 f.helpPostComplete();
2645 }
2646 return dst;
2647 }
2648 }
2649
2650 /**
2651 * Recursively constructs an And'ed tree of CompletableFutures.
2652 * Called only when array known to have at least two elements.
2653 */
2654 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2655 int lo, int hi) {
2656 CompletableFuture<?> fst, snd;
2657 int mid = (lo + hi) >>> 1;
2658 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2659 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2660 throw new NullPointerException();
2661 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2662 AndCompletion d = null;
2663 CompletionNode p = null, q = null;
2664 Object r = null, s = null;
2665 while ((r = fst.result) == null || (s = snd.result) == null) {
2666 if (d == null)
2667 d = new AndCompletion(fst, snd, dst);
2668 else if (p == null)
2669 p = new CompletionNode(d);
2670 else if (q == null) {
2671 if (UNSAFE.compareAndSwapObject
2672 (fst, COMPLETIONS, p.next = fst.completions, p))
2673 q = new CompletionNode(d);
2674 }
2675 else if (UNSAFE.compareAndSwapObject
2676 (snd, COMPLETIONS, q.next = snd.completions, q))
2677 break;
2678 }
2679 if ((r != null || (r = fst.result) != null) &&
2680 (s != null || (s = snd.result) != null) &&
2681 (d == null || d.compareAndSet(0, 1))) {
2682 Throwable ex;
2683 if (r instanceof AltResult)
2684 ex = ((AltResult)r).ex;
2685 else
2686 ex = null;
2687 if (ex == null && (s instanceof AltResult))
2688 ex = ((AltResult)s).ex;
2689 dst.internalComplete(null, ex);
2690 }
2691 fst.helpPostComplete();
2692 snd.helpPostComplete();
2693 return dst;
2694 }
2695
2696 /**
2697 * Returns a new CompletableFuture that is completed when any of
2698 * the given CompletableFutures complete, with the same result.
2699 * Otherwise, if it completed exceptionally, the returned
2700 * CompletableFuture also does so, with a CompletionException
2701 * holding this exception as its cause. If no CompletableFutures
2702 * are provided, returns an incomplete CompletableFuture.
2703 *
2704 * @param cfs the CompletableFutures
2705 * @return a new CompletableFuture that is completed with the
2706 * result or exception of any of the given CompletableFutures when
2707 * one completes
2708 * @throws NullPointerException if the array or any of its elements are
2709 * {@code null}
2710 */
2711 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2712 int len = cfs.length; // Same idea as allOf
2713 if (len > 1)
2714 return anyTree(cfs, 0, len - 1);
2715 else {
2716 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2717 CompletableFuture<?> f;
2718 if (len == 0)
2719 ; // skip
2720 else if ((f = cfs[0]) == null)
2721 throw new NullPointerException();
2722 else {
2723 ThenCopy<Object> d = null;
2724 CompletionNode p = null;
2725 Object r;
2726 while ((r = f.result) == null) {
2727 if (d == null)
2728 d = new ThenCopy<Object>(f, dst);
2729 else if (p == null)
2730 p = new CompletionNode(d);
2731 else if (UNSAFE.compareAndSwapObject
2732 (f, COMPLETIONS, p.next = f.completions, p))
2733 break;
2734 }
2735 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2736 Throwable ex; Object t;
2737 if (r instanceof AltResult) {
2738 ex = ((AltResult)r).ex;
2739 t = null;
2740 }
2741 else {
2742 ex = null;
2743 t = r;
2744 }
2745 dst.internalComplete(t, ex);
2746 }
2747 f.helpPostComplete();
2748 }
2749 return dst;
2750 }
2751 }
2752
2753 /**
2754 * Recursively constructs an Or'ed tree of CompletableFutures.
2755 */
2756 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2757 int lo, int hi) {
2758 CompletableFuture<?> fst, snd;
2759 int mid = (lo + hi) >>> 1;
2760 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2761 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2762 throw new NullPointerException();
2763 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2764 OrCompletion d = null;
2765 CompletionNode p = null, q = null;
2766 Object r;
2767 while ((r = fst.result) == null && (r = snd.result) == null) {
2768 if (d == null)
2769 d = new OrCompletion(fst, snd, dst);
2770 else if (p == null)
2771 p = new CompletionNode(d);
2772 else if (q == null) {
2773 if (UNSAFE.compareAndSwapObject
2774 (fst, COMPLETIONS, p.next = fst.completions, p))
2775 q = new CompletionNode(d);
2776 }
2777 else if (UNSAFE.compareAndSwapObject
2778 (snd, COMPLETIONS, q.next = snd.completions, q))
2779 break;
2780 }
2781 if ((r != null || (r = fst.result) != null ||
2782 (r = snd.result) != null) &&
2783 (d == null || d.compareAndSet(0, 1))) {
2784 Throwable ex; Object t;
2785 if (r instanceof AltResult) {
2786 ex = ((AltResult)r).ex;
2787 t = null;
2788 }
2789 else {
2790 ex = null;
2791 t = r;
2792 }
2793 dst.internalComplete(t, ex);
2794 }
2795 fst.helpPostComplete();
2796 snd.helpPostComplete();
2797 return dst;
2798 }
2799
2800 /* ------------- Control and status methods -------------- */
2801
2802 /**
2803 * If not already completed, completes this CompletableFuture with
2804 * a {@link CancellationException}. Dependent CompletableFutures
2805 * that have not already completed will also complete
2806 * exceptionally, with a {@link CompletionException} caused by
2807 * this {@code CancellationException}.
2808 *
2809 * @param mayInterruptIfRunning this value has no effect in this
2810 * implementation because interrupts are not used to control
2811 * processing.
2812 *
2813 * @return {@code true} if this task is now cancelled
2814 */
2815 public boolean cancel(boolean mayInterruptIfRunning) {
2816 boolean cancelled = (result == null) &&
2817 UNSAFE.compareAndSwapObject
2818 (this, RESULT, null, new AltResult(new CancellationException()));
2819 postComplete();
2820 return cancelled || isCancelled();
2821 }
2822
2823 /**
2824 * Returns {@code true} if this CompletableFuture was cancelled
2825 * before it completed normally.
2826 *
2827 * @return {@code true} if this CompletableFuture was cancelled
2828 * before it completed normally
2829 */
2830 public boolean isCancelled() {
2831 Object r;
2832 return ((r = result) instanceof AltResult) &&
2833 (((AltResult)r).ex instanceof CancellationException);
2834 }
2835
2836 /**
2837 * Returns {@code true} if this CompletableFuture completed
2838 * exceptionally.
2839 *
2840 * @return {@code true} if this CompletableFuture completed
2841 * exceptionally
2842 */
2843 public boolean isCompletedExceptionally() {
2844 return (result instanceof AltResult);
2845 }
2846
2847 /**
2848 * Forcibly sets or resets the value subsequently returned by
2849 * method {@link #get()} and related methods, whether or not
2850 * already completed. This method is designed for use only in
2851 * error recovery actions, and even in such situations may result
2852 * in ongoing dependent completions using established versus
2853 * overwritten outcomes.
2854 *
2855 * @param value the completion value
2856 */
2857 public void obtrudeValue(T value) {
2858 result = (value == null) ? NIL : value;
2859 postComplete();
2860 }
2861
2862 /**
2863 * Forcibly causes subsequent invocations of method {@link #get()}
2864 * and related methods to throw the given exception, whether or
2865 * not already completed. This method is designed for use only in
2866 * recovery actions, and even in such situations may result in
2867 * ongoing dependent completions using established versus
2868 * overwritten outcomes.
2869 *
2870 * @param ex the exception
2871 */
2872 public void obtrudeException(Throwable ex) {
2873 if (ex == null) throw new NullPointerException();
2874 result = new AltResult(ex);
2875 postComplete();
2876 }
2877
2878 /**
2879 * Returns the estimated number of CompletableFutures whose
2880 * completions are awaiting completion of this CompletableFuture.
2881 * This method is designed for use in monitoring system state, not
2882 * for synchronization control.
2883 *
2884 * @return the number of dependent CompletableFutures
2885 */
2886 public int getNumberOfDependents() {
2887 int count = 0;
2888 for (CompletionNode p = completions; p != null; p = p.next)
2889 ++count;
2890 return count;
2891 }
2892
2893 /**
2894 * Returns a string identifying this CompletableFuture, as well as
2895 * its completion state. The state, in brackets, contains the
2896 * String {@code "Completed Normally"} or the String {@code
2897 * "Completed Exceptionally"}, or the String {@code "Not
2898 * completed"} followed by the number of CompletableFutures
2899 * dependent upon its completion, if any.
2900 *
2901 * @return a string identifying this CompletableFuture, as well as its state
2902 */
2903 public String toString() {
2904 Object r = result;
2905 int count;
2906 return super.toString() +
2907 ((r == null) ?
2908 (((count = getNumberOfDependents()) == 0) ?
2909 "[Not completed]" :
2910 "[Not completed, " + count + " dependents]") :
2911 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2912 "[Completed exceptionally]" :
2913 "[Completed normally]"));
2914 }
2915
2916 // Unsafe mechanics
2917 private static final sun.misc.Unsafe UNSAFE;
2918 private static final long RESULT;
2919 private static final long WAITERS;
2920 private static final long COMPLETIONS;
2921 static {
2922 try {
2923 UNSAFE = sun.misc.Unsafe.getUnsafe();
2924 Class<?> k = CompletableFuture.class;
2925 RESULT = UNSAFE.objectFieldOffset
2926 (k.getDeclaredField("result"));
2927 WAITERS = UNSAFE.objectFieldOffset
2928 (k.getDeclaredField("waiters"));
2929 COMPLETIONS = UNSAFE.objectFieldOffset
2930 (k.getDeclaredField("completions"));
2931 } catch (Exception e) {
2932 throw new Error(e);
2933 }
2934 }
2935 }