ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.94
Committed: Thu Jul 18 01:36:08 2013 UTC (10 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.93: +3 -3 lines
Log Message:
doclint warning fixes

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.CompletionException;
23 import java.util.concurrent.CompletionStage;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.locks.LockSupport;
26
27 /**
28 * A {@link Future} that may be explicitly completed (setting its
29 * value and status), and may be used as a {@link CompletionStage},
30 * supporting dependent functions and actions that trigger upon its
31 * completion.
32 *
33 * <p>When two or more threads attempt to
34 * {@link #complete complete},
35 * {@link #completeExceptionally completeExceptionally}, or
36 * {@link #cancel cancel}
37 * a CompletableFuture, only one of them succeeds.
38 *
39 * <p>In addition to these and related methods for directly
40 * manipulating status and results, CompletableFuture implements
41 * interface {@link CompletionStage} with the following policies: <ul>
42 *
43 * <li>Actions supplied for dependent completions of
44 * <em>non-async</em> methods may be performed by the thread that
45 * completes the current CompletableFuture, or by any other caller of
46 * these methods.</li>
47 *
48 * <li>All <em>async</em> methods without an explicit Executor
49 * argument are performed using the {@link
50 * ForkJoinPool#commonPool()}. To simplify monitoring, debugging, and
51 * tracking, all generated asynchronous tasks are instances of the
52 * marker interface {@link AsynchronousCompletionTask}. </li>
53 *
54 * <li>All CompletionStage methods are implemented independently of
55 * other public methods, so the behavior of one method is not impacted
56 * by overrides of others in subclasses. </li> </ul>
57 *
58 * <p>CompletableFuture also implements {@link Future} with the following
59 * policies: <ul>
60 *
61 * <li>Since (unlike {@link FutureTask}) this class has no direct
62 * control over the computation that causes it to be completed,
63 * cancellation is treated as just another form of exceptional
64 * completion. Method {@link #cancel cancel} has the same effect as
65 * {@code completeExceptionally(new CancellationException())}. Method
66 * {@link #isCompletedExceptionally} can be used to determine if a
67 * CompletableFuture completed in any exceptional fashion.</li>
68 *
69 * <li>In case of exceptional completion with a CompletionException,
70 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
71 * {@link ExecutionException} with the same cause as held in the
72 * corresponding CompletionException. To simplify usage in most
73 * contexts, this class also defines methods {@link #join()} and
74 * {@link #getNow} that instead throw the CompletionException directly
75 * in these cases.</li> </ul>
76 *
77 * @author Doug Lea
78 * @since 1.8
79 */
80 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
81
82 /*
83 * Overview:
84 *
85 * 1. Non-nullness of field result (set via CAS) indicates done.
86 * An AltResult is used to box null as a result, as well as to
87 * hold exceptions. Using a single field makes completion fast
88 * and simple to detect and trigger, at the expense of a lot of
89 * encoding and decoding that infiltrates many methods. One minor
90 * simplification relies on the (static) NIL (to box null results)
91 * being the only AltResult with a null exception field, so we
92 * don't usually need explicit comparisons with NIL. The CF
93 * exception propagation mechanics surrounding decoding rely on
94 * unchecked casts of decoded results really being unchecked,
95 * where user type errors are caught at point of use, as is
96 * currently the case in Java. These are highlighted by using
97 * SuppressWarnings-annotated temporaries.
98 *
99 * 2. Waiters are held in a Treiber stack similar to the one used
100 * in FutureTask, Phaser, and SynchronousQueue. See their
101 * internal documentation for algorithmic details.
102 *
103 * 3. Completions are also kept in a list/stack, and pulled off
104 * and run when completion is triggered. (We could even use the
105 * same stack as for waiters, but would give up the potential
106 * parallelism obtained because woken waiters help release/run
107 * others -- see method postComplete). Because post-processing
108 * may race with direct calls, class Completion opportunistically
109 * extends AtomicInteger so callers can claim the action via
110 * compareAndSet(0, 1). The Completion.run methods are all
111 * written a boringly similar uniform way (that sometimes includes
112 * unnecessary-looking checks, kept to maintain uniformity).
113 * There are enough dimensions upon which they differ that
114 * attempts to factor commonalities while maintaining efficiency
115 * require more lines of code than they would save.
116 *
117 * 4. The exported then/and/or methods do support a bit of
118 * factoring (see doThenApply etc). They must cope with the
119 * intrinsic races surrounding addition of a dependent action
120 * versus performing the action directly because the task is
121 * already complete. For example, a CF may not be complete upon
122 * entry, so a dependent completion is added, but by the time it
123 * is added, the target CF is complete, so must be directly
124 * executed. This is all done while avoiding unnecessary object
125 * construction in safe-bypass cases.
126 */
127
128 // preliminaries
129
130 static final class AltResult {
131 final Throwable ex; // null only for NIL
132 AltResult(Throwable ex) { this.ex = ex; }
133 }
134
135 static final AltResult NIL = new AltResult(null);
136
137 // Fields
138
139 volatile Object result; // Either the result or boxed AltResult
140 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
141 volatile CompletionNode completions; // list (Treiber stack) of completions
142
143 // Basic utilities for triggering and processing completions
144
145 /**
146 * Removes and signals all waiting threads and runs all completions.
147 */
148 final void postComplete() {
149 WaitNode q; Thread t;
150 while ((q = waiters) != null) {
151 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
152 (t = q.thread) != null) {
153 q.thread = null;
154 LockSupport.unpark(t);
155 }
156 }
157
158 CompletionNode h; Completion c;
159 while ((h = completions) != null) {
160 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
161 (c = h.completion) != null)
162 c.run();
163 }
164 }
165
166 /**
167 * Triggers completion with the encoding of the given arguments:
168 * if the exception is non-null, encodes it as a wrapped
169 * CompletionException unless it is one already. Otherwise uses
170 * the given result, boxed as NIL if null.
171 */
172 final void internalComplete(T v, Throwable ex) {
173 if (result == null)
174 UNSAFE.compareAndSwapObject
175 (this, RESULT, null,
176 (ex == null) ? (v == null) ? NIL : v :
177 new AltResult((ex instanceof CompletionException) ? ex :
178 new CompletionException(ex)));
179 postComplete(); // help out even if not triggered
180 }
181
182 /**
183 * If triggered, helps release and/or process completions.
184 */
185 final void helpPostComplete() {
186 if (result != null)
187 postComplete();
188 }
189
190 /* ------------- waiting for completions -------------- */
191
192 /** Number of processors, for spin control */
193 static final int NCPU = Runtime.getRuntime().availableProcessors();
194
195 /**
196 * Heuristic spin value for waitingGet() before blocking on
197 * multiprocessors
198 */
199 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
200
201 /**
202 * Linked nodes to record waiting threads in a Treiber stack. See
203 * other classes such as Phaser and SynchronousQueue for more
204 * detailed explanation. This class implements ManagedBlocker to
205 * avoid starvation when blocking actions pile up in
206 * ForkJoinPools.
207 */
208 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
209 long nanos; // wait time if timed
210 final long deadline; // non-zero if timed
211 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
212 volatile Thread thread;
213 volatile WaitNode next;
214 WaitNode(boolean interruptible, long nanos, long deadline) {
215 this.thread = Thread.currentThread();
216 this.interruptControl = interruptible ? 1 : 0;
217 this.nanos = nanos;
218 this.deadline = deadline;
219 }
220 public boolean isReleasable() {
221 if (thread == null)
222 return true;
223 if (Thread.interrupted()) {
224 int i = interruptControl;
225 interruptControl = -1;
226 if (i > 0)
227 return true;
228 }
229 if (deadline != 0L &&
230 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
231 thread = null;
232 return true;
233 }
234 return false;
235 }
236 public boolean block() {
237 if (isReleasable())
238 return true;
239 else if (deadline == 0L)
240 LockSupport.park(this);
241 else if (nanos > 0L)
242 LockSupport.parkNanos(this, nanos);
243 return isReleasable();
244 }
245 }
246
247 /**
248 * Returns raw result after waiting, or null if interruptible and
249 * interrupted.
250 */
251 private Object waitingGet(boolean interruptible) {
252 WaitNode q = null;
253 boolean queued = false;
254 int spins = SPINS;
255 for (Object r;;) {
256 if ((r = result) != null) {
257 if (q != null) { // suppress unpark
258 q.thread = null;
259 if (q.interruptControl < 0) {
260 if (interruptible) {
261 removeWaiter(q);
262 return null;
263 }
264 Thread.currentThread().interrupt();
265 }
266 }
267 postComplete(); // help release others
268 return r;
269 }
270 else if (spins > 0) {
271 int rnd = ThreadLocalRandom.nextSecondarySeed();
272 if (rnd == 0)
273 rnd = ThreadLocalRandom.current().nextInt();
274 if (rnd >= 0)
275 --spins;
276 }
277 else if (q == null)
278 q = new WaitNode(interruptible, 0L, 0L);
279 else if (!queued)
280 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
281 q.next = waiters, q);
282 else if (interruptible && q.interruptControl < 0) {
283 removeWaiter(q);
284 return null;
285 }
286 else if (q.thread != null && result == null) {
287 try {
288 ForkJoinPool.managedBlock(q);
289 } catch (InterruptedException ex) {
290 q.interruptControl = -1;
291 }
292 }
293 }
294 }
295
296 /**
297 * Awaits completion or aborts on interrupt or timeout.
298 *
299 * @param nanos time to wait
300 * @return raw result
301 */
302 private Object timedAwaitDone(long nanos)
303 throws InterruptedException, TimeoutException {
304 WaitNode q = null;
305 boolean queued = false;
306 for (Object r;;) {
307 if ((r = result) != null) {
308 if (q != null) {
309 q.thread = null;
310 if (q.interruptControl < 0) {
311 removeWaiter(q);
312 throw new InterruptedException();
313 }
314 }
315 postComplete();
316 return r;
317 }
318 else if (q == null) {
319 if (nanos <= 0L)
320 throw new TimeoutException();
321 long d = System.nanoTime() + nanos;
322 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
323 }
324 else if (!queued)
325 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
326 q.next = waiters, q);
327 else if (q.interruptControl < 0) {
328 removeWaiter(q);
329 throw new InterruptedException();
330 }
331 else if (q.nanos <= 0L) {
332 if (result == null) {
333 removeWaiter(q);
334 throw new TimeoutException();
335 }
336 }
337 else if (q.thread != null && result == null) {
338 try {
339 ForkJoinPool.managedBlock(q);
340 } catch (InterruptedException ex) {
341 q.interruptControl = -1;
342 }
343 }
344 }
345 }
346
347 /**
348 * Tries to unlink a timed-out or interrupted wait node to avoid
349 * accumulating garbage. Internal nodes are simply unspliced
350 * without CAS since it is harmless if they are traversed anyway
351 * by releasers. To avoid effects of unsplicing from already
352 * removed nodes, the list is retraversed in case of an apparent
353 * race. This is slow when there are a lot of nodes, but we don't
354 * expect lists to be long enough to outweigh higher-overhead
355 * schemes.
356 */
357 private void removeWaiter(WaitNode node) {
358 if (node != null) {
359 node.thread = null;
360 retry:
361 for (;;) { // restart on removeWaiter race
362 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
363 s = q.next;
364 if (q.thread != null)
365 pred = q;
366 else if (pred != null) {
367 pred.next = s;
368 if (pred.thread == null) // check for race
369 continue retry;
370 }
371 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
372 continue retry;
373 }
374 break;
375 }
376 }
377 }
378
379 /* ------------- Async tasks -------------- */
380
381 /**
382 * A marker interface identifying asynchronous tasks produced by
383 * {@code async} methods. This may be useful for monitoring,
384 * debugging, and tracking asynchronous activities.
385 *
386 * @since 1.8
387 */
388 public static interface AsynchronousCompletionTask {
389 }
390
391 /** Base class can act as either FJ or plain Runnable */
392 abstract static class Async extends ForkJoinTask<Void>
393 implements Runnable, AsynchronousCompletionTask {
394 public final Void getRawResult() { return null; }
395 public final void setRawResult(Void v) { }
396 public final void run() { exec(); }
397 }
398
399 static final class AsyncRun extends Async {
400 final Runnable fn;
401 final CompletableFuture<Void> dst;
402 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
403 this.fn = fn; this.dst = dst;
404 }
405 public final boolean exec() {
406 CompletableFuture<Void> d; Throwable ex;
407 if ((d = this.dst) != null && d.result == null) {
408 try {
409 fn.run();
410 ex = null;
411 } catch (Throwable rex) {
412 ex = rex;
413 }
414 d.internalComplete(null, ex);
415 }
416 return true;
417 }
418 private static final long serialVersionUID = 5232453952276885070L;
419 }
420
421 static final class AsyncSupply<U> extends Async {
422 final Supplier<U> fn;
423 final CompletableFuture<U> dst;
424 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
425 this.fn = fn; this.dst = dst;
426 }
427 public final boolean exec() {
428 CompletableFuture<U> d; U u; Throwable ex;
429 if ((d = this.dst) != null && d.result == null) {
430 try {
431 u = fn.get();
432 ex = null;
433 } catch (Throwable rex) {
434 ex = rex;
435 u = null;
436 }
437 d.internalComplete(u, ex);
438 }
439 return true;
440 }
441 private static final long serialVersionUID = 5232453952276885070L;
442 }
443
444 static final class AsyncApply<T,U> extends Async {
445 final T arg;
446 final Function<? super T,? extends U> fn;
447 final CompletableFuture<U> dst;
448 AsyncApply(T arg, Function<? super T,? extends U> fn,
449 CompletableFuture<U> dst) {
450 this.arg = arg; this.fn = fn; this.dst = dst;
451 }
452 public final boolean exec() {
453 CompletableFuture<U> d; U u; Throwable ex;
454 if ((d = this.dst) != null && d.result == null) {
455 try {
456 u = fn.apply(arg);
457 ex = null;
458 } catch (Throwable rex) {
459 ex = rex;
460 u = null;
461 }
462 d.internalComplete(u, ex);
463 }
464 return true;
465 }
466 private static final long serialVersionUID = 5232453952276885070L;
467 }
468
469 static final class AsyncCombine<T,U,V> extends Async {
470 final T arg1;
471 final U arg2;
472 final BiFunction<? super T,? super U,? extends V> fn;
473 final CompletableFuture<V> dst;
474 AsyncCombine(T arg1, U arg2,
475 BiFunction<? super T,? super U,? extends V> fn,
476 CompletableFuture<V> dst) {
477 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
478 }
479 public final boolean exec() {
480 CompletableFuture<V> d; V v; Throwable ex;
481 if ((d = this.dst) != null && d.result == null) {
482 try {
483 v = fn.apply(arg1, arg2);
484 ex = null;
485 } catch (Throwable rex) {
486 ex = rex;
487 v = null;
488 }
489 d.internalComplete(v, ex);
490 }
491 return true;
492 }
493 private static final long serialVersionUID = 5232453952276885070L;
494 }
495
496 static final class AsyncAccept<T> extends Async {
497 final T arg;
498 final Consumer<? super T> fn;
499 final CompletableFuture<?> dst;
500 AsyncAccept(T arg, Consumer<? super T> fn,
501 CompletableFuture<?> dst) {
502 this.arg = arg; this.fn = fn; this.dst = dst;
503 }
504 public final boolean exec() {
505 CompletableFuture<?> d; Throwable ex;
506 if ((d = this.dst) != null && d.result == null) {
507 try {
508 fn.accept(arg);
509 ex = null;
510 } catch (Throwable rex) {
511 ex = rex;
512 }
513 d.internalComplete(null, ex);
514 }
515 return true;
516 }
517 private static final long serialVersionUID = 5232453952276885070L;
518 }
519
520 static final class AsyncAcceptBoth<T,U> extends Async {
521 final T arg1;
522 final U arg2;
523 final BiConsumer<? super T,? super U> fn;
524 final CompletableFuture<?> dst;
525 AsyncAcceptBoth(T arg1, U arg2,
526 BiConsumer<? super T,? super U> fn,
527 CompletableFuture<?> dst) {
528 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
529 }
530 public final boolean exec() {
531 CompletableFuture<?> d; Throwable ex;
532 if ((d = this.dst) != null && d.result == null) {
533 try {
534 fn.accept(arg1, arg2);
535 ex = null;
536 } catch (Throwable rex) {
537 ex = rex;
538 }
539 d.internalComplete(null, ex);
540 }
541 return true;
542 }
543 private static final long serialVersionUID = 5232453952276885070L;
544 }
545
546 static final class AsyncCompose<T,U> extends Async {
547 final T arg;
548 final Function<? super T, ? extends CompletionStage<U>> fn;
549 final CompletableFuture<U> dst;
550 AsyncCompose(T arg,
551 Function<? super T, ? extends CompletionStage<U>> fn,
552 CompletableFuture<U> dst) {
553 this.arg = arg; this.fn = fn; this.dst = dst;
554 }
555 public final boolean exec() {
556 CompletableFuture<U> d, fr; U u; Throwable ex;
557 if ((d = this.dst) != null && d.result == null) {
558 try {
559 CompletionStage<U> cs = fn.apply(arg);
560 fr = (cs == null) ? null : cs.toCompletableFuture();
561 ex = (fr == null) ? new NullPointerException() : null;
562 } catch (Throwable rex) {
563 ex = rex;
564 fr = null;
565 }
566 if (ex != null)
567 u = null;
568 else {
569 Object r = fr.result;
570 if (r == null)
571 r = fr.waitingGet(false);
572 if (r instanceof AltResult) {
573 ex = ((AltResult)r).ex;
574 u = null;
575 }
576 else {
577 @SuppressWarnings("unchecked") U ur = (U) r;
578 u = ur;
579 }
580 }
581 d.internalComplete(u, ex);
582 }
583 return true;
584 }
585 private static final long serialVersionUID = 5232453952276885070L;
586 }
587
588 static final class AsyncWhenComplete<T> extends Async {
589 final T arg1;
590 final Throwable arg2;
591 final BiConsumer<? super T,? super Throwable> fn;
592 final CompletableFuture<T> dst;
593 AsyncWhenComplete(T arg1, Throwable arg2,
594 BiConsumer<? super T,? super Throwable> fn,
595 CompletableFuture<T> dst) {
596 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
597 }
598 public final boolean exec() {
599 CompletableFuture<T> d;
600 if ((d = this.dst) != null && d.result == null) {
601 Throwable ex = arg2;
602 try {
603 fn.accept(arg1, ex);
604 } catch (Throwable rex) {
605 if (ex == null)
606 ex = rex;
607 }
608 d.internalComplete(arg1, ex);
609 }
610 return true;
611 }
612 private static final long serialVersionUID = 5232453952276885070L;
613 }
614
615 /* ------------- Completions -------------- */
616
617 /**
618 * Simple linked list nodes to record completions, used in
619 * basically the same way as WaitNodes. (We separate nodes from
620 * the Completions themselves mainly because for the And and Or
621 * methods, the same Completion object resides in two lists.)
622 */
623 static final class CompletionNode {
624 final Completion completion;
625 volatile CompletionNode next;
626 CompletionNode(Completion completion) { this.completion = completion; }
627 }
628
629 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
630 abstract static class Completion extends AtomicInteger implements Runnable {
631 }
632
633 static final class ThenApply<T,U> extends Completion {
634 final CompletableFuture<? extends T> src;
635 final Function<? super T,? extends U> fn;
636 final CompletableFuture<U> dst;
637 final Executor executor;
638 ThenApply(CompletableFuture<? extends T> src,
639 Function<? super T,? extends U> fn,
640 CompletableFuture<U> dst,
641 Executor executor) {
642 this.src = src; this.fn = fn; this.dst = dst;
643 this.executor = executor;
644 }
645 public final void run() {
646 final CompletableFuture<? extends T> a;
647 final Function<? super T,? extends U> fn;
648 final CompletableFuture<U> dst;
649 Object r; T t; Throwable ex;
650 if ((dst = this.dst) != null &&
651 (fn = this.fn) != null &&
652 (a = this.src) != null &&
653 (r = a.result) != null &&
654 compareAndSet(0, 1)) {
655 if (r instanceof AltResult) {
656 ex = ((AltResult)r).ex;
657 t = null;
658 }
659 else {
660 ex = null;
661 @SuppressWarnings("unchecked") T tr = (T) r;
662 t = tr;
663 }
664 Executor e = executor;
665 U u = null;
666 if (ex == null) {
667 try {
668 if (e != null)
669 e.execute(new AsyncApply<T,U>(t, fn, dst));
670 else
671 u = fn.apply(t);
672 } catch (Throwable rex) {
673 ex = rex;
674 }
675 }
676 if (e == null || ex != null)
677 dst.internalComplete(u, ex);
678 }
679 }
680 private static final long serialVersionUID = 5232453952276885070L;
681 }
682
683 static final class ThenAccept<T> extends Completion {
684 final CompletableFuture<? extends T> src;
685 final Consumer<? super T> fn;
686 final CompletableFuture<?> dst;
687 final Executor executor;
688 ThenAccept(CompletableFuture<? extends T> src,
689 Consumer<? super T> fn,
690 CompletableFuture<?> dst,
691 Executor executor) {
692 this.src = src; this.fn = fn; this.dst = dst;
693 this.executor = executor;
694 }
695 public final void run() {
696 final CompletableFuture<? extends T> a;
697 final Consumer<? super T> fn;
698 final CompletableFuture<?> dst;
699 Object r; T t; Throwable ex;
700 if ((dst = this.dst) != null &&
701 (fn = this.fn) != null &&
702 (a = this.src) != null &&
703 (r = a.result) != null &&
704 compareAndSet(0, 1)) {
705 if (r instanceof AltResult) {
706 ex = ((AltResult)r).ex;
707 t = null;
708 }
709 else {
710 ex = null;
711 @SuppressWarnings("unchecked") T tr = (T) r;
712 t = tr;
713 }
714 Executor e = executor;
715 if (ex == null) {
716 try {
717 if (e != null)
718 e.execute(new AsyncAccept<T>(t, fn, dst));
719 else
720 fn.accept(t);
721 } catch (Throwable rex) {
722 ex = rex;
723 }
724 }
725 if (e == null || ex != null)
726 dst.internalComplete(null, ex);
727 }
728 }
729 private static final long serialVersionUID = 5232453952276885070L;
730 }
731
732 static final class ThenRun extends Completion {
733 final CompletableFuture<?> src;
734 final Runnable fn;
735 final CompletableFuture<Void> dst;
736 final Executor executor;
737 ThenRun(CompletableFuture<?> src,
738 Runnable fn,
739 CompletableFuture<Void> dst,
740 Executor executor) {
741 this.src = src; this.fn = fn; this.dst = dst;
742 this.executor = executor;
743 }
744 public final void run() {
745 final CompletableFuture<?> a;
746 final Runnable fn;
747 final CompletableFuture<Void> dst;
748 Object r; Throwable ex;
749 if ((dst = this.dst) != null &&
750 (fn = this.fn) != null &&
751 (a = this.src) != null &&
752 (r = a.result) != null &&
753 compareAndSet(0, 1)) {
754 if (r instanceof AltResult)
755 ex = ((AltResult)r).ex;
756 else
757 ex = null;
758 Executor e = executor;
759 if (ex == null) {
760 try {
761 if (e != null)
762 e.execute(new AsyncRun(fn, dst));
763 else
764 fn.run();
765 } catch (Throwable rex) {
766 ex = rex;
767 }
768 }
769 if (e == null || ex != null)
770 dst.internalComplete(null, ex);
771 }
772 }
773 private static final long serialVersionUID = 5232453952276885070L;
774 }
775
776 static final class ThenCombine<T,U,V> extends Completion {
777 final CompletableFuture<? extends T> src;
778 final CompletableFuture<? extends U> snd;
779 final BiFunction<? super T,? super U,? extends V> fn;
780 final CompletableFuture<V> dst;
781 final Executor executor;
782 ThenCombine(CompletableFuture<? extends T> src,
783 CompletableFuture<? extends U> snd,
784 BiFunction<? super T,? super U,? extends V> fn,
785 CompletableFuture<V> dst,
786 Executor executor) {
787 this.src = src; this.snd = snd;
788 this.fn = fn; this.dst = dst;
789 this.executor = executor;
790 }
791 public final void run() {
792 final CompletableFuture<? extends T> a;
793 final CompletableFuture<? extends U> b;
794 final BiFunction<? super T,? super U,? extends V> fn;
795 final CompletableFuture<V> dst;
796 Object r, s; T t; U u; Throwable ex;
797 if ((dst = this.dst) != null &&
798 (fn = this.fn) != null &&
799 (a = this.src) != null &&
800 (r = a.result) != null &&
801 (b = this.snd) != null &&
802 (s = b.result) != null &&
803 compareAndSet(0, 1)) {
804 if (r instanceof AltResult) {
805 ex = ((AltResult)r).ex;
806 t = null;
807 }
808 else {
809 ex = null;
810 @SuppressWarnings("unchecked") T tr = (T) r;
811 t = tr;
812 }
813 if (ex != null)
814 u = null;
815 else if (s instanceof AltResult) {
816 ex = ((AltResult)s).ex;
817 u = null;
818 }
819 else {
820 @SuppressWarnings("unchecked") U us = (U) s;
821 u = us;
822 }
823 Executor e = executor;
824 V v = null;
825 if (ex == null) {
826 try {
827 if (e != null)
828 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
829 else
830 v = fn.apply(t, u);
831 } catch (Throwable rex) {
832 ex = rex;
833 }
834 }
835 if (e == null || ex != null)
836 dst.internalComplete(v, ex);
837 }
838 }
839 private static final long serialVersionUID = 5232453952276885070L;
840 }
841
842 static final class ThenAcceptBoth<T,U> extends Completion {
843 final CompletableFuture<? extends T> src;
844 final CompletableFuture<? extends U> snd;
845 final BiConsumer<? super T,? super U> fn;
846 final CompletableFuture<Void> dst;
847 final Executor executor;
848 ThenAcceptBoth(CompletableFuture<? extends T> src,
849 CompletableFuture<? extends U> snd,
850 BiConsumer<? super T,? super U> fn,
851 CompletableFuture<Void> dst,
852 Executor executor) {
853 this.src = src; this.snd = snd;
854 this.fn = fn; this.dst = dst;
855 this.executor = executor;
856 }
857 public final void run() {
858 final CompletableFuture<? extends T> a;
859 final CompletableFuture<? extends U> b;
860 final BiConsumer<? super T,? super U> fn;
861 final CompletableFuture<Void> dst;
862 Object r, s; T t; U u; Throwable ex;
863 if ((dst = this.dst) != null &&
864 (fn = this.fn) != null &&
865 (a = this.src) != null &&
866 (r = a.result) != null &&
867 (b = this.snd) != null &&
868 (s = b.result) != null &&
869 compareAndSet(0, 1)) {
870 if (r instanceof AltResult) {
871 ex = ((AltResult)r).ex;
872 t = null;
873 }
874 else {
875 ex = null;
876 @SuppressWarnings("unchecked") T tr = (T) r;
877 t = tr;
878 }
879 if (ex != null)
880 u = null;
881 else if (s instanceof AltResult) {
882 ex = ((AltResult)s).ex;
883 u = null;
884 }
885 else {
886 @SuppressWarnings("unchecked") U us = (U) s;
887 u = us;
888 }
889 Executor e = executor;
890 if (ex == null) {
891 try {
892 if (e != null)
893 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
894 else
895 fn.accept(t, u);
896 } catch (Throwable rex) {
897 ex = rex;
898 }
899 }
900 if (e == null || ex != null)
901 dst.internalComplete(null, ex);
902 }
903 }
904 private static final long serialVersionUID = 5232453952276885070L;
905 }
906
907 static final class RunAfterBoth extends Completion {
908 final CompletableFuture<?> src;
909 final CompletableFuture<?> snd;
910 final Runnable fn;
911 final CompletableFuture<Void> dst;
912 final Executor executor;
913 RunAfterBoth(CompletableFuture<?> src,
914 CompletableFuture<?> snd,
915 Runnable fn,
916 CompletableFuture<Void> dst,
917 Executor executor) {
918 this.src = src; this.snd = snd;
919 this.fn = fn; this.dst = dst;
920 this.executor = executor;
921 }
922 public final void run() {
923 final CompletableFuture<?> a;
924 final CompletableFuture<?> b;
925 final Runnable fn;
926 final CompletableFuture<Void> dst;
927 Object r, s; Throwable ex;
928 if ((dst = this.dst) != null &&
929 (fn = this.fn) != null &&
930 (a = this.src) != null &&
931 (r = a.result) != null &&
932 (b = this.snd) != null &&
933 (s = b.result) != null &&
934 compareAndSet(0, 1)) {
935 if (r instanceof AltResult)
936 ex = ((AltResult)r).ex;
937 else
938 ex = null;
939 if (ex == null && (s instanceof AltResult))
940 ex = ((AltResult)s).ex;
941 Executor e = executor;
942 if (ex == null) {
943 try {
944 if (e != null)
945 e.execute(new AsyncRun(fn, dst));
946 else
947 fn.run();
948 } catch (Throwable rex) {
949 ex = rex;
950 }
951 }
952 if (e == null || ex != null)
953 dst.internalComplete(null, ex);
954 }
955 }
956 private static final long serialVersionUID = 5232453952276885070L;
957 }
958
959 static final class AndCompletion extends Completion {
960 final CompletableFuture<?> src;
961 final CompletableFuture<?> snd;
962 final CompletableFuture<Void> dst;
963 AndCompletion(CompletableFuture<?> src,
964 CompletableFuture<?> snd,
965 CompletableFuture<Void> dst) {
966 this.src = src; this.snd = snd; this.dst = dst;
967 }
968 public final void run() {
969 final CompletableFuture<?> a;
970 final CompletableFuture<?> b;
971 final CompletableFuture<Void> dst;
972 Object r, s; Throwable ex;
973 if ((dst = this.dst) != null &&
974 (a = this.src) != null &&
975 (r = a.result) != null &&
976 (b = this.snd) != null &&
977 (s = b.result) != null &&
978 compareAndSet(0, 1)) {
979 if (r instanceof AltResult)
980 ex = ((AltResult)r).ex;
981 else
982 ex = null;
983 if (ex == null && (s instanceof AltResult))
984 ex = ((AltResult)s).ex;
985 dst.internalComplete(null, ex);
986 }
987 }
988 private static final long serialVersionUID = 5232453952276885070L;
989 }
990
991 static final class ApplyToEither<T,U> extends Completion {
992 final CompletableFuture<? extends T> src;
993 final CompletableFuture<? extends T> snd;
994 final Function<? super T,? extends U> fn;
995 final CompletableFuture<U> dst;
996 final Executor executor;
997 ApplyToEither(CompletableFuture<? extends T> src,
998 CompletableFuture<? extends T> snd,
999 Function<? super T,? extends U> fn,
1000 CompletableFuture<U> dst,
1001 Executor executor) {
1002 this.src = src; this.snd = snd;
1003 this.fn = fn; this.dst = dst;
1004 this.executor = executor;
1005 }
1006 public final void run() {
1007 final CompletableFuture<? extends T> a;
1008 final CompletableFuture<? extends T> b;
1009 final Function<? super T,? extends U> fn;
1010 final CompletableFuture<U> dst;
1011 Object r; T t; Throwable ex;
1012 if ((dst = this.dst) != null &&
1013 (fn = this.fn) != null &&
1014 (((a = this.src) != null && (r = a.result) != null) ||
1015 ((b = this.snd) != null && (r = b.result) != null)) &&
1016 compareAndSet(0, 1)) {
1017 if (r instanceof AltResult) {
1018 ex = ((AltResult)r).ex;
1019 t = null;
1020 }
1021 else {
1022 ex = null;
1023 @SuppressWarnings("unchecked") T tr = (T) r;
1024 t = tr;
1025 }
1026 Executor e = executor;
1027 U u = null;
1028 if (ex == null) {
1029 try {
1030 if (e != null)
1031 e.execute(new AsyncApply<T,U>(t, fn, dst));
1032 else
1033 u = fn.apply(t);
1034 } catch (Throwable rex) {
1035 ex = rex;
1036 }
1037 }
1038 if (e == null || ex != null)
1039 dst.internalComplete(u, ex);
1040 }
1041 }
1042 private static final long serialVersionUID = 5232453952276885070L;
1043 }
1044
1045 static final class AcceptEither<T> extends Completion {
1046 final CompletableFuture<? extends T> src;
1047 final CompletableFuture<? extends T> snd;
1048 final Consumer<? super T> fn;
1049 final CompletableFuture<Void> dst;
1050 final Executor executor;
1051 AcceptEither(CompletableFuture<? extends T> src,
1052 CompletableFuture<? extends T> snd,
1053 Consumer<? super T> fn,
1054 CompletableFuture<Void> dst,
1055 Executor executor) {
1056 this.src = src; this.snd = snd;
1057 this.fn = fn; this.dst = dst;
1058 this.executor = executor;
1059 }
1060 public final void run() {
1061 final CompletableFuture<? extends T> a;
1062 final CompletableFuture<? extends T> b;
1063 final Consumer<? super T> fn;
1064 final CompletableFuture<Void> dst;
1065 Object r; T t; Throwable ex;
1066 if ((dst = this.dst) != null &&
1067 (fn = this.fn) != null &&
1068 (((a = this.src) != null && (r = a.result) != null) ||
1069 ((b = this.snd) != null && (r = b.result) != null)) &&
1070 compareAndSet(0, 1)) {
1071 if (r instanceof AltResult) {
1072 ex = ((AltResult)r).ex;
1073 t = null;
1074 }
1075 else {
1076 ex = null;
1077 @SuppressWarnings("unchecked") T tr = (T) r;
1078 t = tr;
1079 }
1080 Executor e = executor;
1081 if (ex == null) {
1082 try {
1083 if (e != null)
1084 e.execute(new AsyncAccept<T>(t, fn, dst));
1085 else
1086 fn.accept(t);
1087 } catch (Throwable rex) {
1088 ex = rex;
1089 }
1090 }
1091 if (e == null || ex != null)
1092 dst.internalComplete(null, ex);
1093 }
1094 }
1095 private static final long serialVersionUID = 5232453952276885070L;
1096 }
1097
1098 static final class RunAfterEither extends Completion {
1099 final CompletableFuture<?> src;
1100 final CompletableFuture<?> snd;
1101 final Runnable fn;
1102 final CompletableFuture<Void> dst;
1103 final Executor executor;
1104 RunAfterEither(CompletableFuture<?> src,
1105 CompletableFuture<?> snd,
1106 Runnable fn,
1107 CompletableFuture<Void> dst,
1108 Executor executor) {
1109 this.src = src; this.snd = snd;
1110 this.fn = fn; this.dst = dst;
1111 this.executor = executor;
1112 }
1113 public final void run() {
1114 final CompletableFuture<?> a;
1115 final CompletableFuture<?> b;
1116 final Runnable fn;
1117 final CompletableFuture<Void> dst;
1118 Object r; Throwable ex;
1119 if ((dst = this.dst) != null &&
1120 (fn = this.fn) != null &&
1121 (((a = this.src) != null && (r = a.result) != null) ||
1122 ((b = this.snd) != null && (r = b.result) != null)) &&
1123 compareAndSet(0, 1)) {
1124 if (r instanceof AltResult)
1125 ex = ((AltResult)r).ex;
1126 else
1127 ex = null;
1128 Executor e = executor;
1129 if (ex == null) {
1130 try {
1131 if (e != null)
1132 e.execute(new AsyncRun(fn, dst));
1133 else
1134 fn.run();
1135 } catch (Throwable rex) {
1136 ex = rex;
1137 }
1138 }
1139 if (e == null || ex != null)
1140 dst.internalComplete(null, ex);
1141 }
1142 }
1143 private static final long serialVersionUID = 5232453952276885070L;
1144 }
1145
1146 static final class OrCompletion extends Completion {
1147 final CompletableFuture<?> src;
1148 final CompletableFuture<?> snd;
1149 final CompletableFuture<Object> dst;
1150 OrCompletion(CompletableFuture<?> src,
1151 CompletableFuture<?> snd,
1152 CompletableFuture<Object> dst) {
1153 this.src = src; this.snd = snd; this.dst = dst;
1154 }
1155 public final void run() {
1156 final CompletableFuture<?> a;
1157 final CompletableFuture<?> b;
1158 final CompletableFuture<Object> dst;
1159 Object r, t; Throwable ex;
1160 if ((dst = this.dst) != null &&
1161 (((a = this.src) != null && (r = a.result) != null) ||
1162 ((b = this.snd) != null && (r = b.result) != null)) &&
1163 compareAndSet(0, 1)) {
1164 if (r instanceof AltResult) {
1165 ex = ((AltResult)r).ex;
1166 t = null;
1167 }
1168 else {
1169 ex = null;
1170 t = r;
1171 }
1172 dst.internalComplete(t, ex);
1173 }
1174 }
1175 private static final long serialVersionUID = 5232453952276885070L;
1176 }
1177
1178 static final class ExceptionCompletion<T> extends Completion {
1179 final CompletableFuture<? extends T> src;
1180 final Function<? super Throwable, ? extends T> fn;
1181 final CompletableFuture<T> dst;
1182 ExceptionCompletion(CompletableFuture<? extends T> src,
1183 Function<? super Throwable, ? extends T> fn,
1184 CompletableFuture<T> dst) {
1185 this.src = src; this.fn = fn; this.dst = dst;
1186 }
1187 public final void run() {
1188 final CompletableFuture<? extends T> a;
1189 final Function<? super Throwable, ? extends T> fn;
1190 final CompletableFuture<T> dst;
1191 Object r; T t = null; Throwable ex, dx = null;
1192 if ((dst = this.dst) != null &&
1193 (fn = this.fn) != null &&
1194 (a = this.src) != null &&
1195 (r = a.result) != null &&
1196 compareAndSet(0, 1)) {
1197 if ((r instanceof AltResult) &&
1198 (ex = ((AltResult)r).ex) != null) {
1199 try {
1200 t = fn.apply(ex);
1201 } catch (Throwable rex) {
1202 dx = rex;
1203 }
1204 }
1205 else {
1206 @SuppressWarnings("unchecked") T tr = (T) r;
1207 t = tr;
1208 }
1209 dst.internalComplete(t, dx);
1210 }
1211 }
1212 private static final long serialVersionUID = 5232453952276885070L;
1213 }
1214
1215 static final class WhenCompleteCompletion<T> extends Completion {
1216 final CompletableFuture<? extends T> src;
1217 final BiConsumer<? super T, ? super Throwable> fn;
1218 final CompletableFuture<T> dst;
1219 final Executor executor;
1220 WhenCompleteCompletion(CompletableFuture<? extends T> src,
1221 BiConsumer<? super T, ? super Throwable> fn,
1222 CompletableFuture<T> dst,
1223 Executor executor) {
1224 this.src = src; this.fn = fn; this.dst = dst;
1225 this.executor = executor;
1226 }
1227 public final void run() {
1228 final CompletableFuture<? extends T> a;
1229 final BiConsumer<? super T, ? super Throwable> fn;
1230 final CompletableFuture<T> dst;
1231 Object r; T t; Throwable ex;
1232 if ((dst = this.dst) != null &&
1233 (fn = this.fn) != null &&
1234 (a = this.src) != null &&
1235 (r = a.result) != null &&
1236 compareAndSet(0, 1)) {
1237 if (r instanceof AltResult) {
1238 ex = ((AltResult)r).ex;
1239 t = null;
1240 }
1241 else {
1242 ex = null;
1243 @SuppressWarnings("unchecked") T tr = (T) r;
1244 t = tr;
1245 }
1246 Executor e = executor;
1247 Throwable dx = null;
1248 try {
1249 if (e != null)
1250 e.execute(new AsyncWhenComplete<T>(t, ex, fn, dst));
1251 else
1252 fn.accept(t, ex);
1253 } catch (Throwable rex) {
1254 dx = rex;
1255 }
1256 if (e == null || dx != null)
1257 dst.internalComplete(t, ex != null ? ex : dx);
1258 }
1259 }
1260 private static final long serialVersionUID = 5232453952276885070L;
1261 }
1262
1263 static final class ThenCopy<T> extends Completion {
1264 final CompletableFuture<?> src;
1265 final CompletableFuture<T> dst;
1266 ThenCopy(CompletableFuture<?> src,
1267 CompletableFuture<T> dst) {
1268 this.src = src; this.dst = dst;
1269 }
1270 public final void run() {
1271 final CompletableFuture<?> a;
1272 final CompletableFuture<T> dst;
1273 Object r; T t; Throwable ex;
1274 if ((dst = this.dst) != null &&
1275 (a = this.src) != null &&
1276 (r = a.result) != null &&
1277 compareAndSet(0, 1)) {
1278 if (r instanceof AltResult) {
1279 ex = ((AltResult)r).ex;
1280 t = null;
1281 }
1282 else {
1283 ex = null;
1284 @SuppressWarnings("unchecked") T tr = (T) r;
1285 t = tr;
1286 }
1287 dst.internalComplete(t, ex);
1288 }
1289 }
1290 private static final long serialVersionUID = 5232453952276885070L;
1291 }
1292
1293 // version of ThenCopy for CompletableFuture<Void> dst
1294 static final class ThenPropagate extends Completion {
1295 final CompletableFuture<?> src;
1296 final CompletableFuture<Void> dst;
1297 ThenPropagate(CompletableFuture<?> src,
1298 CompletableFuture<Void> dst) {
1299 this.src = src; this.dst = dst;
1300 }
1301 public final void run() {
1302 final CompletableFuture<?> a;
1303 final CompletableFuture<Void> dst;
1304 Object r; Throwable ex;
1305 if ((dst = this.dst) != null &&
1306 (a = this.src) != null &&
1307 (r = a.result) != null &&
1308 compareAndSet(0, 1)) {
1309 if (r instanceof AltResult)
1310 ex = ((AltResult)r).ex;
1311 else
1312 ex = null;
1313 dst.internalComplete(null, ex);
1314 }
1315 }
1316 private static final long serialVersionUID = 5232453952276885070L;
1317 }
1318
1319 static final class HandleCompletion<T,U> extends Completion {
1320 final CompletableFuture<? extends T> src;
1321 final BiFunction<? super T, Throwable, ? extends U> fn;
1322 final CompletableFuture<U> dst;
1323 final Executor executor;
1324 HandleCompletion(CompletableFuture<? extends T> src,
1325 BiFunction<? super T, Throwable, ? extends U> fn,
1326 CompletableFuture<U> dst,
1327 Executor executor) {
1328 this.src = src; this.fn = fn; this.dst = dst;
1329 this.executor = executor;
1330 }
1331 public final void run() {
1332 final CompletableFuture<? extends T> a;
1333 final BiFunction<? super T, Throwable, ? extends U> fn;
1334 final CompletableFuture<U> dst;
1335 Object r; T t; Throwable ex;
1336 if ((dst = this.dst) != null &&
1337 (fn = this.fn) != null &&
1338 (a = this.src) != null &&
1339 (r = a.result) != null &&
1340 compareAndSet(0, 1)) {
1341 if (r instanceof AltResult) {
1342 ex = ((AltResult)r).ex;
1343 t = null;
1344 }
1345 else {
1346 ex = null;
1347 @SuppressWarnings("unchecked") T tr = (T) r;
1348 t = tr;
1349 }
1350 Executor e = executor;
1351 U u = null;
1352 Throwable dx = null;
1353 try {
1354 if (e != null)
1355 e.execute(new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
1356 else
1357 u = fn.apply(t, ex);
1358 } catch (Throwable rex) {
1359 dx = rex;
1360 }
1361 if (e == null || dx != null)
1362 dst.internalComplete(u, dx);
1363 }
1364 }
1365 private static final long serialVersionUID = 5232453952276885070L;
1366 }
1367
1368 static final class ThenCompose<T,U> extends Completion {
1369 final CompletableFuture<? extends T> src;
1370 final Function<? super T, ? extends CompletionStage<U>> fn;
1371 final CompletableFuture<U> dst;
1372 final Executor executor;
1373 ThenCompose(CompletableFuture<? extends T> src,
1374 Function<? super T, ? extends CompletionStage<U>> fn,
1375 CompletableFuture<U> dst,
1376 Executor executor) {
1377 this.src = src; this.fn = fn; this.dst = dst;
1378 this.executor = executor;
1379 }
1380 public final void run() {
1381 final CompletableFuture<? extends T> a;
1382 final Function<? super T, ? extends CompletionStage<U>> fn;
1383 final CompletableFuture<U> dst;
1384 Object r; T t; Throwable ex; Executor e;
1385 if ((dst = this.dst) != null &&
1386 (fn = this.fn) != null &&
1387 (a = this.src) != null &&
1388 (r = a.result) != null &&
1389 compareAndSet(0, 1)) {
1390 if (r instanceof AltResult) {
1391 ex = ((AltResult)r).ex;
1392 t = null;
1393 }
1394 else {
1395 ex = null;
1396 @SuppressWarnings("unchecked") T tr = (T) r;
1397 t = tr;
1398 }
1399 CompletableFuture<U> c = null;
1400 U u = null;
1401 boolean complete = false;
1402 if (ex == null) {
1403 if ((e = executor) != null)
1404 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1405 else {
1406 try {
1407 CompletionStage<U> cs = fn.apply(t);
1408 c = (cs == null) ? null : cs.toCompletableFuture();
1409 if (c == null)
1410 ex = new NullPointerException();
1411 } catch (Throwable rex) {
1412 ex = rex;
1413 }
1414 }
1415 }
1416 if (c != null) {
1417 ThenCopy<U> d = null;
1418 Object s;
1419 if ((s = c.result) == null) {
1420 CompletionNode p = new CompletionNode
1421 (d = new ThenCopy<U>(c, dst));
1422 while ((s = c.result) == null) {
1423 if (UNSAFE.compareAndSwapObject
1424 (c, COMPLETIONS, p.next = c.completions, p))
1425 break;
1426 }
1427 }
1428 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1429 complete = true;
1430 if (s instanceof AltResult) {
1431 ex = ((AltResult)s).ex; // no rewrap
1432 u = null;
1433 }
1434 else {
1435 @SuppressWarnings("unchecked") U us = (U) s;
1436 u = us;
1437 }
1438 }
1439 }
1440 if (complete || ex != null)
1441 dst.internalComplete(u, ex);
1442 if (c != null)
1443 c.helpPostComplete();
1444 }
1445 }
1446 private static final long serialVersionUID = 5232453952276885070L;
1447 }
1448
1449 // Implementations of stage methods with (plain, async, Executor) forms
1450
1451 private <U> CompletableFuture<U> doThenApply
1452 (Function<? super T,? extends U> fn,
1453 Executor e) {
1454 if (fn == null) throw new NullPointerException();
1455 CompletableFuture<U> dst = new CompletableFuture<U>();
1456 ThenApply<T,U> d = null;
1457 Object r;
1458 if ((r = result) == null) {
1459 CompletionNode p = new CompletionNode
1460 (d = new ThenApply<T,U>(this, fn, dst, e));
1461 while ((r = result) == null) {
1462 if (UNSAFE.compareAndSwapObject
1463 (this, COMPLETIONS, p.next = completions, p))
1464 break;
1465 }
1466 }
1467 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1468 T t; Throwable ex;
1469 if (r instanceof AltResult) {
1470 ex = ((AltResult)r).ex;
1471 t = null;
1472 }
1473 else {
1474 ex = null;
1475 @SuppressWarnings("unchecked") T tr = (T) r;
1476 t = tr;
1477 }
1478 U u = null;
1479 if (ex == null) {
1480 try {
1481 if (e != null)
1482 e.execute(new AsyncApply<T,U>(t, fn, dst));
1483 else
1484 u = fn.apply(t);
1485 } catch (Throwable rex) {
1486 ex = rex;
1487 }
1488 }
1489 if (e == null || ex != null)
1490 dst.internalComplete(u, ex);
1491 }
1492 helpPostComplete();
1493 return dst;
1494 }
1495
1496 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1497 Executor e) {
1498 if (fn == null) throw new NullPointerException();
1499 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1500 ThenAccept<T> d = null;
1501 Object r;
1502 if ((r = result) == null) {
1503 CompletionNode p = new CompletionNode
1504 (d = new ThenAccept<T>(this, fn, dst, e));
1505 while ((r = result) == null) {
1506 if (UNSAFE.compareAndSwapObject
1507 (this, COMPLETIONS, p.next = completions, p))
1508 break;
1509 }
1510 }
1511 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1512 T t; Throwable ex;
1513 if (r instanceof AltResult) {
1514 ex = ((AltResult)r).ex;
1515 t = null;
1516 }
1517 else {
1518 ex = null;
1519 @SuppressWarnings("unchecked") T tr = (T) r;
1520 t = tr;
1521 }
1522 if (ex == null) {
1523 try {
1524 if (e != null)
1525 e.execute(new AsyncAccept<T>(t, fn, dst));
1526 else
1527 fn.accept(t);
1528 } catch (Throwable rex) {
1529 ex = rex;
1530 }
1531 }
1532 if (e == null || ex != null)
1533 dst.internalComplete(null, ex);
1534 }
1535 helpPostComplete();
1536 return dst;
1537 }
1538
1539 private CompletableFuture<Void> doThenRun(Runnable action,
1540 Executor e) {
1541 if (action == null) throw new NullPointerException();
1542 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1543 ThenRun d = null;
1544 Object r;
1545 if ((r = result) == null) {
1546 CompletionNode p = new CompletionNode
1547 (d = new ThenRun(this, action, dst, e));
1548 while ((r = result) == null) {
1549 if (UNSAFE.compareAndSwapObject
1550 (this, COMPLETIONS, p.next = completions, p))
1551 break;
1552 }
1553 }
1554 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1555 Throwable ex;
1556 if (r instanceof AltResult)
1557 ex = ((AltResult)r).ex;
1558 else
1559 ex = null;
1560 if (ex == null) {
1561 try {
1562 if (e != null)
1563 e.execute(new AsyncRun(action, dst));
1564 else
1565 action.run();
1566 } catch (Throwable rex) {
1567 ex = rex;
1568 }
1569 }
1570 if (e == null || ex != null)
1571 dst.internalComplete(null, ex);
1572 }
1573 helpPostComplete();
1574 return dst;
1575 }
1576
1577 private <U,V> CompletableFuture<V> doThenCombine
1578 (CompletableFuture<? extends U> other,
1579 BiFunction<? super T,? super U,? extends V> fn,
1580 Executor e) {
1581 if (other == null || fn == null) throw new NullPointerException();
1582 CompletableFuture<V> dst = new CompletableFuture<V>();
1583 ThenCombine<T,U,V> d = null;
1584 Object r, s = null;
1585 if ((r = result) == null || (s = other.result) == null) {
1586 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
1587 CompletionNode q = null, p = new CompletionNode(d);
1588 while ((r == null && (r = result) == null) ||
1589 (s == null && (s = other.result) == null)) {
1590 if (q != null) {
1591 if (s != null ||
1592 UNSAFE.compareAndSwapObject
1593 (other, COMPLETIONS, q.next = other.completions, q))
1594 break;
1595 }
1596 else if (r != null ||
1597 UNSAFE.compareAndSwapObject
1598 (this, COMPLETIONS, p.next = completions, p)) {
1599 if (s != null)
1600 break;
1601 q = new CompletionNode(d);
1602 }
1603 }
1604 }
1605 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1606 T t; U u; Throwable ex;
1607 if (r instanceof AltResult) {
1608 ex = ((AltResult)r).ex;
1609 t = null;
1610 }
1611 else {
1612 ex = null;
1613 @SuppressWarnings("unchecked") T tr = (T) r;
1614 t = tr;
1615 }
1616 if (ex != null)
1617 u = null;
1618 else if (s instanceof AltResult) {
1619 ex = ((AltResult)s).ex;
1620 u = null;
1621 }
1622 else {
1623 @SuppressWarnings("unchecked") U us = (U) s;
1624 u = us;
1625 }
1626 V v = null;
1627 if (ex == null) {
1628 try {
1629 if (e != null)
1630 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
1631 else
1632 v = fn.apply(t, u);
1633 } catch (Throwable rex) {
1634 ex = rex;
1635 }
1636 }
1637 if (e == null || ex != null)
1638 dst.internalComplete(v, ex);
1639 }
1640 helpPostComplete();
1641 other.helpPostComplete();
1642 return dst;
1643 }
1644
1645 private <U> CompletableFuture<Void> doThenAcceptBoth
1646 (CompletableFuture<? extends U> other,
1647 BiConsumer<? super T,? super U> fn,
1648 Executor e) {
1649 if (other == null || fn == null) throw new NullPointerException();
1650 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1651 ThenAcceptBoth<T,U> d = null;
1652 Object r, s = null;
1653 if ((r = result) == null || (s = other.result) == null) {
1654 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
1655 CompletionNode q = null, p = new CompletionNode(d);
1656 while ((r == null && (r = result) == null) ||
1657 (s == null && (s = other.result) == null)) {
1658 if (q != null) {
1659 if (s != null ||
1660 UNSAFE.compareAndSwapObject
1661 (other, COMPLETIONS, q.next = other.completions, q))
1662 break;
1663 }
1664 else if (r != null ||
1665 UNSAFE.compareAndSwapObject
1666 (this, COMPLETIONS, p.next = completions, p)) {
1667 if (s != null)
1668 break;
1669 q = new CompletionNode(d);
1670 }
1671 }
1672 }
1673 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1674 T t; U u; Throwable ex;
1675 if (r instanceof AltResult) {
1676 ex = ((AltResult)r).ex;
1677 t = null;
1678 }
1679 else {
1680 ex = null;
1681 @SuppressWarnings("unchecked") T tr = (T) r;
1682 t = tr;
1683 }
1684 if (ex != null)
1685 u = null;
1686 else if (s instanceof AltResult) {
1687 ex = ((AltResult)s).ex;
1688 u = null;
1689 }
1690 else {
1691 @SuppressWarnings("unchecked") U us = (U) s;
1692 u = us;
1693 }
1694 if (ex == null) {
1695 try {
1696 if (e != null)
1697 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
1698 else
1699 fn.accept(t, u);
1700 } catch (Throwable rex) {
1701 ex = rex;
1702 }
1703 }
1704 if (e == null || ex != null)
1705 dst.internalComplete(null, ex);
1706 }
1707 helpPostComplete();
1708 other.helpPostComplete();
1709 return dst;
1710 }
1711
1712 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
1713 Runnable action,
1714 Executor e) {
1715 if (other == null || action == null) throw new NullPointerException();
1716 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1717 RunAfterBoth d = null;
1718 Object r, s = null;
1719 if ((r = result) == null || (s = other.result) == null) {
1720 d = new RunAfterBoth(this, other, action, dst, e);
1721 CompletionNode q = null, p = new CompletionNode(d);
1722 while ((r == null && (r = result) == null) ||
1723 (s == null && (s = other.result) == null)) {
1724 if (q != null) {
1725 if (s != null ||
1726 UNSAFE.compareAndSwapObject
1727 (other, COMPLETIONS, q.next = other.completions, q))
1728 break;
1729 }
1730 else if (r != null ||
1731 UNSAFE.compareAndSwapObject
1732 (this, COMPLETIONS, p.next = completions, p)) {
1733 if (s != null)
1734 break;
1735 q = new CompletionNode(d);
1736 }
1737 }
1738 }
1739 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1740 Throwable ex;
1741 if (r instanceof AltResult)
1742 ex = ((AltResult)r).ex;
1743 else
1744 ex = null;
1745 if (ex == null && (s instanceof AltResult))
1746 ex = ((AltResult)s).ex;
1747 if (ex == null) {
1748 try {
1749 if (e != null)
1750 e.execute(new AsyncRun(action, dst));
1751 else
1752 action.run();
1753 } catch (Throwable rex) {
1754 ex = rex;
1755 }
1756 }
1757 if (e == null || ex != null)
1758 dst.internalComplete(null, ex);
1759 }
1760 helpPostComplete();
1761 other.helpPostComplete();
1762 return dst;
1763 }
1764
1765 private <U> CompletableFuture<U> doApplyToEither
1766 (CompletableFuture<? extends T> other,
1767 Function<? super T, U> fn,
1768 Executor e) {
1769 if (other == null || fn == null) throw new NullPointerException();
1770 CompletableFuture<U> dst = new CompletableFuture<U>();
1771 ApplyToEither<T,U> d = null;
1772 Object r;
1773 if ((r = result) == null && (r = other.result) == null) {
1774 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
1775 CompletionNode q = null, p = new CompletionNode(d);
1776 while ((r = result) == null && (r = other.result) == null) {
1777 if (q != null) {
1778 if (UNSAFE.compareAndSwapObject
1779 (other, COMPLETIONS, q.next = other.completions, q))
1780 break;
1781 }
1782 else if (UNSAFE.compareAndSwapObject
1783 (this, COMPLETIONS, p.next = completions, p))
1784 q = new CompletionNode(d);
1785 }
1786 }
1787 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1788 T t; Throwable ex;
1789 if (r instanceof AltResult) {
1790 ex = ((AltResult)r).ex;
1791 t = null;
1792 }
1793 else {
1794 ex = null;
1795 @SuppressWarnings("unchecked") T tr = (T) r;
1796 t = tr;
1797 }
1798 U u = null;
1799 if (ex == null) {
1800 try {
1801 if (e != null)
1802 e.execute(new AsyncApply<T,U>(t, fn, dst));
1803 else
1804 u = fn.apply(t);
1805 } catch (Throwable rex) {
1806 ex = rex;
1807 }
1808 }
1809 if (e == null || ex != null)
1810 dst.internalComplete(u, ex);
1811 }
1812 helpPostComplete();
1813 other.helpPostComplete();
1814 return dst;
1815 }
1816
1817 private CompletableFuture<Void> doAcceptEither
1818 (CompletableFuture<? extends T> other,
1819 Consumer<? super T> fn,
1820 Executor e) {
1821 if (other == null || fn == null) throw new NullPointerException();
1822 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1823 AcceptEither<T> d = null;
1824 Object r;
1825 if ((r = result) == null && (r = other.result) == null) {
1826 d = new AcceptEither<T>(this, other, fn, dst, e);
1827 CompletionNode q = null, p = new CompletionNode(d);
1828 while ((r = result) == null && (r = other.result) == null) {
1829 if (q != null) {
1830 if (UNSAFE.compareAndSwapObject
1831 (other, COMPLETIONS, q.next = other.completions, q))
1832 break;
1833 }
1834 else if (UNSAFE.compareAndSwapObject
1835 (this, COMPLETIONS, p.next = completions, p))
1836 q = new CompletionNode(d);
1837 }
1838 }
1839 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1840 T t; Throwable ex;
1841 if (r instanceof AltResult) {
1842 ex = ((AltResult)r).ex;
1843 t = null;
1844 }
1845 else {
1846 ex = null;
1847 @SuppressWarnings("unchecked") T tr = (T) r;
1848 t = tr;
1849 }
1850 if (ex == null) {
1851 try {
1852 if (e != null)
1853 e.execute(new AsyncAccept<T>(t, fn, dst));
1854 else
1855 fn.accept(t);
1856 } catch (Throwable rex) {
1857 ex = rex;
1858 }
1859 }
1860 if (e == null || ex != null)
1861 dst.internalComplete(null, ex);
1862 }
1863 helpPostComplete();
1864 other.helpPostComplete();
1865 return dst;
1866 }
1867
1868 private CompletableFuture<Void> doRunAfterEither
1869 (CompletableFuture<?> other,
1870 Runnable action,
1871 Executor e) {
1872 if (other == null || action == null) throw new NullPointerException();
1873 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1874 RunAfterEither d = null;
1875 Object r;
1876 if ((r = result) == null && (r = other.result) == null) {
1877 d = new RunAfterEither(this, other, action, dst, e);
1878 CompletionNode q = null, p = new CompletionNode(d);
1879 while ((r = result) == null && (r = other.result) == null) {
1880 if (q != null) {
1881 if (UNSAFE.compareAndSwapObject
1882 (other, COMPLETIONS, q.next = other.completions, q))
1883 break;
1884 }
1885 else if (UNSAFE.compareAndSwapObject
1886 (this, COMPLETIONS, p.next = completions, p))
1887 q = new CompletionNode(d);
1888 }
1889 }
1890 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1891 Throwable ex;
1892 if (r instanceof AltResult)
1893 ex = ((AltResult)r).ex;
1894 else
1895 ex = null;
1896 if (ex == null) {
1897 try {
1898 if (e != null)
1899 e.execute(new AsyncRun(action, dst));
1900 else
1901 action.run();
1902 } catch (Throwable rex) {
1903 ex = rex;
1904 }
1905 }
1906 if (e == null || ex != null)
1907 dst.internalComplete(null, ex);
1908 }
1909 helpPostComplete();
1910 other.helpPostComplete();
1911 return dst;
1912 }
1913
1914 private <U> CompletableFuture<U> doThenCompose
1915 (Function<? super T, ? extends CompletionStage<U>> fn,
1916 Executor e) {
1917 if (fn == null) throw new NullPointerException();
1918 CompletableFuture<U> dst = null;
1919 ThenCompose<T,U> d = null;
1920 Object r;
1921 if ((r = result) == null) {
1922 dst = new CompletableFuture<U>();
1923 CompletionNode p = new CompletionNode
1924 (d = new ThenCompose<T,U>(this, fn, dst, e));
1925 while ((r = result) == null) {
1926 if (UNSAFE.compareAndSwapObject
1927 (this, COMPLETIONS, p.next = completions, p))
1928 break;
1929 }
1930 }
1931 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1932 T t; Throwable ex;
1933 if (r instanceof AltResult) {
1934 ex = ((AltResult)r).ex;
1935 t = null;
1936 }
1937 else {
1938 ex = null;
1939 @SuppressWarnings("unchecked") T tr = (T) r;
1940 t = tr;
1941 }
1942 if (ex == null) {
1943 if (e != null) {
1944 if (dst == null)
1945 dst = new CompletableFuture<U>();
1946 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1947 }
1948 else {
1949 try {
1950 CompletionStage<U> cs = fn.apply(t);
1951 if (cs == null ||
1952 (dst = cs.toCompletableFuture()) == null)
1953 ex = new NullPointerException();
1954 } catch (Throwable rex) {
1955 ex = rex;
1956 }
1957 }
1958 }
1959 if (dst == null)
1960 dst = new CompletableFuture<U>();
1961 if (e == null || ex != null)
1962 dst.internalComplete(null, ex);
1963 }
1964 helpPostComplete();
1965 dst.helpPostComplete();
1966 return dst;
1967 }
1968
1969 private CompletableFuture<T> doWhenComplete
1970 (BiConsumer<? super T, ? super Throwable> fn,
1971 Executor e) {
1972 if (fn == null) throw new NullPointerException();
1973 CompletableFuture<T> dst = new CompletableFuture<T>();
1974 WhenCompleteCompletion<T> d = null;
1975 Object r;
1976 if ((r = result) == null) {
1977 CompletionNode p =
1978 new CompletionNode(d = new WhenCompleteCompletion<T>
1979 (this, fn, dst, e));
1980 while ((r = result) == null) {
1981 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
1982 p.next = completions, p))
1983 break;
1984 }
1985 }
1986 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1987 T t; Throwable ex;
1988 if (r instanceof AltResult) {
1989 ex = ((AltResult)r).ex;
1990 t = null;
1991 }
1992 else {
1993 ex = null;
1994 @SuppressWarnings("unchecked") T tr = (T) r;
1995 t = tr;
1996 }
1997 Throwable dx = null;
1998 try {
1999 if (e != null)
2000 e.execute(new AsyncWhenComplete<T>(t, ex, fn, dst));
2001 else
2002 fn.accept(t, ex);
2003 } catch (Throwable rex) {
2004 dx = rex;
2005 }
2006 if (e == null || dx != null)
2007 dst.internalComplete(t, ex != null ? ex : dx);
2008 }
2009 helpPostComplete();
2010 return dst;
2011 }
2012
2013 private <U> CompletableFuture<U> doHandle
2014 (BiFunction<? super T, Throwable, ? extends U> fn,
2015 Executor e) {
2016 if (fn == null) throw new NullPointerException();
2017 CompletableFuture<U> dst = new CompletableFuture<U>();
2018 HandleCompletion<T,U> d = null;
2019 Object r;
2020 if ((r = result) == null) {
2021 CompletionNode p =
2022 new CompletionNode(d = new HandleCompletion<T,U>
2023 (this, fn, dst, e));
2024 while ((r = result) == null) {
2025 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2026 p.next = completions, p))
2027 break;
2028 }
2029 }
2030 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2031 T t; Throwable ex;
2032 if (r instanceof AltResult) {
2033 ex = ((AltResult)r).ex;
2034 t = null;
2035 }
2036 else {
2037 ex = null;
2038 @SuppressWarnings("unchecked") T tr = (T) r;
2039 t = tr;
2040 }
2041 U u = null;
2042 Throwable dx = null;
2043 try {
2044 if (e != null)
2045 e.execute(new AsyncCombine<T,Throwable,U>(t, ex, fn, dst));
2046 else {
2047 u = fn.apply(t, ex);
2048 dx = null;
2049 }
2050 } catch (Throwable rex) {
2051 dx = rex;
2052 u = null;
2053 }
2054 if (e == null || dx != null)
2055 dst.internalComplete(u, dx);
2056 }
2057 helpPostComplete();
2058 return dst;
2059 }
2060
2061
2062 // public methods
2063
2064 /**
2065 * Creates a new incomplete CompletableFuture.
2066 */
2067 public CompletableFuture() {
2068 }
2069
2070 /**
2071 * Returns a new CompletableFuture that is asynchronously completed
2072 * by a task running in the {@link ForkJoinPool#commonPool()} with
2073 * the value obtained by calling the given Supplier.
2074 *
2075 * @param supplier a function returning the value to be used
2076 * to complete the returned CompletableFuture
2077 * @param <U> the type of the returned future's value
2078 * @return the new CompletableFuture
2079 */
2080 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
2081 if (supplier == null) throw new NullPointerException();
2082 CompletableFuture<U> f = new CompletableFuture<U>();
2083 ForkJoinPool.commonPool().
2084 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
2085 return f;
2086 }
2087
2088 /**
2089 * Returns a new CompletableFuture that is asynchronously completed
2090 * by a task running in the given executor with the value obtained
2091 * by calling the given Supplier.
2092 *
2093 * @param supplier a function returning the value to be used
2094 * to complete the returned CompletableFuture
2095 * @param executor the executor to use for asynchronous execution
2096 * @param <U> the type of the returned future's value
2097 * @return the new CompletableFuture
2098 */
2099 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
2100 Executor executor) {
2101 if (executor == null || supplier == null)
2102 throw new NullPointerException();
2103 CompletableFuture<U> f = new CompletableFuture<U>();
2104 executor.execute(new AsyncSupply<U>(supplier, f));
2105 return f;
2106 }
2107
2108 /**
2109 * Returns a new CompletableFuture that is asynchronously completed
2110 * by a task running in the {@link ForkJoinPool#commonPool()} after
2111 * it runs the given action.
2112 *
2113 * @param runnable the action to run before completing the
2114 * returned CompletableFuture
2115 * @return the new CompletableFuture
2116 */
2117 public static CompletableFuture<Void> runAsync(Runnable runnable) {
2118 if (runnable == null) throw new NullPointerException();
2119 CompletableFuture<Void> f = new CompletableFuture<Void>();
2120 ForkJoinPool.commonPool().
2121 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
2122 return f;
2123 }
2124
2125 /**
2126 * Returns a new CompletableFuture that is asynchronously completed
2127 * by a task running in the given executor after it runs the given
2128 * action.
2129 *
2130 * @param runnable the action to run before completing the
2131 * returned CompletableFuture
2132 * @param executor the executor to use for asynchronous execution
2133 * @return the new CompletableFuture
2134 */
2135 public static CompletableFuture<Void> runAsync(Runnable runnable,
2136 Executor executor) {
2137 if (executor == null || runnable == null)
2138 throw new NullPointerException();
2139 CompletableFuture<Void> f = new CompletableFuture<Void>();
2140 executor.execute(new AsyncRun(runnable, f));
2141 return f;
2142 }
2143
2144 /**
2145 * Returns a new CompletableFuture that is already completed with
2146 * the given value.
2147 *
2148 * @param value the value
2149 * @param <U> the type of the returned future's value
2150 * @return the completed CompletableFuture
2151 */
2152 public static <U> CompletableFuture<U> completedFuture(U value) {
2153 CompletableFuture<U> f = new CompletableFuture<U>();
2154 f.result = (value == null) ? NIL : value;
2155 return f;
2156 }
2157
2158 /**
2159 * Returns {@code true} if completed in any fashion: normally,
2160 * exceptionally, or via cancellation.
2161 *
2162 * @return {@code true} if completed
2163 */
2164 public boolean isDone() {
2165 return result != null;
2166 }
2167
2168 /**
2169 * Waits if necessary for this future to complete, and then
2170 * returns its result.
2171 *
2172 * @return the result value
2173 * @throws CancellationException if this future was cancelled
2174 * @throws ExecutionException if this future completed exceptionally
2175 * @throws InterruptedException if the current thread was interrupted
2176 * while waiting
2177 */
2178 public T get() throws InterruptedException, ExecutionException {
2179 Object r; Throwable ex, cause;
2180 if ((r = result) == null && (r = waitingGet(true)) == null)
2181 throw new InterruptedException();
2182 if (!(r instanceof AltResult)) {
2183 @SuppressWarnings("unchecked") T tr = (T) r;
2184 return tr;
2185 }
2186 if ((ex = ((AltResult)r).ex) == null)
2187 return null;
2188 if (ex instanceof CancellationException)
2189 throw (CancellationException)ex;
2190 if ((ex instanceof CompletionException) &&
2191 (cause = ex.getCause()) != null)
2192 ex = cause;
2193 throw new ExecutionException(ex);
2194 }
2195
2196 /**
2197 * Waits if necessary for at most the given time for this future
2198 * to complete, and then returns its result, if available.
2199 *
2200 * @param timeout the maximum time to wait
2201 * @param unit the time unit of the timeout argument
2202 * @return the result value
2203 * @throws CancellationException if this future was cancelled
2204 * @throws ExecutionException if this future completed exceptionally
2205 * @throws InterruptedException if the current thread was interrupted
2206 * while waiting
2207 * @throws TimeoutException if the wait timed out
2208 */
2209 public T get(long timeout, TimeUnit unit)
2210 throws InterruptedException, ExecutionException, TimeoutException {
2211 Object r; Throwable ex, cause;
2212 long nanos = unit.toNanos(timeout);
2213 if (Thread.interrupted())
2214 throw new InterruptedException();
2215 if ((r = result) == null)
2216 r = timedAwaitDone(nanos);
2217 if (!(r instanceof AltResult)) {
2218 @SuppressWarnings("unchecked") T tr = (T) r;
2219 return tr;
2220 }
2221 if ((ex = ((AltResult)r).ex) == null)
2222 return null;
2223 if (ex instanceof CancellationException)
2224 throw (CancellationException)ex;
2225 if ((ex instanceof CompletionException) &&
2226 (cause = ex.getCause()) != null)
2227 ex = cause;
2228 throw new ExecutionException(ex);
2229 }
2230
2231 /**
2232 * Returns the result value when complete, or throws an
2233 * (unchecked) exception if completed exceptionally. To better
2234 * conform with the use of common functional forms, if a
2235 * computation involved in the completion of this
2236 * CompletableFuture threw an exception, this method throws an
2237 * (unchecked) {@link CompletionException} with the underlying
2238 * exception as its cause.
2239 *
2240 * @return the result value
2241 * @throws CancellationException if the computation was cancelled
2242 * @throws CompletionException if this future completed
2243 * exceptionally or a completion computation threw an exception
2244 */
2245 public T join() {
2246 Object r; Throwable ex;
2247 if ((r = result) == null)
2248 r = waitingGet(false);
2249 if (!(r instanceof AltResult)) {
2250 @SuppressWarnings("unchecked") T tr = (T) r;
2251 return tr;
2252 }
2253 if ((ex = ((AltResult)r).ex) == null)
2254 return null;
2255 if (ex instanceof CancellationException)
2256 throw (CancellationException)ex;
2257 if (ex instanceof CompletionException)
2258 throw (CompletionException)ex;
2259 throw new CompletionException(ex);
2260 }
2261
2262 /**
2263 * Returns the result value (or throws any encountered exception)
2264 * if completed, else returns the given valueIfAbsent.
2265 *
2266 * @param valueIfAbsent the value to return if not completed
2267 * @return the result value, if completed, else the given valueIfAbsent
2268 * @throws CancellationException if the computation was cancelled
2269 * @throws CompletionException if this future completed
2270 * exceptionally or a completion computation threw an exception
2271 */
2272 public T getNow(T valueIfAbsent) {
2273 Object r; Throwable ex;
2274 if ((r = result) == null)
2275 return valueIfAbsent;
2276 if (!(r instanceof AltResult)) {
2277 @SuppressWarnings("unchecked") T tr = (T) r;
2278 return tr;
2279 }
2280 if ((ex = ((AltResult)r).ex) == null)
2281 return null;
2282 if (ex instanceof CancellationException)
2283 throw (CancellationException)ex;
2284 if (ex instanceof CompletionException)
2285 throw (CompletionException)ex;
2286 throw new CompletionException(ex);
2287 }
2288
2289 /**
2290 * If not already completed, sets the value returned by {@link
2291 * #get()} and related methods to the given value.
2292 *
2293 * @param value the result value
2294 * @return {@code true} if this invocation caused this CompletableFuture
2295 * to transition to a completed state, else {@code false}
2296 */
2297 public boolean complete(T value) {
2298 boolean triggered = result == null &&
2299 UNSAFE.compareAndSwapObject(this, RESULT, null,
2300 value == null ? NIL : value);
2301 postComplete();
2302 return triggered;
2303 }
2304
2305 /**
2306 * If not already completed, causes invocations of {@link #get()}
2307 * and related methods to throw the given exception.
2308 *
2309 * @param ex the exception
2310 * @return {@code true} if this invocation caused this CompletableFuture
2311 * to transition to a completed state, else {@code false}
2312 */
2313 public boolean completeExceptionally(Throwable ex) {
2314 if (ex == null) throw new NullPointerException();
2315 boolean triggered = result == null &&
2316 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
2317 postComplete();
2318 return triggered;
2319 }
2320
2321 // CompletionStage methods
2322
2323 public <U> CompletableFuture<U> thenApply
2324 (Function<? super T,? extends U> fn) {
2325 return doThenApply(fn, null);
2326 }
2327
2328 public <U> CompletableFuture<U> thenApplyAsync
2329 (Function<? super T,? extends U> fn) {
2330 return doThenApply(fn, ForkJoinPool.commonPool());
2331 }
2332
2333 public <U> CompletableFuture<U> thenApplyAsync
2334 (Function<? super T,? extends U> fn,
2335 Executor executor) {
2336 if (executor == null) throw new NullPointerException();
2337 return doThenApply(fn, executor);
2338 }
2339
2340 public CompletableFuture<Void> thenAccept
2341 (Consumer<? super T> action) {
2342 return doThenAccept(action, null);
2343 }
2344
2345 public CompletableFuture<Void> thenAcceptAsync
2346 (Consumer<? super T> action) {
2347 return doThenAccept(action, ForkJoinPool.commonPool());
2348 }
2349
2350 public CompletableFuture<Void> thenAcceptAsync
2351 (Consumer<? super T> action,
2352 Executor executor) {
2353 if (executor == null) throw new NullPointerException();
2354 return doThenAccept(action, executor);
2355 }
2356
2357 public CompletableFuture<Void> thenRun
2358 (Runnable action) {
2359 return doThenRun(action, null);
2360 }
2361
2362 public CompletableFuture<Void> thenRunAsync
2363 (Runnable action) {
2364 return doThenRun(action, ForkJoinPool.commonPool());
2365 }
2366
2367 public CompletableFuture<Void> thenRunAsync
2368 (Runnable action,
2369 Executor executor) {
2370 if (executor == null) throw new NullPointerException();
2371 return doThenRun(action, executor);
2372 }
2373
2374 public <U,V> CompletableFuture<V> thenCombine
2375 (CompletionStage<? extends U> other,
2376 BiFunction<? super T,? super U,? extends V> fn) {
2377 return doThenCombine(other.toCompletableFuture(), fn, null);
2378 }
2379
2380 public <U,V> CompletableFuture<V> thenCombineAsync
2381 (CompletionStage<? extends U> other,
2382 BiFunction<? super T,? super U,? extends V> fn) {
2383 return doThenCombine(other.toCompletableFuture(), fn,
2384 ForkJoinPool.commonPool());
2385 }
2386
2387 public <U,V> CompletableFuture<V> thenCombineAsync
2388 (CompletionStage<? extends U> other,
2389 BiFunction<? super T,? super U,? extends V> fn,
2390 Executor executor) {
2391 if (executor == null) throw new NullPointerException();
2392 return doThenCombine(other.toCompletableFuture(), fn, executor);
2393 }
2394
2395 public <U> CompletableFuture<Void> thenAcceptBoth
2396 (CompletionStage<? extends U> other,
2397 BiConsumer<? super T, ? super U> action) {
2398 return doThenAcceptBoth(other.toCompletableFuture(), action, null);
2399 }
2400
2401 public <U> CompletableFuture<Void> thenAcceptBothAsync
2402 (CompletionStage<? extends U> other,
2403 BiConsumer<? super T, ? super U> action) {
2404 return doThenAcceptBoth(other.toCompletableFuture(), action,
2405 ForkJoinPool.commonPool());
2406 }
2407
2408 public <U> CompletableFuture<Void> thenAcceptBothAsync
2409 (CompletionStage<? extends U> other,
2410 BiConsumer<? super T, ? super U> action,
2411 Executor executor) {
2412 if (executor == null) throw new NullPointerException();
2413 return doThenAcceptBoth(other.toCompletableFuture(), action, executor);
2414 }
2415
2416 public CompletableFuture<Void> runAfterBoth
2417 (CompletionStage<?> other,
2418 Runnable action) {
2419 return doRunAfterBoth(other.toCompletableFuture(), action, null);
2420 }
2421
2422 public CompletableFuture<Void> runAfterBothAsync
2423 (CompletionStage<?> other,
2424 Runnable action) {
2425 return doRunAfterBoth(other.toCompletableFuture(), action,
2426 ForkJoinPool.commonPool());
2427 }
2428
2429 public CompletableFuture<Void> runAfterBothAsync
2430 (CompletionStage<?> other,
2431 Runnable action,
2432 Executor executor) {
2433 if (executor == null) throw new NullPointerException();
2434 return doRunAfterBoth(other.toCompletableFuture(), action, executor);
2435 }
2436
2437
2438 public <U> CompletableFuture<U> applyToEither
2439 (CompletionStage<? extends T> other,
2440 Function<? super T, U> fn) {
2441 return doApplyToEither(other.toCompletableFuture(), fn, null);
2442 }
2443
2444 public <U> CompletableFuture<U> applyToEitherAsync
2445 (CompletionStage<? extends T> other,
2446 Function<? super T, U> fn) {
2447 return doApplyToEither(other.toCompletableFuture(), fn,
2448 ForkJoinPool.commonPool());
2449 }
2450
2451 public <U> CompletableFuture<U> applyToEitherAsync
2452 (CompletionStage<? extends T> other,
2453 Function<? super T, U> fn,
2454 Executor executor) {
2455 if (executor == null) throw new NullPointerException();
2456 return doApplyToEither(other.toCompletableFuture(), fn, executor);
2457 }
2458
2459 public CompletableFuture<Void> acceptEither
2460 (CompletionStage<? extends T> other,
2461 Consumer<? super T> action) {
2462 return doAcceptEither(other.toCompletableFuture(), action, null);
2463 }
2464
2465 public CompletableFuture<Void> acceptEitherAsync
2466 (CompletionStage<? extends T> other,
2467 Consumer<? super T> action) {
2468 return doAcceptEither(other.toCompletableFuture(), action,
2469 ForkJoinPool.commonPool());
2470 }
2471
2472 public CompletableFuture<Void> acceptEitherAsync
2473 (CompletionStage<? extends T> other,
2474 Consumer<? super T> action,
2475 Executor executor) {
2476 if (executor == null) throw new NullPointerException();
2477 return doAcceptEither(other.toCompletableFuture(), action, executor);
2478 }
2479
2480 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
2481 Runnable action) {
2482 return doRunAfterEither(other.toCompletableFuture(), action, null);
2483 }
2484
2485 public CompletableFuture<Void> runAfterEitherAsync
2486 (CompletionStage<?> other,
2487 Runnable action) {
2488 return doRunAfterEither(other.toCompletableFuture(), action,
2489 ForkJoinPool.commonPool());
2490 }
2491
2492 public CompletableFuture<Void> runAfterEitherAsync
2493 (CompletionStage<?> other,
2494 Runnable action,
2495 Executor executor) {
2496 if (executor == null) throw new NullPointerException();
2497 return doRunAfterEither(other.toCompletableFuture(), action, executor);
2498 }
2499
2500 public <U> CompletableFuture<U> thenCompose
2501 (Function<? super T, ? extends CompletionStage<U>> fn) {
2502 return doThenCompose(fn, null);
2503 }
2504
2505 public <U> CompletableFuture<U> thenComposeAsync
2506 (Function<? super T, ? extends CompletionStage<U>> fn) {
2507 return doThenCompose(fn, ForkJoinPool.commonPool());
2508 }
2509
2510 public <U> CompletableFuture<U> thenComposeAsync
2511 (Function<? super T, ? extends CompletionStage<U>> fn,
2512 Executor executor) {
2513 if (executor == null) throw new NullPointerException();
2514 return doThenCompose(fn, executor);
2515 }
2516
2517 public CompletableFuture<T> whenComplete
2518 (BiConsumer<? super T, ? super Throwable> action) {
2519 return doWhenComplete(action, null);
2520 }
2521
2522 public CompletableFuture<T> whenCompleteAsync
2523 (BiConsumer<? super T, ? super Throwable> action) {
2524 return doWhenComplete(action, ForkJoinPool.commonPool());
2525 }
2526
2527 public CompletableFuture<T> whenCompleteAsync
2528 (BiConsumer<? super T, ? super Throwable> action,
2529 Executor executor) {
2530 if (executor == null) throw new NullPointerException();
2531 return doWhenComplete(action, executor);
2532 }
2533
2534 public <U> CompletableFuture<U> handle
2535 (BiFunction<? super T, Throwable, ? extends U> fn) {
2536 return doHandle(fn, null);
2537 }
2538
2539 public <U> CompletableFuture<U> handleAsync
2540 (BiFunction<? super T, Throwable, ? extends U> fn) {
2541 return doHandle(fn, ForkJoinPool.commonPool());
2542 }
2543
2544 public <U> CompletableFuture<U> handleAsync
2545 (BiFunction<? super T, Throwable, ? extends U> fn,
2546 Executor executor) {
2547 if (executor == null) throw new NullPointerException();
2548 return doHandle(fn, executor);
2549 }
2550
2551 /**
2552 * Returns this CompletableFuture
2553 *
2554 * @return this CompletableFuture
2555 */
2556 public CompletableFuture<T> toCompletableFuture() {
2557 return this;
2558 }
2559
2560 // not in interface CompletionStage
2561
2562 /**
2563 * Returns a new CompletableFuture that is completed when this
2564 * CompletableFuture completes, with the result of the given
2565 * function of the exception triggering this CompletableFuture's
2566 * completion when it completes exceptionally; otherwise, if this
2567 * CompletableFuture completes normally, then the returned
2568 * CompletableFuture also completes normally with the same value.
2569 * Note: More flexible versions of this functionality are
2570 * available using methods {@code whenComplete} and {@code handle}.
2571 *
2572 * @param fn the function to use to compute the value of the
2573 * returned CompletableFuture if this CompletableFuture completed
2574 * exceptionally
2575 * @return the new CompletableFuture
2576 */
2577 public CompletableFuture<T> exceptionally
2578 (Function<Throwable, ? extends T> fn) {
2579 if (fn == null) throw new NullPointerException();
2580 CompletableFuture<T> dst = new CompletableFuture<T>();
2581 ExceptionCompletion<T> d = null;
2582 Object r;
2583 if ((r = result) == null) {
2584 CompletionNode p =
2585 new CompletionNode(d = new ExceptionCompletion<T>
2586 (this, fn, dst));
2587 while ((r = result) == null) {
2588 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2589 p.next = completions, p))
2590 break;
2591 }
2592 }
2593 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2594 T t = null; Throwable ex, dx = null;
2595 if (r instanceof AltResult) {
2596 if ((ex = ((AltResult)r).ex) != null) {
2597 try {
2598 t = fn.apply(ex);
2599 } catch (Throwable rex) {
2600 dx = rex;
2601 }
2602 }
2603 }
2604 else {
2605 @SuppressWarnings("unchecked") T tr = (T) r;
2606 t = tr;
2607 }
2608 dst.internalComplete(t, dx);
2609 }
2610 helpPostComplete();
2611 return dst;
2612 }
2613
2614 /* ------------- Arbitrary-arity constructions -------------- */
2615
2616 /*
2617 * The basic plan of attack is to recursively form binary
2618 * completion trees of elements. This can be overkill for small
2619 * sets, but scales nicely. The And/All vs Or/Any forms use the
2620 * same idea, but details differ.
2621 */
2622
2623 /**
2624 * Returns a new CompletableFuture that is completed when all of
2625 * the given CompletableFutures complete. If any of the given
2626 * CompletableFutures complete exceptionally, then the returned
2627 * CompletableFuture also does so, with a CompletionException
2628 * holding this exception as its cause. Otherwise, the results,
2629 * if any, of the given CompletableFutures are not reflected in
2630 * the returned CompletableFuture, but may be obtained by
2631 * inspecting them individually. If no CompletableFutures are
2632 * provided, returns a CompletableFuture completed with the value
2633 * {@code null}.
2634 *
2635 * <p>Among the applications of this method is to await completion
2636 * of a set of independent CompletableFutures before continuing a
2637 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2638 * c3).join();}.
2639 *
2640 * @param cfs the CompletableFutures
2641 * @return a new CompletableFuture that is completed when all of the
2642 * given CompletableFutures complete
2643 * @throws NullPointerException if the array or any of its elements are
2644 * {@code null}
2645 */
2646 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2647 int len = cfs.length; // Directly handle empty and singleton cases
2648 if (len > 1)
2649 return allTree(cfs, 0, len - 1);
2650 else {
2651 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2652 CompletableFuture<?> f;
2653 if (len == 0)
2654 dst.result = NIL;
2655 else if ((f = cfs[0]) == null)
2656 throw new NullPointerException();
2657 else {
2658 ThenPropagate d = null;
2659 CompletionNode p = null;
2660 Object r;
2661 while ((r = f.result) == null) {
2662 if (d == null)
2663 d = new ThenPropagate(f, dst);
2664 else if (p == null)
2665 p = new CompletionNode(d);
2666 else if (UNSAFE.compareAndSwapObject
2667 (f, COMPLETIONS, p.next = f.completions, p))
2668 break;
2669 }
2670 if (r != null && (d == null || d.compareAndSet(0, 1)))
2671 dst.internalComplete(null, (r instanceof AltResult) ?
2672 ((AltResult)r).ex : null);
2673 f.helpPostComplete();
2674 }
2675 return dst;
2676 }
2677 }
2678
2679 /**
2680 * Recursively constructs an And'ed tree of CompletableFutures.
2681 * Called only when array known to have at least two elements.
2682 */
2683 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2684 int lo, int hi) {
2685 CompletableFuture<?> fst, snd;
2686 int mid = (lo + hi) >>> 1;
2687 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2688 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2689 throw new NullPointerException();
2690 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2691 AndCompletion d = null;
2692 CompletionNode p = null, q = null;
2693 Object r = null, s = null;
2694 while ((r = fst.result) == null || (s = snd.result) == null) {
2695 if (d == null)
2696 d = new AndCompletion(fst, snd, dst);
2697 else if (p == null)
2698 p = new CompletionNode(d);
2699 else if (q == null) {
2700 if (UNSAFE.compareAndSwapObject
2701 (fst, COMPLETIONS, p.next = fst.completions, p))
2702 q = new CompletionNode(d);
2703 }
2704 else if (UNSAFE.compareAndSwapObject
2705 (snd, COMPLETIONS, q.next = snd.completions, q))
2706 break;
2707 }
2708 if ((r != null || (r = fst.result) != null) &&
2709 (s != null || (s = snd.result) != null) &&
2710 (d == null || d.compareAndSet(0, 1))) {
2711 Throwable ex;
2712 if (r instanceof AltResult)
2713 ex = ((AltResult)r).ex;
2714 else
2715 ex = null;
2716 if (ex == null && (s instanceof AltResult))
2717 ex = ((AltResult)s).ex;
2718 dst.internalComplete(null, ex);
2719 }
2720 fst.helpPostComplete();
2721 snd.helpPostComplete();
2722 return dst;
2723 }
2724
2725 /**
2726 * Returns a new CompletableFuture that is completed when any of
2727 * the given CompletableFutures complete, with the same result.
2728 * Otherwise, if it completed exceptionally, the returned
2729 * CompletableFuture also does so, with a CompletionException
2730 * holding this exception as its cause. If no CompletableFutures
2731 * are provided, returns an incomplete CompletableFuture.
2732 *
2733 * @param cfs the CompletableFutures
2734 * @return a new CompletableFuture that is completed with the
2735 * result or exception of any of the given CompletableFutures when
2736 * one completes
2737 * @throws NullPointerException if the array or any of its elements are
2738 * {@code null}
2739 */
2740 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
2741 int len = cfs.length; // Same idea as allOf
2742 if (len > 1)
2743 return anyTree(cfs, 0, len - 1);
2744 else {
2745 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2746 CompletableFuture<?> f;
2747 if (len == 0)
2748 ; // skip
2749 else if ((f = cfs[0]) == null)
2750 throw new NullPointerException();
2751 else {
2752 ThenCopy<Object> d = null;
2753 CompletionNode p = null;
2754 Object r;
2755 while ((r = f.result) == null) {
2756 if (d == null)
2757 d = new ThenCopy<Object>(f, dst);
2758 else if (p == null)
2759 p = new CompletionNode(d);
2760 else if (UNSAFE.compareAndSwapObject
2761 (f, COMPLETIONS, p.next = f.completions, p))
2762 break;
2763 }
2764 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2765 Throwable ex; Object t;
2766 if (r instanceof AltResult) {
2767 ex = ((AltResult)r).ex;
2768 t = null;
2769 }
2770 else {
2771 ex = null;
2772 t = r;
2773 }
2774 dst.internalComplete(t, ex);
2775 }
2776 f.helpPostComplete();
2777 }
2778 return dst;
2779 }
2780 }
2781
2782 /**
2783 * Recursively constructs an Or'ed tree of CompletableFutures.
2784 */
2785 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
2786 int lo, int hi) {
2787 CompletableFuture<?> fst, snd;
2788 int mid = (lo + hi) >>> 1;
2789 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2790 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2791 throw new NullPointerException();
2792 CompletableFuture<Object> dst = new CompletableFuture<Object>();
2793 OrCompletion d = null;
2794 CompletionNode p = null, q = null;
2795 Object r;
2796 while ((r = fst.result) == null && (r = snd.result) == null) {
2797 if (d == null)
2798 d = new OrCompletion(fst, snd, dst);
2799 else if (p == null)
2800 p = new CompletionNode(d);
2801 else if (q == null) {
2802 if (UNSAFE.compareAndSwapObject
2803 (fst, COMPLETIONS, p.next = fst.completions, p))
2804 q = new CompletionNode(d);
2805 }
2806 else if (UNSAFE.compareAndSwapObject
2807 (snd, COMPLETIONS, q.next = snd.completions, q))
2808 break;
2809 }
2810 if ((r != null || (r = fst.result) != null ||
2811 (r = snd.result) != null) &&
2812 (d == null || d.compareAndSet(0, 1))) {
2813 Throwable ex; Object t;
2814 if (r instanceof AltResult) {
2815 ex = ((AltResult)r).ex;
2816 t = null;
2817 }
2818 else {
2819 ex = null;
2820 t = r;
2821 }
2822 dst.internalComplete(t, ex);
2823 }
2824 fst.helpPostComplete();
2825 snd.helpPostComplete();
2826 return dst;
2827 }
2828
2829 /* ------------- Control and status methods -------------- */
2830
2831 /**
2832 * If not already completed, completes this CompletableFuture with
2833 * a {@link CancellationException}. Dependent CompletableFutures
2834 * that have not already completed will also complete
2835 * exceptionally, with a {@link CompletionException} caused by
2836 * this {@code CancellationException}.
2837 *
2838 * @param mayInterruptIfRunning this value has no effect in this
2839 * implementation because interrupts are not used to control
2840 * processing.
2841 *
2842 * @return {@code true} if this task is now cancelled
2843 */
2844 public boolean cancel(boolean mayInterruptIfRunning) {
2845 boolean cancelled = (result == null) &&
2846 UNSAFE.compareAndSwapObject
2847 (this, RESULT, null, new AltResult(new CancellationException()));
2848 postComplete();
2849 return cancelled || isCancelled();
2850 }
2851
2852 /**
2853 * Returns {@code true} if this CompletableFuture was cancelled
2854 * before it completed normally.
2855 *
2856 * @return {@code true} if this CompletableFuture was cancelled
2857 * before it completed normally
2858 */
2859 public boolean isCancelled() {
2860 Object r;
2861 return ((r = result) instanceof AltResult) &&
2862 (((AltResult)r).ex instanceof CancellationException);
2863 }
2864
2865 /**
2866 * Returns {@code true} if this CompletableFuture completed
2867 * exceptionally, in any way. Possible causes include
2868 * cancellation, explicit invocation of {@code
2869 * completeExceptionally}, and abrupt termination of a
2870 * CompletionStage action.
2871 *
2872 * @return {@code true} if this CompletableFuture completed
2873 * exceptionally
2874 */
2875 public boolean isCompletedExceptionally() {
2876 Object r;
2877 return ((r = result) instanceof AltResult) && r != NIL;
2878 }
2879
2880 /**
2881 * Forcibly sets or resets the value subsequently returned by
2882 * method {@link #get()} and related methods, whether or not
2883 * already completed. This method is designed for use only in
2884 * error recovery actions, and even in such situations may result
2885 * in ongoing dependent completions using established versus
2886 * overwritten outcomes.
2887 *
2888 * @param value the completion value
2889 */
2890 public void obtrudeValue(T value) {
2891 result = (value == null) ? NIL : value;
2892 postComplete();
2893 }
2894
2895 /**
2896 * Forcibly causes subsequent invocations of method {@link #get()}
2897 * and related methods to throw the given exception, whether or
2898 * not already completed. This method is designed for use only in
2899 * recovery actions, and even in such situations may result in
2900 * ongoing dependent completions using established versus
2901 * overwritten outcomes.
2902 *
2903 * @param ex the exception
2904 */
2905 public void obtrudeException(Throwable ex) {
2906 if (ex == null) throw new NullPointerException();
2907 result = new AltResult(ex);
2908 postComplete();
2909 }
2910
2911 /**
2912 * Returns the estimated number of CompletableFutures whose
2913 * completions are awaiting completion of this CompletableFuture.
2914 * This method is designed for use in monitoring system state, not
2915 * for synchronization control.
2916 *
2917 * @return the number of dependent CompletableFutures
2918 */
2919 public int getNumberOfDependents() {
2920 int count = 0;
2921 for (CompletionNode p = completions; p != null; p = p.next)
2922 ++count;
2923 return count;
2924 }
2925
2926 /**
2927 * Returns a string identifying this CompletableFuture, as well as
2928 * its completion state. The state, in brackets, contains the
2929 * String {@code "Completed Normally"} or the String {@code
2930 * "Completed Exceptionally"}, or the String {@code "Not
2931 * completed"} followed by the number of CompletableFutures
2932 * dependent upon its completion, if any.
2933 *
2934 * @return a string identifying this CompletableFuture, as well as its state
2935 */
2936 public String toString() {
2937 Object r = result;
2938 int count;
2939 return super.toString() +
2940 ((r == null) ?
2941 (((count = getNumberOfDependents()) == 0) ?
2942 "[Not completed]" :
2943 "[Not completed, " + count + " dependents]") :
2944 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
2945 "[Completed exceptionally]" :
2946 "[Completed normally]"));
2947 }
2948
2949 // Unsafe mechanics
2950 private static final sun.misc.Unsafe UNSAFE;
2951 private static final long RESULT;
2952 private static final long WAITERS;
2953 private static final long COMPLETIONS;
2954 static {
2955 try {
2956 UNSAFE = sun.misc.Unsafe.getUnsafe();
2957 Class<?> k = CompletableFuture.class;
2958 RESULT = UNSAFE.objectFieldOffset
2959 (k.getDeclaredField("result"));
2960 WAITERS = UNSAFE.objectFieldOffset
2961 (k.getDeclaredField("waiters"));
2962 COMPLETIONS = UNSAFE.objectFieldOffset
2963 (k.getDeclaredField("completions"));
2964 } catch (Exception e) {
2965 throw new Error(e);
2966 }
2967 }
2968 }