ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.83
Committed: Thu Apr 4 18:52:03 2013 UTC (11 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.82: +2 -2 lines
Log Message:
Ensure nonnull return in thenCompose

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on
37 * user-provided Functions, Consumers, or Runnables. The appropriate
38 * form to use depends on whether actions require arguments and/or
39 * produce results. Completion of a dependent action will trigger the
40 * completion of another CompletableFuture. Actions may also be
41 * triggered after either or both the current and another
42 * CompletableFuture complete. Multiple CompletableFutures may also
43 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44 * {@link #allOf(CompletableFuture...)}.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, actions supplied for dependent completions of another
48 * CompletableFuture may do so, depending on whether they are provided
49 * via one of the <em>async</em> methods (that is, methods with names
50 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51 * methods provide a way to commence asynchronous processing of an
52 * action using either a given {@link Executor} or by default the
53 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54 * debugging, and tracking, all generated asynchronous tasks are
55 * instances of the marker interface {@link AsynchronousCompletionTask}.
56 *
57 * <p>Actions supplied for dependent completions of <em>non-async</em>
58 * methods may be performed by the thread that completes the current
59 * CompletableFuture, or by any other caller of these methods. There
60 * are no guarantees about the order of processing completions unless
61 * constrained by these methods.
62 *
63 * <p>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional completion.
66 * Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}.
68 *
69 * <p>Upon exceptional completion (including cancellation), or when a
70 * completion entails an additional computation which terminates
71 * abruptly with an (unchecked) exception or error, then all of their
72 * dependent completions (and their dependents in turn) generally act
73 * as {@code completeExceptionally} with a {@link CompletionException}
74 * holding that exception as its cause. However, the {@link
75 * #exceptionally exceptionally} and {@link #handle handle}
76 * completions <em>are</em> able to handle exceptional completions of
77 * the CompletableFutures they depend on.
78 *
79 * <p>In case of exceptional completion with a CompletionException,
80 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81 * {@link ExecutionException} with the same cause as held in the
82 * corresponding CompletionException. However, in these cases,
83 * methods {@link #join()} and {@link #getNow} throw the
84 * CompletionException, which simplifies usage.
85 *
86 * <p>Arguments used to pass a completion result (that is, for parameters
87 * of type {@code T}) may be null, but passing a null value for any other
88 * parameter will result in a {@link NullPointerException} being thrown.
89 *
90 * @author Doug Lea
91 * @since 1.8
92 */
93 public class CompletableFuture<T> implements Future<T> {
94
95 /*
96 * Overview:
97 *
98 * 1. Non-nullness of field result (set via CAS) indicates done.
99 * An AltResult is used to box null as a result, as well as to
100 * hold exceptions. Using a single field makes completion fast
101 * and simple to detect and trigger, at the expense of a lot of
102 * encoding and decoding that infiltrates many methods. One minor
103 * simplification relies on the (static) NIL (to box null results)
104 * being the only AltResult with a null exception field, so we
105 * don't usually need explicit comparisons with NIL. The CF
106 * exception propagation mechanics surrounding decoding rely on
107 * unchecked casts of decoded results really being unchecked,
108 * where user type errors are caught at point of use, as is
109 * currently the case in Java. These are highlighted by using
110 * SuppressWarnings-annotated temporaries.
111 *
112 * 2. Waiters are held in a Treiber stack similar to the one used
113 * in FutureTask, Phaser, and SynchronousQueue. See their
114 * internal documentation for algorithmic details.
115 *
116 * 3. Completions are also kept in a list/stack, and pulled off
117 * and run when completion is triggered. (We could even use the
118 * same stack as for waiters, but would give up the potential
119 * parallelism obtained because woken waiters help release/run
120 * others -- see method postComplete). Because post-processing
121 * may race with direct calls, class Completion opportunistically
122 * extends AtomicInteger so callers can claim the action via
123 * compareAndSet(0, 1). The Completion.run methods are all
124 * written a boringly similar uniform way (that sometimes includes
125 * unnecessary-looking checks, kept to maintain uniformity).
126 * There are enough dimensions upon which they differ that
127 * attempts to factor commonalities while maintaining efficiency
128 * require more lines of code than they would save.
129 *
130 * 4. The exported then/and/or methods do support a bit of
131 * factoring (see doThenApply etc). They must cope with the
132 * intrinsic races surrounding addition of a dependent action
133 * versus performing the action directly because the task is
134 * already complete. For example, a CF may not be complete upon
135 * entry, so a dependent completion is added, but by the time it
136 * is added, the target CF is complete, so must be directly
137 * executed. This is all done while avoiding unnecessary object
138 * construction in safe-bypass cases.
139 */
140
141 // preliminaries
142
143 static final class AltResult {
144 final Throwable ex; // null only for NIL
145 AltResult(Throwable ex) { this.ex = ex; }
146 }
147
148 static final AltResult NIL = new AltResult(null);
149
150 // Fields
151
152 volatile Object result; // Either the result or boxed AltResult
153 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
154 volatile CompletionNode completions; // list (Treiber stack) of completions
155
156 // Basic utilities for triggering and processing completions
157
158 /**
159 * Removes and signals all waiting threads and runs all completions.
160 */
161 final void postComplete() {
162 WaitNode q; Thread t;
163 while ((q = waiters) != null) {
164 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
165 (t = q.thread) != null) {
166 q.thread = null;
167 LockSupport.unpark(t);
168 }
169 }
170
171 CompletionNode h; Completion c;
172 while ((h = completions) != null) {
173 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
174 (c = h.completion) != null)
175 c.run();
176 }
177 }
178
179 /**
180 * Triggers completion with the encoding of the given arguments:
181 * if the exception is non-null, encodes it as a wrapped
182 * CompletionException unless it is one already. Otherwise uses
183 * the given result, boxed as NIL if null.
184 */
185 final void internalComplete(T v, Throwable ex) {
186 if (result == null)
187 UNSAFE.compareAndSwapObject
188 (this, RESULT, null,
189 (ex == null) ? (v == null) ? NIL : v :
190 new AltResult((ex instanceof CompletionException) ? ex :
191 new CompletionException(ex)));
192 postComplete(); // help out even if not triggered
193 }
194
195 /**
196 * If triggered, helps release and/or process completions.
197 */
198 final void helpPostComplete() {
199 if (result != null)
200 postComplete();
201 }
202
203 /* ------------- waiting for completions -------------- */
204
205 /** Number of processors, for spin control */
206 static final int NCPU = Runtime.getRuntime().availableProcessors();
207
208 /**
209 * Heuristic spin value for waitingGet() before blocking on
210 * multiprocessors
211 */
212 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
213
214 /**
215 * Linked nodes to record waiting threads in a Treiber stack. See
216 * other classes such as Phaser and SynchronousQueue for more
217 * detailed explanation. This class implements ManagedBlocker to
218 * avoid starvation when blocking actions pile up in
219 * ForkJoinPools.
220 */
221 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
222 long nanos; // wait time if timed
223 final long deadline; // non-zero if timed
224 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
225 volatile Thread thread;
226 volatile WaitNode next;
227 WaitNode(boolean interruptible, long nanos, long deadline) {
228 this.thread = Thread.currentThread();
229 this.interruptControl = interruptible ? 1 : 0;
230 this.nanos = nanos;
231 this.deadline = deadline;
232 }
233 public boolean isReleasable() {
234 if (thread == null)
235 return true;
236 if (Thread.interrupted()) {
237 int i = interruptControl;
238 interruptControl = -1;
239 if (i > 0)
240 return true;
241 }
242 if (deadline != 0L &&
243 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
244 thread = null;
245 return true;
246 }
247 return false;
248 }
249 public boolean block() {
250 if (isReleasable())
251 return true;
252 else if (deadline == 0L)
253 LockSupport.park(this);
254 else if (nanos > 0L)
255 LockSupport.parkNanos(this, nanos);
256 return isReleasable();
257 }
258 }
259
260 /**
261 * Returns raw result after waiting, or null if interruptible and
262 * interrupted.
263 */
264 private Object waitingGet(boolean interruptible) {
265 WaitNode q = null;
266 boolean queued = false;
267 int spins = SPINS;
268 for (Object r;;) {
269 if ((r = result) != null) {
270 if (q != null) { // suppress unpark
271 q.thread = null;
272 if (q.interruptControl < 0) {
273 if (interruptible) {
274 removeWaiter(q);
275 return null;
276 }
277 Thread.currentThread().interrupt();
278 }
279 }
280 postComplete(); // help release others
281 return r;
282 }
283 else if (spins > 0) {
284 int rnd = ThreadLocalRandom.nextSecondarySeed();
285 if (rnd == 0)
286 rnd = ThreadLocalRandom.current().nextInt();
287 if (rnd >= 0)
288 --spins;
289 }
290 else if (q == null)
291 q = new WaitNode(interruptible, 0L, 0L);
292 else if (!queued)
293 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
294 q.next = waiters, q);
295 else if (interruptible && q.interruptControl < 0) {
296 removeWaiter(q);
297 return null;
298 }
299 else if (q.thread != null && result == null) {
300 try {
301 ForkJoinPool.managedBlock(q);
302 } catch (InterruptedException ex) {
303 q.interruptControl = -1;
304 }
305 }
306 }
307 }
308
309 /**
310 * Awaits completion or aborts on interrupt or timeout.
311 *
312 * @param nanos time to wait
313 * @return raw result
314 */
315 private Object timedAwaitDone(long nanos)
316 throws InterruptedException, TimeoutException {
317 WaitNode q = null;
318 boolean queued = false;
319 for (Object r;;) {
320 if ((r = result) != null) {
321 if (q != null) {
322 q.thread = null;
323 if (q.interruptControl < 0) {
324 removeWaiter(q);
325 throw new InterruptedException();
326 }
327 }
328 postComplete();
329 return r;
330 }
331 else if (q == null) {
332 if (nanos <= 0L)
333 throw new TimeoutException();
334 long d = System.nanoTime() + nanos;
335 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
336 }
337 else if (!queued)
338 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
339 q.next = waiters, q);
340 else if (q.interruptControl < 0) {
341 removeWaiter(q);
342 throw new InterruptedException();
343 }
344 else if (q.nanos <= 0L) {
345 if (result == null) {
346 removeWaiter(q);
347 throw new TimeoutException();
348 }
349 }
350 else if (q.thread != null && result == null) {
351 try {
352 ForkJoinPool.managedBlock(q);
353 } catch (InterruptedException ex) {
354 q.interruptControl = -1;
355 }
356 }
357 }
358 }
359
360 /**
361 * Tries to unlink a timed-out or interrupted wait node to avoid
362 * accumulating garbage. Internal nodes are simply unspliced
363 * without CAS since it is harmless if they are traversed anyway
364 * by releasers. To avoid effects of unsplicing from already
365 * removed nodes, the list is retraversed in case of an apparent
366 * race. This is slow when there are a lot of nodes, but we don't
367 * expect lists to be long enough to outweigh higher-overhead
368 * schemes.
369 */
370 private void removeWaiter(WaitNode node) {
371 if (node != null) {
372 node.thread = null;
373 retry:
374 for (;;) { // restart on removeWaiter race
375 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
376 s = q.next;
377 if (q.thread != null)
378 pred = q;
379 else if (pred != null) {
380 pred.next = s;
381 if (pred.thread == null) // check for race
382 continue retry;
383 }
384 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
385 continue retry;
386 }
387 break;
388 }
389 }
390 }
391
392 /* ------------- Async tasks -------------- */
393
394 /**
395 * A marker interface identifying asynchronous tasks produced by
396 * {@code async} methods. This may be useful for monitoring,
397 * debugging, and tracking asynchronous activities.
398 *
399 * @since 1.8
400 */
401 public static interface AsynchronousCompletionTask {
402 }
403
404 /** Base class can act as either FJ or plain Runnable */
405 abstract static class Async extends ForkJoinTask<Void>
406 implements Runnable, AsynchronousCompletionTask {
407 public final Void getRawResult() { return null; }
408 public final void setRawResult(Void v) { }
409 public final void run() { exec(); }
410 }
411
412 static final class AsyncRun extends Async {
413 final Runnable fn;
414 final CompletableFuture<Void> dst;
415 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
416 this.fn = fn; this.dst = dst;
417 }
418 public final boolean exec() {
419 CompletableFuture<Void> d; Throwable ex;
420 if ((d = this.dst) != null && d.result == null) {
421 try {
422 fn.run();
423 ex = null;
424 } catch (Throwable rex) {
425 ex = rex;
426 }
427 d.internalComplete(null, ex);
428 }
429 return true;
430 }
431 private static final long serialVersionUID = 5232453952276885070L;
432 }
433
434 static final class AsyncSupply<U> extends Async {
435 final Supplier<U> fn;
436 final CompletableFuture<U> dst;
437 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
438 this.fn = fn; this.dst = dst;
439 }
440 public final boolean exec() {
441 CompletableFuture<U> d; U u; Throwable ex;
442 if ((d = this.dst) != null && d.result == null) {
443 try {
444 u = fn.get();
445 ex = null;
446 } catch (Throwable rex) {
447 ex = rex;
448 u = null;
449 }
450 d.internalComplete(u, ex);
451 }
452 return true;
453 }
454 private static final long serialVersionUID = 5232453952276885070L;
455 }
456
457 static final class AsyncApply<T,U> extends Async {
458 final T arg;
459 final Function<? super T,? extends U> fn;
460 final CompletableFuture<U> dst;
461 AsyncApply(T arg, Function<? super T,? extends U> fn,
462 CompletableFuture<U> dst) {
463 this.arg = arg; this.fn = fn; this.dst = dst;
464 }
465 public final boolean exec() {
466 CompletableFuture<U> d; U u; Throwable ex;
467 if ((d = this.dst) != null && d.result == null) {
468 try {
469 u = fn.apply(arg);
470 ex = null;
471 } catch (Throwable rex) {
472 ex = rex;
473 u = null;
474 }
475 d.internalComplete(u, ex);
476 }
477 return true;
478 }
479 private static final long serialVersionUID = 5232453952276885070L;
480 }
481
482 static final class AsyncCombine<T,U,V> extends Async {
483 final T arg1;
484 final U arg2;
485 final BiFunction<? super T,? super U,? extends V> fn;
486 final CompletableFuture<V> dst;
487 AsyncCombine(T arg1, U arg2,
488 BiFunction<? super T,? super U,? extends V> fn,
489 CompletableFuture<V> dst) {
490 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
491 }
492 public final boolean exec() {
493 CompletableFuture<V> d; V v; Throwable ex;
494 if ((d = this.dst) != null && d.result == null) {
495 try {
496 v = fn.apply(arg1, arg2);
497 ex = null;
498 } catch (Throwable rex) {
499 ex = rex;
500 v = null;
501 }
502 d.internalComplete(v, ex);
503 }
504 return true;
505 }
506 private static final long serialVersionUID = 5232453952276885070L;
507 }
508
509 static final class AsyncAccept<T> extends Async {
510 final T arg;
511 final Consumer<? super T> fn;
512 final CompletableFuture<Void> dst;
513 AsyncAccept(T arg, Consumer<? super T> fn,
514 CompletableFuture<Void> dst) {
515 this.arg = arg; this.fn = fn; this.dst = dst;
516 }
517 public final boolean exec() {
518 CompletableFuture<Void> d; Throwable ex;
519 if ((d = this.dst) != null && d.result == null) {
520 try {
521 fn.accept(arg);
522 ex = null;
523 } catch (Throwable rex) {
524 ex = rex;
525 }
526 d.internalComplete(null, ex);
527 }
528 return true;
529 }
530 private static final long serialVersionUID = 5232453952276885070L;
531 }
532
533 static final class AsyncAcceptBoth<T,U> extends Async {
534 final T arg1;
535 final U arg2;
536 final BiConsumer<? super T,? super U> fn;
537 final CompletableFuture<Void> dst;
538 AsyncAcceptBoth(T arg1, U arg2,
539 BiConsumer<? super T,? super U> fn,
540 CompletableFuture<Void> dst) {
541 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
542 }
543 public final boolean exec() {
544 CompletableFuture<Void> d; Throwable ex;
545 if ((d = this.dst) != null && d.result == null) {
546 try {
547 fn.accept(arg1, arg2);
548 ex = null;
549 } catch (Throwable rex) {
550 ex = rex;
551 }
552 d.internalComplete(null, ex);
553 }
554 return true;
555 }
556 private static final long serialVersionUID = 5232453952276885070L;
557 }
558
559 static final class AsyncCompose<T,U> extends Async {
560 final T arg;
561 final Function<? super T, CompletableFuture<U>> fn;
562 final CompletableFuture<U> dst;
563 AsyncCompose(T arg,
564 Function<? super T, CompletableFuture<U>> fn,
565 CompletableFuture<U> dst) {
566 this.arg = arg; this.fn = fn; this.dst = dst;
567 }
568 public final boolean exec() {
569 CompletableFuture<U> d, fr; U u; Throwable ex;
570 if ((d = this.dst) != null && d.result == null) {
571 try {
572 fr = fn.apply(arg);
573 ex = (fr == null) ? new NullPointerException() : null;
574 } catch (Throwable rex) {
575 ex = rex;
576 fr = null;
577 }
578 if (ex != null)
579 u = null;
580 else {
581 Object r = fr.result;
582 if (r == null)
583 r = fr.waitingGet(false);
584 if (r instanceof AltResult) {
585 ex = ((AltResult)r).ex;
586 u = null;
587 }
588 else {
589 @SuppressWarnings("unchecked") U ur = (U) r;
590 u = ur;
591 }
592 }
593 d.internalComplete(u, ex);
594 }
595 return true;
596 }
597 private static final long serialVersionUID = 5232453952276885070L;
598 }
599
600 /* ------------- Completions -------------- */
601
602 /**
603 * Simple linked list nodes to record completions, used in
604 * basically the same way as WaitNodes. (We separate nodes from
605 * the Completions themselves mainly because for the And and Or
606 * methods, the same Completion object resides in two lists.)
607 */
608 static final class CompletionNode {
609 final Completion completion;
610 volatile CompletionNode next;
611 CompletionNode(Completion completion) { this.completion = completion; }
612 }
613
614 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
615 abstract static class Completion extends AtomicInteger implements Runnable {
616 }
617
618 static final class ThenApply<T,U> extends Completion {
619 final CompletableFuture<? extends T> src;
620 final Function<? super T,? extends U> fn;
621 final CompletableFuture<U> dst;
622 final Executor executor;
623 ThenApply(CompletableFuture<? extends T> src,
624 Function<? super T,? extends U> fn,
625 CompletableFuture<U> dst,
626 Executor executor) {
627 this.src = src; this.fn = fn; this.dst = dst;
628 this.executor = executor;
629 }
630 public final void run() {
631 final CompletableFuture<? extends T> a;
632 final Function<? super T,? extends U> fn;
633 final CompletableFuture<U> dst;
634 Object r; T t; Throwable ex;
635 if ((dst = this.dst) != null &&
636 (fn = this.fn) != null &&
637 (a = this.src) != null &&
638 (r = a.result) != null &&
639 compareAndSet(0, 1)) {
640 if (r instanceof AltResult) {
641 ex = ((AltResult)r).ex;
642 t = null;
643 }
644 else {
645 ex = null;
646 @SuppressWarnings("unchecked") T tr = (T) r;
647 t = tr;
648 }
649 Executor e = executor;
650 U u = null;
651 if (ex == null) {
652 try {
653 if (e != null)
654 e.execute(new AsyncApply<T,U>(t, fn, dst));
655 else
656 u = fn.apply(t);
657 } catch (Throwable rex) {
658 ex = rex;
659 }
660 }
661 if (e == null || ex != null)
662 dst.internalComplete(u, ex);
663 }
664 }
665 private static final long serialVersionUID = 5232453952276885070L;
666 }
667
668 static final class ThenAccept<T> extends Completion {
669 final CompletableFuture<? extends T> src;
670 final Consumer<? super T> fn;
671 final CompletableFuture<Void> dst;
672 final Executor executor;
673 ThenAccept(CompletableFuture<? extends T> src,
674 Consumer<? super T> fn,
675 CompletableFuture<Void> dst,
676 Executor executor) {
677 this.src = src; this.fn = fn; this.dst = dst;
678 this.executor = executor;
679 }
680 public final void run() {
681 final CompletableFuture<? extends T> a;
682 final Consumer<? super T> fn;
683 final CompletableFuture<Void> dst;
684 Object r; T t; Throwable ex;
685 if ((dst = this.dst) != null &&
686 (fn = this.fn) != null &&
687 (a = this.src) != null &&
688 (r = a.result) != null &&
689 compareAndSet(0, 1)) {
690 if (r instanceof AltResult) {
691 ex = ((AltResult)r).ex;
692 t = null;
693 }
694 else {
695 ex = null;
696 @SuppressWarnings("unchecked") T tr = (T) r;
697 t = tr;
698 }
699 Executor e = executor;
700 if (ex == null) {
701 try {
702 if (e != null)
703 e.execute(new AsyncAccept<T>(t, fn, dst));
704 else
705 fn.accept(t);
706 } catch (Throwable rex) {
707 ex = rex;
708 }
709 }
710 if (e == null || ex != null)
711 dst.internalComplete(null, ex);
712 }
713 }
714 private static final long serialVersionUID = 5232453952276885070L;
715 }
716
717 static final class ThenRun extends Completion {
718 final CompletableFuture<?> src;
719 final Runnable fn;
720 final CompletableFuture<Void> dst;
721 final Executor executor;
722 ThenRun(CompletableFuture<?> src,
723 Runnable fn,
724 CompletableFuture<Void> dst,
725 Executor executor) {
726 this.src = src; this.fn = fn; this.dst = dst;
727 this.executor = executor;
728 }
729 public final void run() {
730 final CompletableFuture<?> a;
731 final Runnable fn;
732 final CompletableFuture<Void> dst;
733 Object r; Throwable ex;
734 if ((dst = this.dst) != null &&
735 (fn = this.fn) != null &&
736 (a = this.src) != null &&
737 (r = a.result) != null &&
738 compareAndSet(0, 1)) {
739 if (r instanceof AltResult)
740 ex = ((AltResult)r).ex;
741 else
742 ex = null;
743 Executor e = executor;
744 if (ex == null) {
745 try {
746 if (e != null)
747 e.execute(new AsyncRun(fn, dst));
748 else
749 fn.run();
750 } catch (Throwable rex) {
751 ex = rex;
752 }
753 }
754 if (e == null || ex != null)
755 dst.internalComplete(null, ex);
756 }
757 }
758 private static final long serialVersionUID = 5232453952276885070L;
759 }
760
761 static final class ThenCombine<T,U,V> extends Completion {
762 final CompletableFuture<? extends T> src;
763 final CompletableFuture<? extends U> snd;
764 final BiFunction<? super T,? super U,? extends V> fn;
765 final CompletableFuture<V> dst;
766 final Executor executor;
767 ThenCombine(CompletableFuture<? extends T> src,
768 CompletableFuture<? extends U> snd,
769 BiFunction<? super T,? super U,? extends V> fn,
770 CompletableFuture<V> dst,
771 Executor executor) {
772 this.src = src; this.snd = snd;
773 this.fn = fn; this.dst = dst;
774 this.executor = executor;
775 }
776 public final void run() {
777 final CompletableFuture<? extends T> a;
778 final CompletableFuture<? extends U> b;
779 final BiFunction<? super T,? super U,? extends V> fn;
780 final CompletableFuture<V> dst;
781 Object r, s; T t; U u; Throwable ex;
782 if ((dst = this.dst) != null &&
783 (fn = this.fn) != null &&
784 (a = this.src) != null &&
785 (r = a.result) != null &&
786 (b = this.snd) != null &&
787 (s = b.result) != null &&
788 compareAndSet(0, 1)) {
789 if (r instanceof AltResult) {
790 ex = ((AltResult)r).ex;
791 t = null;
792 }
793 else {
794 ex = null;
795 @SuppressWarnings("unchecked") T tr = (T) r;
796 t = tr;
797 }
798 if (ex != null)
799 u = null;
800 else if (s instanceof AltResult) {
801 ex = ((AltResult)s).ex;
802 u = null;
803 }
804 else {
805 @SuppressWarnings("unchecked") U us = (U) s;
806 u = us;
807 }
808 Executor e = executor;
809 V v = null;
810 if (ex == null) {
811 try {
812 if (e != null)
813 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
814 else
815 v = fn.apply(t, u);
816 } catch (Throwable rex) {
817 ex = rex;
818 }
819 }
820 if (e == null || ex != null)
821 dst.internalComplete(v, ex);
822 }
823 }
824 private static final long serialVersionUID = 5232453952276885070L;
825 }
826
827 static final class ThenAcceptBoth<T,U> extends Completion {
828 final CompletableFuture<? extends T> src;
829 final CompletableFuture<? extends U> snd;
830 final BiConsumer<? super T,? super U> fn;
831 final CompletableFuture<Void> dst;
832 final Executor executor;
833 ThenAcceptBoth(CompletableFuture<? extends T> src,
834 CompletableFuture<? extends U> snd,
835 BiConsumer<? super T,? super U> fn,
836 CompletableFuture<Void> dst,
837 Executor executor) {
838 this.src = src; this.snd = snd;
839 this.fn = fn; this.dst = dst;
840 this.executor = executor;
841 }
842 public final void run() {
843 final CompletableFuture<? extends T> a;
844 final CompletableFuture<? extends U> b;
845 final BiConsumer<? super T,? super U> fn;
846 final CompletableFuture<Void> dst;
847 Object r, s; T t; U u; Throwable ex;
848 if ((dst = this.dst) != null &&
849 (fn = this.fn) != null &&
850 (a = this.src) != null &&
851 (r = a.result) != null &&
852 (b = this.snd) != null &&
853 (s = b.result) != null &&
854 compareAndSet(0, 1)) {
855 if (r instanceof AltResult) {
856 ex = ((AltResult)r).ex;
857 t = null;
858 }
859 else {
860 ex = null;
861 @SuppressWarnings("unchecked") T tr = (T) r;
862 t = tr;
863 }
864 if (ex != null)
865 u = null;
866 else if (s instanceof AltResult) {
867 ex = ((AltResult)s).ex;
868 u = null;
869 }
870 else {
871 @SuppressWarnings("unchecked") U us = (U) s;
872 u = us;
873 }
874 Executor e = executor;
875 if (ex == null) {
876 try {
877 if (e != null)
878 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
879 else
880 fn.accept(t, u);
881 } catch (Throwable rex) {
882 ex = rex;
883 }
884 }
885 if (e == null || ex != null)
886 dst.internalComplete(null, ex);
887 }
888 }
889 private static final long serialVersionUID = 5232453952276885070L;
890 }
891
892 static final class RunAfterBoth extends Completion {
893 final CompletableFuture<?> src;
894 final CompletableFuture<?> snd;
895 final Runnable fn;
896 final CompletableFuture<Void> dst;
897 final Executor executor;
898 RunAfterBoth(CompletableFuture<?> src,
899 CompletableFuture<?> snd,
900 Runnable fn,
901 CompletableFuture<Void> dst,
902 Executor executor) {
903 this.src = src; this.snd = snd;
904 this.fn = fn; this.dst = dst;
905 this.executor = executor;
906 }
907 public final void run() {
908 final CompletableFuture<?> a;
909 final CompletableFuture<?> b;
910 final Runnable fn;
911 final CompletableFuture<Void> dst;
912 Object r, s; Throwable ex;
913 if ((dst = this.dst) != null &&
914 (fn = this.fn) != null &&
915 (a = this.src) != null &&
916 (r = a.result) != null &&
917 (b = this.snd) != null &&
918 (s = b.result) != null &&
919 compareAndSet(0, 1)) {
920 if (r instanceof AltResult)
921 ex = ((AltResult)r).ex;
922 else
923 ex = null;
924 if (ex == null && (s instanceof AltResult))
925 ex = ((AltResult)s).ex;
926 Executor e = executor;
927 if (ex == null) {
928 try {
929 if (e != null)
930 e.execute(new AsyncRun(fn, dst));
931 else
932 fn.run();
933 } catch (Throwable rex) {
934 ex = rex;
935 }
936 }
937 if (e == null || ex != null)
938 dst.internalComplete(null, ex);
939 }
940 }
941 private static final long serialVersionUID = 5232453952276885070L;
942 }
943
944 static final class AndCompletion extends Completion {
945 final CompletableFuture<?> src;
946 final CompletableFuture<?> snd;
947 final CompletableFuture<Void> dst;
948 AndCompletion(CompletableFuture<?> src,
949 CompletableFuture<?> snd,
950 CompletableFuture<Void> dst) {
951 this.src = src; this.snd = snd; this.dst = dst;
952 }
953 public final void run() {
954 final CompletableFuture<?> a;
955 final CompletableFuture<?> b;
956 final CompletableFuture<Void> dst;
957 Object r, s; Throwable ex;
958 if ((dst = this.dst) != null &&
959 (a = this.src) != null &&
960 (r = a.result) != null &&
961 (b = this.snd) != null &&
962 (s = b.result) != null &&
963 compareAndSet(0, 1)) {
964 if (r instanceof AltResult)
965 ex = ((AltResult)r).ex;
966 else
967 ex = null;
968 if (ex == null && (s instanceof AltResult))
969 ex = ((AltResult)s).ex;
970 dst.internalComplete(null, ex);
971 }
972 }
973 private static final long serialVersionUID = 5232453952276885070L;
974 }
975
976 static final class ApplyToEither<T,U> extends Completion {
977 final CompletableFuture<? extends T> src;
978 final CompletableFuture<? extends T> snd;
979 final Function<? super T,? extends U> fn;
980 final CompletableFuture<U> dst;
981 final Executor executor;
982 ApplyToEither(CompletableFuture<? extends T> src,
983 CompletableFuture<? extends T> snd,
984 Function<? super T,? extends U> fn,
985 CompletableFuture<U> dst,
986 Executor executor) {
987 this.src = src; this.snd = snd;
988 this.fn = fn; this.dst = dst;
989 this.executor = executor;
990 }
991 public final void run() {
992 final CompletableFuture<? extends T> a;
993 final CompletableFuture<? extends T> b;
994 final Function<? super T,? extends U> fn;
995 final CompletableFuture<U> dst;
996 Object r; T t; Throwable ex;
997 if ((dst = this.dst) != null &&
998 (fn = this.fn) != null &&
999 (((a = this.src) != null && (r = a.result) != null) ||
1000 ((b = this.snd) != null && (r = b.result) != null)) &&
1001 compareAndSet(0, 1)) {
1002 if (r instanceof AltResult) {
1003 ex = ((AltResult)r).ex;
1004 t = null;
1005 }
1006 else {
1007 ex = null;
1008 @SuppressWarnings("unchecked") T tr = (T) r;
1009 t = tr;
1010 }
1011 Executor e = executor;
1012 U u = null;
1013 if (ex == null) {
1014 try {
1015 if (e != null)
1016 e.execute(new AsyncApply<T,U>(t, fn, dst));
1017 else
1018 u = fn.apply(t);
1019 } catch (Throwable rex) {
1020 ex = rex;
1021 }
1022 }
1023 if (e == null || ex != null)
1024 dst.internalComplete(u, ex);
1025 }
1026 }
1027 private static final long serialVersionUID = 5232453952276885070L;
1028 }
1029
1030 static final class AcceptEither<T> extends Completion {
1031 final CompletableFuture<? extends T> src;
1032 final CompletableFuture<? extends T> snd;
1033 final Consumer<? super T> fn;
1034 final CompletableFuture<Void> dst;
1035 final Executor executor;
1036 AcceptEither(CompletableFuture<? extends T> src,
1037 CompletableFuture<? extends T> snd,
1038 Consumer<? super T> fn,
1039 CompletableFuture<Void> dst,
1040 Executor executor) {
1041 this.src = src; this.snd = snd;
1042 this.fn = fn; this.dst = dst;
1043 this.executor = executor;
1044 }
1045 public final void run() {
1046 final CompletableFuture<? extends T> a;
1047 final CompletableFuture<? extends T> b;
1048 final Consumer<? super T> fn;
1049 final CompletableFuture<Void> dst;
1050 Object r; T t; Throwable ex;
1051 if ((dst = this.dst) != null &&
1052 (fn = this.fn) != null &&
1053 (((a = this.src) != null && (r = a.result) != null) ||
1054 ((b = this.snd) != null && (r = b.result) != null)) &&
1055 compareAndSet(0, 1)) {
1056 if (r instanceof AltResult) {
1057 ex = ((AltResult)r).ex;
1058 t = null;
1059 }
1060 else {
1061 ex = null;
1062 @SuppressWarnings("unchecked") T tr = (T) r;
1063 t = tr;
1064 }
1065 Executor e = executor;
1066 if (ex == null) {
1067 try {
1068 if (e != null)
1069 e.execute(new AsyncAccept<T>(t, fn, dst));
1070 else
1071 fn.accept(t);
1072 } catch (Throwable rex) {
1073 ex = rex;
1074 }
1075 }
1076 if (e == null || ex != null)
1077 dst.internalComplete(null, ex);
1078 }
1079 }
1080 private static final long serialVersionUID = 5232453952276885070L;
1081 }
1082
1083 static final class RunAfterEither extends Completion {
1084 final CompletableFuture<?> src;
1085 final CompletableFuture<?> snd;
1086 final Runnable fn;
1087 final CompletableFuture<Void> dst;
1088 final Executor executor;
1089 RunAfterEither(CompletableFuture<?> src,
1090 CompletableFuture<?> snd,
1091 Runnable fn,
1092 CompletableFuture<Void> dst,
1093 Executor executor) {
1094 this.src = src; this.snd = snd;
1095 this.fn = fn; this.dst = dst;
1096 this.executor = executor;
1097 }
1098 public final void run() {
1099 final CompletableFuture<?> a;
1100 final CompletableFuture<?> b;
1101 final Runnable fn;
1102 final CompletableFuture<Void> dst;
1103 Object r; Throwable ex;
1104 if ((dst = this.dst) != null &&
1105 (fn = this.fn) != null &&
1106 (((a = this.src) != null && (r = a.result) != null) ||
1107 ((b = this.snd) != null && (r = b.result) != null)) &&
1108 compareAndSet(0, 1)) {
1109 if (r instanceof AltResult)
1110 ex = ((AltResult)r).ex;
1111 else
1112 ex = null;
1113 Executor e = executor;
1114 if (ex == null) {
1115 try {
1116 if (e != null)
1117 e.execute(new AsyncRun(fn, dst));
1118 else
1119 fn.run();
1120 } catch (Throwable rex) {
1121 ex = rex;
1122 }
1123 }
1124 if (e == null || ex != null)
1125 dst.internalComplete(null, ex);
1126 }
1127 }
1128 private static final long serialVersionUID = 5232453952276885070L;
1129 }
1130
1131 static final class OrCompletion extends Completion {
1132 final CompletableFuture<?> src;
1133 final CompletableFuture<?> snd;
1134 final CompletableFuture<Object> dst;
1135 OrCompletion(CompletableFuture<?> src,
1136 CompletableFuture<?> snd,
1137 CompletableFuture<Object> dst) {
1138 this.src = src; this.snd = snd; this.dst = dst;
1139 }
1140 public final void run() {
1141 final CompletableFuture<?> a;
1142 final CompletableFuture<?> b;
1143 final CompletableFuture<Object> dst;
1144 Object r, t; Throwable ex;
1145 if ((dst = this.dst) != null &&
1146 (((a = this.src) != null && (r = a.result) != null) ||
1147 ((b = this.snd) != null && (r = b.result) != null)) &&
1148 compareAndSet(0, 1)) {
1149 if (r instanceof AltResult) {
1150 ex = ((AltResult)r).ex;
1151 t = null;
1152 }
1153 else {
1154 ex = null;
1155 t = r;
1156 }
1157 dst.internalComplete(t, ex);
1158 }
1159 }
1160 private static final long serialVersionUID = 5232453952276885070L;
1161 }
1162
1163 static final class ExceptionCompletion<T> extends Completion {
1164 final CompletableFuture<? extends T> src;
1165 final Function<? super Throwable, ? extends T> fn;
1166 final CompletableFuture<T> dst;
1167 ExceptionCompletion(CompletableFuture<? extends T> src,
1168 Function<? super Throwable, ? extends T> fn,
1169 CompletableFuture<T> dst) {
1170 this.src = src; this.fn = fn; this.dst = dst;
1171 }
1172 public final void run() {
1173 final CompletableFuture<? extends T> a;
1174 final Function<? super Throwable, ? extends T> fn;
1175 final CompletableFuture<T> dst;
1176 Object r; T t = null; Throwable ex, dx = null;
1177 if ((dst = this.dst) != null &&
1178 (fn = this.fn) != null &&
1179 (a = this.src) != null &&
1180 (r = a.result) != null &&
1181 compareAndSet(0, 1)) {
1182 if ((r instanceof AltResult) &&
1183 (ex = ((AltResult)r).ex) != null) {
1184 try {
1185 t = fn.apply(ex);
1186 } catch (Throwable rex) {
1187 dx = rex;
1188 }
1189 }
1190 else {
1191 @SuppressWarnings("unchecked") T tr = (T) r;
1192 t = tr;
1193 }
1194 dst.internalComplete(t, dx);
1195 }
1196 }
1197 private static final long serialVersionUID = 5232453952276885070L;
1198 }
1199
1200 static final class ThenCopy<T> extends Completion {
1201 final CompletableFuture<?> src;
1202 final CompletableFuture<T> dst;
1203 ThenCopy(CompletableFuture<?> src,
1204 CompletableFuture<T> dst) {
1205 this.src = src; this.dst = dst;
1206 }
1207 public final void run() {
1208 final CompletableFuture<?> a;
1209 final CompletableFuture<T> dst;
1210 Object r; T t; Throwable ex;
1211 if ((dst = this.dst) != null &&
1212 (a = this.src) != null &&
1213 (r = a.result) != null &&
1214 compareAndSet(0, 1)) {
1215 if (r instanceof AltResult) {
1216 ex = ((AltResult)r).ex;
1217 t = null;
1218 }
1219 else {
1220 ex = null;
1221 @SuppressWarnings("unchecked") T tr = (T) r;
1222 t = tr;
1223 }
1224 dst.internalComplete(t, ex);
1225 }
1226 }
1227 private static final long serialVersionUID = 5232453952276885070L;
1228 }
1229
1230 // version of ThenCopy for CompletableFuture<Void> dst
1231 static final class ThenPropagate extends Completion {
1232 final CompletableFuture<?> src;
1233 final CompletableFuture<Void> dst;
1234 ThenPropagate(CompletableFuture<?> src,
1235 CompletableFuture<Void> dst) {
1236 this.src = src; this.dst = dst;
1237 }
1238 public final void run() {
1239 final CompletableFuture<?> a;
1240 final CompletableFuture<Void> dst;
1241 Object r; Throwable ex;
1242 if ((dst = this.dst) != null &&
1243 (a = this.src) != null &&
1244 (r = a.result) != null &&
1245 compareAndSet(0, 1)) {
1246 if (r instanceof AltResult)
1247 ex = ((AltResult)r).ex;
1248 else
1249 ex = null;
1250 dst.internalComplete(null, ex);
1251 }
1252 }
1253 private static final long serialVersionUID = 5232453952276885070L;
1254 }
1255
1256 static final class HandleCompletion<T,U> extends Completion {
1257 final CompletableFuture<? extends T> src;
1258 final BiFunction<? super T, Throwable, ? extends U> fn;
1259 final CompletableFuture<U> dst;
1260 HandleCompletion(CompletableFuture<? extends T> src,
1261 BiFunction<? super T, Throwable, ? extends U> fn,
1262 CompletableFuture<U> dst) {
1263 this.src = src; this.fn = fn; this.dst = dst;
1264 }
1265 public final void run() {
1266 final CompletableFuture<? extends T> a;
1267 final BiFunction<? super T, Throwable, ? extends U> fn;
1268 final CompletableFuture<U> dst;
1269 Object r; T t; Throwable ex;
1270 if ((dst = this.dst) != null &&
1271 (fn = this.fn) != null &&
1272 (a = this.src) != null &&
1273 (r = a.result) != null &&
1274 compareAndSet(0, 1)) {
1275 if (r instanceof AltResult) {
1276 ex = ((AltResult)r).ex;
1277 t = null;
1278 }
1279 else {
1280 ex = null;
1281 @SuppressWarnings("unchecked") T tr = (T) r;
1282 t = tr;
1283 }
1284 U u = null; Throwable dx = null;
1285 try {
1286 u = fn.apply(t, ex);
1287 } catch (Throwable rex) {
1288 dx = rex;
1289 }
1290 dst.internalComplete(u, dx);
1291 }
1292 }
1293 private static final long serialVersionUID = 5232453952276885070L;
1294 }
1295
1296 static final class ThenCompose<T,U> extends Completion {
1297 final CompletableFuture<? extends T> src;
1298 final Function<? super T, CompletableFuture<U>> fn;
1299 final CompletableFuture<U> dst;
1300 final Executor executor;
1301 ThenCompose(CompletableFuture<? extends T> src,
1302 Function<? super T, CompletableFuture<U>> fn,
1303 CompletableFuture<U> dst,
1304 Executor executor) {
1305 this.src = src; this.fn = fn; this.dst = dst;
1306 this.executor = executor;
1307 }
1308 public final void run() {
1309 final CompletableFuture<? extends T> a;
1310 final Function<? super T, CompletableFuture<U>> fn;
1311 final CompletableFuture<U> dst;
1312 Object r; T t; Throwable ex; Executor e;
1313 if ((dst = this.dst) != null &&
1314 (fn = this.fn) != null &&
1315 (a = this.src) != null &&
1316 (r = a.result) != null &&
1317 compareAndSet(0, 1)) {
1318 if (r instanceof AltResult) {
1319 ex = ((AltResult)r).ex;
1320 t = null;
1321 }
1322 else {
1323 ex = null;
1324 @SuppressWarnings("unchecked") T tr = (T) r;
1325 t = tr;
1326 }
1327 CompletableFuture<U> c = null;
1328 U u = null;
1329 boolean complete = false;
1330 if (ex == null) {
1331 if ((e = executor) != null)
1332 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1333 else {
1334 try {
1335 if ((c = fn.apply(t)) == null)
1336 ex = new NullPointerException();
1337 } catch (Throwable rex) {
1338 ex = rex;
1339 }
1340 }
1341 }
1342 if (c != null) {
1343 ThenCopy<U> d = null;
1344 Object s;
1345 if ((s = c.result) == null) {
1346 CompletionNode p = new CompletionNode
1347 (d = new ThenCopy<U>(c, dst));
1348 while ((s = c.result) == null) {
1349 if (UNSAFE.compareAndSwapObject
1350 (c, COMPLETIONS, p.next = c.completions, p))
1351 break;
1352 }
1353 }
1354 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1355 complete = true;
1356 if (s instanceof AltResult) {
1357 ex = ((AltResult)s).ex; // no rewrap
1358 u = null;
1359 }
1360 else {
1361 @SuppressWarnings("unchecked") U us = (U) s;
1362 u = us;
1363 }
1364 }
1365 }
1366 if (complete || ex != null)
1367 dst.internalComplete(u, ex);
1368 if (c != null)
1369 c.helpPostComplete();
1370 }
1371 }
1372 private static final long serialVersionUID = 5232453952276885070L;
1373 }
1374
1375 // public methods
1376
1377 /**
1378 * Creates a new incomplete CompletableFuture.
1379 */
1380 public CompletableFuture() {
1381 }
1382
1383 /**
1384 * Returns a new CompletableFuture that is asynchronously completed
1385 * by a task running in the {@link ForkJoinPool#commonPool()} with
1386 * the value obtained by calling the given Supplier.
1387 *
1388 * @param supplier a function returning the value to be used
1389 * to complete the returned CompletableFuture
1390 * @return the new CompletableFuture
1391 */
1392 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1393 if (supplier == null) throw new NullPointerException();
1394 CompletableFuture<U> f = new CompletableFuture<U>();
1395 ForkJoinPool.commonPool().
1396 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1397 return f;
1398 }
1399
1400 /**
1401 * Returns a new CompletableFuture that is asynchronously completed
1402 * by a task running in the given executor with the value obtained
1403 * by calling the given Supplier.
1404 *
1405 * @param supplier a function returning the value to be used
1406 * to complete the returned CompletableFuture
1407 * @param executor the executor to use for asynchronous execution
1408 * @return the new CompletableFuture
1409 */
1410 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1411 Executor executor) {
1412 if (executor == null || supplier == null)
1413 throw new NullPointerException();
1414 CompletableFuture<U> f = new CompletableFuture<U>();
1415 executor.execute(new AsyncSupply<U>(supplier, f));
1416 return f;
1417 }
1418
1419 /**
1420 * Returns a new CompletableFuture that is asynchronously completed
1421 * by a task running in the {@link ForkJoinPool#commonPool()} after
1422 * it runs the given action.
1423 *
1424 * @param runnable the action to run before completing the
1425 * returned CompletableFuture
1426 * @return the new CompletableFuture
1427 */
1428 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1429 if (runnable == null) throw new NullPointerException();
1430 CompletableFuture<Void> f = new CompletableFuture<Void>();
1431 ForkJoinPool.commonPool().
1432 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1433 return f;
1434 }
1435
1436 /**
1437 * Returns a new CompletableFuture that is asynchronously completed
1438 * by a task running in the given executor after it runs the given
1439 * action.
1440 *
1441 * @param runnable the action to run before completing the
1442 * returned CompletableFuture
1443 * @param executor the executor to use for asynchronous execution
1444 * @return the new CompletableFuture
1445 */
1446 public static CompletableFuture<Void> runAsync(Runnable runnable,
1447 Executor executor) {
1448 if (executor == null || runnable == null)
1449 throw new NullPointerException();
1450 CompletableFuture<Void> f = new CompletableFuture<Void>();
1451 executor.execute(new AsyncRun(runnable, f));
1452 return f;
1453 }
1454
1455 /**
1456 * Returns a new CompletableFuture that is already completed with
1457 * the given value.
1458 *
1459 * @param value the value
1460 * @return the completed CompletableFuture
1461 */
1462 public static <U> CompletableFuture<U> completedFuture(U value) {
1463 CompletableFuture<U> f = new CompletableFuture<U>();
1464 f.result = (value == null) ? NIL : value;
1465 return f;
1466 }
1467
1468 /**
1469 * Returns {@code true} if completed in any fashion: normally,
1470 * exceptionally, or via cancellation.
1471 *
1472 * @return {@code true} if completed
1473 */
1474 public boolean isDone() {
1475 return result != null;
1476 }
1477
1478 /**
1479 * Waits if necessary for this future to complete, and then
1480 * returns its result.
1481 *
1482 * @return the result value
1483 * @throws CancellationException if this future was cancelled
1484 * @throws ExecutionException if this future completed exceptionally
1485 * @throws InterruptedException if the current thread was interrupted
1486 * while waiting
1487 */
1488 public T get() throws InterruptedException, ExecutionException {
1489 Object r; Throwable ex, cause;
1490 if ((r = result) == null && (r = waitingGet(true)) == null)
1491 throw new InterruptedException();
1492 if (!(r instanceof AltResult)) {
1493 @SuppressWarnings("unchecked") T tr = (T) r;
1494 return tr;
1495 }
1496 if ((ex = ((AltResult)r).ex) == null)
1497 return null;
1498 if (ex instanceof CancellationException)
1499 throw (CancellationException)ex;
1500 if ((ex instanceof CompletionException) &&
1501 (cause = ex.getCause()) != null)
1502 ex = cause;
1503 throw new ExecutionException(ex);
1504 }
1505
1506 /**
1507 * Waits if necessary for at most the given time for this future
1508 * to complete, and then returns its result, if available.
1509 *
1510 * @param timeout the maximum time to wait
1511 * @param unit the time unit of the timeout argument
1512 * @return the result value
1513 * @throws CancellationException if this future was cancelled
1514 * @throws ExecutionException if this future completed exceptionally
1515 * @throws InterruptedException if the current thread was interrupted
1516 * while waiting
1517 * @throws TimeoutException if the wait timed out
1518 */
1519 public T get(long timeout, TimeUnit unit)
1520 throws InterruptedException, ExecutionException, TimeoutException {
1521 Object r; Throwable ex, cause;
1522 long nanos = unit.toNanos(timeout);
1523 if (Thread.interrupted())
1524 throw new InterruptedException();
1525 if ((r = result) == null)
1526 r = timedAwaitDone(nanos);
1527 if (!(r instanceof AltResult)) {
1528 @SuppressWarnings("unchecked") T tr = (T) r;
1529 return tr;
1530 }
1531 if ((ex = ((AltResult)r).ex) == null)
1532 return null;
1533 if (ex instanceof CancellationException)
1534 throw (CancellationException)ex;
1535 if ((ex instanceof CompletionException) &&
1536 (cause = ex.getCause()) != null)
1537 ex = cause;
1538 throw new ExecutionException(ex);
1539 }
1540
1541 /**
1542 * Returns the result value when complete, or throws an
1543 * (unchecked) exception if completed exceptionally. To better
1544 * conform with the use of common functional forms, if a
1545 * computation involved in the completion of this
1546 * CompletableFuture threw an exception, this method throws an
1547 * (unchecked) {@link CompletionException} with the underlying
1548 * exception as its cause.
1549 *
1550 * @return the result value
1551 * @throws CancellationException if the computation was cancelled
1552 * @throws CompletionException if this future completed
1553 * exceptionally or a completion computation threw an exception
1554 */
1555 public T join() {
1556 Object r; Throwable ex;
1557 if ((r = result) == null)
1558 r = waitingGet(false);
1559 if (!(r instanceof AltResult)) {
1560 @SuppressWarnings("unchecked") T tr = (T) r;
1561 return tr;
1562 }
1563 if ((ex = ((AltResult)r).ex) == null)
1564 return null;
1565 if (ex instanceof CancellationException)
1566 throw (CancellationException)ex;
1567 if (ex instanceof CompletionException)
1568 throw (CompletionException)ex;
1569 throw new CompletionException(ex);
1570 }
1571
1572 /**
1573 * Returns the result value (or throws any encountered exception)
1574 * if completed, else returns the given valueIfAbsent.
1575 *
1576 * @param valueIfAbsent the value to return if not completed
1577 * @return the result value, if completed, else the given valueIfAbsent
1578 * @throws CancellationException if the computation was cancelled
1579 * @throws CompletionException if this future completed
1580 * exceptionally or a completion computation threw an exception
1581 */
1582 public T getNow(T valueIfAbsent) {
1583 Object r; Throwable ex;
1584 if ((r = result) == null)
1585 return valueIfAbsent;
1586 if (!(r instanceof AltResult)) {
1587 @SuppressWarnings("unchecked") T tr = (T) r;
1588 return tr;
1589 }
1590 if ((ex = ((AltResult)r).ex) == null)
1591 return null;
1592 if (ex instanceof CancellationException)
1593 throw (CancellationException)ex;
1594 if (ex instanceof CompletionException)
1595 throw (CompletionException)ex;
1596 throw new CompletionException(ex);
1597 }
1598
1599 /**
1600 * If not already completed, sets the value returned by {@link
1601 * #get()} and related methods to the given value.
1602 *
1603 * @param value the result value
1604 * @return {@code true} if this invocation caused this CompletableFuture
1605 * to transition to a completed state, else {@code false}
1606 */
1607 public boolean complete(T value) {
1608 boolean triggered = result == null &&
1609 UNSAFE.compareAndSwapObject(this, RESULT, null,
1610 value == null ? NIL : value);
1611 postComplete();
1612 return triggered;
1613 }
1614
1615 /**
1616 * If not already completed, causes invocations of {@link #get()}
1617 * and related methods to throw the given exception.
1618 *
1619 * @param ex the exception
1620 * @return {@code true} if this invocation caused this CompletableFuture
1621 * to transition to a completed state, else {@code false}
1622 */
1623 public boolean completeExceptionally(Throwable ex) {
1624 if (ex == null) throw new NullPointerException();
1625 boolean triggered = result == null &&
1626 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1627 postComplete();
1628 return triggered;
1629 }
1630
1631 /**
1632 * Returns a new CompletableFuture that is completed
1633 * when this CompletableFuture completes, with the result of the
1634 * given function of this CompletableFuture's result.
1635 *
1636 * <p>If this CompletableFuture completes exceptionally, or the
1637 * supplied function throws an exception, then the returned
1638 * CompletableFuture completes exceptionally with a
1639 * CompletionException holding the exception as its cause.
1640 *
1641 * @param fn the function to use to compute the value of
1642 * the returned CompletableFuture
1643 * @return the new CompletableFuture
1644 */
1645 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1646 return doThenApply(fn, null);
1647 }
1648
1649 /**
1650 * Returns a new CompletableFuture that is asynchronously completed
1651 * when this CompletableFuture completes, with the result of the
1652 * given function of this CompletableFuture's result from a
1653 * task running in the {@link ForkJoinPool#commonPool()}.
1654 *
1655 * <p>If this CompletableFuture completes exceptionally, or the
1656 * supplied function throws an exception, then the returned
1657 * CompletableFuture completes exceptionally with a
1658 * CompletionException holding the exception as its cause.
1659 *
1660 * @param fn the function to use to compute the value of
1661 * the returned CompletableFuture
1662 * @return the new CompletableFuture
1663 */
1664 public <U> CompletableFuture<U> thenApplyAsync
1665 (Function<? super T,? extends U> fn) {
1666 return doThenApply(fn, ForkJoinPool.commonPool());
1667 }
1668
1669 /**
1670 * Returns a new CompletableFuture that is asynchronously completed
1671 * when this CompletableFuture completes, with the result of the
1672 * given function of this CompletableFuture's result from a
1673 * task running in the given executor.
1674 *
1675 * <p>If this CompletableFuture completes exceptionally, or the
1676 * supplied function throws an exception, then the returned
1677 * CompletableFuture completes exceptionally with a
1678 * CompletionException holding the exception as its cause.
1679 *
1680 * @param fn the function to use to compute the value of
1681 * the returned CompletableFuture
1682 * @param executor the executor to use for asynchronous execution
1683 * @return the new CompletableFuture
1684 */
1685 public <U> CompletableFuture<U> thenApplyAsync
1686 (Function<? super T,? extends U> fn,
1687 Executor executor) {
1688 if (executor == null) throw new NullPointerException();
1689 return doThenApply(fn, executor);
1690 }
1691
1692 private <U> CompletableFuture<U> doThenApply
1693 (Function<? super T,? extends U> fn,
1694 Executor e) {
1695 if (fn == null) throw new NullPointerException();
1696 CompletableFuture<U> dst = new CompletableFuture<U>();
1697 ThenApply<T,U> d = null;
1698 Object r;
1699 if ((r = result) == null) {
1700 CompletionNode p = new CompletionNode
1701 (d = new ThenApply<T,U>(this, fn, dst, e));
1702 while ((r = result) == null) {
1703 if (UNSAFE.compareAndSwapObject
1704 (this, COMPLETIONS, p.next = completions, p))
1705 break;
1706 }
1707 }
1708 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1709 T t; Throwable ex;
1710 if (r instanceof AltResult) {
1711 ex = ((AltResult)r).ex;
1712 t = null;
1713 }
1714 else {
1715 ex = null;
1716 @SuppressWarnings("unchecked") T tr = (T) r;
1717 t = tr;
1718 }
1719 U u = null;
1720 if (ex == null) {
1721 try {
1722 if (e != null)
1723 e.execute(new AsyncApply<T,U>(t, fn, dst));
1724 else
1725 u = fn.apply(t);
1726 } catch (Throwable rex) {
1727 ex = rex;
1728 }
1729 }
1730 if (e == null || ex != null)
1731 dst.internalComplete(u, ex);
1732 }
1733 helpPostComplete();
1734 return dst;
1735 }
1736
1737 /**
1738 * Returns a new CompletableFuture that is completed
1739 * when this CompletableFuture completes, after performing the given
1740 * action with this CompletableFuture's result.
1741 *
1742 * <p>If this CompletableFuture completes exceptionally, or the
1743 * supplied action throws an exception, then the returned
1744 * CompletableFuture completes exceptionally with a
1745 * CompletionException holding the exception as its cause.
1746 *
1747 * @param block the action to perform before completing the
1748 * returned CompletableFuture
1749 * @return the new CompletableFuture
1750 */
1751 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1752 return doThenAccept(block, null);
1753 }
1754
1755 /**
1756 * Returns a new CompletableFuture that is asynchronously completed
1757 * when this CompletableFuture completes, after performing the given
1758 * action with this CompletableFuture's result from a task running
1759 * in the {@link ForkJoinPool#commonPool()}.
1760 *
1761 * <p>If this CompletableFuture completes exceptionally, or the
1762 * supplied action throws an exception, then the returned
1763 * CompletableFuture completes exceptionally with a
1764 * CompletionException holding the exception as its cause.
1765 *
1766 * @param block the action to perform before completing the
1767 * returned CompletableFuture
1768 * @return the new CompletableFuture
1769 */
1770 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1771 return doThenAccept(block, ForkJoinPool.commonPool());
1772 }
1773
1774 /**
1775 * Returns a new CompletableFuture that is asynchronously completed
1776 * when this CompletableFuture completes, after performing the given
1777 * action with this CompletableFuture's result from a task running
1778 * in the given executor.
1779 *
1780 * <p>If this CompletableFuture completes exceptionally, or the
1781 * supplied action throws an exception, then the returned
1782 * CompletableFuture completes exceptionally with a
1783 * CompletionException holding the exception as its cause.
1784 *
1785 * @param block the action to perform before completing the
1786 * returned CompletableFuture
1787 * @param executor the executor to use for asynchronous execution
1788 * @return the new CompletableFuture
1789 */
1790 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1791 Executor executor) {
1792 if (executor == null) throw new NullPointerException();
1793 return doThenAccept(block, executor);
1794 }
1795
1796 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1797 Executor e) {
1798 if (fn == null) throw new NullPointerException();
1799 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1800 ThenAccept<T> d = null;
1801 Object r;
1802 if ((r = result) == null) {
1803 CompletionNode p = new CompletionNode
1804 (d = new ThenAccept<T>(this, fn, dst, e));
1805 while ((r = result) == null) {
1806 if (UNSAFE.compareAndSwapObject
1807 (this, COMPLETIONS, p.next = completions, p))
1808 break;
1809 }
1810 }
1811 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1812 T t; Throwable ex;
1813 if (r instanceof AltResult) {
1814 ex = ((AltResult)r).ex;
1815 t = null;
1816 }
1817 else {
1818 ex = null;
1819 @SuppressWarnings("unchecked") T tr = (T) r;
1820 t = tr;
1821 }
1822 if (ex == null) {
1823 try {
1824 if (e != null)
1825 e.execute(new AsyncAccept<T>(t, fn, dst));
1826 else
1827 fn.accept(t);
1828 } catch (Throwable rex) {
1829 ex = rex;
1830 }
1831 }
1832 if (e == null || ex != null)
1833 dst.internalComplete(null, ex);
1834 }
1835 helpPostComplete();
1836 return dst;
1837 }
1838
1839 /**
1840 * Returns a new CompletableFuture that is completed
1841 * when this CompletableFuture completes, after performing the given
1842 * action.
1843 *
1844 * <p>If this CompletableFuture completes exceptionally, or the
1845 * supplied action throws an exception, then the returned
1846 * CompletableFuture completes exceptionally with a
1847 * CompletionException holding the exception as its cause.
1848 *
1849 * @param action the action to perform before completing the
1850 * returned CompletableFuture
1851 * @return the new CompletableFuture
1852 */
1853 public CompletableFuture<Void> thenRun(Runnable action) {
1854 return doThenRun(action, null);
1855 }
1856
1857 /**
1858 * Returns a new CompletableFuture that is asynchronously completed
1859 * when this CompletableFuture completes, after performing the given
1860 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1861 *
1862 * <p>If this CompletableFuture completes exceptionally, or the
1863 * supplied action throws an exception, then the returned
1864 * CompletableFuture completes exceptionally with a
1865 * CompletionException holding the exception as its cause.
1866 *
1867 * @param action the action to perform before completing the
1868 * returned CompletableFuture
1869 * @return the new CompletableFuture
1870 */
1871 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1872 return doThenRun(action, ForkJoinPool.commonPool());
1873 }
1874
1875 /**
1876 * Returns a new CompletableFuture that is asynchronously completed
1877 * when this CompletableFuture completes, after performing the given
1878 * action from a task running in the given executor.
1879 *
1880 * <p>If this CompletableFuture completes exceptionally, or the
1881 * supplied action throws an exception, then the returned
1882 * CompletableFuture completes exceptionally with a
1883 * CompletionException holding the exception as its cause.
1884 *
1885 * @param action the action to perform before completing the
1886 * returned CompletableFuture
1887 * @param executor the executor to use for asynchronous execution
1888 * @return the new CompletableFuture
1889 */
1890 public CompletableFuture<Void> thenRunAsync(Runnable action,
1891 Executor executor) {
1892 if (executor == null) throw new NullPointerException();
1893 return doThenRun(action, executor);
1894 }
1895
1896 private CompletableFuture<Void> doThenRun(Runnable action,
1897 Executor e) {
1898 if (action == null) throw new NullPointerException();
1899 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1900 ThenRun d = null;
1901 Object r;
1902 if ((r = result) == null) {
1903 CompletionNode p = new CompletionNode
1904 (d = new ThenRun(this, action, dst, e));
1905 while ((r = result) == null) {
1906 if (UNSAFE.compareAndSwapObject
1907 (this, COMPLETIONS, p.next = completions, p))
1908 break;
1909 }
1910 }
1911 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1912 Throwable ex;
1913 if (r instanceof AltResult)
1914 ex = ((AltResult)r).ex;
1915 else
1916 ex = null;
1917 if (ex == null) {
1918 try {
1919 if (e != null)
1920 e.execute(new AsyncRun(action, dst));
1921 else
1922 action.run();
1923 } catch (Throwable rex) {
1924 ex = rex;
1925 }
1926 }
1927 if (e == null || ex != null)
1928 dst.internalComplete(null, ex);
1929 }
1930 helpPostComplete();
1931 return dst;
1932 }
1933
1934 /**
1935 * Returns a new CompletableFuture that is completed
1936 * when both this and the other given CompletableFuture complete,
1937 * with the result of the given function of the results of the two
1938 * CompletableFutures.
1939 *
1940 * <p>If this and/or the other CompletableFuture complete
1941 * exceptionally, or the supplied function throws an exception,
1942 * then the returned CompletableFuture completes exceptionally
1943 * with a CompletionException holding the exception as its cause.
1944 *
1945 * @param other the other CompletableFuture
1946 * @param fn the function to use to compute the value of
1947 * the returned CompletableFuture
1948 * @return the new CompletableFuture
1949 */
1950 public <U,V> CompletableFuture<V> thenCombine
1951 (CompletableFuture<? extends U> other,
1952 BiFunction<? super T,? super U,? extends V> fn) {
1953 return doThenCombine(other, fn, null);
1954 }
1955
1956 /**
1957 * Returns a new CompletableFuture that is asynchronously completed
1958 * when both this and the other given CompletableFuture complete,
1959 * with the result of the given function of the results of the two
1960 * CompletableFutures from a task running in the
1961 * {@link ForkJoinPool#commonPool()}.
1962 *
1963 * <p>If this and/or the other CompletableFuture complete
1964 * exceptionally, or the supplied function throws an exception,
1965 * then the returned CompletableFuture completes exceptionally
1966 * with a CompletionException holding the exception as its cause.
1967 *
1968 * @param other the other CompletableFuture
1969 * @param fn the function to use to compute the value of
1970 * the returned CompletableFuture
1971 * @return the new CompletableFuture
1972 */
1973 public <U,V> CompletableFuture<V> thenCombineAsync
1974 (CompletableFuture<? extends U> other,
1975 BiFunction<? super T,? super U,? extends V> fn) {
1976 return doThenCombine(other, fn, ForkJoinPool.commonPool());
1977 }
1978
1979 /**
1980 * Returns a new CompletableFuture that is asynchronously completed
1981 * when both this and the other given CompletableFuture complete,
1982 * with the result of the given function of the results of the two
1983 * CompletableFutures from a task running in the given executor.
1984 *
1985 * <p>If this and/or the other CompletableFuture complete
1986 * exceptionally, or the supplied function throws an exception,
1987 * then the returned CompletableFuture completes exceptionally
1988 * with a CompletionException holding the exception as its cause.
1989 *
1990 * @param other the other CompletableFuture
1991 * @param fn the function to use to compute the value of
1992 * the returned CompletableFuture
1993 * @param executor the executor to use for asynchronous execution
1994 * @return the new CompletableFuture
1995 */
1996 public <U,V> CompletableFuture<V> thenCombineAsync
1997 (CompletableFuture<? extends U> other,
1998 BiFunction<? super T,? super U,? extends V> fn,
1999 Executor executor) {
2000 if (executor == null) throw new NullPointerException();
2001 return doThenCombine(other, fn, executor);
2002 }
2003
2004 private <U,V> CompletableFuture<V> doThenCombine
2005 (CompletableFuture<? extends U> other,
2006 BiFunction<? super T,? super U,? extends V> fn,
2007 Executor e) {
2008 if (other == null || fn == null) throw new NullPointerException();
2009 CompletableFuture<V> dst = new CompletableFuture<V>();
2010 ThenCombine<T,U,V> d = null;
2011 Object r, s = null;
2012 if ((r = result) == null || (s = other.result) == null) {
2013 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
2014 CompletionNode q = null, p = new CompletionNode(d);
2015 while ((r == null && (r = result) == null) ||
2016 (s == null && (s = other.result) == null)) {
2017 if (q != null) {
2018 if (s != null ||
2019 UNSAFE.compareAndSwapObject
2020 (other, COMPLETIONS, q.next = other.completions, q))
2021 break;
2022 }
2023 else if (r != null ||
2024 UNSAFE.compareAndSwapObject
2025 (this, COMPLETIONS, p.next = completions, p)) {
2026 if (s != null)
2027 break;
2028 q = new CompletionNode(d);
2029 }
2030 }
2031 }
2032 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2033 T t; U u; Throwable ex;
2034 if (r instanceof AltResult) {
2035 ex = ((AltResult)r).ex;
2036 t = null;
2037 }
2038 else {
2039 ex = null;
2040 @SuppressWarnings("unchecked") T tr = (T) r;
2041 t = tr;
2042 }
2043 if (ex != null)
2044 u = null;
2045 else if (s instanceof AltResult) {
2046 ex = ((AltResult)s).ex;
2047 u = null;
2048 }
2049 else {
2050 @SuppressWarnings("unchecked") U us = (U) s;
2051 u = us;
2052 }
2053 V v = null;
2054 if (ex == null) {
2055 try {
2056 if (e != null)
2057 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
2058 else
2059 v = fn.apply(t, u);
2060 } catch (Throwable rex) {
2061 ex = rex;
2062 }
2063 }
2064 if (e == null || ex != null)
2065 dst.internalComplete(v, ex);
2066 }
2067 helpPostComplete();
2068 other.helpPostComplete();
2069 return dst;
2070 }
2071
2072 /**
2073 * Returns a new CompletableFuture that is completed
2074 * when both this and the other given CompletableFuture complete,
2075 * after performing the given action with the results of the two
2076 * CompletableFutures.
2077 *
2078 * <p>If this and/or the other CompletableFuture complete
2079 * exceptionally, or the supplied action throws an exception,
2080 * then the returned CompletableFuture completes exceptionally
2081 * with a CompletionException holding the exception as its cause.
2082 *
2083 * @param other the other CompletableFuture
2084 * @param block the action to perform before completing the
2085 * returned CompletableFuture
2086 * @return the new CompletableFuture
2087 */
2088 public <U> CompletableFuture<Void> thenAcceptBoth
2089 (CompletableFuture<? extends U> other,
2090 BiConsumer<? super T, ? super U> block) {
2091 return doThenAcceptBoth(other, block, null);
2092 }
2093
2094 /**
2095 * Returns a new CompletableFuture that is asynchronously completed
2096 * when both this and the other given CompletableFuture complete,
2097 * after performing the given action with the results of the two
2098 * CompletableFutures from a task running in the {@link
2099 * ForkJoinPool#commonPool()}.
2100 *
2101 * <p>If this and/or the other CompletableFuture complete
2102 * exceptionally, or the supplied action throws an exception,
2103 * then the returned CompletableFuture completes exceptionally
2104 * with a CompletionException holding the exception as its cause.
2105 *
2106 * @param other the other CompletableFuture
2107 * @param block the action to perform before completing the
2108 * returned CompletableFuture
2109 * @return the new CompletableFuture
2110 */
2111 public <U> CompletableFuture<Void> thenAcceptBothAsync
2112 (CompletableFuture<? extends U> other,
2113 BiConsumer<? super T, ? super U> block) {
2114 return doThenAcceptBoth(other, block, ForkJoinPool.commonPool());
2115 }
2116
2117 /**
2118 * Returns a new CompletableFuture that is asynchronously completed
2119 * when both this and the other given CompletableFuture complete,
2120 * after performing the given action with the results of the two
2121 * CompletableFutures from a task running in the given executor.
2122 *
2123 * <p>If this and/or the other CompletableFuture complete
2124 * exceptionally, or the supplied action throws an exception,
2125 * then the returned CompletableFuture completes exceptionally
2126 * with a CompletionException holding the exception as its cause.
2127 *
2128 * @param other the other CompletableFuture
2129 * @param block the action to perform before completing the
2130 * returned CompletableFuture
2131 * @param executor the executor to use for asynchronous execution
2132 * @return the new CompletableFuture
2133 */
2134 public <U> CompletableFuture<Void> thenAcceptBothAsync
2135 (CompletableFuture<? extends U> other,
2136 BiConsumer<? super T, ? super U> block,
2137 Executor executor) {
2138 if (executor == null) throw new NullPointerException();
2139 return doThenAcceptBoth(other, block, executor);
2140 }
2141
2142 private <U> CompletableFuture<Void> doThenAcceptBoth
2143 (CompletableFuture<? extends U> other,
2144 BiConsumer<? super T,? super U> fn,
2145 Executor e) {
2146 if (other == null || fn == null) throw new NullPointerException();
2147 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2148 ThenAcceptBoth<T,U> d = null;
2149 Object r, s = null;
2150 if ((r = result) == null || (s = other.result) == null) {
2151 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
2152 CompletionNode q = null, p = new CompletionNode(d);
2153 while ((r == null && (r = result) == null) ||
2154 (s == null && (s = other.result) == null)) {
2155 if (q != null) {
2156 if (s != null ||
2157 UNSAFE.compareAndSwapObject
2158 (other, COMPLETIONS, q.next = other.completions, q))
2159 break;
2160 }
2161 else if (r != null ||
2162 UNSAFE.compareAndSwapObject
2163 (this, COMPLETIONS, p.next = completions, p)) {
2164 if (s != null)
2165 break;
2166 q = new CompletionNode(d);
2167 }
2168 }
2169 }
2170 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2171 T t; U u; Throwable ex;
2172 if (r instanceof AltResult) {
2173 ex = ((AltResult)r).ex;
2174 t = null;
2175 }
2176 else {
2177 ex = null;
2178 @SuppressWarnings("unchecked") T tr = (T) r;
2179 t = tr;
2180 }
2181 if (ex != null)
2182 u = null;
2183 else if (s instanceof AltResult) {
2184 ex = ((AltResult)s).ex;
2185 u = null;
2186 }
2187 else {
2188 @SuppressWarnings("unchecked") U us = (U) s;
2189 u = us;
2190 }
2191 if (ex == null) {
2192 try {
2193 if (e != null)
2194 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
2195 else
2196 fn.accept(t, u);
2197 } catch (Throwable rex) {
2198 ex = rex;
2199 }
2200 }
2201 if (e == null || ex != null)
2202 dst.internalComplete(null, ex);
2203 }
2204 helpPostComplete();
2205 other.helpPostComplete();
2206 return dst;
2207 }
2208
2209 /**
2210 * Returns a new CompletableFuture that is completed
2211 * when both this and the other given CompletableFuture complete,
2212 * after performing the given action.
2213 *
2214 * <p>If this and/or the other CompletableFuture complete
2215 * exceptionally, or the supplied action throws an exception,
2216 * then the returned CompletableFuture completes exceptionally
2217 * with a CompletionException holding the exception as its cause.
2218 *
2219 * @param other the other CompletableFuture
2220 * @param action the action to perform before completing the
2221 * returned CompletableFuture
2222 * @return the new CompletableFuture
2223 */
2224 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2225 Runnable action) {
2226 return doRunAfterBoth(other, action, null);
2227 }
2228
2229 /**
2230 * Returns a new CompletableFuture that is asynchronously completed
2231 * when both this and the other given CompletableFuture complete,
2232 * after performing the given action from a task running in the
2233 * {@link ForkJoinPool#commonPool()}.
2234 *
2235 * <p>If this and/or the other CompletableFuture complete
2236 * exceptionally, or the supplied action throws an exception,
2237 * then the returned CompletableFuture completes exceptionally
2238 * with a CompletionException holding the exception as its cause.
2239 *
2240 * @param other the other CompletableFuture
2241 * @param action the action to perform before completing the
2242 * returned CompletableFuture
2243 * @return the new CompletableFuture
2244 */
2245 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2246 Runnable action) {
2247 return doRunAfterBoth(other, action, ForkJoinPool.commonPool());
2248 }
2249
2250 /**
2251 * Returns a new CompletableFuture that is asynchronously completed
2252 * when both this and the other given CompletableFuture complete,
2253 * after performing the given action from a task running in the
2254 * given executor.
2255 *
2256 * <p>If this and/or the other CompletableFuture complete
2257 * exceptionally, or the supplied action throws an exception,
2258 * then the returned CompletableFuture completes exceptionally
2259 * with a CompletionException holding the exception as its cause.
2260 *
2261 * @param other the other CompletableFuture
2262 * @param action the action to perform before completing the
2263 * returned CompletableFuture
2264 * @param executor the executor to use for asynchronous execution
2265 * @return the new CompletableFuture
2266 */
2267 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2268 Runnable action,
2269 Executor executor) {
2270 if (executor == null) throw new NullPointerException();
2271 return doRunAfterBoth(other, action, executor);
2272 }
2273
2274 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
2275 Runnable action,
2276 Executor e) {
2277 if (other == null || action == null) throw new NullPointerException();
2278 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2279 RunAfterBoth d = null;
2280 Object r, s = null;
2281 if ((r = result) == null || (s = other.result) == null) {
2282 d = new RunAfterBoth(this, other, action, dst, e);
2283 CompletionNode q = null, p = new CompletionNode(d);
2284 while ((r == null && (r = result) == null) ||
2285 (s == null && (s = other.result) == null)) {
2286 if (q != null) {
2287 if (s != null ||
2288 UNSAFE.compareAndSwapObject
2289 (other, COMPLETIONS, q.next = other.completions, q))
2290 break;
2291 }
2292 else if (r != null ||
2293 UNSAFE.compareAndSwapObject
2294 (this, COMPLETIONS, p.next = completions, p)) {
2295 if (s != null)
2296 break;
2297 q = new CompletionNode(d);
2298 }
2299 }
2300 }
2301 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2302 Throwable ex;
2303 if (r instanceof AltResult)
2304 ex = ((AltResult)r).ex;
2305 else
2306 ex = null;
2307 if (ex == null && (s instanceof AltResult))
2308 ex = ((AltResult)s).ex;
2309 if (ex == null) {
2310 try {
2311 if (e != null)
2312 e.execute(new AsyncRun(action, dst));
2313 else
2314 action.run();
2315 } catch (Throwable rex) {
2316 ex = rex;
2317 }
2318 }
2319 if (e == null || ex != null)
2320 dst.internalComplete(null, ex);
2321 }
2322 helpPostComplete();
2323 other.helpPostComplete();
2324 return dst;
2325 }
2326
2327 /**
2328 * Returns a new CompletableFuture that is completed
2329 * when either this or the other given CompletableFuture completes,
2330 * with the result of the given function of either this or the other
2331 * CompletableFuture's result.
2332 *
2333 * <p>If this and/or the other CompletableFuture complete
2334 * exceptionally, then the returned CompletableFuture may also do so,
2335 * with a CompletionException holding one of these exceptions as its
2336 * cause. No guarantees are made about which result or exception is
2337 * used in the returned CompletableFuture. If the supplied function
2338 * throws an exception, then the returned CompletableFuture completes
2339 * exceptionally with a CompletionException holding the exception as
2340 * its cause.
2341 *
2342 * @param other the other CompletableFuture
2343 * @param fn the function to use to compute the value of
2344 * the returned CompletableFuture
2345 * @return the new CompletableFuture
2346 */
2347 public <U> CompletableFuture<U> applyToEither
2348 (CompletableFuture<? extends T> other,
2349 Function<? super T, U> fn) {
2350 return doApplyToEither(other, fn, null);
2351 }
2352
2353 /**
2354 * Returns a new CompletableFuture that is asynchronously completed
2355 * when either this or the other given CompletableFuture completes,
2356 * with the result of the given function of either this or the other
2357 * CompletableFuture's result from a task running in the
2358 * {@link ForkJoinPool#commonPool()}.
2359 *
2360 * <p>If this and/or the other CompletableFuture complete
2361 * exceptionally, then the returned CompletableFuture may also do so,
2362 * with a CompletionException holding one of these exceptions as its
2363 * cause. No guarantees are made about which result or exception is
2364 * used in the returned CompletableFuture. If the supplied function
2365 * throws an exception, then the returned CompletableFuture completes
2366 * exceptionally with a CompletionException holding the exception as
2367 * its cause.
2368 *
2369 * @param other the other CompletableFuture
2370 * @param fn the function to use to compute the value of
2371 * the returned CompletableFuture
2372 * @return the new CompletableFuture
2373 */
2374 public <U> CompletableFuture<U> applyToEitherAsync
2375 (CompletableFuture<? extends T> other,
2376 Function<? super T, U> fn) {
2377 return doApplyToEither(other, fn, ForkJoinPool.commonPool());
2378 }
2379
2380 /**
2381 * Returns a new CompletableFuture that is asynchronously completed
2382 * when either this or the other given CompletableFuture completes,
2383 * with the result of the given function of either this or the other
2384 * CompletableFuture's result from a task running in the
2385 * given executor.
2386 *
2387 * <p>If this and/or the other CompletableFuture complete
2388 * exceptionally, then the returned CompletableFuture may also do so,
2389 * with a CompletionException holding one of these exceptions as its
2390 * cause. No guarantees are made about which result or exception is
2391 * used in the returned CompletableFuture. If the supplied function
2392 * throws an exception, then the returned CompletableFuture completes
2393 * exceptionally with a CompletionException holding the exception as
2394 * its cause.
2395 *
2396 * @param other the other CompletableFuture
2397 * @param fn the function to use to compute the value of
2398 * the returned CompletableFuture
2399 * @param executor the executor to use for asynchronous execution
2400 * @return the new CompletableFuture
2401 */
2402 public <U> CompletableFuture<U> applyToEitherAsync
2403 (CompletableFuture<? extends T> other,
2404 Function<? super T, U> fn,
2405 Executor executor) {
2406 if (executor == null) throw new NullPointerException();
2407 return doApplyToEither(other, fn, executor);
2408 }
2409
2410 private <U> CompletableFuture<U> doApplyToEither
2411 (CompletableFuture<? extends T> other,
2412 Function<? super T, U> fn,
2413 Executor e) {
2414 if (other == null || fn == null) throw new NullPointerException();
2415 CompletableFuture<U> dst = new CompletableFuture<U>();
2416 ApplyToEither<T,U> d = null;
2417 Object r;
2418 if ((r = result) == null && (r = other.result) == null) {
2419 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
2420 CompletionNode q = null, p = new CompletionNode(d);
2421 while ((r = result) == null && (r = other.result) == null) {
2422 if (q != null) {
2423 if (UNSAFE.compareAndSwapObject
2424 (other, COMPLETIONS, q.next = other.completions, q))
2425 break;
2426 }
2427 else if (UNSAFE.compareAndSwapObject
2428 (this, COMPLETIONS, p.next = completions, p))
2429 q = new CompletionNode(d);
2430 }
2431 }
2432 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2433 T t; Throwable ex;
2434 if (r instanceof AltResult) {
2435 ex = ((AltResult)r).ex;
2436 t = null;
2437 }
2438 else {
2439 ex = null;
2440 @SuppressWarnings("unchecked") T tr = (T) r;
2441 t = tr;
2442 }
2443 U u = null;
2444 if (ex == null) {
2445 try {
2446 if (e != null)
2447 e.execute(new AsyncApply<T,U>(t, fn, dst));
2448 else
2449 u = fn.apply(t);
2450 } catch (Throwable rex) {
2451 ex = rex;
2452 }
2453 }
2454 if (e == null || ex != null)
2455 dst.internalComplete(u, ex);
2456 }
2457 helpPostComplete();
2458 other.helpPostComplete();
2459 return dst;
2460 }
2461
2462 /**
2463 * Returns a new CompletableFuture that is completed
2464 * when either this or the other given CompletableFuture completes,
2465 * after performing the given action with the result of either this
2466 * or the other CompletableFuture's result.
2467 *
2468 * <p>If this and/or the other CompletableFuture complete
2469 * exceptionally, then the returned CompletableFuture may also do so,
2470 * with a CompletionException holding one of these exceptions as its
2471 * cause. No guarantees are made about which result or exception is
2472 * used in the returned CompletableFuture. If the supplied action
2473 * throws an exception, then the returned CompletableFuture completes
2474 * exceptionally with a CompletionException holding the exception as
2475 * its cause.
2476 *
2477 * @param other the other CompletableFuture
2478 * @param block the action to perform before completing the
2479 * returned CompletableFuture
2480 * @return the new CompletableFuture
2481 */
2482 public CompletableFuture<Void> acceptEither
2483 (CompletableFuture<? extends T> other,
2484 Consumer<? super T> block) {
2485 return doAcceptEither(other, block, null);
2486 }
2487
2488 /**
2489 * Returns a new CompletableFuture that is asynchronously completed
2490 * when either this or the other given CompletableFuture completes,
2491 * after performing the given action with the result of either this
2492 * or the other CompletableFuture's result from a task running in
2493 * the {@link ForkJoinPool#commonPool()}.
2494 *
2495 * <p>If this and/or the other CompletableFuture complete
2496 * exceptionally, then the returned CompletableFuture may also do so,
2497 * with a CompletionException holding one of these exceptions as its
2498 * cause. No guarantees are made about which result or exception is
2499 * used in the returned CompletableFuture. If the supplied action
2500 * throws an exception, then the returned CompletableFuture completes
2501 * exceptionally with a CompletionException holding the exception as
2502 * its cause.
2503 *
2504 * @param other the other CompletableFuture
2505 * @param block the action to perform before completing the
2506 * returned CompletableFuture
2507 * @return the new CompletableFuture
2508 */
2509 public CompletableFuture<Void> acceptEitherAsync
2510 (CompletableFuture<? extends T> other,
2511 Consumer<? super T> block) {
2512 return doAcceptEither(other, block, ForkJoinPool.commonPool());
2513 }
2514
2515 /**
2516 * Returns a new CompletableFuture that is asynchronously completed
2517 * when either this or the other given CompletableFuture completes,
2518 * after performing the given action with the result of either this
2519 * or the other CompletableFuture's result from a task running in
2520 * the given executor.
2521 *
2522 * <p>If this and/or the other CompletableFuture complete
2523 * exceptionally, then the returned CompletableFuture may also do so,
2524 * with a CompletionException holding one of these exceptions as its
2525 * cause. No guarantees are made about which result or exception is
2526 * used in the returned CompletableFuture. If the supplied action
2527 * throws an exception, then the returned CompletableFuture completes
2528 * exceptionally with a CompletionException holding the exception as
2529 * its cause.
2530 *
2531 * @param other the other CompletableFuture
2532 * @param block the action to perform before completing the
2533 * returned CompletableFuture
2534 * @param executor the executor to use for asynchronous execution
2535 * @return the new CompletableFuture
2536 */
2537 public CompletableFuture<Void> acceptEitherAsync
2538 (CompletableFuture<? extends T> other,
2539 Consumer<? super T> block,
2540 Executor executor) {
2541 if (executor == null) throw new NullPointerException();
2542 return doAcceptEither(other, block, executor);
2543 }
2544
2545 private CompletableFuture<Void> doAcceptEither
2546 (CompletableFuture<? extends T> other,
2547 Consumer<? super T> fn,
2548 Executor e) {
2549 if (other == null || fn == null) throw new NullPointerException();
2550 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2551 AcceptEither<T> d = null;
2552 Object r;
2553 if ((r = result) == null && (r = other.result) == null) {
2554 d = new AcceptEither<T>(this, other, fn, dst, e);
2555 CompletionNode q = null, p = new CompletionNode(d);
2556 while ((r = result) == null && (r = other.result) == null) {
2557 if (q != null) {
2558 if (UNSAFE.compareAndSwapObject
2559 (other, COMPLETIONS, q.next = other.completions, q))
2560 break;
2561 }
2562 else if (UNSAFE.compareAndSwapObject
2563 (this, COMPLETIONS, p.next = completions, p))
2564 q = new CompletionNode(d);
2565 }
2566 }
2567 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2568 T t; Throwable ex;
2569 if (r instanceof AltResult) {
2570 ex = ((AltResult)r).ex;
2571 t = null;
2572 }
2573 else {
2574 ex = null;
2575 @SuppressWarnings("unchecked") T tr = (T) r;
2576 t = tr;
2577 }
2578 if (ex == null) {
2579 try {
2580 if (e != null)
2581 e.execute(new AsyncAccept<T>(t, fn, dst));
2582 else
2583 fn.accept(t);
2584 } catch (Throwable rex) {
2585 ex = rex;
2586 }
2587 }
2588 if (e == null || ex != null)
2589 dst.internalComplete(null, ex);
2590 }
2591 helpPostComplete();
2592 other.helpPostComplete();
2593 return dst;
2594 }
2595
2596 /**
2597 * Returns a new CompletableFuture that is completed
2598 * when either this or the other given CompletableFuture completes,
2599 * after performing the given action.
2600 *
2601 * <p>If this and/or the other CompletableFuture complete
2602 * exceptionally, then the returned CompletableFuture may also do so,
2603 * with a CompletionException holding one of these exceptions as its
2604 * cause. No guarantees are made about which result or exception is
2605 * used in the returned CompletableFuture. If the supplied action
2606 * throws an exception, then the returned CompletableFuture completes
2607 * exceptionally with a CompletionException holding the exception as
2608 * its cause.
2609 *
2610 * @param other the other CompletableFuture
2611 * @param action the action to perform before completing the
2612 * returned CompletableFuture
2613 * @return the new CompletableFuture
2614 */
2615 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2616 Runnable action) {
2617 return doRunAfterEither(other, action, null);
2618 }
2619
2620 /**
2621 * Returns a new CompletableFuture that is asynchronously completed
2622 * when either this or the other given CompletableFuture completes,
2623 * after performing the given action from a task running in the
2624 * {@link ForkJoinPool#commonPool()}.
2625 *
2626 * <p>If this and/or the other CompletableFuture complete
2627 * exceptionally, then the returned CompletableFuture may also do so,
2628 * with a CompletionException holding one of these exceptions as its
2629 * cause. No guarantees are made about which result or exception is
2630 * used in the returned CompletableFuture. If the supplied action
2631 * throws an exception, then the returned CompletableFuture completes
2632 * exceptionally with a CompletionException holding the exception as
2633 * its cause.
2634 *
2635 * @param other the other CompletableFuture
2636 * @param action the action to perform before completing the
2637 * returned CompletableFuture
2638 * @return the new CompletableFuture
2639 */
2640 public CompletableFuture<Void> runAfterEitherAsync
2641 (CompletableFuture<?> other,
2642 Runnable action) {
2643 return doRunAfterEither(other, action, ForkJoinPool.commonPool());
2644 }
2645
2646 /**
2647 * Returns a new CompletableFuture that is asynchronously completed
2648 * when either this or the other given CompletableFuture completes,
2649 * after performing the given action from a task running in the
2650 * given executor.
2651 *
2652 * <p>If this and/or the other CompletableFuture complete
2653 * exceptionally, then the returned CompletableFuture may also do so,
2654 * with a CompletionException holding one of these exceptions as its
2655 * cause. No guarantees are made about which result or exception is
2656 * used in the returned CompletableFuture. If the supplied action
2657 * throws an exception, then the returned CompletableFuture completes
2658 * exceptionally with a CompletionException holding the exception as
2659 * its cause.
2660 *
2661 * @param other the other CompletableFuture
2662 * @param action the action to perform before completing the
2663 * returned CompletableFuture
2664 * @param executor the executor to use for asynchronous execution
2665 * @return the new CompletableFuture
2666 */
2667 public CompletableFuture<Void> runAfterEitherAsync
2668 (CompletableFuture<?> other,
2669 Runnable action,
2670 Executor executor) {
2671 if (executor == null) throw new NullPointerException();
2672 return doRunAfterEither(other, action, executor);
2673 }
2674
2675 private CompletableFuture<Void> doRunAfterEither
2676 (CompletableFuture<?> other,
2677 Runnable action,
2678 Executor e) {
2679 if (other == null || action == null) throw new NullPointerException();
2680 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2681 RunAfterEither d = null;
2682 Object r;
2683 if ((r = result) == null && (r = other.result) == null) {
2684 d = new RunAfterEither(this, other, action, dst, e);
2685 CompletionNode q = null, p = new CompletionNode(d);
2686 while ((r = result) == null && (r = other.result) == null) {
2687 if (q != null) {
2688 if (UNSAFE.compareAndSwapObject
2689 (other, COMPLETIONS, q.next = other.completions, q))
2690 break;
2691 }
2692 else if (UNSAFE.compareAndSwapObject
2693 (this, COMPLETIONS, p.next = completions, p))
2694 q = new CompletionNode(d);
2695 }
2696 }
2697 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2698 Throwable ex;
2699 if (r instanceof AltResult)
2700 ex = ((AltResult)r).ex;
2701 else
2702 ex = null;
2703 if (ex == null) {
2704 try {
2705 if (e != null)
2706 e.execute(new AsyncRun(action, dst));
2707 else
2708 action.run();
2709 } catch (Throwable rex) {
2710 ex = rex;
2711 }
2712 }
2713 if (e == null || ex != null)
2714 dst.internalComplete(null, ex);
2715 }
2716 helpPostComplete();
2717 other.helpPostComplete();
2718 return dst;
2719 }
2720
2721 /**
2722 * Returns a CompletableFuture that upon completion, has the same
2723 * value as produced by the given function of the result of this
2724 * CompletableFuture.
2725 *
2726 * <p>If this CompletableFuture completes exceptionally, then the
2727 * returned CompletableFuture also does so, with a
2728 * CompletionException holding this exception as its cause.
2729 * Similarly, if the computed CompletableFuture completes
2730 * exceptionally, then so does the returned CompletableFuture.
2731 *
2732 * @param fn the function returning a new CompletableFuture
2733 * @return the CompletableFuture
2734 */
2735 public <U> CompletableFuture<U> thenCompose
2736 (Function<? super T, CompletableFuture<U>> fn) {
2737 return doThenCompose(fn, null);
2738 }
2739
2740 /**
2741 * Returns a CompletableFuture that upon completion, has the same
2742 * value as that produced asynchronously using the {@link
2743 * ForkJoinPool#commonPool()} by the given function of the result
2744 * of this CompletableFuture.
2745 *
2746 * <p>If this CompletableFuture completes exceptionally, then the
2747 * returned CompletableFuture also does so, with a
2748 * CompletionException holding this exception as its cause.
2749 * Similarly, if the computed CompletableFuture completes
2750 * exceptionally, then so does the returned CompletableFuture.
2751 *
2752 * @param fn the function returning a new CompletableFuture
2753 * @return the CompletableFuture
2754 */
2755 public <U> CompletableFuture<U> thenComposeAsync
2756 (Function<? super T, CompletableFuture<U>> fn) {
2757 return doThenCompose(fn, ForkJoinPool.commonPool());
2758 }
2759
2760 /**
2761 * Returns a CompletableFuture that upon completion, has the same
2762 * value as that produced asynchronously using the given executor
2763 * by the given function of this CompletableFuture.
2764 *
2765 * <p>If this CompletableFuture completes exceptionally, then the
2766 * returned CompletableFuture also does so, with a
2767 * CompletionException holding this exception as its cause.
2768 * Similarly, if the computed CompletableFuture completes
2769 * exceptionally, then so does the returned CompletableFuture.
2770 *
2771 * @param fn the function returning a new CompletableFuture
2772 * @param executor the executor to use for asynchronous execution
2773 * @return the CompletableFuture
2774 */
2775 public <U> CompletableFuture<U> thenComposeAsync
2776 (Function<? super T, CompletableFuture<U>> fn,
2777 Executor executor) {
2778 if (executor == null) throw new NullPointerException();
2779 return doThenCompose(fn, executor);
2780 }
2781
2782 private <U> CompletableFuture<U> doThenCompose
2783 (Function<? super T, CompletableFuture<U>> fn,
2784 Executor e) {
2785 if (fn == null) throw new NullPointerException();
2786 CompletableFuture<U> dst = null;
2787 ThenCompose<T,U> d = null;
2788 Object r;
2789 if ((r = result) == null) {
2790 dst = new CompletableFuture<U>();
2791 CompletionNode p = new CompletionNode
2792 (d = new ThenCompose<T,U>(this, fn, dst, e));
2793 while ((r = result) == null) {
2794 if (UNSAFE.compareAndSwapObject
2795 (this, COMPLETIONS, p.next = completions, p))
2796 break;
2797 }
2798 }
2799 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2800 T t; Throwable ex;
2801 if (r instanceof AltResult) {
2802 ex = ((AltResult)r).ex;
2803 t = null;
2804 }
2805 else {
2806 ex = null;
2807 @SuppressWarnings("unchecked") T tr = (T) r;
2808 t = tr;
2809 }
2810 if (ex == null) {
2811 if (e != null) {
2812 if (dst == null)
2813 dst = new CompletableFuture<U>();
2814 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2815 }
2816 else {
2817 try {
2818 if ((dst = fn.apply(t)) == null)
2819 ex = new NullPointerException();
2820 } catch (Throwable rex) {
2821 ex = rex;
2822 }
2823 }
2824 }
2825 if (dst == null)
2826 dst = new CompletableFuture<U>();
2827 if (e == null && ex != null)
2828 dst.internalComplete(null, ex);
2829 }
2830 helpPostComplete();
2831 dst.helpPostComplete();
2832 return dst;
2833 }
2834
2835 /**
2836 * Returns a new CompletableFuture that is completed when this
2837 * CompletableFuture completes, with the result of the given
2838 * function of the exception triggering this CompletableFuture's
2839 * completion when it completes exceptionally; otherwise, if this
2840 * CompletableFuture completes normally, then the returned
2841 * CompletableFuture also completes normally with the same value.
2842 *
2843 * @param fn the function to use to compute the value of the
2844 * returned CompletableFuture if this CompletableFuture completed
2845 * exceptionally
2846 * @return the new CompletableFuture
2847 */
2848 public CompletableFuture<T> exceptionally
2849 (Function<Throwable, ? extends T> fn) {
2850 if (fn == null) throw new NullPointerException();
2851 CompletableFuture<T> dst = new CompletableFuture<T>();
2852 ExceptionCompletion<T> d = null;
2853 Object r;
2854 if ((r = result) == null) {
2855 CompletionNode p =
2856 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2857 while ((r = result) == null) {
2858 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2859 p.next = completions, p))
2860 break;
2861 }
2862 }
2863 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2864 T t = null; Throwable ex, dx = null;
2865 if (r instanceof AltResult) {
2866 if ((ex = ((AltResult)r).ex) != null) {
2867 try {
2868 t = fn.apply(ex);
2869 } catch (Throwable rex) {
2870 dx = rex;
2871 }
2872 }
2873 }
2874 else {
2875 @SuppressWarnings("unchecked") T tr = (T) r;
2876 t = tr;
2877 }
2878 dst.internalComplete(t, dx);
2879 }
2880 helpPostComplete();
2881 return dst;
2882 }
2883
2884 /**
2885 * Returns a new CompletableFuture that is completed when this
2886 * CompletableFuture completes, with the result of the given
2887 * function of the result and exception of this CompletableFuture's
2888 * completion. The given function is invoked with the result (or
2889 * {@code null} if none) and the exception (or {@code null} if none)
2890 * of this CompletableFuture when complete.
2891 *
2892 * @param fn the function to use to compute the value of the
2893 * returned CompletableFuture
2894 * @return the new CompletableFuture
2895 */
2896 public <U> CompletableFuture<U> handle
2897 (BiFunction<? super T, Throwable, ? extends U> fn) {
2898 if (fn == null) throw new NullPointerException();
2899 CompletableFuture<U> dst = new CompletableFuture<U>();
2900 HandleCompletion<T,U> d = null;
2901 Object r;
2902 if ((r = result) == null) {
2903 CompletionNode p =
2904 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2905 while ((r = result) == null) {
2906 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2907 p.next = completions, p))
2908 break;
2909 }
2910 }
2911 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2912 T t; Throwable ex;
2913 if (r instanceof AltResult) {
2914 ex = ((AltResult)r).ex;
2915 t = null;
2916 }
2917 else {
2918 ex = null;
2919 @SuppressWarnings("unchecked") T tr = (T) r;
2920 t = tr;
2921 }
2922 U u; Throwable dx;
2923 try {
2924 u = fn.apply(t, ex);
2925 dx = null;
2926 } catch (Throwable rex) {
2927 dx = rex;
2928 u = null;
2929 }
2930 dst.internalComplete(u, dx);
2931 }
2932 helpPostComplete();
2933 return dst;
2934 }
2935
2936
2937 /* ------------- Arbitrary-arity constructions -------------- */
2938
2939 /*
2940 * The basic plan of attack is to recursively form binary
2941 * completion trees of elements. This can be overkill for small
2942 * sets, but scales nicely. The And/All vs Or/Any forms use the
2943 * same idea, but details differ.
2944 */
2945
2946 /**
2947 * Returns a new CompletableFuture that is completed when all of
2948 * the given CompletableFutures complete. If any of the given
2949 * CompletableFutures complete exceptionally, then the returned
2950 * CompletableFuture also does so, with a CompletionException
2951 * holding this exception as its cause. Otherwise, the results,
2952 * if any, of the given CompletableFutures are not reflected in
2953 * the returned CompletableFuture, but may be obtained by
2954 * inspecting them individually. If no CompletableFutures are
2955 * provided, returns a CompletableFuture completed with the value
2956 * {@code null}.
2957 *
2958 * <p>Among the applications of this method is to await completion
2959 * of a set of independent CompletableFutures before continuing a
2960 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2961 * c3).join();}.
2962 *
2963 * @param cfs the CompletableFutures
2964 * @return a new CompletableFuture that is completed when all of the
2965 * given CompletableFutures complete
2966 * @throws NullPointerException if the array or any of its elements are
2967 * {@code null}
2968 */
2969 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2970 int len = cfs.length; // Directly handle empty and singleton cases
2971 if (len > 1)
2972 return allTree(cfs, 0, len - 1);
2973 else {
2974 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2975 CompletableFuture<?> f;
2976 if (len == 0)
2977 dst.result = NIL;
2978 else if ((f = cfs[0]) == null)
2979 throw new NullPointerException();
2980 else {
2981 ThenPropagate d = null;
2982 CompletionNode p = null;
2983 Object r;
2984 while ((r = f.result) == null) {
2985 if (d == null)
2986 d = new ThenPropagate(f, dst);
2987 else if (p == null)
2988 p = new CompletionNode(d);
2989 else if (UNSAFE.compareAndSwapObject
2990 (f, COMPLETIONS, p.next = f.completions, p))
2991 break;
2992 }
2993 if (r != null && (d == null || d.compareAndSet(0, 1)))
2994 dst.internalComplete(null, (r instanceof AltResult) ?
2995 ((AltResult)r).ex : null);
2996 f.helpPostComplete();
2997 }
2998 return dst;
2999 }
3000 }
3001
3002 /**
3003 * Recursively constructs an And'ed tree of CompletableFutures.
3004 * Called only when array known to have at least two elements.
3005 */
3006 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
3007 int lo, int hi) {
3008 CompletableFuture<?> fst, snd;
3009 int mid = (lo + hi) >>> 1;
3010 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
3011 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
3012 throw new NullPointerException();
3013 CompletableFuture<Void> dst = new CompletableFuture<Void>();
3014 AndCompletion d = null;
3015 CompletionNode p = null, q = null;
3016 Object r = null, s = null;
3017 while ((r = fst.result) == null || (s = snd.result) == null) {
3018 if (d == null)
3019 d = new AndCompletion(fst, snd, dst);
3020 else if (p == null)
3021 p = new CompletionNode(d);
3022 else if (q == null) {
3023 if (UNSAFE.compareAndSwapObject
3024 (fst, COMPLETIONS, p.next = fst.completions, p))
3025 q = new CompletionNode(d);
3026 }
3027 else if (UNSAFE.compareAndSwapObject
3028 (snd, COMPLETIONS, q.next = snd.completions, q))
3029 break;
3030 }
3031 if ((r != null || (r = fst.result) != null) &&
3032 (s != null || (s = snd.result) != null) &&
3033 (d == null || d.compareAndSet(0, 1))) {
3034 Throwable ex;
3035 if (r instanceof AltResult)
3036 ex = ((AltResult)r).ex;
3037 else
3038 ex = null;
3039 if (ex == null && (s instanceof AltResult))
3040 ex = ((AltResult)s).ex;
3041 dst.internalComplete(null, ex);
3042 }
3043 fst.helpPostComplete();
3044 snd.helpPostComplete();
3045 return dst;
3046 }
3047
3048 /**
3049 * Returns a new CompletableFuture that is completed when any of
3050 * the given CompletableFutures complete, with the same result.
3051 * Otherwise, if it completed exceptionally, the returned
3052 * CompletableFuture also does so, with a CompletionException
3053 * holding this exception as its cause. If no CompletableFutures
3054 * are provided, returns an incomplete CompletableFuture.
3055 *
3056 * @param cfs the CompletableFutures
3057 * @return a new CompletableFuture that is completed with the
3058 * result or exception of any of the given CompletableFutures when
3059 * one completes
3060 * @throws NullPointerException if the array or any of its elements are
3061 * {@code null}
3062 */
3063 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
3064 int len = cfs.length; // Same idea as allOf
3065 if (len > 1)
3066 return anyTree(cfs, 0, len - 1);
3067 else {
3068 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3069 CompletableFuture<?> f;
3070 if (len == 0)
3071 ; // skip
3072 else if ((f = cfs[0]) == null)
3073 throw new NullPointerException();
3074 else {
3075 ThenCopy<Object> d = null;
3076 CompletionNode p = null;
3077 Object r;
3078 while ((r = f.result) == null) {
3079 if (d == null)
3080 d = new ThenCopy<Object>(f, dst);
3081 else if (p == null)
3082 p = new CompletionNode(d);
3083 else if (UNSAFE.compareAndSwapObject
3084 (f, COMPLETIONS, p.next = f.completions, p))
3085 break;
3086 }
3087 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3088 Throwable ex; Object t;
3089 if (r instanceof AltResult) {
3090 ex = ((AltResult)r).ex;
3091 t = null;
3092 }
3093 else {
3094 ex = null;
3095 t = r;
3096 }
3097 dst.internalComplete(t, ex);
3098 }
3099 f.helpPostComplete();
3100 }
3101 return dst;
3102 }
3103 }
3104
3105 /**
3106 * Recursively constructs an Or'ed tree of CompletableFutures.
3107 */
3108 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
3109 int lo, int hi) {
3110 CompletableFuture<?> fst, snd;
3111 int mid = (lo + hi) >>> 1;
3112 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3113 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3114 throw new NullPointerException();
3115 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3116 OrCompletion d = null;
3117 CompletionNode p = null, q = null;
3118 Object r;
3119 while ((r = fst.result) == null && (r = snd.result) == null) {
3120 if (d == null)
3121 d = new OrCompletion(fst, snd, dst);
3122 else if (p == null)
3123 p = new CompletionNode(d);
3124 else if (q == null) {
3125 if (UNSAFE.compareAndSwapObject
3126 (fst, COMPLETIONS, p.next = fst.completions, p))
3127 q = new CompletionNode(d);
3128 }
3129 else if (UNSAFE.compareAndSwapObject
3130 (snd, COMPLETIONS, q.next = snd.completions, q))
3131 break;
3132 }
3133 if ((r != null || (r = fst.result) != null ||
3134 (r = snd.result) != null) &&
3135 (d == null || d.compareAndSet(0, 1))) {
3136 Throwable ex; Object t;
3137 if (r instanceof AltResult) {
3138 ex = ((AltResult)r).ex;
3139 t = null;
3140 }
3141 else {
3142 ex = null;
3143 t = r;
3144 }
3145 dst.internalComplete(t, ex);
3146 }
3147 fst.helpPostComplete();
3148 snd.helpPostComplete();
3149 return dst;
3150 }
3151
3152 /* ------------- Control and status methods -------------- */
3153
3154 /**
3155 * If not already completed, completes this CompletableFuture with
3156 * a {@link CancellationException}. Dependent CompletableFutures
3157 * that have not already completed will also complete
3158 * exceptionally, with a {@link CompletionException} caused by
3159 * this {@code CancellationException}.
3160 *
3161 * @param mayInterruptIfRunning this value has no effect in this
3162 * implementation because interrupts are not used to control
3163 * processing.
3164 *
3165 * @return {@code true} if this task is now cancelled
3166 */
3167 public boolean cancel(boolean mayInterruptIfRunning) {
3168 boolean cancelled = (result == null) &&
3169 UNSAFE.compareAndSwapObject
3170 (this, RESULT, null, new AltResult(new CancellationException()));
3171 postComplete();
3172 return cancelled || isCancelled();
3173 }
3174
3175 /**
3176 * Returns {@code true} if this CompletableFuture was cancelled
3177 * before it completed normally.
3178 *
3179 * @return {@code true} if this CompletableFuture was cancelled
3180 * before it completed normally
3181 */
3182 public boolean isCancelled() {
3183 Object r;
3184 return ((r = result) instanceof AltResult) &&
3185 (((AltResult)r).ex instanceof CancellationException);
3186 }
3187
3188 /**
3189 * Forcibly sets or resets the value subsequently returned by
3190 * method {@link #get()} and related methods, whether or not
3191 * already completed. This method is designed for use only in
3192 * error recovery actions, and even in such situations may result
3193 * in ongoing dependent completions using established versus
3194 * overwritten outcomes.
3195 *
3196 * @param value the completion value
3197 */
3198 public void obtrudeValue(T value) {
3199 result = (value == null) ? NIL : value;
3200 postComplete();
3201 }
3202
3203 /**
3204 * Forcibly causes subsequent invocations of method {@link #get()}
3205 * and related methods to throw the given exception, whether or
3206 * not already completed. This method is designed for use only in
3207 * recovery actions, and even in such situations may result in
3208 * ongoing dependent completions using established versus
3209 * overwritten outcomes.
3210 *
3211 * @param ex the exception
3212 */
3213 public void obtrudeException(Throwable ex) {
3214 if (ex == null) throw new NullPointerException();
3215 result = new AltResult(ex);
3216 postComplete();
3217 }
3218
3219 /**
3220 * Returns the estimated number of CompletableFutures whose
3221 * completions are awaiting completion of this CompletableFuture.
3222 * This method is designed for use in monitoring system state, not
3223 * for synchronization control.
3224 *
3225 * @return the number of dependent CompletableFutures
3226 */
3227 public int getNumberOfDependents() {
3228 int count = 0;
3229 for (CompletionNode p = completions; p != null; p = p.next)
3230 ++count;
3231 return count;
3232 }
3233
3234 /**
3235 * Returns a string identifying this CompletableFuture, as well as
3236 * its completion state. The state, in brackets, contains the
3237 * String {@code "Completed Normally"} or the String {@code
3238 * "Completed Exceptionally"}, or the String {@code "Not
3239 * completed"} followed by the number of CompletableFutures
3240 * dependent upon its completion, if any.
3241 *
3242 * @return a string identifying this CompletableFuture, as well as its state
3243 */
3244 public String toString() {
3245 Object r = result;
3246 int count;
3247 return super.toString() +
3248 ((r == null) ?
3249 (((count = getNumberOfDependents()) == 0) ?
3250 "[Not completed]" :
3251 "[Not completed, " + count + " dependents]") :
3252 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3253 "[Completed exceptionally]" :
3254 "[Completed normally]"));
3255 }
3256
3257 // Unsafe mechanics
3258 private static final sun.misc.Unsafe UNSAFE;
3259 private static final long RESULT;
3260 private static final long WAITERS;
3261 private static final long COMPLETIONS;
3262 static {
3263 try {
3264 UNSAFE = sun.misc.Unsafe.getUnsafe();
3265 Class<?> k = CompletableFuture.class;
3266 RESULT = UNSAFE.objectFieldOffset
3267 (k.getDeclaredField("result"));
3268 WAITERS = UNSAFE.objectFieldOffset
3269 (k.getDeclaredField("waiters"));
3270 COMPLETIONS = UNSAFE.objectFieldOffset
3271 (k.getDeclaredField("completions"));
3272 } catch (Exception e) {
3273 throw new Error(e);
3274 }
3275 }
3276 }