ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.84
Committed: Fri Apr 5 14:21:16 2013 UTC (11 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.83: +71 -68 lines
Log Message:
Symmetric exception checks

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on
37 * user-provided Functions, Consumers, or Runnables. The appropriate
38 * form to use depends on whether actions require arguments and/or
39 * produce results. Completion of a dependent action will trigger the
40 * completion of another CompletableFuture. Actions may also be
41 * triggered after either or both the current and another
42 * CompletableFuture complete. Multiple CompletableFutures may also
43 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44 * {@link #allOf(CompletableFuture...)}.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, actions supplied for dependent completions of another
48 * CompletableFuture may do so, depending on whether they are provided
49 * via one of the <em>async</em> methods (that is, methods with names
50 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51 * methods provide a way to commence asynchronous processing of an
52 * action using either a given {@link Executor} or by default the
53 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54 * debugging, and tracking, all generated asynchronous tasks are
55 * instances of the marker interface {@link AsynchronousCompletionTask}.
56 *
57 * <p>Actions supplied for dependent completions of <em>non-async</em>
58 * methods may be performed by the thread that completes the current
59 * CompletableFuture, or by any other caller of these methods. There
60 * are no guarantees about the order of processing completions unless
61 * constrained by these methods.
62 *
63 * <p>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional completion.
66 * Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}.
68 *
69 * <p>Upon exceptional completion (including cancellation), or when a
70 * completion entails an additional computation which terminates
71 * abruptly with an (unchecked) exception or error, then all of their
72 * dependent completions (and their dependents in turn) generally act
73 * as {@code completeExceptionally} with a {@link CompletionException}
74 * holding that exception as its cause. However, the {@link
75 * #exceptionally exceptionally} and {@link #handle handle}
76 * completions <em>are</em> able to handle exceptional completions of
77 * the CompletableFutures they depend on.
78 *
79 * <p>In case of exceptional completion with a CompletionException,
80 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81 * {@link ExecutionException} with the same cause as held in the
82 * corresponding CompletionException. However, in these cases,
83 * methods {@link #join()} and {@link #getNow} throw the
84 * CompletionException, which simplifies usage.
85 *
86 * <p>Arguments used to pass a completion result (that is, for parameters
87 * of type {@code T}) may be null, but passing a null value for any other
88 * parameter will result in a {@link NullPointerException} being thrown.
89 *
90 * @author Doug Lea
91 * @since 1.8
92 */
93 public class CompletableFuture<T> implements Future<T> {
94
95 /*
96 * Overview:
97 *
98 * 1. Non-nullness of field result (set via CAS) indicates done.
99 * An AltResult is used to box null as a result, as well as to
100 * hold exceptions. Using a single field makes completion fast
101 * and simple to detect and trigger, at the expense of a lot of
102 * encoding and decoding that infiltrates many methods. One minor
103 * simplification relies on the (static) NIL (to box null results)
104 * being the only AltResult with a null exception field, so we
105 * don't usually need explicit comparisons with NIL. The CF
106 * exception propagation mechanics surrounding decoding rely on
107 * unchecked casts of decoded results really being unchecked,
108 * where user type errors are caught at point of use, as is
109 * currently the case in Java. These are highlighted by using
110 * SuppressWarnings-annotated temporaries.
111 *
112 * 2. Waiters are held in a Treiber stack similar to the one used
113 * in FutureTask, Phaser, and SynchronousQueue. See their
114 * internal documentation for algorithmic details.
115 *
116 * 3. Completions are also kept in a list/stack, and pulled off
117 * and run when completion is triggered. (We could even use the
118 * same stack as for waiters, but would give up the potential
119 * parallelism obtained because woken waiters help release/run
120 * others -- see method postComplete). Because post-processing
121 * may race with direct calls, class Completion opportunistically
122 * extends AtomicInteger so callers can claim the action via
123 * compareAndSet(0, 1). The Completion.run methods are all
124 * written a boringly similar uniform way (that sometimes includes
125 * unnecessary-looking checks, kept to maintain uniformity).
126 * There are enough dimensions upon which they differ that
127 * attempts to factor commonalities while maintaining efficiency
128 * require more lines of code than they would save.
129 *
130 * 4. The exported then/and/or methods do support a bit of
131 * factoring (see doThenApply etc). They must cope with the
132 * intrinsic races surrounding addition of a dependent action
133 * versus performing the action directly because the task is
134 * already complete. For example, a CF may not be complete upon
135 * entry, so a dependent completion is added, but by the time it
136 * is added, the target CF is complete, so must be directly
137 * executed. This is all done while avoiding unnecessary object
138 * construction in safe-bypass cases.
139 */
140
141 // preliminaries
142
143 static final class AltResult {
144 final Throwable ex; // null only for NIL
145 AltResult(Throwable ex) { this.ex = ex; }
146 }
147
148 static final AltResult NIL = new AltResult(null);
149
150 // Fields
151
152 volatile Object result; // Either the result or boxed AltResult
153 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
154 volatile CompletionNode completions; // list (Treiber stack) of completions
155
156 // Basic utilities for triggering and processing completions
157
158 /**
159 * Removes and signals all waiting threads and runs all completions.
160 */
161 final void postComplete() {
162 WaitNode q; Thread t;
163 while ((q = waiters) != null) {
164 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
165 (t = q.thread) != null) {
166 q.thread = null;
167 LockSupport.unpark(t);
168 }
169 }
170
171 CompletionNode h; Completion c;
172 while ((h = completions) != null) {
173 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
174 (c = h.completion) != null)
175 c.run();
176 }
177 }
178
179 /**
180 * Triggers completion with the encoding of the given arguments:
181 * if the exception is non-null, encodes it as a wrapped
182 * CompletionException unless it is one already. Otherwise uses
183 * the given result, boxed as NIL if null.
184 */
185 final void internalComplete(T v, Throwable ex) {
186 if (result == null)
187 UNSAFE.compareAndSwapObject
188 (this, RESULT, null,
189 (ex == null) ? (v == null) ? NIL : v :
190 new AltResult((ex instanceof CompletionException) ? ex :
191 new CompletionException(ex)));
192 postComplete(); // help out even if not triggered
193 }
194
195 /**
196 * If triggered, helps release and/or process completions.
197 */
198 final void helpPostComplete() {
199 if (result != null)
200 postComplete();
201 }
202
203 /* ------------- waiting for completions -------------- */
204
205 /** Number of processors, for spin control */
206 static final int NCPU = Runtime.getRuntime().availableProcessors();
207
208 /**
209 * Heuristic spin value for waitingGet() before blocking on
210 * multiprocessors
211 */
212 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
213
214 /**
215 * Linked nodes to record waiting threads in a Treiber stack. See
216 * other classes such as Phaser and SynchronousQueue for more
217 * detailed explanation. This class implements ManagedBlocker to
218 * avoid starvation when blocking actions pile up in
219 * ForkJoinPools.
220 */
221 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
222 long nanos; // wait time if timed
223 final long deadline; // non-zero if timed
224 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
225 volatile Thread thread;
226 volatile WaitNode next;
227 WaitNode(boolean interruptible, long nanos, long deadline) {
228 this.thread = Thread.currentThread();
229 this.interruptControl = interruptible ? 1 : 0;
230 this.nanos = nanos;
231 this.deadline = deadline;
232 }
233 public boolean isReleasable() {
234 if (thread == null)
235 return true;
236 if (Thread.interrupted()) {
237 int i = interruptControl;
238 interruptControl = -1;
239 if (i > 0)
240 return true;
241 }
242 if (deadline != 0L &&
243 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
244 thread = null;
245 return true;
246 }
247 return false;
248 }
249 public boolean block() {
250 if (isReleasable())
251 return true;
252 else if (deadline == 0L)
253 LockSupport.park(this);
254 else if (nanos > 0L)
255 LockSupport.parkNanos(this, nanos);
256 return isReleasable();
257 }
258 }
259
260 /**
261 * Returns raw result after waiting, or null if interruptible and
262 * interrupted.
263 */
264 private Object waitingGet(boolean interruptible) {
265 WaitNode q = null;
266 boolean queued = false;
267 int spins = SPINS;
268 for (Object r;;) {
269 if ((r = result) != null) {
270 if (q != null) { // suppress unpark
271 q.thread = null;
272 if (q.interruptControl < 0) {
273 if (interruptible) {
274 removeWaiter(q);
275 return null;
276 }
277 Thread.currentThread().interrupt();
278 }
279 }
280 postComplete(); // help release others
281 return r;
282 }
283 else if (spins > 0) {
284 int rnd = ThreadLocalRandom.nextSecondarySeed();
285 if (rnd == 0)
286 rnd = ThreadLocalRandom.current().nextInt();
287 if (rnd >= 0)
288 --spins;
289 }
290 else if (q == null)
291 q = new WaitNode(interruptible, 0L, 0L);
292 else if (!queued)
293 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
294 q.next = waiters, q);
295 else if (interruptible && q.interruptControl < 0) {
296 removeWaiter(q);
297 return null;
298 }
299 else if (q.thread != null && result == null) {
300 try {
301 ForkJoinPool.managedBlock(q);
302 } catch (InterruptedException ex) {
303 q.interruptControl = -1;
304 }
305 }
306 }
307 }
308
309 /**
310 * Awaits completion or aborts on interrupt or timeout.
311 *
312 * @param nanos time to wait
313 * @return raw result
314 */
315 private Object timedAwaitDone(long nanos)
316 throws InterruptedException, TimeoutException {
317 WaitNode q = null;
318 boolean queued = false;
319 for (Object r;;) {
320 if ((r = result) != null) {
321 if (q != null) {
322 q.thread = null;
323 if (q.interruptControl < 0) {
324 removeWaiter(q);
325 throw new InterruptedException();
326 }
327 }
328 postComplete();
329 return r;
330 }
331 else if (q == null) {
332 if (nanos <= 0L)
333 throw new TimeoutException();
334 long d = System.nanoTime() + nanos;
335 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
336 }
337 else if (!queued)
338 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
339 q.next = waiters, q);
340 else if (q.interruptControl < 0) {
341 removeWaiter(q);
342 throw new InterruptedException();
343 }
344 else if (q.nanos <= 0L) {
345 if (result == null) {
346 removeWaiter(q);
347 throw new TimeoutException();
348 }
349 }
350 else if (q.thread != null && result == null) {
351 try {
352 ForkJoinPool.managedBlock(q);
353 } catch (InterruptedException ex) {
354 q.interruptControl = -1;
355 }
356 }
357 }
358 }
359
360 /**
361 * Tries to unlink a timed-out or interrupted wait node to avoid
362 * accumulating garbage. Internal nodes are simply unspliced
363 * without CAS since it is harmless if they are traversed anyway
364 * by releasers. To avoid effects of unsplicing from already
365 * removed nodes, the list is retraversed in case of an apparent
366 * race. This is slow when there are a lot of nodes, but we don't
367 * expect lists to be long enough to outweigh higher-overhead
368 * schemes.
369 */
370 private void removeWaiter(WaitNode node) {
371 if (node != null) {
372 node.thread = null;
373 retry:
374 for (;;) { // restart on removeWaiter race
375 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
376 s = q.next;
377 if (q.thread != null)
378 pred = q;
379 else if (pred != null) {
380 pred.next = s;
381 if (pred.thread == null) // check for race
382 continue retry;
383 }
384 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
385 continue retry;
386 }
387 break;
388 }
389 }
390 }
391
392 /* ------------- Async tasks -------------- */
393
394 /**
395 * A marker interface identifying asynchronous tasks produced by
396 * {@code async} methods. This may be useful for monitoring,
397 * debugging, and tracking asynchronous activities.
398 *
399 * @since 1.8
400 */
401 public static interface AsynchronousCompletionTask {
402 }
403
404 /** Base class can act as either FJ or plain Runnable */
405 abstract static class Async extends ForkJoinTask<Void>
406 implements Runnable, AsynchronousCompletionTask {
407 public final Void getRawResult() { return null; }
408 public final void setRawResult(Void v) { }
409 public final void run() { exec(); }
410 }
411
412 static final class AsyncRun extends Async {
413 final Runnable fn;
414 final CompletableFuture<Void> dst;
415 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
416 this.fn = fn; this.dst = dst;
417 }
418 public final boolean exec() {
419 CompletableFuture<Void> d; Throwable ex;
420 if ((d = this.dst) != null && d.result == null) {
421 try {
422 fn.run();
423 ex = null;
424 } catch (Throwable rex) {
425 ex = rex;
426 }
427 d.internalComplete(null, ex);
428 }
429 return true;
430 }
431 private static final long serialVersionUID = 5232453952276885070L;
432 }
433
434 static final class AsyncSupply<U> extends Async {
435 final Supplier<U> fn;
436 final CompletableFuture<U> dst;
437 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
438 this.fn = fn; this.dst = dst;
439 }
440 public final boolean exec() {
441 CompletableFuture<U> d; U u; Throwable ex;
442 if ((d = this.dst) != null && d.result == null) {
443 try {
444 u = fn.get();
445 ex = null;
446 } catch (Throwable rex) {
447 ex = rex;
448 u = null;
449 }
450 d.internalComplete(u, ex);
451 }
452 return true;
453 }
454 private static final long serialVersionUID = 5232453952276885070L;
455 }
456
457 static final class AsyncApply<T,U> extends Async {
458 final T arg;
459 final Function<? super T,? extends U> fn;
460 final CompletableFuture<U> dst;
461 AsyncApply(T arg, Function<? super T,? extends U> fn,
462 CompletableFuture<U> dst) {
463 this.arg = arg; this.fn = fn; this.dst = dst;
464 }
465 public final boolean exec() {
466 CompletableFuture<U> d; U u; Throwable ex;
467 if ((d = this.dst) != null && d.result == null) {
468 try {
469 u = fn.apply(arg);
470 ex = null;
471 } catch (Throwable rex) {
472 ex = rex;
473 u = null;
474 }
475 d.internalComplete(u, ex);
476 }
477 return true;
478 }
479 private static final long serialVersionUID = 5232453952276885070L;
480 }
481
482 static final class AsyncCombine<T,U,V> extends Async {
483 final T arg1;
484 final U arg2;
485 final BiFunction<? super T,? super U,? extends V> fn;
486 final CompletableFuture<V> dst;
487 AsyncCombine(T arg1, U arg2,
488 BiFunction<? super T,? super U,? extends V> fn,
489 CompletableFuture<V> dst) {
490 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
491 }
492 public final boolean exec() {
493 CompletableFuture<V> d; V v; Throwable ex;
494 if ((d = this.dst) != null && d.result == null) {
495 try {
496 v = fn.apply(arg1, arg2);
497 ex = null;
498 } catch (Throwable rex) {
499 ex = rex;
500 v = null;
501 }
502 d.internalComplete(v, ex);
503 }
504 return true;
505 }
506 private static final long serialVersionUID = 5232453952276885070L;
507 }
508
509 static final class AsyncAccept<T> extends Async {
510 final T arg;
511 final Consumer<? super T> fn;
512 final CompletableFuture<Void> dst;
513 AsyncAccept(T arg, Consumer<? super T> fn,
514 CompletableFuture<Void> dst) {
515 this.arg = arg; this.fn = fn; this.dst = dst;
516 }
517 public final boolean exec() {
518 CompletableFuture<Void> d; Throwable ex;
519 if ((d = this.dst) != null && d.result == null) {
520 try {
521 fn.accept(arg);
522 ex = null;
523 } catch (Throwable rex) {
524 ex = rex;
525 }
526 d.internalComplete(null, ex);
527 }
528 return true;
529 }
530 private static final long serialVersionUID = 5232453952276885070L;
531 }
532
533 static final class AsyncAcceptBoth<T,U> extends Async {
534 final T arg1;
535 final U arg2;
536 final BiConsumer<? super T,? super U> fn;
537 final CompletableFuture<Void> dst;
538 AsyncAcceptBoth(T arg1, U arg2,
539 BiConsumer<? super T,? super U> fn,
540 CompletableFuture<Void> dst) {
541 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
542 }
543 public final boolean exec() {
544 CompletableFuture<Void> d; Throwable ex;
545 if ((d = this.dst) != null && d.result == null) {
546 try {
547 fn.accept(arg1, arg2);
548 ex = null;
549 } catch (Throwable rex) {
550 ex = rex;
551 }
552 d.internalComplete(null, ex);
553 }
554 return true;
555 }
556 private static final long serialVersionUID = 5232453952276885070L;
557 }
558
559 static final class AsyncCompose<T,U> extends Async {
560 final T arg;
561 final Function<? super T, CompletableFuture<U>> fn;
562 final CompletableFuture<U> dst;
563 AsyncCompose(T arg,
564 Function<? super T, CompletableFuture<U>> fn,
565 CompletableFuture<U> dst) {
566 this.arg = arg; this.fn = fn; this.dst = dst;
567 }
568 public final boolean exec() {
569 CompletableFuture<U> d, fr; U u; Throwable ex;
570 if ((d = this.dst) != null && d.result == null) {
571 try {
572 fr = fn.apply(arg);
573 ex = (fr == null) ? new NullPointerException() : null;
574 } catch (Throwable rex) {
575 ex = rex;
576 fr = null;
577 }
578 if (ex != null)
579 u = null;
580 else {
581 Object r = fr.result;
582 if (r == null)
583 r = fr.waitingGet(false);
584 if (r instanceof AltResult) {
585 ex = ((AltResult)r).ex;
586 u = null;
587 }
588 else {
589 @SuppressWarnings("unchecked") U ur = (U) r;
590 u = ur;
591 }
592 }
593 d.internalComplete(u, ex);
594 }
595 return true;
596 }
597 private static final long serialVersionUID = 5232453952276885070L;
598 }
599
600 /* ------------- Completions -------------- */
601
602 /**
603 * Simple linked list nodes to record completions, used in
604 * basically the same way as WaitNodes. (We separate nodes from
605 * the Completions themselves mainly because for the And and Or
606 * methods, the same Completion object resides in two lists.)
607 */
608 static final class CompletionNode {
609 final Completion completion;
610 volatile CompletionNode next;
611 CompletionNode(Completion completion) { this.completion = completion; }
612 }
613
614 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
615 abstract static class Completion extends AtomicInteger implements Runnable {
616 }
617
618 static final class ThenApply<T,U> extends Completion {
619 final CompletableFuture<? extends T> src;
620 final Function<? super T,? extends U> fn;
621 final CompletableFuture<U> dst;
622 final Executor executor;
623 ThenApply(CompletableFuture<? extends T> src,
624 Function<? super T,? extends U> fn,
625 CompletableFuture<U> dst,
626 Executor executor) {
627 this.src = src; this.fn = fn; this.dst = dst;
628 this.executor = executor;
629 }
630 public final void run() {
631 final CompletableFuture<? extends T> a;
632 final Function<? super T,? extends U> fn;
633 final CompletableFuture<U> dst;
634 Object r; T t; Throwable ex;
635 if ((dst = this.dst) != null &&
636 (fn = this.fn) != null &&
637 (a = this.src) != null &&
638 (r = a.result) != null &&
639 compareAndSet(0, 1)) {
640 if (r instanceof AltResult) {
641 ex = ((AltResult)r).ex;
642 t = null;
643 }
644 else {
645 ex = null;
646 @SuppressWarnings("unchecked") T tr = (T) r;
647 t = tr;
648 }
649 Executor e = executor;
650 U u = null;
651 if (ex == null) {
652 try {
653 if (e != null)
654 e.execute(new AsyncApply<T,U>(t, fn, dst));
655 else
656 u = fn.apply(t);
657 } catch (Throwable rex) {
658 ex = rex;
659 }
660 }
661 if (e == null || ex != null)
662 dst.internalComplete(u, ex);
663 }
664 }
665 private static final long serialVersionUID = 5232453952276885070L;
666 }
667
668 static final class ThenAccept<T> extends Completion {
669 final CompletableFuture<? extends T> src;
670 final Consumer<? super T> fn;
671 final CompletableFuture<Void> dst;
672 final Executor executor;
673 ThenAccept(CompletableFuture<? extends T> src,
674 Consumer<? super T> fn,
675 CompletableFuture<Void> dst,
676 Executor executor) {
677 this.src = src; this.fn = fn; this.dst = dst;
678 this.executor = executor;
679 }
680 public final void run() {
681 final CompletableFuture<? extends T> a;
682 final Consumer<? super T> fn;
683 final CompletableFuture<Void> dst;
684 Object r; T t; Throwable ex;
685 if ((dst = this.dst) != null &&
686 (fn = this.fn) != null &&
687 (a = this.src) != null &&
688 (r = a.result) != null &&
689 compareAndSet(0, 1)) {
690 if (r instanceof AltResult) {
691 ex = ((AltResult)r).ex;
692 t = null;
693 }
694 else {
695 ex = null;
696 @SuppressWarnings("unchecked") T tr = (T) r;
697 t = tr;
698 }
699 Executor e = executor;
700 if (ex == null) {
701 try {
702 if (e != null)
703 e.execute(new AsyncAccept<T>(t, fn, dst));
704 else
705 fn.accept(t);
706 } catch (Throwable rex) {
707 ex = rex;
708 }
709 }
710 if (e == null || ex != null)
711 dst.internalComplete(null, ex);
712 }
713 }
714 private static final long serialVersionUID = 5232453952276885070L;
715 }
716
717 static final class ThenRun extends Completion {
718 final CompletableFuture<?> src;
719 final Runnable fn;
720 final CompletableFuture<Void> dst;
721 final Executor executor;
722 ThenRun(CompletableFuture<?> src,
723 Runnable fn,
724 CompletableFuture<Void> dst,
725 Executor executor) {
726 this.src = src; this.fn = fn; this.dst = dst;
727 this.executor = executor;
728 }
729 public final void run() {
730 final CompletableFuture<?> a;
731 final Runnable fn;
732 final CompletableFuture<Void> dst;
733 Object r; Throwable ex;
734 if ((dst = this.dst) != null &&
735 (fn = this.fn) != null &&
736 (a = this.src) != null &&
737 (r = a.result) != null &&
738 compareAndSet(0, 1)) {
739 if (r instanceof AltResult)
740 ex = ((AltResult)r).ex;
741 else
742 ex = null;
743 Executor e = executor;
744 if (ex == null) {
745 try {
746 if (e != null)
747 e.execute(new AsyncRun(fn, dst));
748 else
749 fn.run();
750 } catch (Throwable rex) {
751 ex = rex;
752 }
753 }
754 if (e == null || ex != null)
755 dst.internalComplete(null, ex);
756 }
757 }
758 private static final long serialVersionUID = 5232453952276885070L;
759 }
760
761 static final class ThenCombine<T,U,V> extends Completion {
762 final CompletableFuture<? extends T> src;
763 final CompletableFuture<? extends U> snd;
764 final BiFunction<? super T,? super U,? extends V> fn;
765 final CompletableFuture<V> dst;
766 final Executor executor;
767 ThenCombine(CompletableFuture<? extends T> src,
768 CompletableFuture<? extends U> snd,
769 BiFunction<? super T,? super U,? extends V> fn,
770 CompletableFuture<V> dst,
771 Executor executor) {
772 this.src = src; this.snd = snd;
773 this.fn = fn; this.dst = dst;
774 this.executor = executor;
775 }
776
777 public final void run() {
778 final CompletableFuture<? extends T> a;
779 final CompletableFuture<? extends U> b;
780 final BiFunction<? super T,? super U,? extends V> fn;
781 final CompletableFuture<V> dst;
782 if ((dst = this.dst) == null ||
783 (fn = this.fn) == null ||
784 (a = this.src) == null ||
785 (b = this.snd) == null)
786 return;
787 final Object r = a.result, s = b.result;
788 final T t; final U u; Throwable ex;
789 if (r instanceof AltResult) {
790 ex = ((AltResult)r).ex;
791 t = null;
792 } else {
793 ex = null;
794 @SuppressWarnings("unchecked") T tr = (T) r;
795 t = tr;
796 }
797 if (ex != null)
798 u = null;
799 else if (s instanceof AltResult) {
800 ex = ((AltResult)s).ex;
801 u = null;
802 } else {
803 ex = null;
804 @SuppressWarnings("unchecked") U us = (U) s;
805 u = us;
806 }
807
808 if ((ex != null || (r != null && s != null))
809 && compareAndSet(0, 1)) {
810 Executor e = executor;
811 V v = null;
812 if (ex == null) {
813 try {
814 if (e != null)
815 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
816 else
817 v = fn.apply(t, u);
818 } catch (Throwable rex) {
819 ex = rex;
820 }
821 }
822 if (e == null || ex != null)
823 dst.internalComplete(v, ex);
824 }
825 }
826 private static final long serialVersionUID = 5232453952276885070L;
827 }
828
829 static final class ThenAcceptBoth<T,U> extends Completion {
830 final CompletableFuture<? extends T> src;
831 final CompletableFuture<? extends U> snd;
832 final BiConsumer<? super T,? super U> fn;
833 final CompletableFuture<Void> dst;
834 final Executor executor;
835 ThenAcceptBoth(CompletableFuture<? extends T> src,
836 CompletableFuture<? extends U> snd,
837 BiConsumer<? super T,? super U> fn,
838 CompletableFuture<Void> dst,
839 Executor executor) {
840 this.src = src; this.snd = snd;
841 this.fn = fn; this.dst = dst;
842 this.executor = executor;
843 }
844 public final void run() {
845 final CompletableFuture<? extends T> a;
846 final CompletableFuture<? extends U> b;
847 final BiConsumer<? super T,? super U> fn;
848 final CompletableFuture<Void> dst;
849 if ((dst = this.dst) == null ||
850 (fn = this.fn) == null ||
851 (a = this.src) == null ||
852 (b = this.snd) == null)
853 return;
854 final Object r = a.result, s = b.result;
855 final T t; final U u; Throwable ex;
856 if (r instanceof AltResult) {
857 ex = ((AltResult)r).ex;
858 t = null;
859 } else {
860 ex = null;
861 @SuppressWarnings("unchecked") T tr = (T) r;
862 t = tr;
863 }
864 if (ex != null)
865 u = null;
866 else if (s instanceof AltResult) {
867 ex = ((AltResult)s).ex;
868 u = null;
869 } else {
870 ex = null;
871 @SuppressWarnings("unchecked") U us = (U) s;
872 u = us;
873 }
874 if ((ex != null || (r != null && s != null))
875 && compareAndSet(0, 1)) {
876 Executor e = executor;
877 if (ex == null) {
878 try {
879 if (e != null)
880 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
881 else
882 fn.accept(t, u);
883 } catch (Throwable rex) {
884 ex = rex;
885 }
886 }
887 if (e == null || ex != null)
888 dst.internalComplete(null, ex);
889 }
890 }
891 private static final long serialVersionUID = 5232453952276885070L;
892 }
893
894 static final class RunAfterBoth extends Completion {
895 final CompletableFuture<?> src;
896 final CompletableFuture<?> snd;
897 final Runnable fn;
898 final CompletableFuture<Void> dst;
899 final Executor executor;
900 RunAfterBoth(CompletableFuture<?> src,
901 CompletableFuture<?> snd,
902 Runnable fn,
903 CompletableFuture<Void> dst,
904 Executor executor) {
905 this.src = src; this.snd = snd;
906 this.fn = fn; this.dst = dst;
907 this.executor = executor;
908 }
909 public final void run() {
910 final CompletableFuture<?> a;
911 final CompletableFuture<?> b;
912 final Runnable fn;
913 final CompletableFuture<Void> dst;
914 if ((dst = this.dst) == null ||
915 (fn = this.fn) == null ||
916 (a = this.src) == null ||
917 (b = this.snd) == null)
918 return;
919 final Object r = a.result, s = b.result;
920 Throwable ex;
921 if (r instanceof AltResult)
922 ex = ((AltResult)r).ex;
923 else
924 ex = null;
925 if (ex == null && (s instanceof AltResult))
926 ex = ((AltResult)s).ex;
927 if ((ex != null || (r != null && s != null))
928 && compareAndSet(0, 1)) {
929 Executor e = executor;
930 if (ex == null) {
931 try {
932 if (e != null)
933 e.execute(new AsyncRun(fn, dst));
934 else
935 fn.run();
936 } catch (Throwable rex) {
937 ex = rex;
938 }
939 }
940 if (e == null || ex != null)
941 dst.internalComplete(null, ex);
942 }
943 }
944 private static final long serialVersionUID = 5232453952276885070L;
945 }
946
947 static final class AndCompletion extends Completion {
948 final CompletableFuture<?> src;
949 final CompletableFuture<?> snd;
950 final CompletableFuture<Void> dst;
951 AndCompletion(CompletableFuture<?> src,
952 CompletableFuture<?> snd,
953 CompletableFuture<Void> dst) {
954 this.src = src; this.snd = snd; this.dst = dst;
955 }
956 public final void run() {
957 final CompletableFuture<?> a;
958 final CompletableFuture<?> b;
959 final CompletableFuture<Void> dst;
960 Object r, s; Throwable ex;
961 if ((dst = this.dst) != null &&
962 (a = this.src) != null &&
963 (r = a.result) != null &&
964 (b = this.snd) != null &&
965 (s = b.result) != null &&
966 compareAndSet(0, 1)) {
967 if (r instanceof AltResult)
968 ex = ((AltResult)r).ex;
969 else
970 ex = null;
971 if (ex == null && (s instanceof AltResult))
972 ex = ((AltResult)s).ex;
973 dst.internalComplete(null, ex);
974 }
975 }
976 private static final long serialVersionUID = 5232453952276885070L;
977 }
978
979 static final class ApplyToEither<T,U> extends Completion {
980 final CompletableFuture<? extends T> src;
981 final CompletableFuture<? extends T> snd;
982 final Function<? super T,? extends U> fn;
983 final CompletableFuture<U> dst;
984 final Executor executor;
985 ApplyToEither(CompletableFuture<? extends T> src,
986 CompletableFuture<? extends T> snd,
987 Function<? super T,? extends U> fn,
988 CompletableFuture<U> dst,
989 Executor executor) {
990 this.src = src; this.snd = snd;
991 this.fn = fn; this.dst = dst;
992 this.executor = executor;
993 }
994 public final void run() {
995 final CompletableFuture<? extends T> a;
996 final CompletableFuture<? extends T> b;
997 final Function<? super T,? extends U> fn;
998 final CompletableFuture<U> dst;
999 Object r; T t; Throwable ex;
1000 if ((dst = this.dst) != null &&
1001 (fn = this.fn) != null &&
1002 (((a = this.src) != null && (r = a.result) != null) ||
1003 ((b = this.snd) != null && (r = b.result) != null)) &&
1004 compareAndSet(0, 1)) {
1005 if (r instanceof AltResult) {
1006 ex = ((AltResult)r).ex;
1007 t = null;
1008 }
1009 else {
1010 ex = null;
1011 @SuppressWarnings("unchecked") T tr = (T) r;
1012 t = tr;
1013 }
1014 Executor e = executor;
1015 U u = null;
1016 if (ex == null) {
1017 try {
1018 if (e != null)
1019 e.execute(new AsyncApply<T,U>(t, fn, dst));
1020 else
1021 u = fn.apply(t);
1022 } catch (Throwable rex) {
1023 ex = rex;
1024 }
1025 }
1026 if (e == null || ex != null)
1027 dst.internalComplete(u, ex);
1028 }
1029 }
1030 private static final long serialVersionUID = 5232453952276885070L;
1031 }
1032
1033 static final class AcceptEither<T> extends Completion {
1034 final CompletableFuture<? extends T> src;
1035 final CompletableFuture<? extends T> snd;
1036 final Consumer<? super T> fn;
1037 final CompletableFuture<Void> dst;
1038 final Executor executor;
1039 AcceptEither(CompletableFuture<? extends T> src,
1040 CompletableFuture<? extends T> snd,
1041 Consumer<? super T> fn,
1042 CompletableFuture<Void> dst,
1043 Executor executor) {
1044 this.src = src; this.snd = snd;
1045 this.fn = fn; this.dst = dst;
1046 this.executor = executor;
1047 }
1048 public final void run() {
1049 final CompletableFuture<? extends T> a;
1050 final CompletableFuture<? extends T> b;
1051 final Consumer<? super T> fn;
1052 final CompletableFuture<Void> dst;
1053 Object r; T t; Throwable ex;
1054 if ((dst = this.dst) != null &&
1055 (fn = this.fn) != null &&
1056 (((a = this.src) != null && (r = a.result) != null) ||
1057 ((b = this.snd) != null && (r = b.result) != null)) &&
1058 compareAndSet(0, 1)) {
1059 if (r instanceof AltResult) {
1060 ex = ((AltResult)r).ex;
1061 t = null;
1062 }
1063 else {
1064 ex = null;
1065 @SuppressWarnings("unchecked") T tr = (T) r;
1066 t = tr;
1067 }
1068 Executor e = executor;
1069 if (ex == null) {
1070 try {
1071 if (e != null)
1072 e.execute(new AsyncAccept<T>(t, fn, dst));
1073 else
1074 fn.accept(t);
1075 } catch (Throwable rex) {
1076 ex = rex;
1077 }
1078 }
1079 if (e == null || ex != null)
1080 dst.internalComplete(null, ex);
1081 }
1082 }
1083 private static final long serialVersionUID = 5232453952276885070L;
1084 }
1085
1086 static final class RunAfterEither extends Completion {
1087 final CompletableFuture<?> src;
1088 final CompletableFuture<?> snd;
1089 final Runnable fn;
1090 final CompletableFuture<Void> dst;
1091 final Executor executor;
1092 RunAfterEither(CompletableFuture<?> src,
1093 CompletableFuture<?> snd,
1094 Runnable fn,
1095 CompletableFuture<Void> dst,
1096 Executor executor) {
1097 this.src = src; this.snd = snd;
1098 this.fn = fn; this.dst = dst;
1099 this.executor = executor;
1100 }
1101 public final void run() {
1102 final CompletableFuture<?> a;
1103 final CompletableFuture<?> b;
1104 final Runnable fn;
1105 final CompletableFuture<Void> dst;
1106 Object r; Throwable ex;
1107 if ((dst = this.dst) != null &&
1108 (fn = this.fn) != null &&
1109 (((a = this.src) != null && (r = a.result) != null) ||
1110 ((b = this.snd) != null && (r = b.result) != null)) &&
1111 compareAndSet(0, 1)) {
1112 if (r instanceof AltResult)
1113 ex = ((AltResult)r).ex;
1114 else
1115 ex = null;
1116 Executor e = executor;
1117 if (ex == null) {
1118 try {
1119 if (e != null)
1120 e.execute(new AsyncRun(fn, dst));
1121 else
1122 fn.run();
1123 } catch (Throwable rex) {
1124 ex = rex;
1125 }
1126 }
1127 if (e == null || ex != null)
1128 dst.internalComplete(null, ex);
1129 }
1130 }
1131 private static final long serialVersionUID = 5232453952276885070L;
1132 }
1133
1134 static final class OrCompletion extends Completion {
1135 final CompletableFuture<?> src;
1136 final CompletableFuture<?> snd;
1137 final CompletableFuture<Object> dst;
1138 OrCompletion(CompletableFuture<?> src,
1139 CompletableFuture<?> snd,
1140 CompletableFuture<Object> dst) {
1141 this.src = src; this.snd = snd; this.dst = dst;
1142 }
1143 public final void run() {
1144 final CompletableFuture<?> a;
1145 final CompletableFuture<?> b;
1146 final CompletableFuture<Object> dst;
1147 Object r, t; Throwable ex;
1148 if ((dst = this.dst) != null &&
1149 (((a = this.src) != null && (r = a.result) != null) ||
1150 ((b = this.snd) != null && (r = b.result) != null)) &&
1151 compareAndSet(0, 1)) {
1152 if (r instanceof AltResult) {
1153 ex = ((AltResult)r).ex;
1154 t = null;
1155 }
1156 else {
1157 ex = null;
1158 t = r;
1159 }
1160 dst.internalComplete(t, ex);
1161 }
1162 }
1163 private static final long serialVersionUID = 5232453952276885070L;
1164 }
1165
1166 static final class ExceptionCompletion<T> extends Completion {
1167 final CompletableFuture<? extends T> src;
1168 final Function<? super Throwable, ? extends T> fn;
1169 final CompletableFuture<T> dst;
1170 ExceptionCompletion(CompletableFuture<? extends T> src,
1171 Function<? super Throwable, ? extends T> fn,
1172 CompletableFuture<T> dst) {
1173 this.src = src; this.fn = fn; this.dst = dst;
1174 }
1175 public final void run() {
1176 final CompletableFuture<? extends T> a;
1177 final Function<? super Throwable, ? extends T> fn;
1178 final CompletableFuture<T> dst;
1179 Object r; T t = null; Throwable ex, dx = null;
1180 if ((dst = this.dst) != null &&
1181 (fn = this.fn) != null &&
1182 (a = this.src) != null &&
1183 (r = a.result) != null &&
1184 compareAndSet(0, 1)) {
1185 if ((r instanceof AltResult) &&
1186 (ex = ((AltResult)r).ex) != null) {
1187 try {
1188 t = fn.apply(ex);
1189 } catch (Throwable rex) {
1190 dx = rex;
1191 }
1192 }
1193 else {
1194 @SuppressWarnings("unchecked") T tr = (T) r;
1195 t = tr;
1196 }
1197 dst.internalComplete(t, dx);
1198 }
1199 }
1200 private static final long serialVersionUID = 5232453952276885070L;
1201 }
1202
1203 static final class ThenCopy<T> extends Completion {
1204 final CompletableFuture<?> src;
1205 final CompletableFuture<T> dst;
1206 ThenCopy(CompletableFuture<?> src,
1207 CompletableFuture<T> dst) {
1208 this.src = src; this.dst = dst;
1209 }
1210 public final void run() {
1211 final CompletableFuture<?> a;
1212 final CompletableFuture<T> dst;
1213 Object r; T t; Throwable ex;
1214 if ((dst = this.dst) != null &&
1215 (a = this.src) != null &&
1216 (r = a.result) != null &&
1217 compareAndSet(0, 1)) {
1218 if (r instanceof AltResult) {
1219 ex = ((AltResult)r).ex;
1220 t = null;
1221 }
1222 else {
1223 ex = null;
1224 @SuppressWarnings("unchecked") T tr = (T) r;
1225 t = tr;
1226 }
1227 dst.internalComplete(t, ex);
1228 }
1229 }
1230 private static final long serialVersionUID = 5232453952276885070L;
1231 }
1232
1233 // version of ThenCopy for CompletableFuture<Void> dst
1234 static final class ThenPropagate extends Completion {
1235 final CompletableFuture<?> src;
1236 final CompletableFuture<Void> dst;
1237 ThenPropagate(CompletableFuture<?> src,
1238 CompletableFuture<Void> dst) {
1239 this.src = src; this.dst = dst;
1240 }
1241 public final void run() {
1242 final CompletableFuture<?> a;
1243 final CompletableFuture<Void> dst;
1244 Object r; Throwable ex;
1245 if ((dst = this.dst) != null &&
1246 (a = this.src) != null &&
1247 (r = a.result) != null &&
1248 compareAndSet(0, 1)) {
1249 if (r instanceof AltResult)
1250 ex = ((AltResult)r).ex;
1251 else
1252 ex = null;
1253 dst.internalComplete(null, ex);
1254 }
1255 }
1256 private static final long serialVersionUID = 5232453952276885070L;
1257 }
1258
1259 static final class HandleCompletion<T,U> extends Completion {
1260 final CompletableFuture<? extends T> src;
1261 final BiFunction<? super T, Throwable, ? extends U> fn;
1262 final CompletableFuture<U> dst;
1263 HandleCompletion(CompletableFuture<? extends T> src,
1264 BiFunction<? super T, Throwable, ? extends U> fn,
1265 CompletableFuture<U> dst) {
1266 this.src = src; this.fn = fn; this.dst = dst;
1267 }
1268 public final void run() {
1269 final CompletableFuture<? extends T> a;
1270 final BiFunction<? super T, Throwable, ? extends U> fn;
1271 final CompletableFuture<U> dst;
1272 Object r; T t; Throwable ex;
1273 if ((dst = this.dst) != null &&
1274 (fn = this.fn) != null &&
1275 (a = this.src) != null &&
1276 (r = a.result) != null &&
1277 compareAndSet(0, 1)) {
1278 if (r instanceof AltResult) {
1279 ex = ((AltResult)r).ex;
1280 t = null;
1281 }
1282 else {
1283 ex = null;
1284 @SuppressWarnings("unchecked") T tr = (T) r;
1285 t = tr;
1286 }
1287 U u = null; Throwable dx = null;
1288 try {
1289 u = fn.apply(t, ex);
1290 } catch (Throwable rex) {
1291 dx = rex;
1292 }
1293 dst.internalComplete(u, dx);
1294 }
1295 }
1296 private static final long serialVersionUID = 5232453952276885070L;
1297 }
1298
1299 static final class ThenCompose<T,U> extends Completion {
1300 final CompletableFuture<? extends T> src;
1301 final Function<? super T, CompletableFuture<U>> fn;
1302 final CompletableFuture<U> dst;
1303 final Executor executor;
1304 ThenCompose(CompletableFuture<? extends T> src,
1305 Function<? super T, CompletableFuture<U>> fn,
1306 CompletableFuture<U> dst,
1307 Executor executor) {
1308 this.src = src; this.fn = fn; this.dst = dst;
1309 this.executor = executor;
1310 }
1311 public final void run() {
1312 final CompletableFuture<? extends T> a;
1313 final Function<? super T, CompletableFuture<U>> fn;
1314 final CompletableFuture<U> dst;
1315 Object r; T t; Throwable ex; Executor e;
1316 if ((dst = this.dst) != null &&
1317 (fn = this.fn) != null &&
1318 (a = this.src) != null &&
1319 (r = a.result) != null &&
1320 compareAndSet(0, 1)) {
1321 if (r instanceof AltResult) {
1322 ex = ((AltResult)r).ex;
1323 t = null;
1324 }
1325 else {
1326 ex = null;
1327 @SuppressWarnings("unchecked") T tr = (T) r;
1328 t = tr;
1329 }
1330 CompletableFuture<U> c = null;
1331 U u = null;
1332 boolean complete = false;
1333 if (ex == null) {
1334 if ((e = executor) != null)
1335 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1336 else {
1337 try {
1338 if ((c = fn.apply(t)) == null)
1339 ex = new NullPointerException();
1340 } catch (Throwable rex) {
1341 ex = rex;
1342 }
1343 }
1344 }
1345 if (c != null) {
1346 ThenCopy<U> d = null;
1347 Object s;
1348 if ((s = c.result) == null) {
1349 CompletionNode p = new CompletionNode
1350 (d = new ThenCopy<U>(c, dst));
1351 while ((s = c.result) == null) {
1352 if (UNSAFE.compareAndSwapObject
1353 (c, COMPLETIONS, p.next = c.completions, p))
1354 break;
1355 }
1356 }
1357 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1358 complete = true;
1359 if (s instanceof AltResult) {
1360 ex = ((AltResult)s).ex; // no rewrap
1361 u = null;
1362 }
1363 else {
1364 @SuppressWarnings("unchecked") U us = (U) s;
1365 u = us;
1366 }
1367 }
1368 }
1369 if (complete || ex != null)
1370 dst.internalComplete(u, ex);
1371 if (c != null)
1372 c.helpPostComplete();
1373 }
1374 }
1375 private static final long serialVersionUID = 5232453952276885070L;
1376 }
1377
1378 // public methods
1379
1380 /**
1381 * Creates a new incomplete CompletableFuture.
1382 */
1383 public CompletableFuture() {
1384 }
1385
1386 /**
1387 * Returns a new CompletableFuture that is asynchronously completed
1388 * by a task running in the {@link ForkJoinPool#commonPool()} with
1389 * the value obtained by calling the given Supplier.
1390 *
1391 * @param supplier a function returning the value to be used
1392 * to complete the returned CompletableFuture
1393 * @return the new CompletableFuture
1394 */
1395 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1396 if (supplier == null) throw new NullPointerException();
1397 CompletableFuture<U> f = new CompletableFuture<U>();
1398 ForkJoinPool.commonPool().
1399 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1400 return f;
1401 }
1402
1403 /**
1404 * Returns a new CompletableFuture that is asynchronously completed
1405 * by a task running in the given executor with the value obtained
1406 * by calling the given Supplier.
1407 *
1408 * @param supplier a function returning the value to be used
1409 * to complete the returned CompletableFuture
1410 * @param executor the executor to use for asynchronous execution
1411 * @return the new CompletableFuture
1412 */
1413 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1414 Executor executor) {
1415 if (executor == null || supplier == null)
1416 throw new NullPointerException();
1417 CompletableFuture<U> f = new CompletableFuture<U>();
1418 executor.execute(new AsyncSupply<U>(supplier, f));
1419 return f;
1420 }
1421
1422 /**
1423 * Returns a new CompletableFuture that is asynchronously completed
1424 * by a task running in the {@link ForkJoinPool#commonPool()} after
1425 * it runs the given action.
1426 *
1427 * @param runnable the action to run before completing the
1428 * returned CompletableFuture
1429 * @return the new CompletableFuture
1430 */
1431 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1432 if (runnable == null) throw new NullPointerException();
1433 CompletableFuture<Void> f = new CompletableFuture<Void>();
1434 ForkJoinPool.commonPool().
1435 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1436 return f;
1437 }
1438
1439 /**
1440 * Returns a new CompletableFuture that is asynchronously completed
1441 * by a task running in the given executor after it runs the given
1442 * action.
1443 *
1444 * @param runnable the action to run before completing the
1445 * returned CompletableFuture
1446 * @param executor the executor to use for asynchronous execution
1447 * @return the new CompletableFuture
1448 */
1449 public static CompletableFuture<Void> runAsync(Runnable runnable,
1450 Executor executor) {
1451 if (executor == null || runnable == null)
1452 throw new NullPointerException();
1453 CompletableFuture<Void> f = new CompletableFuture<Void>();
1454 executor.execute(new AsyncRun(runnable, f));
1455 return f;
1456 }
1457
1458 /**
1459 * Returns a new CompletableFuture that is already completed with
1460 * the given value.
1461 *
1462 * @param value the value
1463 * @return the completed CompletableFuture
1464 */
1465 public static <U> CompletableFuture<U> completedFuture(U value) {
1466 CompletableFuture<U> f = new CompletableFuture<U>();
1467 f.result = (value == null) ? NIL : value;
1468 return f;
1469 }
1470
1471 /**
1472 * Returns {@code true} if completed in any fashion: normally,
1473 * exceptionally, or via cancellation.
1474 *
1475 * @return {@code true} if completed
1476 */
1477 public boolean isDone() {
1478 return result != null;
1479 }
1480
1481 /**
1482 * Waits if necessary for this future to complete, and then
1483 * returns its result.
1484 *
1485 * @return the result value
1486 * @throws CancellationException if this future was cancelled
1487 * @throws ExecutionException if this future completed exceptionally
1488 * @throws InterruptedException if the current thread was interrupted
1489 * while waiting
1490 */
1491 public T get() throws InterruptedException, ExecutionException {
1492 Object r; Throwable ex, cause;
1493 if ((r = result) == null && (r = waitingGet(true)) == null)
1494 throw new InterruptedException();
1495 if (!(r instanceof AltResult)) {
1496 @SuppressWarnings("unchecked") T tr = (T) r;
1497 return tr;
1498 }
1499 if ((ex = ((AltResult)r).ex) == null)
1500 return null;
1501 if (ex instanceof CancellationException)
1502 throw (CancellationException)ex;
1503 if ((ex instanceof CompletionException) &&
1504 (cause = ex.getCause()) != null)
1505 ex = cause;
1506 throw new ExecutionException(ex);
1507 }
1508
1509 /**
1510 * Waits if necessary for at most the given time for this future
1511 * to complete, and then returns its result, if available.
1512 *
1513 * @param timeout the maximum time to wait
1514 * @param unit the time unit of the timeout argument
1515 * @return the result value
1516 * @throws CancellationException if this future was cancelled
1517 * @throws ExecutionException if this future completed exceptionally
1518 * @throws InterruptedException if the current thread was interrupted
1519 * while waiting
1520 * @throws TimeoutException if the wait timed out
1521 */
1522 public T get(long timeout, TimeUnit unit)
1523 throws InterruptedException, ExecutionException, TimeoutException {
1524 Object r; Throwable ex, cause;
1525 long nanos = unit.toNanos(timeout);
1526 if (Thread.interrupted())
1527 throw new InterruptedException();
1528 if ((r = result) == null)
1529 r = timedAwaitDone(nanos);
1530 if (!(r instanceof AltResult)) {
1531 @SuppressWarnings("unchecked") T tr = (T) r;
1532 return tr;
1533 }
1534 if ((ex = ((AltResult)r).ex) == null)
1535 return null;
1536 if (ex instanceof CancellationException)
1537 throw (CancellationException)ex;
1538 if ((ex instanceof CompletionException) &&
1539 (cause = ex.getCause()) != null)
1540 ex = cause;
1541 throw new ExecutionException(ex);
1542 }
1543
1544 /**
1545 * Returns the result value when complete, or throws an
1546 * (unchecked) exception if completed exceptionally. To better
1547 * conform with the use of common functional forms, if a
1548 * computation involved in the completion of this
1549 * CompletableFuture threw an exception, this method throws an
1550 * (unchecked) {@link CompletionException} with the underlying
1551 * exception as its cause.
1552 *
1553 * @return the result value
1554 * @throws CancellationException if the computation was cancelled
1555 * @throws CompletionException if this future completed
1556 * exceptionally or a completion computation threw an exception
1557 */
1558 public T join() {
1559 Object r; Throwable ex;
1560 if ((r = result) == null)
1561 r = waitingGet(false);
1562 if (!(r instanceof AltResult)) {
1563 @SuppressWarnings("unchecked") T tr = (T) r;
1564 return tr;
1565 }
1566 if ((ex = ((AltResult)r).ex) == null)
1567 return null;
1568 if (ex instanceof CancellationException)
1569 throw (CancellationException)ex;
1570 if (ex instanceof CompletionException)
1571 throw (CompletionException)ex;
1572 throw new CompletionException(ex);
1573 }
1574
1575 /**
1576 * Returns the result value (or throws any encountered exception)
1577 * if completed, else returns the given valueIfAbsent.
1578 *
1579 * @param valueIfAbsent the value to return if not completed
1580 * @return the result value, if completed, else the given valueIfAbsent
1581 * @throws CancellationException if the computation was cancelled
1582 * @throws CompletionException if this future completed
1583 * exceptionally or a completion computation threw an exception
1584 */
1585 public T getNow(T valueIfAbsent) {
1586 Object r; Throwable ex;
1587 if ((r = result) == null)
1588 return valueIfAbsent;
1589 if (!(r instanceof AltResult)) {
1590 @SuppressWarnings("unchecked") T tr = (T) r;
1591 return tr;
1592 }
1593 if ((ex = ((AltResult)r).ex) == null)
1594 return null;
1595 if (ex instanceof CancellationException)
1596 throw (CancellationException)ex;
1597 if (ex instanceof CompletionException)
1598 throw (CompletionException)ex;
1599 throw new CompletionException(ex);
1600 }
1601
1602 /**
1603 * If not already completed, sets the value returned by {@link
1604 * #get()} and related methods to the given value.
1605 *
1606 * @param value the result value
1607 * @return {@code true} if this invocation caused this CompletableFuture
1608 * to transition to a completed state, else {@code false}
1609 */
1610 public boolean complete(T value) {
1611 boolean triggered = result == null &&
1612 UNSAFE.compareAndSwapObject(this, RESULT, null,
1613 value == null ? NIL : value);
1614 postComplete();
1615 return triggered;
1616 }
1617
1618 /**
1619 * If not already completed, causes invocations of {@link #get()}
1620 * and related methods to throw the given exception.
1621 *
1622 * @param ex the exception
1623 * @return {@code true} if this invocation caused this CompletableFuture
1624 * to transition to a completed state, else {@code false}
1625 */
1626 public boolean completeExceptionally(Throwable ex) {
1627 if (ex == null) throw new NullPointerException();
1628 boolean triggered = result == null &&
1629 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1630 postComplete();
1631 return triggered;
1632 }
1633
1634 /**
1635 * Returns a new CompletableFuture that is completed
1636 * when this CompletableFuture completes, with the result of the
1637 * given function of this CompletableFuture's result.
1638 *
1639 * <p>If this CompletableFuture completes exceptionally, or the
1640 * supplied function throws an exception, then the returned
1641 * CompletableFuture completes exceptionally with a
1642 * CompletionException holding the exception as its cause.
1643 *
1644 * @param fn the function to use to compute the value of
1645 * the returned CompletableFuture
1646 * @return the new CompletableFuture
1647 */
1648 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1649 return doThenApply(fn, null);
1650 }
1651
1652 /**
1653 * Returns a new CompletableFuture that is asynchronously completed
1654 * when this CompletableFuture completes, with the result of the
1655 * given function of this CompletableFuture's result from a
1656 * task running in the {@link ForkJoinPool#commonPool()}.
1657 *
1658 * <p>If this CompletableFuture completes exceptionally, or the
1659 * supplied function throws an exception, then the returned
1660 * CompletableFuture completes exceptionally with a
1661 * CompletionException holding the exception as its cause.
1662 *
1663 * @param fn the function to use to compute the value of
1664 * the returned CompletableFuture
1665 * @return the new CompletableFuture
1666 */
1667 public <U> CompletableFuture<U> thenApplyAsync
1668 (Function<? super T,? extends U> fn) {
1669 return doThenApply(fn, ForkJoinPool.commonPool());
1670 }
1671
1672 /**
1673 * Returns a new CompletableFuture that is asynchronously completed
1674 * when this CompletableFuture completes, with the result of the
1675 * given function of this CompletableFuture's result from a
1676 * task running in the given executor.
1677 *
1678 * <p>If this CompletableFuture completes exceptionally, or the
1679 * supplied function throws an exception, then the returned
1680 * CompletableFuture completes exceptionally with a
1681 * CompletionException holding the exception as its cause.
1682 *
1683 * @param fn the function to use to compute the value of
1684 * the returned CompletableFuture
1685 * @param executor the executor to use for asynchronous execution
1686 * @return the new CompletableFuture
1687 */
1688 public <U> CompletableFuture<U> thenApplyAsync
1689 (Function<? super T,? extends U> fn,
1690 Executor executor) {
1691 if (executor == null) throw new NullPointerException();
1692 return doThenApply(fn, executor);
1693 }
1694
1695 private <U> CompletableFuture<U> doThenApply
1696 (Function<? super T,? extends U> fn,
1697 Executor e) {
1698 if (fn == null) throw new NullPointerException();
1699 CompletableFuture<U> dst = new CompletableFuture<U>();
1700 ThenApply<T,U> d = null;
1701 Object r;
1702 if ((r = result) == null) {
1703 CompletionNode p = new CompletionNode
1704 (d = new ThenApply<T,U>(this, fn, dst, e));
1705 while ((r = result) == null) {
1706 if (UNSAFE.compareAndSwapObject
1707 (this, COMPLETIONS, p.next = completions, p))
1708 break;
1709 }
1710 }
1711 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1712 T t; Throwable ex;
1713 if (r instanceof AltResult) {
1714 ex = ((AltResult)r).ex;
1715 t = null;
1716 }
1717 else {
1718 ex = null;
1719 @SuppressWarnings("unchecked") T tr = (T) r;
1720 t = tr;
1721 }
1722 U u = null;
1723 if (ex == null) {
1724 try {
1725 if (e != null)
1726 e.execute(new AsyncApply<T,U>(t, fn, dst));
1727 else
1728 u = fn.apply(t);
1729 } catch (Throwable rex) {
1730 ex = rex;
1731 }
1732 }
1733 if (e == null || ex != null)
1734 dst.internalComplete(u, ex);
1735 }
1736 helpPostComplete();
1737 return dst;
1738 }
1739
1740 /**
1741 * Returns a new CompletableFuture that is completed
1742 * when this CompletableFuture completes, after performing the given
1743 * action with this CompletableFuture's result.
1744 *
1745 * <p>If this CompletableFuture completes exceptionally, or the
1746 * supplied action throws an exception, then the returned
1747 * CompletableFuture completes exceptionally with a
1748 * CompletionException holding the exception as its cause.
1749 *
1750 * @param block the action to perform before completing the
1751 * returned CompletableFuture
1752 * @return the new CompletableFuture
1753 */
1754 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1755 return doThenAccept(block, null);
1756 }
1757
1758 /**
1759 * Returns a new CompletableFuture that is asynchronously completed
1760 * when this CompletableFuture completes, after performing the given
1761 * action with this CompletableFuture's result from a task running
1762 * in the {@link ForkJoinPool#commonPool()}.
1763 *
1764 * <p>If this CompletableFuture completes exceptionally, or the
1765 * supplied action throws an exception, then the returned
1766 * CompletableFuture completes exceptionally with a
1767 * CompletionException holding the exception as its cause.
1768 *
1769 * @param block the action to perform before completing the
1770 * returned CompletableFuture
1771 * @return the new CompletableFuture
1772 */
1773 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1774 return doThenAccept(block, ForkJoinPool.commonPool());
1775 }
1776
1777 /**
1778 * Returns a new CompletableFuture that is asynchronously completed
1779 * when this CompletableFuture completes, after performing the given
1780 * action with this CompletableFuture's result from a task running
1781 * in the given executor.
1782 *
1783 * <p>If this CompletableFuture completes exceptionally, or the
1784 * supplied action throws an exception, then the returned
1785 * CompletableFuture completes exceptionally with a
1786 * CompletionException holding the exception as its cause.
1787 *
1788 * @param block the action to perform before completing the
1789 * returned CompletableFuture
1790 * @param executor the executor to use for asynchronous execution
1791 * @return the new CompletableFuture
1792 */
1793 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1794 Executor executor) {
1795 if (executor == null) throw new NullPointerException();
1796 return doThenAccept(block, executor);
1797 }
1798
1799 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1800 Executor e) {
1801 if (fn == null) throw new NullPointerException();
1802 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1803 ThenAccept<T> d = null;
1804 Object r;
1805 if ((r = result) == null) {
1806 CompletionNode p = new CompletionNode
1807 (d = new ThenAccept<T>(this, fn, dst, e));
1808 while ((r = result) == null) {
1809 if (UNSAFE.compareAndSwapObject
1810 (this, COMPLETIONS, p.next = completions, p))
1811 break;
1812 }
1813 }
1814 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1815 T t; Throwable ex;
1816 if (r instanceof AltResult) {
1817 ex = ((AltResult)r).ex;
1818 t = null;
1819 }
1820 else {
1821 ex = null;
1822 @SuppressWarnings("unchecked") T tr = (T) r;
1823 t = tr;
1824 }
1825 if (ex == null) {
1826 try {
1827 if (e != null)
1828 e.execute(new AsyncAccept<T>(t, fn, dst));
1829 else
1830 fn.accept(t);
1831 } catch (Throwable rex) {
1832 ex = rex;
1833 }
1834 }
1835 if (e == null || ex != null)
1836 dst.internalComplete(null, ex);
1837 }
1838 helpPostComplete();
1839 return dst;
1840 }
1841
1842 /**
1843 * Returns a new CompletableFuture that is completed
1844 * when this CompletableFuture completes, after performing the given
1845 * action.
1846 *
1847 * <p>If this CompletableFuture completes exceptionally, or the
1848 * supplied action throws an exception, then the returned
1849 * CompletableFuture completes exceptionally with a
1850 * CompletionException holding the exception as its cause.
1851 *
1852 * @param action the action to perform before completing the
1853 * returned CompletableFuture
1854 * @return the new CompletableFuture
1855 */
1856 public CompletableFuture<Void> thenRun(Runnable action) {
1857 return doThenRun(action, null);
1858 }
1859
1860 /**
1861 * Returns a new CompletableFuture that is asynchronously completed
1862 * when this CompletableFuture completes, after performing the given
1863 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1864 *
1865 * <p>If this CompletableFuture completes exceptionally, or the
1866 * supplied action throws an exception, then the returned
1867 * CompletableFuture completes exceptionally with a
1868 * CompletionException holding the exception as its cause.
1869 *
1870 * @param action the action to perform before completing the
1871 * returned CompletableFuture
1872 * @return the new CompletableFuture
1873 */
1874 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1875 return doThenRun(action, ForkJoinPool.commonPool());
1876 }
1877
1878 /**
1879 * Returns a new CompletableFuture that is asynchronously completed
1880 * when this CompletableFuture completes, after performing the given
1881 * action from a task running in the given executor.
1882 *
1883 * <p>If this CompletableFuture completes exceptionally, or the
1884 * supplied action throws an exception, then the returned
1885 * CompletableFuture completes exceptionally with a
1886 * CompletionException holding the exception as its cause.
1887 *
1888 * @param action the action to perform before completing the
1889 * returned CompletableFuture
1890 * @param executor the executor to use for asynchronous execution
1891 * @return the new CompletableFuture
1892 */
1893 public CompletableFuture<Void> thenRunAsync(Runnable action,
1894 Executor executor) {
1895 if (executor == null) throw new NullPointerException();
1896 return doThenRun(action, executor);
1897 }
1898
1899 private CompletableFuture<Void> doThenRun(Runnable action,
1900 Executor e) {
1901 if (action == null) throw new NullPointerException();
1902 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1903 ThenRun d = null;
1904 Object r;
1905 if ((r = result) == null) {
1906 CompletionNode p = new CompletionNode
1907 (d = new ThenRun(this, action, dst, e));
1908 while ((r = result) == null) {
1909 if (UNSAFE.compareAndSwapObject
1910 (this, COMPLETIONS, p.next = completions, p))
1911 break;
1912 }
1913 }
1914 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1915 Throwable ex;
1916 if (r instanceof AltResult)
1917 ex = ((AltResult)r).ex;
1918 else
1919 ex = null;
1920 if (ex == null) {
1921 try {
1922 if (e != null)
1923 e.execute(new AsyncRun(action, dst));
1924 else
1925 action.run();
1926 } catch (Throwable rex) {
1927 ex = rex;
1928 }
1929 }
1930 if (e == null || ex != null)
1931 dst.internalComplete(null, ex);
1932 }
1933 helpPostComplete();
1934 return dst;
1935 }
1936
1937 /**
1938 * Returns a new CompletableFuture that is completed
1939 * when both this and the other given CompletableFuture complete,
1940 * with the result of the given function of the results of the two
1941 * CompletableFutures.
1942 *
1943 * <p>If this and/or the other CompletableFuture complete
1944 * exceptionally, or the supplied function throws an exception,
1945 * then the returned CompletableFuture completes exceptionally
1946 * with a CompletionException holding the exception as its cause.
1947 *
1948 * @param other the other CompletableFuture
1949 * @param fn the function to use to compute the value of
1950 * the returned CompletableFuture
1951 * @return the new CompletableFuture
1952 */
1953 public <U,V> CompletableFuture<V> thenCombine
1954 (CompletableFuture<? extends U> other,
1955 BiFunction<? super T,? super U,? extends V> fn) {
1956 return doThenCombine(other, fn, null);
1957 }
1958
1959 /**
1960 * Returns a new CompletableFuture that is asynchronously completed
1961 * when both this and the other given CompletableFuture complete,
1962 * with the result of the given function of the results of the two
1963 * CompletableFutures from a task running in the
1964 * {@link ForkJoinPool#commonPool()}.
1965 *
1966 * <p>If this and/or the other CompletableFuture complete
1967 * exceptionally, or the supplied function throws an exception,
1968 * then the returned CompletableFuture completes exceptionally
1969 * with a CompletionException holding the exception as its cause.
1970 *
1971 * @param other the other CompletableFuture
1972 * @param fn the function to use to compute the value of
1973 * the returned CompletableFuture
1974 * @return the new CompletableFuture
1975 */
1976 public <U,V> CompletableFuture<V> thenCombineAsync
1977 (CompletableFuture<? extends U> other,
1978 BiFunction<? super T,? super U,? extends V> fn) {
1979 return doThenCombine(other, fn, ForkJoinPool.commonPool());
1980 }
1981
1982 /**
1983 * Returns a new CompletableFuture that is asynchronously completed
1984 * when both this and the other given CompletableFuture complete,
1985 * with the result of the given function of the results of the two
1986 * CompletableFutures from a task running in the given executor.
1987 *
1988 * <p>If this and/or the other CompletableFuture complete
1989 * exceptionally, or the supplied function throws an exception,
1990 * then the returned CompletableFuture completes exceptionally
1991 * with a CompletionException holding the exception as its cause.
1992 *
1993 * @param other the other CompletableFuture
1994 * @param fn the function to use to compute the value of
1995 * the returned CompletableFuture
1996 * @param executor the executor to use for asynchronous execution
1997 * @return the new CompletableFuture
1998 */
1999 public <U,V> CompletableFuture<V> thenCombineAsync
2000 (CompletableFuture<? extends U> other,
2001 BiFunction<? super T,? super U,? extends V> fn,
2002 Executor executor) {
2003 if (executor == null) throw new NullPointerException();
2004 return doThenCombine(other, fn, executor);
2005 }
2006
2007 private <U,V> CompletableFuture<V> doThenCombine
2008 (CompletableFuture<? extends U> other,
2009 BiFunction<? super T,? super U,? extends V> fn,
2010 Executor e) {
2011 if (other == null || fn == null) throw new NullPointerException();
2012 CompletableFuture<V> dst = new CompletableFuture<V>();
2013 ThenCombine<T,U,V> d = null;
2014 Object r, s = null;
2015 if ((r = result) == null || (s = other.result) == null) {
2016 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
2017 CompletionNode q = null, p = new CompletionNode(d);
2018 while ((r == null && (r = result) == null) ||
2019 (s == null && (s = other.result) == null)) {
2020 if (q != null) {
2021 if (s != null ||
2022 UNSAFE.compareAndSwapObject
2023 (other, COMPLETIONS, q.next = other.completions, q))
2024 break;
2025 }
2026 else if (r != null ||
2027 UNSAFE.compareAndSwapObject
2028 (this, COMPLETIONS, p.next = completions, p)) {
2029 if (s != null)
2030 break;
2031 q = new CompletionNode(d);
2032 }
2033 }
2034 }
2035 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2036 T t; U u; Throwable ex;
2037 if (r instanceof AltResult) {
2038 ex = ((AltResult)r).ex;
2039 t = null;
2040 }
2041 else {
2042 ex = null;
2043 @SuppressWarnings("unchecked") T tr = (T) r;
2044 t = tr;
2045 }
2046 if (ex != null)
2047 u = null;
2048 else if (s instanceof AltResult) {
2049 ex = ((AltResult)s).ex;
2050 u = null;
2051 }
2052 else {
2053 @SuppressWarnings("unchecked") U us = (U) s;
2054 u = us;
2055 }
2056 V v = null;
2057 if (ex == null) {
2058 try {
2059 if (e != null)
2060 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
2061 else
2062 v = fn.apply(t, u);
2063 } catch (Throwable rex) {
2064 ex = rex;
2065 }
2066 }
2067 if (e == null || ex != null)
2068 dst.internalComplete(v, ex);
2069 }
2070 helpPostComplete();
2071 other.helpPostComplete();
2072 return dst;
2073 }
2074
2075 /**
2076 * Returns a new CompletableFuture that is completed
2077 * when both this and the other given CompletableFuture complete,
2078 * after performing the given action with the results of the two
2079 * CompletableFutures.
2080 *
2081 * <p>If this and/or the other CompletableFuture complete
2082 * exceptionally, or the supplied action throws an exception,
2083 * then the returned CompletableFuture completes exceptionally
2084 * with a CompletionException holding the exception as its cause.
2085 *
2086 * @param other the other CompletableFuture
2087 * @param block the action to perform before completing the
2088 * returned CompletableFuture
2089 * @return the new CompletableFuture
2090 */
2091 public <U> CompletableFuture<Void> thenAcceptBoth
2092 (CompletableFuture<? extends U> other,
2093 BiConsumer<? super T, ? super U> block) {
2094 return doThenAcceptBoth(other, block, null);
2095 }
2096
2097 /**
2098 * Returns a new CompletableFuture that is asynchronously completed
2099 * when both this and the other given CompletableFuture complete,
2100 * after performing the given action with the results of the two
2101 * CompletableFutures from a task running in the {@link
2102 * ForkJoinPool#commonPool()}.
2103 *
2104 * <p>If this and/or the other CompletableFuture complete
2105 * exceptionally, or the supplied action throws an exception,
2106 * then the returned CompletableFuture completes exceptionally
2107 * with a CompletionException holding the exception as its cause.
2108 *
2109 * @param other the other CompletableFuture
2110 * @param block the action to perform before completing the
2111 * returned CompletableFuture
2112 * @return the new CompletableFuture
2113 */
2114 public <U> CompletableFuture<Void> thenAcceptBothAsync
2115 (CompletableFuture<? extends U> other,
2116 BiConsumer<? super T, ? super U> block) {
2117 return doThenAcceptBoth(other, block, ForkJoinPool.commonPool());
2118 }
2119
2120 /**
2121 * Returns a new CompletableFuture that is asynchronously completed
2122 * when both this and the other given CompletableFuture complete,
2123 * after performing the given action with the results of the two
2124 * CompletableFutures from a task running in the given executor.
2125 *
2126 * <p>If this and/or the other CompletableFuture complete
2127 * exceptionally, or the supplied action throws an exception,
2128 * then the returned CompletableFuture completes exceptionally
2129 * with a CompletionException holding the exception as its cause.
2130 *
2131 * @param other the other CompletableFuture
2132 * @param block the action to perform before completing the
2133 * returned CompletableFuture
2134 * @param executor the executor to use for asynchronous execution
2135 * @return the new CompletableFuture
2136 */
2137 public <U> CompletableFuture<Void> thenAcceptBothAsync
2138 (CompletableFuture<? extends U> other,
2139 BiConsumer<? super T, ? super U> block,
2140 Executor executor) {
2141 if (executor == null) throw new NullPointerException();
2142 return doThenAcceptBoth(other, block, executor);
2143 }
2144
2145 private <U> CompletableFuture<Void> doThenAcceptBoth
2146 (CompletableFuture<? extends U> other,
2147 BiConsumer<? super T,? super U> fn,
2148 Executor e) {
2149 if (other == null || fn == null) throw new NullPointerException();
2150 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2151 ThenAcceptBoth<T,U> d = null;
2152 Object r, s = null;
2153 if ((r = result) == null || (s = other.result) == null) {
2154 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
2155 CompletionNode q = null, p = new CompletionNode(d);
2156 while ((r == null && (r = result) == null) ||
2157 (s == null && (s = other.result) == null)) {
2158 if (q != null) {
2159 if (s != null ||
2160 UNSAFE.compareAndSwapObject
2161 (other, COMPLETIONS, q.next = other.completions, q))
2162 break;
2163 }
2164 else if (r != null ||
2165 UNSAFE.compareAndSwapObject
2166 (this, COMPLETIONS, p.next = completions, p)) {
2167 if (s != null)
2168 break;
2169 q = new CompletionNode(d);
2170 }
2171 }
2172 }
2173 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2174 T t; U u; Throwable ex;
2175 if (r instanceof AltResult) {
2176 ex = ((AltResult)r).ex;
2177 t = null;
2178 }
2179 else {
2180 ex = null;
2181 @SuppressWarnings("unchecked") T tr = (T) r;
2182 t = tr;
2183 }
2184 if (ex != null)
2185 u = null;
2186 else if (s instanceof AltResult) {
2187 ex = ((AltResult)s).ex;
2188 u = null;
2189 }
2190 else {
2191 @SuppressWarnings("unchecked") U us = (U) s;
2192 u = us;
2193 }
2194 if (ex == null) {
2195 try {
2196 if (e != null)
2197 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
2198 else
2199 fn.accept(t, u);
2200 } catch (Throwable rex) {
2201 ex = rex;
2202 }
2203 }
2204 if (e == null || ex != null)
2205 dst.internalComplete(null, ex);
2206 }
2207 helpPostComplete();
2208 other.helpPostComplete();
2209 return dst;
2210 }
2211
2212 /**
2213 * Returns a new CompletableFuture that is completed
2214 * when both this and the other given CompletableFuture complete,
2215 * after performing the given action.
2216 *
2217 * <p>If this and/or the other CompletableFuture complete
2218 * exceptionally, or the supplied action throws an exception,
2219 * then the returned CompletableFuture completes exceptionally
2220 * with a CompletionException holding the exception as its cause.
2221 *
2222 * @param other the other CompletableFuture
2223 * @param action the action to perform before completing the
2224 * returned CompletableFuture
2225 * @return the new CompletableFuture
2226 */
2227 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2228 Runnable action) {
2229 return doRunAfterBoth(other, action, null);
2230 }
2231
2232 /**
2233 * Returns a new CompletableFuture that is asynchronously completed
2234 * when both this and the other given CompletableFuture complete,
2235 * after performing the given action from a task running in the
2236 * {@link ForkJoinPool#commonPool()}.
2237 *
2238 * <p>If this and/or the other CompletableFuture complete
2239 * exceptionally, or the supplied action throws an exception,
2240 * then the returned CompletableFuture completes exceptionally
2241 * with a CompletionException holding the exception as its cause.
2242 *
2243 * @param other the other CompletableFuture
2244 * @param action the action to perform before completing the
2245 * returned CompletableFuture
2246 * @return the new CompletableFuture
2247 */
2248 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2249 Runnable action) {
2250 return doRunAfterBoth(other, action, ForkJoinPool.commonPool());
2251 }
2252
2253 /**
2254 * Returns a new CompletableFuture that is asynchronously completed
2255 * when both this and the other given CompletableFuture complete,
2256 * after performing the given action from a task running in the
2257 * given executor.
2258 *
2259 * <p>If this and/or the other CompletableFuture complete
2260 * exceptionally, or the supplied action throws an exception,
2261 * then the returned CompletableFuture completes exceptionally
2262 * with a CompletionException holding the exception as its cause.
2263 *
2264 * @param other the other CompletableFuture
2265 * @param action the action to perform before completing the
2266 * returned CompletableFuture
2267 * @param executor the executor to use for asynchronous execution
2268 * @return the new CompletableFuture
2269 */
2270 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2271 Runnable action,
2272 Executor executor) {
2273 if (executor == null) throw new NullPointerException();
2274 return doRunAfterBoth(other, action, executor);
2275 }
2276
2277 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
2278 Runnable action,
2279 Executor e) {
2280 if (other == null || action == null) throw new NullPointerException();
2281 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2282 RunAfterBoth d = null;
2283 Object r, s = null;
2284 if ((r = result) == null || (s = other.result) == null) {
2285 d = new RunAfterBoth(this, other, action, dst, e);
2286 CompletionNode q = null, p = new CompletionNode(d);
2287 while ((r == null && (r = result) == null) ||
2288 (s == null && (s = other.result) == null)) {
2289 if (q != null) {
2290 if (s != null ||
2291 UNSAFE.compareAndSwapObject
2292 (other, COMPLETIONS, q.next = other.completions, q))
2293 break;
2294 }
2295 else if (r != null ||
2296 UNSAFE.compareAndSwapObject
2297 (this, COMPLETIONS, p.next = completions, p)) {
2298 if (s != null)
2299 break;
2300 q = new CompletionNode(d);
2301 }
2302 }
2303 }
2304 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2305 Throwable ex;
2306 if (r instanceof AltResult)
2307 ex = ((AltResult)r).ex;
2308 else
2309 ex = null;
2310 if (ex == null && (s instanceof AltResult))
2311 ex = ((AltResult)s).ex;
2312 if (ex == null) {
2313 try {
2314 if (e != null)
2315 e.execute(new AsyncRun(action, dst));
2316 else
2317 action.run();
2318 } catch (Throwable rex) {
2319 ex = rex;
2320 }
2321 }
2322 if (e == null || ex != null)
2323 dst.internalComplete(null, ex);
2324 }
2325 helpPostComplete();
2326 other.helpPostComplete();
2327 return dst;
2328 }
2329
2330 /**
2331 * Returns a new CompletableFuture that is completed
2332 * when either this or the other given CompletableFuture completes,
2333 * with the result of the given function of either this or the other
2334 * CompletableFuture's result.
2335 *
2336 * <p>If this and/or the other CompletableFuture complete
2337 * exceptionally, then the returned CompletableFuture may also do so,
2338 * with a CompletionException holding one of these exceptions as its
2339 * cause. No guarantees are made about which result or exception is
2340 * used in the returned CompletableFuture. If the supplied function
2341 * throws an exception, then the returned CompletableFuture completes
2342 * exceptionally with a CompletionException holding the exception as
2343 * its cause.
2344 *
2345 * @param other the other CompletableFuture
2346 * @param fn the function to use to compute the value of
2347 * the returned CompletableFuture
2348 * @return the new CompletableFuture
2349 */
2350 public <U> CompletableFuture<U> applyToEither
2351 (CompletableFuture<? extends T> other,
2352 Function<? super T, U> fn) {
2353 return doApplyToEither(other, fn, null);
2354 }
2355
2356 /**
2357 * Returns a new CompletableFuture that is asynchronously completed
2358 * when either this or the other given CompletableFuture completes,
2359 * with the result of the given function of either this or the other
2360 * CompletableFuture's result from a task running in the
2361 * {@link ForkJoinPool#commonPool()}.
2362 *
2363 * <p>If this and/or the other CompletableFuture complete
2364 * exceptionally, then the returned CompletableFuture may also do so,
2365 * with a CompletionException holding one of these exceptions as its
2366 * cause. No guarantees are made about which result or exception is
2367 * used in the returned CompletableFuture. If the supplied function
2368 * throws an exception, then the returned CompletableFuture completes
2369 * exceptionally with a CompletionException holding the exception as
2370 * its cause.
2371 *
2372 * @param other the other CompletableFuture
2373 * @param fn the function to use to compute the value of
2374 * the returned CompletableFuture
2375 * @return the new CompletableFuture
2376 */
2377 public <U> CompletableFuture<U> applyToEitherAsync
2378 (CompletableFuture<? extends T> other,
2379 Function<? super T, U> fn) {
2380 return doApplyToEither(other, fn, ForkJoinPool.commonPool());
2381 }
2382
2383 /**
2384 * Returns a new CompletableFuture that is asynchronously completed
2385 * when either this or the other given CompletableFuture completes,
2386 * with the result of the given function of either this or the other
2387 * CompletableFuture's result from a task running in the
2388 * given executor.
2389 *
2390 * <p>If this and/or the other CompletableFuture complete
2391 * exceptionally, then the returned CompletableFuture may also do so,
2392 * with a CompletionException holding one of these exceptions as its
2393 * cause. No guarantees are made about which result or exception is
2394 * used in the returned CompletableFuture. If the supplied function
2395 * throws an exception, then the returned CompletableFuture completes
2396 * exceptionally with a CompletionException holding the exception as
2397 * its cause.
2398 *
2399 * @param other the other CompletableFuture
2400 * @param fn the function to use to compute the value of
2401 * the returned CompletableFuture
2402 * @param executor the executor to use for asynchronous execution
2403 * @return the new CompletableFuture
2404 */
2405 public <U> CompletableFuture<U> applyToEitherAsync
2406 (CompletableFuture<? extends T> other,
2407 Function<? super T, U> fn,
2408 Executor executor) {
2409 if (executor == null) throw new NullPointerException();
2410 return doApplyToEither(other, fn, executor);
2411 }
2412
2413 private <U> CompletableFuture<U> doApplyToEither
2414 (CompletableFuture<? extends T> other,
2415 Function<? super T, U> fn,
2416 Executor e) {
2417 if (other == null || fn == null) throw new NullPointerException();
2418 CompletableFuture<U> dst = new CompletableFuture<U>();
2419 ApplyToEither<T,U> d = null;
2420 Object r;
2421 if ((r = result) == null && (r = other.result) == null) {
2422 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
2423 CompletionNode q = null, p = new CompletionNode(d);
2424 while ((r = result) == null && (r = other.result) == null) {
2425 if (q != null) {
2426 if (UNSAFE.compareAndSwapObject
2427 (other, COMPLETIONS, q.next = other.completions, q))
2428 break;
2429 }
2430 else if (UNSAFE.compareAndSwapObject
2431 (this, COMPLETIONS, p.next = completions, p))
2432 q = new CompletionNode(d);
2433 }
2434 }
2435 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2436 T t; Throwable ex;
2437 if (r instanceof AltResult) {
2438 ex = ((AltResult)r).ex;
2439 t = null;
2440 }
2441 else {
2442 ex = null;
2443 @SuppressWarnings("unchecked") T tr = (T) r;
2444 t = tr;
2445 }
2446 U u = null;
2447 if (ex == null) {
2448 try {
2449 if (e != null)
2450 e.execute(new AsyncApply<T,U>(t, fn, dst));
2451 else
2452 u = fn.apply(t);
2453 } catch (Throwable rex) {
2454 ex = rex;
2455 }
2456 }
2457 if (e == null || ex != null)
2458 dst.internalComplete(u, ex);
2459 }
2460 helpPostComplete();
2461 other.helpPostComplete();
2462 return dst;
2463 }
2464
2465 /**
2466 * Returns a new CompletableFuture that is completed
2467 * when either this or the other given CompletableFuture completes,
2468 * after performing the given action with the result of either this
2469 * or the other CompletableFuture's result.
2470 *
2471 * <p>If this and/or the other CompletableFuture complete
2472 * exceptionally, then the returned CompletableFuture may also do so,
2473 * with a CompletionException holding one of these exceptions as its
2474 * cause. No guarantees are made about which result or exception is
2475 * used in the returned CompletableFuture. If the supplied action
2476 * throws an exception, then the returned CompletableFuture completes
2477 * exceptionally with a CompletionException holding the exception as
2478 * its cause.
2479 *
2480 * @param other the other CompletableFuture
2481 * @param block the action to perform before completing the
2482 * returned CompletableFuture
2483 * @return the new CompletableFuture
2484 */
2485 public CompletableFuture<Void> acceptEither
2486 (CompletableFuture<? extends T> other,
2487 Consumer<? super T> block) {
2488 return doAcceptEither(other, block, null);
2489 }
2490
2491 /**
2492 * Returns a new CompletableFuture that is asynchronously completed
2493 * when either this or the other given CompletableFuture completes,
2494 * after performing the given action with the result of either this
2495 * or the other CompletableFuture's result from a task running in
2496 * the {@link ForkJoinPool#commonPool()}.
2497 *
2498 * <p>If this and/or the other CompletableFuture complete
2499 * exceptionally, then the returned CompletableFuture may also do so,
2500 * with a CompletionException holding one of these exceptions as its
2501 * cause. No guarantees are made about which result or exception is
2502 * used in the returned CompletableFuture. If the supplied action
2503 * throws an exception, then the returned CompletableFuture completes
2504 * exceptionally with a CompletionException holding the exception as
2505 * its cause.
2506 *
2507 * @param other the other CompletableFuture
2508 * @param block the action to perform before completing the
2509 * returned CompletableFuture
2510 * @return the new CompletableFuture
2511 */
2512 public CompletableFuture<Void> acceptEitherAsync
2513 (CompletableFuture<? extends T> other,
2514 Consumer<? super T> block) {
2515 return doAcceptEither(other, block, ForkJoinPool.commonPool());
2516 }
2517
2518 /**
2519 * Returns a new CompletableFuture that is asynchronously completed
2520 * when either this or the other given CompletableFuture completes,
2521 * after performing the given action with the result of either this
2522 * or the other CompletableFuture's result from a task running in
2523 * the given executor.
2524 *
2525 * <p>If this and/or the other CompletableFuture complete
2526 * exceptionally, then the returned CompletableFuture may also do so,
2527 * with a CompletionException holding one of these exceptions as its
2528 * cause. No guarantees are made about which result or exception is
2529 * used in the returned CompletableFuture. If the supplied action
2530 * throws an exception, then the returned CompletableFuture completes
2531 * exceptionally with a CompletionException holding the exception as
2532 * its cause.
2533 *
2534 * @param other the other CompletableFuture
2535 * @param block the action to perform before completing the
2536 * returned CompletableFuture
2537 * @param executor the executor to use for asynchronous execution
2538 * @return the new CompletableFuture
2539 */
2540 public CompletableFuture<Void> acceptEitherAsync
2541 (CompletableFuture<? extends T> other,
2542 Consumer<? super T> block,
2543 Executor executor) {
2544 if (executor == null) throw new NullPointerException();
2545 return doAcceptEither(other, block, executor);
2546 }
2547
2548 private CompletableFuture<Void> doAcceptEither
2549 (CompletableFuture<? extends T> other,
2550 Consumer<? super T> fn,
2551 Executor e) {
2552 if (other == null || fn == null) throw new NullPointerException();
2553 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2554 AcceptEither<T> d = null;
2555 Object r;
2556 if ((r = result) == null && (r = other.result) == null) {
2557 d = new AcceptEither<T>(this, other, fn, dst, e);
2558 CompletionNode q = null, p = new CompletionNode(d);
2559 while ((r = result) == null && (r = other.result) == null) {
2560 if (q != null) {
2561 if (UNSAFE.compareAndSwapObject
2562 (other, COMPLETIONS, q.next = other.completions, q))
2563 break;
2564 }
2565 else if (UNSAFE.compareAndSwapObject
2566 (this, COMPLETIONS, p.next = completions, p))
2567 q = new CompletionNode(d);
2568 }
2569 }
2570 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2571 T t; Throwable ex;
2572 if (r instanceof AltResult) {
2573 ex = ((AltResult)r).ex;
2574 t = null;
2575 }
2576 else {
2577 ex = null;
2578 @SuppressWarnings("unchecked") T tr = (T) r;
2579 t = tr;
2580 }
2581 if (ex == null) {
2582 try {
2583 if (e != null)
2584 e.execute(new AsyncAccept<T>(t, fn, dst));
2585 else
2586 fn.accept(t);
2587 } catch (Throwable rex) {
2588 ex = rex;
2589 }
2590 }
2591 if (e == null || ex != null)
2592 dst.internalComplete(null, ex);
2593 }
2594 helpPostComplete();
2595 other.helpPostComplete();
2596 return dst;
2597 }
2598
2599 /**
2600 * Returns a new CompletableFuture that is completed
2601 * when either this or the other given CompletableFuture completes,
2602 * after performing the given action.
2603 *
2604 * <p>If this and/or the other CompletableFuture complete
2605 * exceptionally, then the returned CompletableFuture may also do so,
2606 * with a CompletionException holding one of these exceptions as its
2607 * cause. No guarantees are made about which result or exception is
2608 * used in the returned CompletableFuture. If the supplied action
2609 * throws an exception, then the returned CompletableFuture completes
2610 * exceptionally with a CompletionException holding the exception as
2611 * its cause.
2612 *
2613 * @param other the other CompletableFuture
2614 * @param action the action to perform before completing the
2615 * returned CompletableFuture
2616 * @return the new CompletableFuture
2617 */
2618 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2619 Runnable action) {
2620 return doRunAfterEither(other, action, null);
2621 }
2622
2623 /**
2624 * Returns a new CompletableFuture that is asynchronously completed
2625 * when either this or the other given CompletableFuture completes,
2626 * after performing the given action from a task running in the
2627 * {@link ForkJoinPool#commonPool()}.
2628 *
2629 * <p>If this and/or the other CompletableFuture complete
2630 * exceptionally, then the returned CompletableFuture may also do so,
2631 * with a CompletionException holding one of these exceptions as its
2632 * cause. No guarantees are made about which result or exception is
2633 * used in the returned CompletableFuture. If the supplied action
2634 * throws an exception, then the returned CompletableFuture completes
2635 * exceptionally with a CompletionException holding the exception as
2636 * its cause.
2637 *
2638 * @param other the other CompletableFuture
2639 * @param action the action to perform before completing the
2640 * returned CompletableFuture
2641 * @return the new CompletableFuture
2642 */
2643 public CompletableFuture<Void> runAfterEitherAsync
2644 (CompletableFuture<?> other,
2645 Runnable action) {
2646 return doRunAfterEither(other, action, ForkJoinPool.commonPool());
2647 }
2648
2649 /**
2650 * Returns a new CompletableFuture that is asynchronously completed
2651 * when either this or the other given CompletableFuture completes,
2652 * after performing the given action from a task running in the
2653 * given executor.
2654 *
2655 * <p>If this and/or the other CompletableFuture complete
2656 * exceptionally, then the returned CompletableFuture may also do so,
2657 * with a CompletionException holding one of these exceptions as its
2658 * cause. No guarantees are made about which result or exception is
2659 * used in the returned CompletableFuture. If the supplied action
2660 * throws an exception, then the returned CompletableFuture completes
2661 * exceptionally with a CompletionException holding the exception as
2662 * its cause.
2663 *
2664 * @param other the other CompletableFuture
2665 * @param action the action to perform before completing the
2666 * returned CompletableFuture
2667 * @param executor the executor to use for asynchronous execution
2668 * @return the new CompletableFuture
2669 */
2670 public CompletableFuture<Void> runAfterEitherAsync
2671 (CompletableFuture<?> other,
2672 Runnable action,
2673 Executor executor) {
2674 if (executor == null) throw new NullPointerException();
2675 return doRunAfterEither(other, action, executor);
2676 }
2677
2678 private CompletableFuture<Void> doRunAfterEither
2679 (CompletableFuture<?> other,
2680 Runnable action,
2681 Executor e) {
2682 if (other == null || action == null) throw new NullPointerException();
2683 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2684 RunAfterEither d = null;
2685 Object r;
2686 if ((r = result) == null && (r = other.result) == null) {
2687 d = new RunAfterEither(this, other, action, dst, e);
2688 CompletionNode q = null, p = new CompletionNode(d);
2689 while ((r = result) == null && (r = other.result) == null) {
2690 if (q != null) {
2691 if (UNSAFE.compareAndSwapObject
2692 (other, COMPLETIONS, q.next = other.completions, q))
2693 break;
2694 }
2695 else if (UNSAFE.compareAndSwapObject
2696 (this, COMPLETIONS, p.next = completions, p))
2697 q = new CompletionNode(d);
2698 }
2699 }
2700 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2701 Throwable ex;
2702 if (r instanceof AltResult)
2703 ex = ((AltResult)r).ex;
2704 else
2705 ex = null;
2706 if (ex == null) {
2707 try {
2708 if (e != null)
2709 e.execute(new AsyncRun(action, dst));
2710 else
2711 action.run();
2712 } catch (Throwable rex) {
2713 ex = rex;
2714 }
2715 }
2716 if (e == null || ex != null)
2717 dst.internalComplete(null, ex);
2718 }
2719 helpPostComplete();
2720 other.helpPostComplete();
2721 return dst;
2722 }
2723
2724 /**
2725 * Returns a CompletableFuture that upon completion, has the same
2726 * value as produced by the given function of the result of this
2727 * CompletableFuture.
2728 *
2729 * <p>If this CompletableFuture completes exceptionally, then the
2730 * returned CompletableFuture also does so, with a
2731 * CompletionException holding this exception as its cause.
2732 * Similarly, if the computed CompletableFuture completes
2733 * exceptionally, then so does the returned CompletableFuture.
2734 *
2735 * @param fn the function returning a new CompletableFuture
2736 * @return the CompletableFuture
2737 */
2738 public <U> CompletableFuture<U> thenCompose
2739 (Function<? super T, CompletableFuture<U>> fn) {
2740 return doThenCompose(fn, null);
2741 }
2742
2743 /**
2744 * Returns a CompletableFuture that upon completion, has the same
2745 * value as that produced asynchronously using the {@link
2746 * ForkJoinPool#commonPool()} by the given function of the result
2747 * of this CompletableFuture.
2748 *
2749 * <p>If this CompletableFuture completes exceptionally, then the
2750 * returned CompletableFuture also does so, with a
2751 * CompletionException holding this exception as its cause.
2752 * Similarly, if the computed CompletableFuture completes
2753 * exceptionally, then so does the returned CompletableFuture.
2754 *
2755 * @param fn the function returning a new CompletableFuture
2756 * @return the CompletableFuture
2757 */
2758 public <U> CompletableFuture<U> thenComposeAsync
2759 (Function<? super T, CompletableFuture<U>> fn) {
2760 return doThenCompose(fn, ForkJoinPool.commonPool());
2761 }
2762
2763 /**
2764 * Returns a CompletableFuture that upon completion, has the same
2765 * value as that produced asynchronously using the given executor
2766 * by the given function of this CompletableFuture.
2767 *
2768 * <p>If this CompletableFuture completes exceptionally, then the
2769 * returned CompletableFuture also does so, with a
2770 * CompletionException holding this exception as its cause.
2771 * Similarly, if the computed CompletableFuture completes
2772 * exceptionally, then so does the returned CompletableFuture.
2773 *
2774 * @param fn the function returning a new CompletableFuture
2775 * @param executor the executor to use for asynchronous execution
2776 * @return the CompletableFuture
2777 */
2778 public <U> CompletableFuture<U> thenComposeAsync
2779 (Function<? super T, CompletableFuture<U>> fn,
2780 Executor executor) {
2781 if (executor == null) throw new NullPointerException();
2782 return doThenCompose(fn, executor);
2783 }
2784
2785 private <U> CompletableFuture<U> doThenCompose
2786 (Function<? super T, CompletableFuture<U>> fn,
2787 Executor e) {
2788 if (fn == null) throw new NullPointerException();
2789 CompletableFuture<U> dst = null;
2790 ThenCompose<T,U> d = null;
2791 Object r;
2792 if ((r = result) == null) {
2793 dst = new CompletableFuture<U>();
2794 CompletionNode p = new CompletionNode
2795 (d = new ThenCompose<T,U>(this, fn, dst, e));
2796 while ((r = result) == null) {
2797 if (UNSAFE.compareAndSwapObject
2798 (this, COMPLETIONS, p.next = completions, p))
2799 break;
2800 }
2801 }
2802 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2803 T t; Throwable ex;
2804 if (r instanceof AltResult) {
2805 ex = ((AltResult)r).ex;
2806 t = null;
2807 }
2808 else {
2809 ex = null;
2810 @SuppressWarnings("unchecked") T tr = (T) r;
2811 t = tr;
2812 }
2813 if (ex == null) {
2814 if (e != null) {
2815 if (dst == null)
2816 dst = new CompletableFuture<U>();
2817 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2818 }
2819 else {
2820 try {
2821 if ((dst = fn.apply(t)) == null)
2822 ex = new NullPointerException();
2823 } catch (Throwable rex) {
2824 ex = rex;
2825 }
2826 }
2827 }
2828 if (dst == null)
2829 dst = new CompletableFuture<U>();
2830 if (e == null && ex != null)
2831 dst.internalComplete(null, ex);
2832 }
2833 helpPostComplete();
2834 dst.helpPostComplete();
2835 return dst;
2836 }
2837
2838 /**
2839 * Returns a new CompletableFuture that is completed when this
2840 * CompletableFuture completes, with the result of the given
2841 * function of the exception triggering this CompletableFuture's
2842 * completion when it completes exceptionally; otherwise, if this
2843 * CompletableFuture completes normally, then the returned
2844 * CompletableFuture also completes normally with the same value.
2845 *
2846 * @param fn the function to use to compute the value of the
2847 * returned CompletableFuture if this CompletableFuture completed
2848 * exceptionally
2849 * @return the new CompletableFuture
2850 */
2851 public CompletableFuture<T> exceptionally
2852 (Function<Throwable, ? extends T> fn) {
2853 if (fn == null) throw new NullPointerException();
2854 CompletableFuture<T> dst = new CompletableFuture<T>();
2855 ExceptionCompletion<T> d = null;
2856 Object r;
2857 if ((r = result) == null) {
2858 CompletionNode p =
2859 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2860 while ((r = result) == null) {
2861 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2862 p.next = completions, p))
2863 break;
2864 }
2865 }
2866 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2867 T t = null; Throwable ex, dx = null;
2868 if (r instanceof AltResult) {
2869 if ((ex = ((AltResult)r).ex) != null) {
2870 try {
2871 t = fn.apply(ex);
2872 } catch (Throwable rex) {
2873 dx = rex;
2874 }
2875 }
2876 }
2877 else {
2878 @SuppressWarnings("unchecked") T tr = (T) r;
2879 t = tr;
2880 }
2881 dst.internalComplete(t, dx);
2882 }
2883 helpPostComplete();
2884 return dst;
2885 }
2886
2887 /**
2888 * Returns a new CompletableFuture that is completed when this
2889 * CompletableFuture completes, with the result of the given
2890 * function of the result and exception of this CompletableFuture's
2891 * completion. The given function is invoked with the result (or
2892 * {@code null} if none) and the exception (or {@code null} if none)
2893 * of this CompletableFuture when complete.
2894 *
2895 * @param fn the function to use to compute the value of the
2896 * returned CompletableFuture
2897 * @return the new CompletableFuture
2898 */
2899 public <U> CompletableFuture<U> handle
2900 (BiFunction<? super T, Throwable, ? extends U> fn) {
2901 if (fn == null) throw new NullPointerException();
2902 CompletableFuture<U> dst = new CompletableFuture<U>();
2903 HandleCompletion<T,U> d = null;
2904 Object r;
2905 if ((r = result) == null) {
2906 CompletionNode p =
2907 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2908 while ((r = result) == null) {
2909 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2910 p.next = completions, p))
2911 break;
2912 }
2913 }
2914 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2915 T t; Throwable ex;
2916 if (r instanceof AltResult) {
2917 ex = ((AltResult)r).ex;
2918 t = null;
2919 }
2920 else {
2921 ex = null;
2922 @SuppressWarnings("unchecked") T tr = (T) r;
2923 t = tr;
2924 }
2925 U u; Throwable dx;
2926 try {
2927 u = fn.apply(t, ex);
2928 dx = null;
2929 } catch (Throwable rex) {
2930 dx = rex;
2931 u = null;
2932 }
2933 dst.internalComplete(u, dx);
2934 }
2935 helpPostComplete();
2936 return dst;
2937 }
2938
2939
2940 /* ------------- Arbitrary-arity constructions -------------- */
2941
2942 /*
2943 * The basic plan of attack is to recursively form binary
2944 * completion trees of elements. This can be overkill for small
2945 * sets, but scales nicely. The And/All vs Or/Any forms use the
2946 * same idea, but details differ.
2947 */
2948
2949 /**
2950 * Returns a new CompletableFuture that is completed when all of
2951 * the given CompletableFutures complete. If any of the given
2952 * CompletableFutures complete exceptionally, then the returned
2953 * CompletableFuture also does so, with a CompletionException
2954 * holding this exception as its cause. Otherwise, the results,
2955 * if any, of the given CompletableFutures are not reflected in
2956 * the returned CompletableFuture, but may be obtained by
2957 * inspecting them individually. If no CompletableFutures are
2958 * provided, returns a CompletableFuture completed with the value
2959 * {@code null}.
2960 *
2961 * <p>Among the applications of this method is to await completion
2962 * of a set of independent CompletableFutures before continuing a
2963 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2964 * c3).join();}.
2965 *
2966 * @param cfs the CompletableFutures
2967 * @return a new CompletableFuture that is completed when all of the
2968 * given CompletableFutures complete
2969 * @throws NullPointerException if the array or any of its elements are
2970 * {@code null}
2971 */
2972 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2973 int len = cfs.length; // Directly handle empty and singleton cases
2974 if (len > 1)
2975 return allTree(cfs, 0, len - 1);
2976 else {
2977 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2978 CompletableFuture<?> f;
2979 if (len == 0)
2980 dst.result = NIL;
2981 else if ((f = cfs[0]) == null)
2982 throw new NullPointerException();
2983 else {
2984 ThenPropagate d = null;
2985 CompletionNode p = null;
2986 Object r;
2987 while ((r = f.result) == null) {
2988 if (d == null)
2989 d = new ThenPropagate(f, dst);
2990 else if (p == null)
2991 p = new CompletionNode(d);
2992 else if (UNSAFE.compareAndSwapObject
2993 (f, COMPLETIONS, p.next = f.completions, p))
2994 break;
2995 }
2996 if (r != null && (d == null || d.compareAndSet(0, 1)))
2997 dst.internalComplete(null, (r instanceof AltResult) ?
2998 ((AltResult)r).ex : null);
2999 f.helpPostComplete();
3000 }
3001 return dst;
3002 }
3003 }
3004
3005 /**
3006 * Recursively constructs an And'ed tree of CompletableFutures.
3007 * Called only when array known to have at least two elements.
3008 */
3009 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
3010 int lo, int hi) {
3011 CompletableFuture<?> fst, snd;
3012 int mid = (lo + hi) >>> 1;
3013 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
3014 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
3015 throw new NullPointerException();
3016 CompletableFuture<Void> dst = new CompletableFuture<Void>();
3017 AndCompletion d = null;
3018 CompletionNode p = null, q = null;
3019 Object r = null, s = null;
3020 while ((r = fst.result) == null || (s = snd.result) == null) {
3021 if (d == null)
3022 d = new AndCompletion(fst, snd, dst);
3023 else if (p == null)
3024 p = new CompletionNode(d);
3025 else if (q == null) {
3026 if (UNSAFE.compareAndSwapObject
3027 (fst, COMPLETIONS, p.next = fst.completions, p))
3028 q = new CompletionNode(d);
3029 }
3030 else if (UNSAFE.compareAndSwapObject
3031 (snd, COMPLETIONS, q.next = snd.completions, q))
3032 break;
3033 }
3034 if ((r != null || (r = fst.result) != null) &&
3035 (s != null || (s = snd.result) != null) &&
3036 (d == null || d.compareAndSet(0, 1))) {
3037 Throwable ex;
3038 if (r instanceof AltResult)
3039 ex = ((AltResult)r).ex;
3040 else
3041 ex = null;
3042 if (ex == null && (s instanceof AltResult))
3043 ex = ((AltResult)s).ex;
3044 dst.internalComplete(null, ex);
3045 }
3046 fst.helpPostComplete();
3047 snd.helpPostComplete();
3048 return dst;
3049 }
3050
3051 /**
3052 * Returns a new CompletableFuture that is completed when any of
3053 * the given CompletableFutures complete, with the same result.
3054 * Otherwise, if it completed exceptionally, the returned
3055 * CompletableFuture also does so, with a CompletionException
3056 * holding this exception as its cause. If no CompletableFutures
3057 * are provided, returns an incomplete CompletableFuture.
3058 *
3059 * @param cfs the CompletableFutures
3060 * @return a new CompletableFuture that is completed with the
3061 * result or exception of any of the given CompletableFutures when
3062 * one completes
3063 * @throws NullPointerException if the array or any of its elements are
3064 * {@code null}
3065 */
3066 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
3067 int len = cfs.length; // Same idea as allOf
3068 if (len > 1)
3069 return anyTree(cfs, 0, len - 1);
3070 else {
3071 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3072 CompletableFuture<?> f;
3073 if (len == 0)
3074 ; // skip
3075 else if ((f = cfs[0]) == null)
3076 throw new NullPointerException();
3077 else {
3078 ThenCopy<Object> d = null;
3079 CompletionNode p = null;
3080 Object r;
3081 while ((r = f.result) == null) {
3082 if (d == null)
3083 d = new ThenCopy<Object>(f, dst);
3084 else if (p == null)
3085 p = new CompletionNode(d);
3086 else if (UNSAFE.compareAndSwapObject
3087 (f, COMPLETIONS, p.next = f.completions, p))
3088 break;
3089 }
3090 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3091 Throwable ex; Object t;
3092 if (r instanceof AltResult) {
3093 ex = ((AltResult)r).ex;
3094 t = null;
3095 }
3096 else {
3097 ex = null;
3098 t = r;
3099 }
3100 dst.internalComplete(t, ex);
3101 }
3102 f.helpPostComplete();
3103 }
3104 return dst;
3105 }
3106 }
3107
3108 /**
3109 * Recursively constructs an Or'ed tree of CompletableFutures.
3110 */
3111 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
3112 int lo, int hi) {
3113 CompletableFuture<?> fst, snd;
3114 int mid = (lo + hi) >>> 1;
3115 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3116 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3117 throw new NullPointerException();
3118 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3119 OrCompletion d = null;
3120 CompletionNode p = null, q = null;
3121 Object r;
3122 while ((r = fst.result) == null && (r = snd.result) == null) {
3123 if (d == null)
3124 d = new OrCompletion(fst, snd, dst);
3125 else if (p == null)
3126 p = new CompletionNode(d);
3127 else if (q == null) {
3128 if (UNSAFE.compareAndSwapObject
3129 (fst, COMPLETIONS, p.next = fst.completions, p))
3130 q = new CompletionNode(d);
3131 }
3132 else if (UNSAFE.compareAndSwapObject
3133 (snd, COMPLETIONS, q.next = snd.completions, q))
3134 break;
3135 }
3136 if ((r != null || (r = fst.result) != null ||
3137 (r = snd.result) != null) &&
3138 (d == null || d.compareAndSet(0, 1))) {
3139 Throwable ex; Object t;
3140 if (r instanceof AltResult) {
3141 ex = ((AltResult)r).ex;
3142 t = null;
3143 }
3144 else {
3145 ex = null;
3146 t = r;
3147 }
3148 dst.internalComplete(t, ex);
3149 }
3150 fst.helpPostComplete();
3151 snd.helpPostComplete();
3152 return dst;
3153 }
3154
3155 /* ------------- Control and status methods -------------- */
3156
3157 /**
3158 * If not already completed, completes this CompletableFuture with
3159 * a {@link CancellationException}. Dependent CompletableFutures
3160 * that have not already completed will also complete
3161 * exceptionally, with a {@link CompletionException} caused by
3162 * this {@code CancellationException}.
3163 *
3164 * @param mayInterruptIfRunning this value has no effect in this
3165 * implementation because interrupts are not used to control
3166 * processing.
3167 *
3168 * @return {@code true} if this task is now cancelled
3169 */
3170 public boolean cancel(boolean mayInterruptIfRunning) {
3171 boolean cancelled = (result == null) &&
3172 UNSAFE.compareAndSwapObject
3173 (this, RESULT, null, new AltResult(new CancellationException()));
3174 postComplete();
3175 return cancelled || isCancelled();
3176 }
3177
3178 /**
3179 * Returns {@code true} if this CompletableFuture was cancelled
3180 * before it completed normally.
3181 *
3182 * @return {@code true} if this CompletableFuture was cancelled
3183 * before it completed normally
3184 */
3185 public boolean isCancelled() {
3186 Object r;
3187 return ((r = result) instanceof AltResult) &&
3188 (((AltResult)r).ex instanceof CancellationException);
3189 }
3190
3191 /**
3192 * Forcibly sets or resets the value subsequently returned by
3193 * method {@link #get()} and related methods, whether or not
3194 * already completed. This method is designed for use only in
3195 * error recovery actions, and even in such situations may result
3196 * in ongoing dependent completions using established versus
3197 * overwritten outcomes.
3198 *
3199 * @param value the completion value
3200 */
3201 public void obtrudeValue(T value) {
3202 result = (value == null) ? NIL : value;
3203 postComplete();
3204 }
3205
3206 /**
3207 * Forcibly causes subsequent invocations of method {@link #get()}
3208 * and related methods to throw the given exception, whether or
3209 * not already completed. This method is designed for use only in
3210 * recovery actions, and even in such situations may result in
3211 * ongoing dependent completions using established versus
3212 * overwritten outcomes.
3213 *
3214 * @param ex the exception
3215 */
3216 public void obtrudeException(Throwable ex) {
3217 if (ex == null) throw new NullPointerException();
3218 result = new AltResult(ex);
3219 postComplete();
3220 }
3221
3222 /**
3223 * Returns the estimated number of CompletableFutures whose
3224 * completions are awaiting completion of this CompletableFuture.
3225 * This method is designed for use in monitoring system state, not
3226 * for synchronization control.
3227 *
3228 * @return the number of dependent CompletableFutures
3229 */
3230 public int getNumberOfDependents() {
3231 int count = 0;
3232 for (CompletionNode p = completions; p != null; p = p.next)
3233 ++count;
3234 return count;
3235 }
3236
3237 /**
3238 * Returns a string identifying this CompletableFuture, as well as
3239 * its completion state. The state, in brackets, contains the
3240 * String {@code "Completed Normally"} or the String {@code
3241 * "Completed Exceptionally"}, or the String {@code "Not
3242 * completed"} followed by the number of CompletableFutures
3243 * dependent upon its completion, if any.
3244 *
3245 * @return a string identifying this CompletableFuture, as well as its state
3246 */
3247 public String toString() {
3248 Object r = result;
3249 int count;
3250 return super.toString() +
3251 ((r == null) ?
3252 (((count = getNumberOfDependents()) == 0) ?
3253 "[Not completed]" :
3254 "[Not completed, " + count + " dependents]") :
3255 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3256 "[Completed exceptionally]" :
3257 "[Completed normally]"));
3258 }
3259
3260 // Unsafe mechanics
3261 private static final sun.misc.Unsafe UNSAFE;
3262 private static final long RESULT;
3263 private static final long WAITERS;
3264 private static final long COMPLETIONS;
3265 static {
3266 try {
3267 UNSAFE = sun.misc.Unsafe.getUnsafe();
3268 Class<?> k = CompletableFuture.class;
3269 RESULT = UNSAFE.objectFieldOffset
3270 (k.getDeclaredField("result"));
3271 WAITERS = UNSAFE.objectFieldOffset
3272 (k.getDeclaredField("waiters"));
3273 COMPLETIONS = UNSAFE.objectFieldOffset
3274 (k.getDeclaredField("completions"));
3275 } catch (Exception e) {
3276 throw new Error(e);
3277 }
3278 }
3279 }