ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.80
Committed: Mon Apr 1 20:16:05 2013 UTC (11 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.79: +4 -0 lines
Log Message:
Specify class-level NPE policy

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on
37 * user-provided Functions, Consumers, or Runnables. The appropriate
38 * form to use depends on whether actions require arguments and/or
39 * produce results. Completion of a dependent action will trigger the
40 * completion of another CompletableFuture. Actions may also be
41 * triggered after either or both the current and another
42 * CompletableFuture complete. Multiple CompletableFutures may also
43 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44 * {@link #allOf(CompletableFuture...)}.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, actions supplied for dependent completions of another
48 * CompletableFuture may do so, depending on whether they are provided
49 * via one of the <em>async</em> methods (that is, methods with names
50 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51 * methods provide a way to commence asynchronous processing of an
52 * action using either a given {@link Executor} or by default the
53 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54 * debugging, and tracking, all generated asynchronous tasks are
55 * instances of the marker interface {@link AsynchronousCompletionTask}.
56 *
57 * <p>Actions supplied for dependent completions of <em>non-async</em>
58 * methods may be performed by the thread that completes the current
59 * CompletableFuture, or by any other caller of these methods. There
60 * are no guarantees about the order of processing completions unless
61 * constrained by these methods.
62 *
63 * <p>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional completion.
66 * Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}.
68 *
69 * <p>Upon exceptional completion (including cancellation), or when a
70 * completion entails an additional computation which terminates
71 * abruptly with an (unchecked) exception or error, then all of their
72 * dependent completions (and their dependents in turn) generally act
73 * as {@code completeExceptionally} with a {@link CompletionException}
74 * holding that exception as its cause. However, the {@link
75 * #exceptionally exceptionally} and {@link #handle handle}
76 * completions <em>are</em> able to handle exceptional completions of
77 * the CompletableFutures they depend on.
78 *
79 * <p>In case of exceptional completion with a CompletionException,
80 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81 * {@link ExecutionException} with the same cause as held in the
82 * corresponding CompletionException. However, in these cases,
83 * methods {@link #join()} and {@link #getNow} throw the
84 * CompletionException, which simplifies usage.
85 *
86 * <p>Arguments used to pass a completion result (that is, for parameters
87 * of type {@code T}) may be null, but passing a null value for any other
88 * parameter will result in a {@link NullPointerException} being thrown.
89 *
90 * @author Doug Lea
91 * @since 1.8
92 */
93 public class CompletableFuture<T> implements Future<T> {
94
95 /*
96 * Overview:
97 *
98 * 1. Non-nullness of field result (set via CAS) indicates done.
99 * An AltResult is used to box null as a result, as well as to
100 * hold exceptions. Using a single field makes completion fast
101 * and simple to detect and trigger, at the expense of a lot of
102 * encoding and decoding that infiltrates many methods. One minor
103 * simplification relies on the (static) NIL (to box null results)
104 * being the only AltResult with a null exception field, so we
105 * don't usually need explicit comparisons with NIL. The CF
106 * exception propagation mechanics surrounding decoding rely on
107 * unchecked casts of decoded results really being unchecked,
108 * where user type errors are caught at point of use, as is
109 * currently the case in Java. These are highlighted by using
110 * SuppressWarnings-annotated temporaries.
111 *
112 * 2. Waiters are held in a Treiber stack similar to the one used
113 * in FutureTask, Phaser, and SynchronousQueue. See their
114 * internal documentation for algorithmic details.
115 *
116 * 3. Completions are also kept in a list/stack, and pulled off
117 * and run when completion is triggered. (We could even use the
118 * same stack as for waiters, but would give up the potential
119 * parallelism obtained because woken waiters help release/run
120 * others -- see method postComplete). Because post-processing
121 * may race with direct calls, class Completion opportunistically
122 * extends AtomicInteger so callers can claim the action via
123 * compareAndSet(0, 1). The Completion.run methods are all
124 * written a boringly similar uniform way (that sometimes includes
125 * unnecessary-looking checks, kept to maintain uniformity).
126 * There are enough dimensions upon which they differ that
127 * attempts to factor commonalities while maintaining efficiency
128 * require more lines of code than they would save.
129 *
130 * 4. The exported then/and/or methods do support a bit of
131 * factoring (see doThenApply etc). They must cope with the
132 * intrinsic races surrounding addition of a dependent action
133 * versus performing the action directly because the task is
134 * already complete. For example, a CF may not be complete upon
135 * entry, so a dependent completion is added, but by the time it
136 * is added, the target CF is complete, so must be directly
137 * executed. This is all done while avoiding unnecessary object
138 * construction in safe-bypass cases.
139 */
140
141 // preliminaries
142
143 static final class AltResult {
144 final Throwable ex; // null only for NIL
145 AltResult(Throwable ex) { this.ex = ex; }
146 }
147
148 static final AltResult NIL = new AltResult(null);
149
150 // Fields
151
152 volatile Object result; // Either the result or boxed AltResult
153 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
154 volatile CompletionNode completions; // list (Treiber stack) of completions
155
156 // Basic utilities for triggering and processing completions
157
158 /**
159 * Removes and signals all waiting threads and runs all completions.
160 */
161 final void postComplete() {
162 WaitNode q; Thread t;
163 while ((q = waiters) != null) {
164 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
165 (t = q.thread) != null) {
166 q.thread = null;
167 LockSupport.unpark(t);
168 }
169 }
170
171 CompletionNode h; Completion c;
172 while ((h = completions) != null) {
173 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
174 (c = h.completion) != null)
175 c.run();
176 }
177 }
178
179 /**
180 * Triggers completion with the encoding of the given arguments:
181 * if the exception is non-null, encodes it as a wrapped
182 * CompletionException unless it is one already. Otherwise uses
183 * the given result, boxed as NIL if null.
184 */
185 final void internalComplete(T v, Throwable ex) {
186 if (result == null)
187 UNSAFE.compareAndSwapObject
188 (this, RESULT, null,
189 (ex == null) ? (v == null) ? NIL : v :
190 new AltResult((ex instanceof CompletionException) ? ex :
191 new CompletionException(ex)));
192 postComplete(); // help out even if not triggered
193 }
194
195 /**
196 * If triggered, helps release and/or process completions.
197 */
198 final void helpPostComplete() {
199 if (result != null)
200 postComplete();
201 }
202
203 /* ------------- waiting for completions -------------- */
204
205 /** Number of processors, for spin control */
206 static final int NCPU = Runtime.getRuntime().availableProcessors();
207
208 /**
209 * Heuristic spin value for waitingGet() before blocking on
210 * multiprocessors
211 */
212 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
213
214 /**
215 * Linked nodes to record waiting threads in a Treiber stack. See
216 * other classes such as Phaser and SynchronousQueue for more
217 * detailed explanation. This class implements ManagedBlocker to
218 * avoid starvation when blocking actions pile up in
219 * ForkJoinPools.
220 */
221 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
222 long nanos; // wait time if timed
223 final long deadline; // non-zero if timed
224 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
225 volatile Thread thread;
226 volatile WaitNode next;
227 WaitNode(boolean interruptible, long nanos, long deadline) {
228 this.thread = Thread.currentThread();
229 this.interruptControl = interruptible ? 1 : 0;
230 this.nanos = nanos;
231 this.deadline = deadline;
232 }
233 public boolean isReleasable() {
234 if (thread == null)
235 return true;
236 if (Thread.interrupted()) {
237 int i = interruptControl;
238 interruptControl = -1;
239 if (i > 0)
240 return true;
241 }
242 if (deadline != 0L &&
243 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
244 thread = null;
245 return true;
246 }
247 return false;
248 }
249 public boolean block() {
250 if (isReleasable())
251 return true;
252 else if (deadline == 0L)
253 LockSupport.park(this);
254 else if (nanos > 0L)
255 LockSupport.parkNanos(this, nanos);
256 return isReleasable();
257 }
258 }
259
260 /**
261 * Returns raw result after waiting, or null if interruptible and
262 * interrupted.
263 */
264 private Object waitingGet(boolean interruptible) {
265 WaitNode q = null;
266 boolean queued = false;
267 int spins = SPINS;
268 for (Object r;;) {
269 if ((r = result) != null) {
270 if (q != null) { // suppress unpark
271 q.thread = null;
272 if (q.interruptControl < 0) {
273 if (interruptible) {
274 removeWaiter(q);
275 return null;
276 }
277 Thread.currentThread().interrupt();
278 }
279 }
280 postComplete(); // help release others
281 return r;
282 }
283 else if (spins > 0) {
284 int rnd = ThreadLocalRandom.nextSecondarySeed();
285 if (rnd == 0)
286 rnd = ThreadLocalRandom.current().nextInt();
287 if (rnd >= 0)
288 --spins;
289 }
290 else if (q == null)
291 q = new WaitNode(interruptible, 0L, 0L);
292 else if (!queued)
293 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
294 q.next = waiters, q);
295 else if (interruptible && q.interruptControl < 0) {
296 removeWaiter(q);
297 return null;
298 }
299 else if (q.thread != null && result == null) {
300 try {
301 ForkJoinPool.managedBlock(q);
302 } catch (InterruptedException ex) {
303 q.interruptControl = -1;
304 }
305 }
306 }
307 }
308
309 /**
310 * Awaits completion or aborts on interrupt or timeout.
311 *
312 * @param nanos time to wait
313 * @return raw result
314 */
315 private Object timedAwaitDone(long nanos)
316 throws InterruptedException, TimeoutException {
317 WaitNode q = null;
318 boolean queued = false;
319 for (Object r;;) {
320 if ((r = result) != null) {
321 if (q != null) {
322 q.thread = null;
323 if (q.interruptControl < 0) {
324 removeWaiter(q);
325 throw new InterruptedException();
326 }
327 }
328 postComplete();
329 return r;
330 }
331 else if (q == null) {
332 if (nanos <= 0L)
333 throw new TimeoutException();
334 long d = System.nanoTime() + nanos;
335 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
336 }
337 else if (!queued)
338 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
339 q.next = waiters, q);
340 else if (q.interruptControl < 0) {
341 removeWaiter(q);
342 throw new InterruptedException();
343 }
344 else if (q.nanos <= 0L) {
345 if (result == null) {
346 removeWaiter(q);
347 throw new TimeoutException();
348 }
349 }
350 else if (q.thread != null && result == null) {
351 try {
352 ForkJoinPool.managedBlock(q);
353 } catch (InterruptedException ex) {
354 q.interruptControl = -1;
355 }
356 }
357 }
358 }
359
360 /**
361 * Tries to unlink a timed-out or interrupted wait node to avoid
362 * accumulating garbage. Internal nodes are simply unspliced
363 * without CAS since it is harmless if they are traversed anyway
364 * by releasers. To avoid effects of unsplicing from already
365 * removed nodes, the list is retraversed in case of an apparent
366 * race. This is slow when there are a lot of nodes, but we don't
367 * expect lists to be long enough to outweigh higher-overhead
368 * schemes.
369 */
370 private void removeWaiter(WaitNode node) {
371 if (node != null) {
372 node.thread = null;
373 retry:
374 for (;;) { // restart on removeWaiter race
375 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
376 s = q.next;
377 if (q.thread != null)
378 pred = q;
379 else if (pred != null) {
380 pred.next = s;
381 if (pred.thread == null) // check for race
382 continue retry;
383 }
384 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
385 continue retry;
386 }
387 break;
388 }
389 }
390 }
391
392 /* ------------- Async tasks -------------- */
393
394 /**
395 * A marker interface identifying asynchronous tasks produced by
396 * {@code async} methods. This may be useful for monitoring,
397 * debugging, and tracking asynchronous activities.
398 *
399 * @since 1.8
400 */
401 public static interface AsynchronousCompletionTask {
402 }
403
404 /** Base class can act as either FJ or plain Runnable */
405 abstract static class Async extends ForkJoinTask<Void>
406 implements Runnable, AsynchronousCompletionTask {
407 public final Void getRawResult() { return null; }
408 public final void setRawResult(Void v) { }
409 public final void run() { exec(); }
410 }
411
412 static final class AsyncRun extends Async {
413 final Runnable fn;
414 final CompletableFuture<Void> dst;
415 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
416 this.fn = fn; this.dst = dst;
417 }
418 public final boolean exec() {
419 CompletableFuture<Void> d; Throwable ex;
420 if ((d = this.dst) != null && d.result == null) {
421 try {
422 fn.run();
423 ex = null;
424 } catch (Throwable rex) {
425 ex = rex;
426 }
427 d.internalComplete(null, ex);
428 }
429 return true;
430 }
431 private static final long serialVersionUID = 5232453952276885070L;
432 }
433
434 static final class AsyncSupply<U> extends Async {
435 final Supplier<U> fn;
436 final CompletableFuture<U> dst;
437 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
438 this.fn = fn; this.dst = dst;
439 }
440 public final boolean exec() {
441 CompletableFuture<U> d; U u; Throwable ex;
442 if ((d = this.dst) != null && d.result == null) {
443 try {
444 u = fn.get();
445 ex = null;
446 } catch (Throwable rex) {
447 ex = rex;
448 u = null;
449 }
450 d.internalComplete(u, ex);
451 }
452 return true;
453 }
454 private static final long serialVersionUID = 5232453952276885070L;
455 }
456
457 static final class AsyncApply<T,U> extends Async {
458 final T arg;
459 final Function<? super T,? extends U> fn;
460 final CompletableFuture<U> dst;
461 AsyncApply(T arg, Function<? super T,? extends U> fn,
462 CompletableFuture<U> dst) {
463 this.arg = arg; this.fn = fn; this.dst = dst;
464 }
465 public final boolean exec() {
466 CompletableFuture<U> d; U u; Throwable ex;
467 if ((d = this.dst) != null && d.result == null) {
468 try {
469 u = fn.apply(arg);
470 ex = null;
471 } catch (Throwable rex) {
472 ex = rex;
473 u = null;
474 }
475 d.internalComplete(u, ex);
476 }
477 return true;
478 }
479 private static final long serialVersionUID = 5232453952276885070L;
480 }
481
482 static final class AsyncBiApply<T,U,V> extends Async {
483 final T arg1;
484 final U arg2;
485 final BiFunction<? super T,? super U,? extends V> fn;
486 final CompletableFuture<V> dst;
487 AsyncBiApply(T arg1, U arg2,
488 BiFunction<? super T,? super U,? extends V> fn,
489 CompletableFuture<V> dst) {
490 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
491 }
492 public final boolean exec() {
493 CompletableFuture<V> d; V v; Throwable ex;
494 if ((d = this.dst) != null && d.result == null) {
495 try {
496 v = fn.apply(arg1, arg2);
497 ex = null;
498 } catch (Throwable rex) {
499 ex = rex;
500 v = null;
501 }
502 d.internalComplete(v, ex);
503 }
504 return true;
505 }
506 private static final long serialVersionUID = 5232453952276885070L;
507 }
508
509 static final class AsyncAccept<T> extends Async {
510 final T arg;
511 final Consumer<? super T> fn;
512 final CompletableFuture<Void> dst;
513 AsyncAccept(T arg, Consumer<? super T> fn,
514 CompletableFuture<Void> dst) {
515 this.arg = arg; this.fn = fn; this.dst = dst;
516 }
517 public final boolean exec() {
518 CompletableFuture<Void> d; Throwable ex;
519 if ((d = this.dst) != null && d.result == null) {
520 try {
521 fn.accept(arg);
522 ex = null;
523 } catch (Throwable rex) {
524 ex = rex;
525 }
526 d.internalComplete(null, ex);
527 }
528 return true;
529 }
530 private static final long serialVersionUID = 5232453952276885070L;
531 }
532
533 static final class AsyncBiAccept<T,U> extends Async {
534 final T arg1;
535 final U arg2;
536 final BiConsumer<? super T,? super U> fn;
537 final CompletableFuture<Void> dst;
538 AsyncBiAccept(T arg1, U arg2,
539 BiConsumer<? super T,? super U> fn,
540 CompletableFuture<Void> dst) {
541 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
542 }
543 public final boolean exec() {
544 CompletableFuture<Void> d; Throwable ex;
545 if ((d = this.dst) != null && d.result == null) {
546 try {
547 fn.accept(arg1, arg2);
548 ex = null;
549 } catch (Throwable rex) {
550 ex = rex;
551 }
552 d.internalComplete(null, ex);
553 }
554 return true;
555 }
556 private static final long serialVersionUID = 5232453952276885070L;
557 }
558
559 static final class AsyncCompose<T,U> extends Async {
560 final T arg;
561 final Function<? super T, CompletableFuture<U>> fn;
562 final CompletableFuture<U> dst;
563 AsyncCompose(T arg,
564 Function<? super T, CompletableFuture<U>> fn,
565 CompletableFuture<U> dst) {
566 this.arg = arg; this.fn = fn; this.dst = dst;
567 }
568 public final boolean exec() {
569 CompletableFuture<U> d, fr; U u; Throwable ex;
570 if ((d = this.dst) != null && d.result == null) {
571 try {
572 fr = fn.apply(arg);
573 ex = (fr == null) ? new NullPointerException() : null;
574 } catch (Throwable rex) {
575 ex = rex;
576 fr = null;
577 }
578 if (ex != null)
579 u = null;
580 else {
581 Object r = fr.result;
582 if (r == null)
583 r = fr.waitingGet(false);
584 if (r instanceof AltResult) {
585 ex = ((AltResult)r).ex;
586 u = null;
587 }
588 else {
589 @SuppressWarnings("unchecked") U ur = (U) r;
590 u = ur;
591 }
592 }
593 d.internalComplete(u, ex);
594 }
595 return true;
596 }
597 private static final long serialVersionUID = 5232453952276885070L;
598 }
599
600 /* ------------- Completions -------------- */
601
602 /**
603 * Simple linked list nodes to record completions, used in
604 * basically the same way as WaitNodes. (We separate nodes from
605 * the Completions themselves mainly because for the And and Or
606 * methods, the same Completion object resides in two lists.)
607 */
608 static final class CompletionNode {
609 final Completion completion;
610 volatile CompletionNode next;
611 CompletionNode(Completion completion) { this.completion = completion; }
612 }
613
614 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
615 abstract static class Completion extends AtomicInteger implements Runnable {
616 }
617
618 static final class ApplyCompletion<T,U> extends Completion {
619 final CompletableFuture<? extends T> src;
620 final Function<? super T,? extends U> fn;
621 final CompletableFuture<U> dst;
622 final Executor executor;
623 ApplyCompletion(CompletableFuture<? extends T> src,
624 Function<? super T,? extends U> fn,
625 CompletableFuture<U> dst, Executor executor) {
626 this.src = src; this.fn = fn; this.dst = dst;
627 this.executor = executor;
628 }
629 public final void run() {
630 final CompletableFuture<? extends T> a;
631 final Function<? super T,? extends U> fn;
632 final CompletableFuture<U> dst;
633 Object r; T t; Throwable ex;
634 if ((dst = this.dst) != null &&
635 (fn = this.fn) != null &&
636 (a = this.src) != null &&
637 (r = a.result) != null &&
638 compareAndSet(0, 1)) {
639 if (r instanceof AltResult) {
640 ex = ((AltResult)r).ex;
641 t = null;
642 }
643 else {
644 ex = null;
645 @SuppressWarnings("unchecked") T tr = (T) r;
646 t = tr;
647 }
648 Executor e = executor;
649 U u = null;
650 if (ex == null) {
651 try {
652 if (e != null)
653 e.execute(new AsyncApply<T,U>(t, fn, dst));
654 else
655 u = fn.apply(t);
656 } catch (Throwable rex) {
657 ex = rex;
658 }
659 }
660 if (e == null || ex != null)
661 dst.internalComplete(u, ex);
662 }
663 }
664 private static final long serialVersionUID = 5232453952276885070L;
665 }
666
667 static final class AcceptCompletion<T> extends Completion {
668 final CompletableFuture<? extends T> src;
669 final Consumer<? super T> fn;
670 final CompletableFuture<Void> dst;
671 final Executor executor;
672 AcceptCompletion(CompletableFuture<? extends T> src,
673 Consumer<? super T> fn,
674 CompletableFuture<Void> dst, Executor executor) {
675 this.src = src; this.fn = fn; this.dst = dst;
676 this.executor = executor;
677 }
678 public final void run() {
679 final CompletableFuture<? extends T> a;
680 final Consumer<? super T> fn;
681 final CompletableFuture<Void> dst;
682 Object r; T t; Throwable ex;
683 if ((dst = this.dst) != null &&
684 (fn = this.fn) != null &&
685 (a = this.src) != null &&
686 (r = a.result) != null &&
687 compareAndSet(0, 1)) {
688 if (r instanceof AltResult) {
689 ex = ((AltResult)r).ex;
690 t = null;
691 }
692 else {
693 ex = null;
694 @SuppressWarnings("unchecked") T tr = (T) r;
695 t = tr;
696 }
697 Executor e = executor;
698 if (ex == null) {
699 try {
700 if (e != null)
701 e.execute(new AsyncAccept<T>(t, fn, dst));
702 else
703 fn.accept(t);
704 } catch (Throwable rex) {
705 ex = rex;
706 }
707 }
708 if (e == null || ex != null)
709 dst.internalComplete(null, ex);
710 }
711 }
712 private static final long serialVersionUID = 5232453952276885070L;
713 }
714
715 static final class RunCompletion<T> extends Completion {
716 final CompletableFuture<? extends T> src;
717 final Runnable fn;
718 final CompletableFuture<Void> dst;
719 final Executor executor;
720 RunCompletion(CompletableFuture<? extends T> src,
721 Runnable fn,
722 CompletableFuture<Void> dst,
723 Executor executor) {
724 this.src = src; this.fn = fn; this.dst = dst;
725 this.executor = executor;
726 }
727 public final void run() {
728 final CompletableFuture<? extends T> a;
729 final Runnable fn;
730 final CompletableFuture<Void> dst;
731 Object r; Throwable ex;
732 if ((dst = this.dst) != null &&
733 (fn = this.fn) != null &&
734 (a = this.src) != null &&
735 (r = a.result) != null &&
736 compareAndSet(0, 1)) {
737 if (r instanceof AltResult)
738 ex = ((AltResult)r).ex;
739 else
740 ex = null;
741 Executor e = executor;
742 if (ex == null) {
743 try {
744 if (e != null)
745 e.execute(new AsyncRun(fn, dst));
746 else
747 fn.run();
748 } catch (Throwable rex) {
749 ex = rex;
750 }
751 }
752 if (e == null || ex != null)
753 dst.internalComplete(null, ex);
754 }
755 }
756 private static final long serialVersionUID = 5232453952276885070L;
757 }
758
759 static final class BiApplyCompletion<T,U,V> extends Completion {
760 final CompletableFuture<? extends T> src;
761 final CompletableFuture<? extends U> snd;
762 final BiFunction<? super T,? super U,? extends V> fn;
763 final CompletableFuture<V> dst;
764 final Executor executor;
765 BiApplyCompletion(CompletableFuture<? extends T> src,
766 CompletableFuture<? extends U> snd,
767 BiFunction<? super T,? super U,? extends V> fn,
768 CompletableFuture<V> dst, Executor executor) {
769 this.src = src; this.snd = snd;
770 this.fn = fn; this.dst = dst;
771 this.executor = executor;
772 }
773 public final void run() {
774 final CompletableFuture<? extends T> a;
775 final CompletableFuture<? extends U> b;
776 final BiFunction<? super T,? super U,? extends V> fn;
777 final CompletableFuture<V> dst;
778 Object r, s; T t; U u; Throwable ex;
779 if ((dst = this.dst) != null &&
780 (fn = this.fn) != null &&
781 (a = this.src) != null &&
782 (r = a.result) != null &&
783 (b = this.snd) != null &&
784 (s = b.result) != null &&
785 compareAndSet(0, 1)) {
786 if (r instanceof AltResult) {
787 ex = ((AltResult)r).ex;
788 t = null;
789 }
790 else {
791 ex = null;
792 @SuppressWarnings("unchecked") T tr = (T) r;
793 t = tr;
794 }
795 if (ex != null)
796 u = null;
797 else if (s instanceof AltResult) {
798 ex = ((AltResult)s).ex;
799 u = null;
800 }
801 else {
802 @SuppressWarnings("unchecked") U us = (U) s;
803 u = us;
804 }
805 Executor e = executor;
806 V v = null;
807 if (ex == null) {
808 try {
809 if (e != null)
810 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
811 else
812 v = fn.apply(t, u);
813 } catch (Throwable rex) {
814 ex = rex;
815 }
816 }
817 if (e == null || ex != null)
818 dst.internalComplete(v, ex);
819 }
820 }
821 private static final long serialVersionUID = 5232453952276885070L;
822 }
823
824 static final class BiAcceptCompletion<T,U> extends Completion {
825 final CompletableFuture<? extends T> src;
826 final CompletableFuture<? extends U> snd;
827 final BiConsumer<? super T,? super U> fn;
828 final CompletableFuture<Void> dst;
829 final Executor executor;
830 BiAcceptCompletion(CompletableFuture<? extends T> src,
831 CompletableFuture<? extends U> snd,
832 BiConsumer<? super T,? super U> fn,
833 CompletableFuture<Void> dst, Executor executor) {
834 this.src = src; this.snd = snd;
835 this.fn = fn; this.dst = dst;
836 this.executor = executor;
837 }
838 public final void run() {
839 final CompletableFuture<? extends T> a;
840 final CompletableFuture<? extends U> b;
841 final BiConsumer<? super T,? super U> fn;
842 final CompletableFuture<Void> dst;
843 Object r, s; T t; U u; Throwable ex;
844 if ((dst = this.dst) != null &&
845 (fn = this.fn) != null &&
846 (a = this.src) != null &&
847 (r = a.result) != null &&
848 (b = this.snd) != null &&
849 (s = b.result) != null &&
850 compareAndSet(0, 1)) {
851 if (r instanceof AltResult) {
852 ex = ((AltResult)r).ex;
853 t = null;
854 }
855 else {
856 ex = null;
857 @SuppressWarnings("unchecked") T tr = (T) r;
858 t = tr;
859 }
860 if (ex != null)
861 u = null;
862 else if (s instanceof AltResult) {
863 ex = ((AltResult)s).ex;
864 u = null;
865 }
866 else {
867 @SuppressWarnings("unchecked") U us = (U) s;
868 u = us;
869 }
870 Executor e = executor;
871 if (ex == null) {
872 try {
873 if (e != null)
874 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
875 else
876 fn.accept(t, u);
877 } catch (Throwable rex) {
878 ex = rex;
879 }
880 }
881 if (e == null || ex != null)
882 dst.internalComplete(null, ex);
883 }
884 }
885 private static final long serialVersionUID = 5232453952276885070L;
886 }
887
888 static final class BiRunCompletion<T> extends Completion {
889 final CompletableFuture<? extends T> src;
890 final CompletableFuture<?> snd;
891 final Runnable fn;
892 final CompletableFuture<Void> dst;
893 final Executor executor;
894 BiRunCompletion(CompletableFuture<? extends T> src,
895 CompletableFuture<?> snd,
896 Runnable fn,
897 CompletableFuture<Void> dst, Executor executor) {
898 this.src = src; this.snd = snd;
899 this.fn = fn; this.dst = dst;
900 this.executor = executor;
901 }
902 public final void run() {
903 final CompletableFuture<? extends T> a;
904 final CompletableFuture<?> b;
905 final Runnable fn;
906 final CompletableFuture<Void> dst;
907 Object r, s; Throwable ex;
908 if ((dst = this.dst) != null &&
909 (fn = this.fn) != null &&
910 (a = this.src) != null &&
911 (r = a.result) != null &&
912 (b = this.snd) != null &&
913 (s = b.result) != null &&
914 compareAndSet(0, 1)) {
915 if (r instanceof AltResult)
916 ex = ((AltResult)r).ex;
917 else
918 ex = null;
919 if (ex == null && (s instanceof AltResult))
920 ex = ((AltResult)s).ex;
921 Executor e = executor;
922 if (ex == null) {
923 try {
924 if (e != null)
925 e.execute(new AsyncRun(fn, dst));
926 else
927 fn.run();
928 } catch (Throwable rex) {
929 ex = rex;
930 }
931 }
932 if (e == null || ex != null)
933 dst.internalComplete(null, ex);
934 }
935 }
936 private static final long serialVersionUID = 5232453952276885070L;
937 }
938
939 static final class AndCompletion extends Completion {
940 final CompletableFuture<?> src;
941 final CompletableFuture<?> snd;
942 final CompletableFuture<Void> dst;
943 AndCompletion(CompletableFuture<?> src,
944 CompletableFuture<?> snd,
945 CompletableFuture<Void> dst) {
946 this.src = src; this.snd = snd; this.dst = dst;
947 }
948 public final void run() {
949 final CompletableFuture<?> a;
950 final CompletableFuture<?> b;
951 final CompletableFuture<Void> dst;
952 Object r, s; Throwable ex;
953 if ((dst = this.dst) != null &&
954 (a = this.src) != null &&
955 (r = a.result) != null &&
956 (b = this.snd) != null &&
957 (s = b.result) != null &&
958 compareAndSet(0, 1)) {
959 if (r instanceof AltResult)
960 ex = ((AltResult)r).ex;
961 else
962 ex = null;
963 if (ex == null && (s instanceof AltResult))
964 ex = ((AltResult)s).ex;
965 dst.internalComplete(null, ex);
966 }
967 }
968 private static final long serialVersionUID = 5232453952276885070L;
969 }
970
971 static final class OrApplyCompletion<T,U> extends Completion {
972 final CompletableFuture<? extends T> src;
973 final CompletableFuture<? extends T> snd;
974 final Function<? super T,? extends U> fn;
975 final CompletableFuture<U> dst;
976 final Executor executor;
977 OrApplyCompletion(CompletableFuture<? extends T> src,
978 CompletableFuture<? extends T> snd,
979 Function<? super T,? extends U> fn,
980 CompletableFuture<U> dst, Executor executor) {
981 this.src = src; this.snd = snd;
982 this.fn = fn; this.dst = dst;
983 this.executor = executor;
984 }
985 public final void run() {
986 final CompletableFuture<? extends T> a;
987 final CompletableFuture<? extends T> b;
988 final Function<? super T,? extends U> fn;
989 final CompletableFuture<U> dst;
990 Object r; T t; Throwable ex;
991 if ((dst = this.dst) != null &&
992 (fn = this.fn) != null &&
993 (((a = this.src) != null && (r = a.result) != null) ||
994 ((b = this.snd) != null && (r = b.result) != null)) &&
995 compareAndSet(0, 1)) {
996 if (r instanceof AltResult) {
997 ex = ((AltResult)r).ex;
998 t = null;
999 }
1000 else {
1001 ex = null;
1002 @SuppressWarnings("unchecked") T tr = (T) r;
1003 t = tr;
1004 }
1005 Executor e = executor;
1006 U u = null;
1007 if (ex == null) {
1008 try {
1009 if (e != null)
1010 e.execute(new AsyncApply<T,U>(t, fn, dst));
1011 else
1012 u = fn.apply(t);
1013 } catch (Throwable rex) {
1014 ex = rex;
1015 }
1016 }
1017 if (e == null || ex != null)
1018 dst.internalComplete(u, ex);
1019 }
1020 }
1021 private static final long serialVersionUID = 5232453952276885070L;
1022 }
1023
1024 static final class OrAcceptCompletion<T> extends Completion {
1025 final CompletableFuture<? extends T> src;
1026 final CompletableFuture<? extends T> snd;
1027 final Consumer<? super T> fn;
1028 final CompletableFuture<Void> dst;
1029 final Executor executor;
1030 OrAcceptCompletion(CompletableFuture<? extends T> src,
1031 CompletableFuture<? extends T> snd,
1032 Consumer<? super T> fn,
1033 CompletableFuture<Void> dst, Executor executor) {
1034 this.src = src; this.snd = snd;
1035 this.fn = fn; this.dst = dst;
1036 this.executor = executor;
1037 }
1038 public final void run() {
1039 final CompletableFuture<? extends T> a;
1040 final CompletableFuture<? extends T> b;
1041 final Consumer<? super T> fn;
1042 final CompletableFuture<Void> dst;
1043 Object r; T t; Throwable ex;
1044 if ((dst = this.dst) != null &&
1045 (fn = this.fn) != null &&
1046 (((a = this.src) != null && (r = a.result) != null) ||
1047 ((b = this.snd) != null && (r = b.result) != null)) &&
1048 compareAndSet(0, 1)) {
1049 if (r instanceof AltResult) {
1050 ex = ((AltResult)r).ex;
1051 t = null;
1052 }
1053 else {
1054 ex = null;
1055 @SuppressWarnings("unchecked") T tr = (T) r;
1056 t = tr;
1057 }
1058 Executor e = executor;
1059 if (ex == null) {
1060 try {
1061 if (e != null)
1062 e.execute(new AsyncAccept<T>(t, fn, dst));
1063 else
1064 fn.accept(t);
1065 } catch (Throwable rex) {
1066 ex = rex;
1067 }
1068 }
1069 if (e == null || ex != null)
1070 dst.internalComplete(null, ex);
1071 }
1072 }
1073 private static final long serialVersionUID = 5232453952276885070L;
1074 }
1075
1076 static final class OrRunCompletion<T> extends Completion {
1077 final CompletableFuture<? extends T> src;
1078 final CompletableFuture<?> snd;
1079 final Runnable fn;
1080 final CompletableFuture<Void> dst;
1081 final Executor executor;
1082 OrRunCompletion(CompletableFuture<? extends T> src,
1083 CompletableFuture<?> snd,
1084 Runnable fn,
1085 CompletableFuture<Void> dst, Executor executor) {
1086 this.src = src; this.snd = snd;
1087 this.fn = fn; this.dst = dst;
1088 this.executor = executor;
1089 }
1090 public final void run() {
1091 final CompletableFuture<? extends T> a;
1092 final CompletableFuture<?> b;
1093 final Runnable fn;
1094 final CompletableFuture<Void> dst;
1095 Object r; Throwable ex;
1096 if ((dst = this.dst) != null &&
1097 (fn = this.fn) != null &&
1098 (((a = this.src) != null && (r = a.result) != null) ||
1099 ((b = this.snd) != null && (r = b.result) != null)) &&
1100 compareAndSet(0, 1)) {
1101 if (r instanceof AltResult)
1102 ex = ((AltResult)r).ex;
1103 else
1104 ex = null;
1105 Executor e = executor;
1106 if (ex == null) {
1107 try {
1108 if (e != null)
1109 e.execute(new AsyncRun(fn, dst));
1110 else
1111 fn.run();
1112 } catch (Throwable rex) {
1113 ex = rex;
1114 }
1115 }
1116 if (e == null || ex != null)
1117 dst.internalComplete(null, ex);
1118 }
1119 }
1120 private static final long serialVersionUID = 5232453952276885070L;
1121 }
1122
1123 static final class OrCompletion extends Completion {
1124 final CompletableFuture<?> src;
1125 final CompletableFuture<?> snd;
1126 final CompletableFuture<Object> dst;
1127 OrCompletion(CompletableFuture<?> src,
1128 CompletableFuture<?> snd,
1129 CompletableFuture<Object> dst) {
1130 this.src = src; this.snd = snd; this.dst = dst;
1131 }
1132 public final void run() {
1133 final CompletableFuture<?> a;
1134 final CompletableFuture<?> b;
1135 final CompletableFuture<Object> dst;
1136 Object r, t; Throwable ex;
1137 if ((dst = this.dst) != null &&
1138 (((a = this.src) != null && (r = a.result) != null) ||
1139 ((b = this.snd) != null && (r = b.result) != null)) &&
1140 compareAndSet(0, 1)) {
1141 if (r instanceof AltResult) {
1142 ex = ((AltResult)r).ex;
1143 t = null;
1144 }
1145 else {
1146 ex = null;
1147 t = r;
1148 }
1149 dst.internalComplete(t, ex);
1150 }
1151 }
1152 private static final long serialVersionUID = 5232453952276885070L;
1153 }
1154
1155 static final class ExceptionCompletion<T> extends Completion {
1156 final CompletableFuture<? extends T> src;
1157 final Function<? super Throwable, ? extends T> fn;
1158 final CompletableFuture<T> dst;
1159 ExceptionCompletion(CompletableFuture<? extends T> src,
1160 Function<? super Throwable, ? extends T> fn,
1161 CompletableFuture<T> dst) {
1162 this.src = src; this.fn = fn; this.dst = dst;
1163 }
1164 public final void run() {
1165 final CompletableFuture<? extends T> a;
1166 final Function<? super Throwable, ? extends T> fn;
1167 final CompletableFuture<T> dst;
1168 Object r; T t = null; Throwable ex, dx = null;
1169 if ((dst = this.dst) != null &&
1170 (fn = this.fn) != null &&
1171 (a = this.src) != null &&
1172 (r = a.result) != null &&
1173 compareAndSet(0, 1)) {
1174 if ((r instanceof AltResult) &&
1175 (ex = ((AltResult)r).ex) != null) {
1176 try {
1177 t = fn.apply(ex);
1178 } catch (Throwable rex) {
1179 dx = rex;
1180 }
1181 }
1182 else {
1183 @SuppressWarnings("unchecked") T tr = (T) r;
1184 t = tr;
1185 }
1186 dst.internalComplete(t, dx);
1187 }
1188 }
1189 private static final long serialVersionUID = 5232453952276885070L;
1190 }
1191
1192 static final class ThenCopy<T> extends Completion {
1193 final CompletableFuture<?> src;
1194 final CompletableFuture<T> dst;
1195 ThenCopy(CompletableFuture<?> src,
1196 CompletableFuture<T> dst) {
1197 this.src = src; this.dst = dst;
1198 }
1199 public final void run() {
1200 final CompletableFuture<?> a;
1201 final CompletableFuture<T> dst;
1202 Object r; T t; Throwable ex;
1203 if ((dst = this.dst) != null &&
1204 (a = this.src) != null &&
1205 (r = a.result) != null &&
1206 compareAndSet(0, 1)) {
1207 if (r instanceof AltResult) {
1208 ex = ((AltResult)r).ex;
1209 t = null;
1210 }
1211 else {
1212 ex = null;
1213 @SuppressWarnings("unchecked") T tr = (T) r;
1214 t = tr;
1215 }
1216 dst.internalComplete(t, ex);
1217 }
1218 }
1219 private static final long serialVersionUID = 5232453952276885070L;
1220 }
1221
1222 // version of ThenCopy for CompletableFuture<Void> dst
1223 static final class ThenPropagate extends Completion {
1224 final CompletableFuture<?> src;
1225 final CompletableFuture<Void> dst;
1226 ThenPropagate(CompletableFuture<?> src,
1227 CompletableFuture<Void> dst) {
1228 this.src = src; this.dst = dst;
1229 }
1230 public final void run() {
1231 final CompletableFuture<?> a;
1232 final CompletableFuture<Void> dst;
1233 Object r; Throwable ex;
1234 if ((dst = this.dst) != null &&
1235 (a = this.src) != null &&
1236 (r = a.result) != null &&
1237 compareAndSet(0, 1)) {
1238 if (r instanceof AltResult)
1239 ex = ((AltResult)r).ex;
1240 else
1241 ex = null;
1242 dst.internalComplete(null, ex);
1243 }
1244 }
1245 private static final long serialVersionUID = 5232453952276885070L;
1246 }
1247
1248 static final class HandleCompletion<T,U> extends Completion {
1249 final CompletableFuture<? extends T> src;
1250 final BiFunction<? super T, Throwable, ? extends U> fn;
1251 final CompletableFuture<U> dst;
1252 HandleCompletion(CompletableFuture<? extends T> src,
1253 BiFunction<? super T, Throwable, ? extends U> fn,
1254 CompletableFuture<U> dst) {
1255 this.src = src; this.fn = fn; this.dst = dst;
1256 }
1257 public final void run() {
1258 final CompletableFuture<? extends T> a;
1259 final BiFunction<? super T, Throwable, ? extends U> fn;
1260 final CompletableFuture<U> dst;
1261 Object r; T t; Throwable ex;
1262 if ((dst = this.dst) != null &&
1263 (fn = this.fn) != null &&
1264 (a = this.src) != null &&
1265 (r = a.result) != null &&
1266 compareAndSet(0, 1)) {
1267 if (r instanceof AltResult) {
1268 ex = ((AltResult)r).ex;
1269 t = null;
1270 }
1271 else {
1272 ex = null;
1273 @SuppressWarnings("unchecked") T tr = (T) r;
1274 t = tr;
1275 }
1276 U u = null; Throwable dx = null;
1277 try {
1278 u = fn.apply(t, ex);
1279 } catch (Throwable rex) {
1280 dx = rex;
1281 }
1282 dst.internalComplete(u, dx);
1283 }
1284 }
1285 private static final long serialVersionUID = 5232453952276885070L;
1286 }
1287
1288 static final class ComposeCompletion<T,U> extends Completion {
1289 final CompletableFuture<? extends T> src;
1290 final Function<? super T, CompletableFuture<U>> fn;
1291 final CompletableFuture<U> dst;
1292 final Executor executor;
1293 ComposeCompletion(CompletableFuture<? extends T> src,
1294 Function<? super T, CompletableFuture<U>> fn,
1295 CompletableFuture<U> dst, Executor executor) {
1296 this.src = src; this.fn = fn; this.dst = dst;
1297 this.executor = executor;
1298 }
1299 public final void run() {
1300 final CompletableFuture<? extends T> a;
1301 final Function<? super T, CompletableFuture<U>> fn;
1302 final CompletableFuture<U> dst;
1303 Object r; T t; Throwable ex; Executor e;
1304 if ((dst = this.dst) != null &&
1305 (fn = this.fn) != null &&
1306 (a = this.src) != null &&
1307 (r = a.result) != null &&
1308 compareAndSet(0, 1)) {
1309 if (r instanceof AltResult) {
1310 ex = ((AltResult)r).ex;
1311 t = null;
1312 }
1313 else {
1314 ex = null;
1315 @SuppressWarnings("unchecked") T tr = (T) r;
1316 t = tr;
1317 }
1318 CompletableFuture<U> c = null;
1319 U u = null;
1320 boolean complete = false;
1321 if (ex == null) {
1322 if ((e = executor) != null)
1323 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1324 else {
1325 try {
1326 if ((c = fn.apply(t)) == null)
1327 ex = new NullPointerException();
1328 } catch (Throwable rex) {
1329 ex = rex;
1330 }
1331 }
1332 }
1333 if (c != null) {
1334 ThenCopy<U> d = null;
1335 Object s;
1336 if ((s = c.result) == null) {
1337 CompletionNode p = new CompletionNode
1338 (d = new ThenCopy<U>(c, dst));
1339 while ((s = c.result) == null) {
1340 if (UNSAFE.compareAndSwapObject
1341 (c, COMPLETIONS, p.next = c.completions, p))
1342 break;
1343 }
1344 }
1345 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1346 complete = true;
1347 if (s instanceof AltResult) {
1348 ex = ((AltResult)s).ex; // no rewrap
1349 u = null;
1350 }
1351 else {
1352 @SuppressWarnings("unchecked") U us = (U) s;
1353 u = us;
1354 }
1355 }
1356 }
1357 if (complete || ex != null)
1358 dst.internalComplete(u, ex);
1359 if (c != null)
1360 c.helpPostComplete();
1361 }
1362 }
1363 private static final long serialVersionUID = 5232453952276885070L;
1364 }
1365
1366 // public methods
1367
1368 /**
1369 * Creates a new incomplete CompletableFuture.
1370 */
1371 public CompletableFuture() {
1372 }
1373
1374 /**
1375 * Returns a new CompletableFuture that is asynchronously completed
1376 * by a task running in the {@link ForkJoinPool#commonPool()} with
1377 * the value obtained by calling the given Supplier.
1378 *
1379 * @param supplier a function returning the value to be used
1380 * to complete the returned CompletableFuture
1381 * @return the new CompletableFuture
1382 */
1383 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1384 if (supplier == null) throw new NullPointerException();
1385 CompletableFuture<U> f = new CompletableFuture<U>();
1386 ForkJoinPool.commonPool().
1387 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1388 return f;
1389 }
1390
1391 /**
1392 * Returns a new CompletableFuture that is asynchronously completed
1393 * by a task running in the given executor with the value obtained
1394 * by calling the given Supplier.
1395 *
1396 * @param supplier a function returning the value to be used
1397 * to complete the returned CompletableFuture
1398 * @param executor the executor to use for asynchronous execution
1399 * @return the new CompletableFuture
1400 */
1401 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1402 Executor executor) {
1403 if (executor == null || supplier == null)
1404 throw new NullPointerException();
1405 CompletableFuture<U> f = new CompletableFuture<U>();
1406 executor.execute(new AsyncSupply<U>(supplier, f));
1407 return f;
1408 }
1409
1410 /**
1411 * Returns a new CompletableFuture that is asynchronously completed
1412 * by a task running in the {@link ForkJoinPool#commonPool()} after
1413 * it runs the given action.
1414 *
1415 * @param runnable the action to run before completing the
1416 * returned CompletableFuture
1417 * @return the new CompletableFuture
1418 */
1419 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1420 if (runnable == null) throw new NullPointerException();
1421 CompletableFuture<Void> f = new CompletableFuture<Void>();
1422 ForkJoinPool.commonPool().
1423 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1424 return f;
1425 }
1426
1427 /**
1428 * Returns a new CompletableFuture that is asynchronously completed
1429 * by a task running in the given executor after it runs the given
1430 * action.
1431 *
1432 * @param runnable the action to run before completing the
1433 * returned CompletableFuture
1434 * @param executor the executor to use for asynchronous execution
1435 * @return the new CompletableFuture
1436 */
1437 public static CompletableFuture<Void> runAsync(Runnable runnable,
1438 Executor executor) {
1439 if (executor == null || runnable == null)
1440 throw new NullPointerException();
1441 CompletableFuture<Void> f = new CompletableFuture<Void>();
1442 executor.execute(new AsyncRun(runnable, f));
1443 return f;
1444 }
1445
1446 /**
1447 * Returns a new CompletableFuture that is already completed with
1448 * the given value.
1449 *
1450 * @param value the value
1451 * @return the completed CompletableFuture
1452 */
1453 public static <U> CompletableFuture<U> completedFuture(U value) {
1454 CompletableFuture<U> f = new CompletableFuture<U>();
1455 f.result = (value == null) ? NIL : value;
1456 return f;
1457 }
1458
1459 /**
1460 * Returns {@code true} if completed in any fashion: normally,
1461 * exceptionally, or via cancellation.
1462 *
1463 * @return {@code true} if completed
1464 */
1465 public boolean isDone() {
1466 return result != null;
1467 }
1468
1469 /**
1470 * Waits if necessary for this future to complete, and then
1471 * returns its result.
1472 *
1473 * @return the result value
1474 * @throws CancellationException if this future was cancelled
1475 * @throws ExecutionException if this future completed exceptionally
1476 * @throws InterruptedException if the current thread was interrupted
1477 * while waiting
1478 */
1479 public T get() throws InterruptedException, ExecutionException {
1480 Object r; Throwable ex, cause;
1481 if ((r = result) == null && (r = waitingGet(true)) == null)
1482 throw new InterruptedException();
1483 if (!(r instanceof AltResult)) {
1484 @SuppressWarnings("unchecked") T tr = (T) r;
1485 return tr;
1486 }
1487 if ((ex = ((AltResult)r).ex) == null)
1488 return null;
1489 if (ex instanceof CancellationException)
1490 throw (CancellationException)ex;
1491 if ((ex instanceof CompletionException) &&
1492 (cause = ex.getCause()) != null)
1493 ex = cause;
1494 throw new ExecutionException(ex);
1495 }
1496
1497 /**
1498 * Waits if necessary for at most the given time for this future
1499 * to complete, and then returns its result, if available.
1500 *
1501 * @param timeout the maximum time to wait
1502 * @param unit the time unit of the timeout argument
1503 * @return the result value
1504 * @throws CancellationException if this future was cancelled
1505 * @throws ExecutionException if this future completed exceptionally
1506 * @throws InterruptedException if the current thread was interrupted
1507 * while waiting
1508 * @throws TimeoutException if the wait timed out
1509 */
1510 public T get(long timeout, TimeUnit unit)
1511 throws InterruptedException, ExecutionException, TimeoutException {
1512 Object r; Throwable ex, cause;
1513 long nanos = unit.toNanos(timeout);
1514 if (Thread.interrupted())
1515 throw new InterruptedException();
1516 if ((r = result) == null)
1517 r = timedAwaitDone(nanos);
1518 if (!(r instanceof AltResult)) {
1519 @SuppressWarnings("unchecked") T tr = (T) r;
1520 return tr;
1521 }
1522 if ((ex = ((AltResult)r).ex) == null)
1523 return null;
1524 if (ex instanceof CancellationException)
1525 throw (CancellationException)ex;
1526 if ((ex instanceof CompletionException) &&
1527 (cause = ex.getCause()) != null)
1528 ex = cause;
1529 throw new ExecutionException(ex);
1530 }
1531
1532 /**
1533 * Returns the result value when complete, or throws an
1534 * (unchecked) exception if completed exceptionally. To better
1535 * conform with the use of common functional forms, if a
1536 * computation involved in the completion of this
1537 * CompletableFuture threw an exception, this method throws an
1538 * (unchecked) {@link CompletionException} with the underlying
1539 * exception as its cause.
1540 *
1541 * @return the result value
1542 * @throws CancellationException if the computation was cancelled
1543 * @throws CompletionException if this future completed
1544 * exceptionally or a completion computation threw an exception
1545 */
1546 public T join() {
1547 Object r; Throwable ex;
1548 if ((r = result) == null)
1549 r = waitingGet(false);
1550 if (!(r instanceof AltResult)) {
1551 @SuppressWarnings("unchecked") T tr = (T) r;
1552 return tr;
1553 }
1554 if ((ex = ((AltResult)r).ex) == null)
1555 return null;
1556 if (ex instanceof CancellationException)
1557 throw (CancellationException)ex;
1558 if (ex instanceof CompletionException)
1559 throw (CompletionException)ex;
1560 throw new CompletionException(ex);
1561 }
1562
1563 /**
1564 * Returns the result value (or throws any encountered exception)
1565 * if completed, else returns the given valueIfAbsent.
1566 *
1567 * @param valueIfAbsent the value to return if not completed
1568 * @return the result value, if completed, else the given valueIfAbsent
1569 * @throws CancellationException if the computation was cancelled
1570 * @throws CompletionException if this future completed
1571 * exceptionally or a completion computation threw an exception
1572 */
1573 public T getNow(T valueIfAbsent) {
1574 Object r; Throwable ex;
1575 if ((r = result) == null)
1576 return valueIfAbsent;
1577 if (!(r instanceof AltResult)) {
1578 @SuppressWarnings("unchecked") T tr = (T) r;
1579 return tr;
1580 }
1581 if ((ex = ((AltResult)r).ex) == null)
1582 return null;
1583 if (ex instanceof CancellationException)
1584 throw (CancellationException)ex;
1585 if (ex instanceof CompletionException)
1586 throw (CompletionException)ex;
1587 throw new CompletionException(ex);
1588 }
1589
1590 /**
1591 * If not already completed, sets the value returned by {@link
1592 * #get()} and related methods to the given value.
1593 *
1594 * @param value the result value
1595 * @return {@code true} if this invocation caused this CompletableFuture
1596 * to transition to a completed state, else {@code false}
1597 */
1598 public boolean complete(T value) {
1599 boolean triggered = result == null &&
1600 UNSAFE.compareAndSwapObject(this, RESULT, null,
1601 value == null ? NIL : value);
1602 postComplete();
1603 return triggered;
1604 }
1605
1606 /**
1607 * If not already completed, causes invocations of {@link #get()}
1608 * and related methods to throw the given exception.
1609 *
1610 * @param ex the exception
1611 * @return {@code true} if this invocation caused this CompletableFuture
1612 * to transition to a completed state, else {@code false}
1613 */
1614 public boolean completeExceptionally(Throwable ex) {
1615 if (ex == null) throw new NullPointerException();
1616 boolean triggered = result == null &&
1617 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1618 postComplete();
1619 return triggered;
1620 }
1621
1622 /**
1623 * Returns a new CompletableFuture that is completed
1624 * when this CompletableFuture completes, with the result of the
1625 * given function of this CompletableFuture's result.
1626 *
1627 * <p>If this CompletableFuture completes exceptionally, or the
1628 * supplied function throws an exception, then the returned
1629 * CompletableFuture completes exceptionally with a
1630 * CompletionException holding the exception as its cause.
1631 *
1632 * @param fn the function to use to compute the value of
1633 * the returned CompletableFuture
1634 * @return the new CompletableFuture
1635 */
1636 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1637 return doThenApply(fn, null);
1638 }
1639
1640 /**
1641 * Returns a new CompletableFuture that is asynchronously completed
1642 * when this CompletableFuture completes, with the result of the
1643 * given function of this CompletableFuture's result from a
1644 * task running in the {@link ForkJoinPool#commonPool()}.
1645 *
1646 * <p>If this CompletableFuture completes exceptionally, or the
1647 * supplied function throws an exception, then the returned
1648 * CompletableFuture completes exceptionally with a
1649 * CompletionException holding the exception as its cause.
1650 *
1651 * @param fn the function to use to compute the value of
1652 * the returned CompletableFuture
1653 * @return the new CompletableFuture
1654 */
1655 public <U> CompletableFuture<U> thenApplyAsync
1656 (Function<? super T,? extends U> fn) {
1657 return doThenApply(fn, ForkJoinPool.commonPool());
1658 }
1659
1660 /**
1661 * Returns a new CompletableFuture that is asynchronously completed
1662 * when this CompletableFuture completes, with the result of the
1663 * given function of this CompletableFuture's result from a
1664 * task running in the given executor.
1665 *
1666 * <p>If this CompletableFuture completes exceptionally, or the
1667 * supplied function throws an exception, then the returned
1668 * CompletableFuture completes exceptionally with a
1669 * CompletionException holding the exception as its cause.
1670 *
1671 * @param fn the function to use to compute the value of
1672 * the returned CompletableFuture
1673 * @param executor the executor to use for asynchronous execution
1674 * @return the new CompletableFuture
1675 */
1676 public <U> CompletableFuture<U> thenApplyAsync
1677 (Function<? super T,? extends U> fn,
1678 Executor executor) {
1679 if (executor == null) throw new NullPointerException();
1680 return doThenApply(fn, executor);
1681 }
1682
1683 private <U> CompletableFuture<U> doThenApply
1684 (Function<? super T,? extends U> fn,
1685 Executor e) {
1686 if (fn == null) throw new NullPointerException();
1687 CompletableFuture<U> dst = new CompletableFuture<U>();
1688 ApplyCompletion<T,U> d = null;
1689 Object r;
1690 if ((r = result) == null) {
1691 CompletionNode p = new CompletionNode
1692 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1693 while ((r = result) == null) {
1694 if (UNSAFE.compareAndSwapObject
1695 (this, COMPLETIONS, p.next = completions, p))
1696 break;
1697 }
1698 }
1699 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1700 T t; Throwable ex;
1701 if (r instanceof AltResult) {
1702 ex = ((AltResult)r).ex;
1703 t = null;
1704 }
1705 else {
1706 ex = null;
1707 @SuppressWarnings("unchecked") T tr = (T) r;
1708 t = tr;
1709 }
1710 U u = null;
1711 if (ex == null) {
1712 try {
1713 if (e != null)
1714 e.execute(new AsyncApply<T,U>(t, fn, dst));
1715 else
1716 u = fn.apply(t);
1717 } catch (Throwable rex) {
1718 ex = rex;
1719 }
1720 }
1721 if (e == null || ex != null)
1722 dst.internalComplete(u, ex);
1723 }
1724 helpPostComplete();
1725 return dst;
1726 }
1727
1728 /**
1729 * Returns a new CompletableFuture that is completed
1730 * when this CompletableFuture completes, after performing the given
1731 * action with this CompletableFuture's result.
1732 *
1733 * <p>If this CompletableFuture completes exceptionally, or the
1734 * supplied action throws an exception, then the returned
1735 * CompletableFuture completes exceptionally with a
1736 * CompletionException holding the exception as its cause.
1737 *
1738 * @param block the action to perform before completing the
1739 * returned CompletableFuture
1740 * @return the new CompletableFuture
1741 */
1742 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1743 return doThenAccept(block, null);
1744 }
1745
1746 /**
1747 * Returns a new CompletableFuture that is asynchronously completed
1748 * when this CompletableFuture completes, after performing the given
1749 * action with this CompletableFuture's result from a task running
1750 * in the {@link ForkJoinPool#commonPool()}.
1751 *
1752 * <p>If this CompletableFuture completes exceptionally, or the
1753 * supplied action throws an exception, then the returned
1754 * CompletableFuture completes exceptionally with a
1755 * CompletionException holding the exception as its cause.
1756 *
1757 * @param block the action to perform before completing the
1758 * returned CompletableFuture
1759 * @return the new CompletableFuture
1760 */
1761 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1762 return doThenAccept(block, ForkJoinPool.commonPool());
1763 }
1764
1765 /**
1766 * Returns a new CompletableFuture that is asynchronously completed
1767 * when this CompletableFuture completes, after performing the given
1768 * action with this CompletableFuture's result from a task running
1769 * in the given executor.
1770 *
1771 * <p>If this CompletableFuture completes exceptionally, or the
1772 * supplied action throws an exception, then the returned
1773 * CompletableFuture completes exceptionally with a
1774 * CompletionException holding the exception as its cause.
1775 *
1776 * @param block the action to perform before completing the
1777 * returned CompletableFuture
1778 * @param executor the executor to use for asynchronous execution
1779 * @return the new CompletableFuture
1780 */
1781 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1782 Executor executor) {
1783 if (executor == null) throw new NullPointerException();
1784 return doThenAccept(block, executor);
1785 }
1786
1787 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1788 Executor e) {
1789 if (fn == null) throw new NullPointerException();
1790 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1791 AcceptCompletion<T> d = null;
1792 Object r;
1793 if ((r = result) == null) {
1794 CompletionNode p = new CompletionNode
1795 (d = new AcceptCompletion<T>(this, fn, dst, e));
1796 while ((r = result) == null) {
1797 if (UNSAFE.compareAndSwapObject
1798 (this, COMPLETIONS, p.next = completions, p))
1799 break;
1800 }
1801 }
1802 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1803 T t; Throwable ex;
1804 if (r instanceof AltResult) {
1805 ex = ((AltResult)r).ex;
1806 t = null;
1807 }
1808 else {
1809 ex = null;
1810 @SuppressWarnings("unchecked") T tr = (T) r;
1811 t = tr;
1812 }
1813 if (ex == null) {
1814 try {
1815 if (e != null)
1816 e.execute(new AsyncAccept<T>(t, fn, dst));
1817 else
1818 fn.accept(t);
1819 } catch (Throwable rex) {
1820 ex = rex;
1821 }
1822 }
1823 if (e == null || ex != null)
1824 dst.internalComplete(null, ex);
1825 }
1826 helpPostComplete();
1827 return dst;
1828 }
1829
1830 /**
1831 * Returns a new CompletableFuture that is completed
1832 * when this CompletableFuture completes, after performing the given
1833 * action.
1834 *
1835 * <p>If this CompletableFuture completes exceptionally, or the
1836 * supplied action throws an exception, then the returned
1837 * CompletableFuture completes exceptionally with a
1838 * CompletionException holding the exception as its cause.
1839 *
1840 * @param action the action to perform before completing the
1841 * returned CompletableFuture
1842 * @return the new CompletableFuture
1843 */
1844 public CompletableFuture<Void> thenRun(Runnable action) {
1845 return doThenRun(action, null);
1846 }
1847
1848 /**
1849 * Returns a new CompletableFuture that is asynchronously completed
1850 * when this CompletableFuture completes, after performing the given
1851 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1852 *
1853 * <p>If this CompletableFuture completes exceptionally, or the
1854 * supplied action throws an exception, then the returned
1855 * CompletableFuture completes exceptionally with a
1856 * CompletionException holding the exception as its cause.
1857 *
1858 * @param action the action to perform before completing the
1859 * returned CompletableFuture
1860 * @return the new CompletableFuture
1861 */
1862 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1863 return doThenRun(action, ForkJoinPool.commonPool());
1864 }
1865
1866 /**
1867 * Returns a new CompletableFuture that is asynchronously completed
1868 * when this CompletableFuture completes, after performing the given
1869 * action from a task running in the given executor.
1870 *
1871 * <p>If this CompletableFuture completes exceptionally, or the
1872 * supplied action throws an exception, then the returned
1873 * CompletableFuture completes exceptionally with a
1874 * CompletionException holding the exception as its cause.
1875 *
1876 * @param action the action to perform before completing the
1877 * returned CompletableFuture
1878 * @param executor the executor to use for asynchronous execution
1879 * @return the new CompletableFuture
1880 */
1881 public CompletableFuture<Void> thenRunAsync(Runnable action,
1882 Executor executor) {
1883 if (executor == null) throw new NullPointerException();
1884 return doThenRun(action, executor);
1885 }
1886
1887 private CompletableFuture<Void> doThenRun(Runnable action,
1888 Executor e) {
1889 if (action == null) throw new NullPointerException();
1890 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1891 RunCompletion<T> d = null;
1892 Object r;
1893 if ((r = result) == null) {
1894 CompletionNode p = new CompletionNode
1895 (d = new RunCompletion<T>(this, action, dst, e));
1896 while ((r = result) == null) {
1897 if (UNSAFE.compareAndSwapObject
1898 (this, COMPLETIONS, p.next = completions, p))
1899 break;
1900 }
1901 }
1902 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1903 Throwable ex;
1904 if (r instanceof AltResult)
1905 ex = ((AltResult)r).ex;
1906 else
1907 ex = null;
1908 if (ex == null) {
1909 try {
1910 if (e != null)
1911 e.execute(new AsyncRun(action, dst));
1912 else
1913 action.run();
1914 } catch (Throwable rex) {
1915 ex = rex;
1916 }
1917 }
1918 if (e == null || ex != null)
1919 dst.internalComplete(null, ex);
1920 }
1921 helpPostComplete();
1922 return dst;
1923 }
1924
1925 /**
1926 * Returns a new CompletableFuture that is completed
1927 * when both this and the other given CompletableFuture complete,
1928 * with the result of the given function of the results of the two
1929 * CompletableFutures.
1930 *
1931 * <p>If this and/or the other CompletableFuture complete
1932 * exceptionally, or the supplied function throws an exception,
1933 * then the returned CompletableFuture completes exceptionally
1934 * with a CompletionException holding the exception as its cause.
1935 *
1936 * @param other the other CompletableFuture
1937 * @param fn the function to use to compute the value of
1938 * the returned CompletableFuture
1939 * @return the new CompletableFuture
1940 */
1941 public <U,V> CompletableFuture<V> thenCombine
1942 (CompletableFuture<? extends U> other,
1943 BiFunction<? super T,? super U,? extends V> fn) {
1944 return doThenBiApply(other, fn, null);
1945 }
1946
1947 /**
1948 * Returns a new CompletableFuture that is asynchronously completed
1949 * when both this and the other given CompletableFuture complete,
1950 * with the result of the given function of the results of the two
1951 * CompletableFutures from a task running in the
1952 * {@link ForkJoinPool#commonPool()}.
1953 *
1954 * <p>If this and/or the other CompletableFuture complete
1955 * exceptionally, or the supplied function throws an exception,
1956 * then the returned CompletableFuture completes exceptionally
1957 * with a CompletionException holding the exception as its cause.
1958 *
1959 * @param other the other CompletableFuture
1960 * @param fn the function to use to compute the value of
1961 * the returned CompletableFuture
1962 * @return the new CompletableFuture
1963 */
1964 public <U,V> CompletableFuture<V> thenCombineAsync
1965 (CompletableFuture<? extends U> other,
1966 BiFunction<? super T,? super U,? extends V> fn) {
1967 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1968 }
1969
1970 /**
1971 * Returns a new CompletableFuture that is asynchronously completed
1972 * when both this and the other given CompletableFuture complete,
1973 * with the result of the given function of the results of the two
1974 * CompletableFutures from a task running in the given executor.
1975 *
1976 * <p>If this and/or the other CompletableFuture complete
1977 * exceptionally, or the supplied function throws an exception,
1978 * then the returned CompletableFuture completes exceptionally
1979 * with a CompletionException holding the exception as its cause.
1980 *
1981 * @param other the other CompletableFuture
1982 * @param fn the function to use to compute the value of
1983 * the returned CompletableFuture
1984 * @param executor the executor to use for asynchronous execution
1985 * @return the new CompletableFuture
1986 */
1987 public <U,V> CompletableFuture<V> thenCombineAsync
1988 (CompletableFuture<? extends U> other,
1989 BiFunction<? super T,? super U,? extends V> fn,
1990 Executor executor) {
1991 if (executor == null) throw new NullPointerException();
1992 return doThenBiApply(other, fn, executor);
1993 }
1994
1995 private <U,V> CompletableFuture<V> doThenBiApply
1996 (CompletableFuture<? extends U> other,
1997 BiFunction<? super T,? super U,? extends V> fn,
1998 Executor e) {
1999 if (other == null || fn == null) throw new NullPointerException();
2000 CompletableFuture<V> dst = new CompletableFuture<V>();
2001 BiApplyCompletion<T,U,V> d = null;
2002 Object r, s = null;
2003 if ((r = result) == null || (s = other.result) == null) {
2004 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
2005 CompletionNode q = null, p = new CompletionNode(d);
2006 while ((r == null && (r = result) == null) ||
2007 (s == null && (s = other.result) == null)) {
2008 if (q != null) {
2009 if (s != null ||
2010 UNSAFE.compareAndSwapObject
2011 (other, COMPLETIONS, q.next = other.completions, q))
2012 break;
2013 }
2014 else if (r != null ||
2015 UNSAFE.compareAndSwapObject
2016 (this, COMPLETIONS, p.next = completions, p)) {
2017 if (s != null)
2018 break;
2019 q = new CompletionNode(d);
2020 }
2021 }
2022 }
2023 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2024 T t; U u; Throwable ex;
2025 if (r instanceof AltResult) {
2026 ex = ((AltResult)r).ex;
2027 t = null;
2028 }
2029 else {
2030 ex = null;
2031 @SuppressWarnings("unchecked") T tr = (T) r;
2032 t = tr;
2033 }
2034 if (ex != null)
2035 u = null;
2036 else if (s instanceof AltResult) {
2037 ex = ((AltResult)s).ex;
2038 u = null;
2039 }
2040 else {
2041 @SuppressWarnings("unchecked") U us = (U) s;
2042 u = us;
2043 }
2044 V v = null;
2045 if (ex == null) {
2046 try {
2047 if (e != null)
2048 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
2049 else
2050 v = fn.apply(t, u);
2051 } catch (Throwable rex) {
2052 ex = rex;
2053 }
2054 }
2055 if (e == null || ex != null)
2056 dst.internalComplete(v, ex);
2057 }
2058 helpPostComplete();
2059 other.helpPostComplete();
2060 return dst;
2061 }
2062
2063 /**
2064 * Returns a new CompletableFuture that is completed
2065 * when both this and the other given CompletableFuture complete,
2066 * after performing the given action with the results of the two
2067 * CompletableFutures.
2068 *
2069 * <p>If this and/or the other CompletableFuture complete
2070 * exceptionally, or the supplied action throws an exception,
2071 * then the returned CompletableFuture completes exceptionally
2072 * with a CompletionException holding the exception as its cause.
2073 *
2074 * @param other the other CompletableFuture
2075 * @param block the action to perform before completing the
2076 * returned CompletableFuture
2077 * @return the new CompletableFuture
2078 */
2079 public <U> CompletableFuture<Void> thenAcceptBoth
2080 (CompletableFuture<? extends U> other,
2081 BiConsumer<? super T, ? super U> block) {
2082 return doThenBiAccept(other, block, null);
2083 }
2084
2085 /**
2086 * Returns a new CompletableFuture that is asynchronously completed
2087 * when both this and the other given CompletableFuture complete,
2088 * after performing the given action with the results of the two
2089 * CompletableFutures from a task running in the {@link
2090 * ForkJoinPool#commonPool()}.
2091 *
2092 * <p>If this and/or the other CompletableFuture complete
2093 * exceptionally, or the supplied action throws an exception,
2094 * then the returned CompletableFuture completes exceptionally
2095 * with a CompletionException holding the exception as its cause.
2096 *
2097 * @param other the other CompletableFuture
2098 * @param block the action to perform before completing the
2099 * returned CompletableFuture
2100 * @return the new CompletableFuture
2101 */
2102 public <U> CompletableFuture<Void> thenAcceptBothAsync
2103 (CompletableFuture<? extends U> other,
2104 BiConsumer<? super T, ? super U> block) {
2105 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2106 }
2107
2108 /**
2109 * Returns a new CompletableFuture that is asynchronously completed
2110 * when both this and the other given CompletableFuture complete,
2111 * after performing the given action with the results of the two
2112 * CompletableFutures from a task running in the given executor.
2113 *
2114 * <p>If this and/or the other CompletableFuture complete
2115 * exceptionally, or the supplied action throws an exception,
2116 * then the returned CompletableFuture completes exceptionally
2117 * with a CompletionException holding the exception as its cause.
2118 *
2119 * @param other the other CompletableFuture
2120 * @param block the action to perform before completing the
2121 * returned CompletableFuture
2122 * @param executor the executor to use for asynchronous execution
2123 * @return the new CompletableFuture
2124 */
2125 public <U> CompletableFuture<Void> thenAcceptBothAsync
2126 (CompletableFuture<? extends U> other,
2127 BiConsumer<? super T, ? super U> block,
2128 Executor executor) {
2129 if (executor == null) throw new NullPointerException();
2130 return doThenBiAccept(other, block, executor);
2131 }
2132
2133 private <U> CompletableFuture<Void> doThenBiAccept
2134 (CompletableFuture<? extends U> other,
2135 BiConsumer<? super T,? super U> fn,
2136 Executor e) {
2137 if (other == null || fn == null) throw new NullPointerException();
2138 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2139 BiAcceptCompletion<T,U> d = null;
2140 Object r, s = null;
2141 if ((r = result) == null || (s = other.result) == null) {
2142 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2143 CompletionNode q = null, p = new CompletionNode(d);
2144 while ((r == null && (r = result) == null) ||
2145 (s == null && (s = other.result) == null)) {
2146 if (q != null) {
2147 if (s != null ||
2148 UNSAFE.compareAndSwapObject
2149 (other, COMPLETIONS, q.next = other.completions, q))
2150 break;
2151 }
2152 else if (r != null ||
2153 UNSAFE.compareAndSwapObject
2154 (this, COMPLETIONS, p.next = completions, p)) {
2155 if (s != null)
2156 break;
2157 q = new CompletionNode(d);
2158 }
2159 }
2160 }
2161 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2162 T t; U u; Throwable ex;
2163 if (r instanceof AltResult) {
2164 ex = ((AltResult)r).ex;
2165 t = null;
2166 }
2167 else {
2168 ex = null;
2169 @SuppressWarnings("unchecked") T tr = (T) r;
2170 t = tr;
2171 }
2172 if (ex != null)
2173 u = null;
2174 else if (s instanceof AltResult) {
2175 ex = ((AltResult)s).ex;
2176 u = null;
2177 }
2178 else {
2179 @SuppressWarnings("unchecked") U us = (U) s;
2180 u = us;
2181 }
2182 if (ex == null) {
2183 try {
2184 if (e != null)
2185 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2186 else
2187 fn.accept(t, u);
2188 } catch (Throwable rex) {
2189 ex = rex;
2190 }
2191 }
2192 if (e == null || ex != null)
2193 dst.internalComplete(null, ex);
2194 }
2195 helpPostComplete();
2196 other.helpPostComplete();
2197 return dst;
2198 }
2199
2200 /**
2201 * Returns a new CompletableFuture that is completed
2202 * when both this and the other given CompletableFuture complete,
2203 * after performing the given action.
2204 *
2205 * <p>If this and/or the other CompletableFuture complete
2206 * exceptionally, or the supplied action throws an exception,
2207 * then the returned CompletableFuture completes exceptionally
2208 * with a CompletionException holding the exception as its cause.
2209 *
2210 * @param other the other CompletableFuture
2211 * @param action the action to perform before completing the
2212 * returned CompletableFuture
2213 * @return the new CompletableFuture
2214 */
2215 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2216 Runnable action) {
2217 return doThenBiRun(other, action, null);
2218 }
2219
2220 /**
2221 * Returns a new CompletableFuture that is asynchronously completed
2222 * when both this and the other given CompletableFuture complete,
2223 * after performing the given action from a task running in the
2224 * {@link ForkJoinPool#commonPool()}.
2225 *
2226 * <p>If this and/or the other CompletableFuture complete
2227 * exceptionally, or the supplied action throws an exception,
2228 * then the returned CompletableFuture completes exceptionally
2229 * with a CompletionException holding the exception as its cause.
2230 *
2231 * @param other the other CompletableFuture
2232 * @param action the action to perform before completing the
2233 * returned CompletableFuture
2234 * @return the new CompletableFuture
2235 */
2236 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2237 Runnable action) {
2238 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2239 }
2240
2241 /**
2242 * Returns a new CompletableFuture that is asynchronously completed
2243 * when both this and the other given CompletableFuture complete,
2244 * after performing the given action from a task running in the
2245 * given executor.
2246 *
2247 * <p>If this and/or the other CompletableFuture complete
2248 * exceptionally, or the supplied action throws an exception,
2249 * then the returned CompletableFuture completes exceptionally
2250 * with a CompletionException holding the exception as its cause.
2251 *
2252 * @param other the other CompletableFuture
2253 * @param action the action to perform before completing the
2254 * returned CompletableFuture
2255 * @param executor the executor to use for asynchronous execution
2256 * @return the new CompletableFuture
2257 */
2258 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2259 Runnable action,
2260 Executor executor) {
2261 if (executor == null) throw new NullPointerException();
2262 return doThenBiRun(other, action, executor);
2263 }
2264
2265 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2266 Runnable action,
2267 Executor e) {
2268 if (other == null || action == null) throw new NullPointerException();
2269 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2270 BiRunCompletion<T> d = null;
2271 Object r, s = null;
2272 if ((r = result) == null || (s = other.result) == null) {
2273 d = new BiRunCompletion<T>(this, other, action, dst, e);
2274 CompletionNode q = null, p = new CompletionNode(d);
2275 while ((r == null && (r = result) == null) ||
2276 (s == null && (s = other.result) == null)) {
2277 if (q != null) {
2278 if (s != null ||
2279 UNSAFE.compareAndSwapObject
2280 (other, COMPLETIONS, q.next = other.completions, q))
2281 break;
2282 }
2283 else if (r != null ||
2284 UNSAFE.compareAndSwapObject
2285 (this, COMPLETIONS, p.next = completions, p)) {
2286 if (s != null)
2287 break;
2288 q = new CompletionNode(d);
2289 }
2290 }
2291 }
2292 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2293 Throwable ex;
2294 if (r instanceof AltResult)
2295 ex = ((AltResult)r).ex;
2296 else
2297 ex = null;
2298 if (ex == null && (s instanceof AltResult))
2299 ex = ((AltResult)s).ex;
2300 if (ex == null) {
2301 try {
2302 if (e != null)
2303 e.execute(new AsyncRun(action, dst));
2304 else
2305 action.run();
2306 } catch (Throwable rex) {
2307 ex = rex;
2308 }
2309 }
2310 if (e == null || ex != null)
2311 dst.internalComplete(null, ex);
2312 }
2313 helpPostComplete();
2314 other.helpPostComplete();
2315 return dst;
2316 }
2317
2318 /**
2319 * Returns a new CompletableFuture that is completed
2320 * when either this or the other given CompletableFuture completes,
2321 * with the result of the given function of either this or the other
2322 * CompletableFuture's result.
2323 *
2324 * <p>If this and/or the other CompletableFuture complete
2325 * exceptionally, then the returned CompletableFuture may also do so,
2326 * with a CompletionException holding one of these exceptions as its
2327 * cause. No guarantees are made about which result or exception is
2328 * used in the returned CompletableFuture. If the supplied function
2329 * throws an exception, then the returned CompletableFuture completes
2330 * exceptionally with a CompletionException holding the exception as
2331 * its cause.
2332 *
2333 * @param other the other CompletableFuture
2334 * @param fn the function to use to compute the value of
2335 * the returned CompletableFuture
2336 * @return the new CompletableFuture
2337 */
2338 public <U> CompletableFuture<U> applyToEither
2339 (CompletableFuture<? extends T> other,
2340 Function<? super T, U> fn) {
2341 return doOrApply(other, fn, null);
2342 }
2343
2344 /**
2345 * Returns a new CompletableFuture that is asynchronously completed
2346 * when either this or the other given CompletableFuture completes,
2347 * with the result of the given function of either this or the other
2348 * CompletableFuture's result from a task running in the
2349 * {@link ForkJoinPool#commonPool()}.
2350 *
2351 * <p>If this and/or the other CompletableFuture complete
2352 * exceptionally, then the returned CompletableFuture may also do so,
2353 * with a CompletionException holding one of these exceptions as its
2354 * cause. No guarantees are made about which result or exception is
2355 * used in the returned CompletableFuture. If the supplied function
2356 * throws an exception, then the returned CompletableFuture completes
2357 * exceptionally with a CompletionException holding the exception as
2358 * its cause.
2359 *
2360 * @param other the other CompletableFuture
2361 * @param fn the function to use to compute the value of
2362 * the returned CompletableFuture
2363 * @return the new CompletableFuture
2364 */
2365 public <U> CompletableFuture<U> applyToEitherAsync
2366 (CompletableFuture<? extends T> other,
2367 Function<? super T, U> fn) {
2368 return doOrApply(other, fn, ForkJoinPool.commonPool());
2369 }
2370
2371 /**
2372 * Returns a new CompletableFuture that is asynchronously completed
2373 * when either this or the other given CompletableFuture completes,
2374 * with the result of the given function of either this or the other
2375 * CompletableFuture's result from a task running in the
2376 * given executor.
2377 *
2378 * <p>If this and/or the other CompletableFuture complete
2379 * exceptionally, then the returned CompletableFuture may also do so,
2380 * with a CompletionException holding one of these exceptions as its
2381 * cause. No guarantees are made about which result or exception is
2382 * used in the returned CompletableFuture. If the supplied function
2383 * throws an exception, then the returned CompletableFuture completes
2384 * exceptionally with a CompletionException holding the exception as
2385 * its cause.
2386 *
2387 * @param other the other CompletableFuture
2388 * @param fn the function to use to compute the value of
2389 * the returned CompletableFuture
2390 * @param executor the executor to use for asynchronous execution
2391 * @return the new CompletableFuture
2392 */
2393 public <U> CompletableFuture<U> applyToEitherAsync
2394 (CompletableFuture<? extends T> other,
2395 Function<? super T, U> fn,
2396 Executor executor) {
2397 if (executor == null) throw new NullPointerException();
2398 return doOrApply(other, fn, executor);
2399 }
2400
2401 private <U> CompletableFuture<U> doOrApply
2402 (CompletableFuture<? extends T> other,
2403 Function<? super T, U> fn,
2404 Executor e) {
2405 if (other == null || fn == null) throw new NullPointerException();
2406 CompletableFuture<U> dst = new CompletableFuture<U>();
2407 OrApplyCompletion<T,U> d = null;
2408 Object r;
2409 if ((r = result) == null && (r = other.result) == null) {
2410 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2411 CompletionNode q = null, p = new CompletionNode(d);
2412 while ((r = result) == null && (r = other.result) == null) {
2413 if (q != null) {
2414 if (UNSAFE.compareAndSwapObject
2415 (other, COMPLETIONS, q.next = other.completions, q))
2416 break;
2417 }
2418 else if (UNSAFE.compareAndSwapObject
2419 (this, COMPLETIONS, p.next = completions, p))
2420 q = new CompletionNode(d);
2421 }
2422 }
2423 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2424 T t; Throwable ex;
2425 if (r instanceof AltResult) {
2426 ex = ((AltResult)r).ex;
2427 t = null;
2428 }
2429 else {
2430 ex = null;
2431 @SuppressWarnings("unchecked") T tr = (T) r;
2432 t = tr;
2433 }
2434 U u = null;
2435 if (ex == null) {
2436 try {
2437 if (e != null)
2438 e.execute(new AsyncApply<T,U>(t, fn, dst));
2439 else
2440 u = fn.apply(t);
2441 } catch (Throwable rex) {
2442 ex = rex;
2443 }
2444 }
2445 if (e == null || ex != null)
2446 dst.internalComplete(u, ex);
2447 }
2448 helpPostComplete();
2449 other.helpPostComplete();
2450 return dst;
2451 }
2452
2453 /**
2454 * Returns a new CompletableFuture that is completed
2455 * when either this or the other given CompletableFuture completes,
2456 * after performing the given action with the result of either this
2457 * or the other CompletableFuture's result.
2458 *
2459 * <p>If this and/or the other CompletableFuture complete
2460 * exceptionally, then the returned CompletableFuture may also do so,
2461 * with a CompletionException holding one of these exceptions as its
2462 * cause. No guarantees are made about which result or exception is
2463 * used in the returned CompletableFuture. If the supplied action
2464 * throws an exception, then the returned CompletableFuture completes
2465 * exceptionally with a CompletionException holding the exception as
2466 * its cause.
2467 *
2468 * @param other the other CompletableFuture
2469 * @param block the action to perform before completing the
2470 * returned CompletableFuture
2471 * @return the new CompletableFuture
2472 */
2473 public CompletableFuture<Void> acceptEither
2474 (CompletableFuture<? extends T> other,
2475 Consumer<? super T> block) {
2476 return doOrAccept(other, block, null);
2477 }
2478
2479 /**
2480 * Returns a new CompletableFuture that is asynchronously completed
2481 * when either this or the other given CompletableFuture completes,
2482 * after performing the given action with the result of either this
2483 * or the other CompletableFuture's result from a task running in
2484 * the {@link ForkJoinPool#commonPool()}.
2485 *
2486 * <p>If this and/or the other CompletableFuture complete
2487 * exceptionally, then the returned CompletableFuture may also do so,
2488 * with a CompletionException holding one of these exceptions as its
2489 * cause. No guarantees are made about which result or exception is
2490 * used in the returned CompletableFuture. If the supplied action
2491 * throws an exception, then the returned CompletableFuture completes
2492 * exceptionally with a CompletionException holding the exception as
2493 * its cause.
2494 *
2495 * @param other the other CompletableFuture
2496 * @param block the action to perform before completing the
2497 * returned CompletableFuture
2498 * @return the new CompletableFuture
2499 */
2500 public CompletableFuture<Void> acceptEitherAsync
2501 (CompletableFuture<? extends T> other,
2502 Consumer<? super T> block) {
2503 return doOrAccept(other, block, ForkJoinPool.commonPool());
2504 }
2505
2506 /**
2507 * Returns a new CompletableFuture that is asynchronously completed
2508 * when either this or the other given CompletableFuture completes,
2509 * after performing the given action with the result of either this
2510 * or the other CompletableFuture's result from a task running in
2511 * the given executor.
2512 *
2513 * <p>If this and/or the other CompletableFuture complete
2514 * exceptionally, then the returned CompletableFuture may also do so,
2515 * with a CompletionException holding one of these exceptions as its
2516 * cause. No guarantees are made about which result or exception is
2517 * used in the returned CompletableFuture. If the supplied action
2518 * throws an exception, then the returned CompletableFuture completes
2519 * exceptionally with a CompletionException holding the exception as
2520 * its cause.
2521 *
2522 * @param other the other CompletableFuture
2523 * @param block the action to perform before completing the
2524 * returned CompletableFuture
2525 * @param executor the executor to use for asynchronous execution
2526 * @return the new CompletableFuture
2527 */
2528 public CompletableFuture<Void> acceptEitherAsync
2529 (CompletableFuture<? extends T> other,
2530 Consumer<? super T> block,
2531 Executor executor) {
2532 if (executor == null) throw new NullPointerException();
2533 return doOrAccept(other, block, executor);
2534 }
2535
2536 private CompletableFuture<Void> doOrAccept
2537 (CompletableFuture<? extends T> other,
2538 Consumer<? super T> fn,
2539 Executor e) {
2540 if (other == null || fn == null) throw new NullPointerException();
2541 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2542 OrAcceptCompletion<T> d = null;
2543 Object r;
2544 if ((r = result) == null && (r = other.result) == null) {
2545 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2546 CompletionNode q = null, p = new CompletionNode(d);
2547 while ((r = result) == null && (r = other.result) == null) {
2548 if (q != null) {
2549 if (UNSAFE.compareAndSwapObject
2550 (other, COMPLETIONS, q.next = other.completions, q))
2551 break;
2552 }
2553 else if (UNSAFE.compareAndSwapObject
2554 (this, COMPLETIONS, p.next = completions, p))
2555 q = new CompletionNode(d);
2556 }
2557 }
2558 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2559 T t; Throwable ex;
2560 if (r instanceof AltResult) {
2561 ex = ((AltResult)r).ex;
2562 t = null;
2563 }
2564 else {
2565 ex = null;
2566 @SuppressWarnings("unchecked") T tr = (T) r;
2567 t = tr;
2568 }
2569 if (ex == null) {
2570 try {
2571 if (e != null)
2572 e.execute(new AsyncAccept<T>(t, fn, dst));
2573 else
2574 fn.accept(t);
2575 } catch (Throwable rex) {
2576 ex = rex;
2577 }
2578 }
2579 if (e == null || ex != null)
2580 dst.internalComplete(null, ex);
2581 }
2582 helpPostComplete();
2583 other.helpPostComplete();
2584 return dst;
2585 }
2586
2587 /**
2588 * Returns a new CompletableFuture that is completed
2589 * when either this or the other given CompletableFuture completes,
2590 * after performing the given action.
2591 *
2592 * <p>If this and/or the other CompletableFuture complete
2593 * exceptionally, then the returned CompletableFuture may also do so,
2594 * with a CompletionException holding one of these exceptions as its
2595 * cause. No guarantees are made about which result or exception is
2596 * used in the returned CompletableFuture. If the supplied action
2597 * throws an exception, then the returned CompletableFuture completes
2598 * exceptionally with a CompletionException holding the exception as
2599 * its cause.
2600 *
2601 * @param other the other CompletableFuture
2602 * @param action the action to perform before completing the
2603 * returned CompletableFuture
2604 * @return the new CompletableFuture
2605 */
2606 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2607 Runnable action) {
2608 return doOrRun(other, action, null);
2609 }
2610
2611 /**
2612 * Returns a new CompletableFuture that is asynchronously completed
2613 * when either this or the other given CompletableFuture completes,
2614 * after performing the given action from a task running in the
2615 * {@link ForkJoinPool#commonPool()}.
2616 *
2617 * <p>If this and/or the other CompletableFuture complete
2618 * exceptionally, then the returned CompletableFuture may also do so,
2619 * with a CompletionException holding one of these exceptions as its
2620 * cause. No guarantees are made about which result or exception is
2621 * used in the returned CompletableFuture. If the supplied action
2622 * throws an exception, then the returned CompletableFuture completes
2623 * exceptionally with a CompletionException holding the exception as
2624 * its cause.
2625 *
2626 * @param other the other CompletableFuture
2627 * @param action the action to perform before completing the
2628 * returned CompletableFuture
2629 * @return the new CompletableFuture
2630 */
2631 public CompletableFuture<Void> runAfterEitherAsync
2632 (CompletableFuture<?> other,
2633 Runnable action) {
2634 return doOrRun(other, action, ForkJoinPool.commonPool());
2635 }
2636
2637 /**
2638 * Returns a new CompletableFuture that is asynchronously completed
2639 * when either this or the other given CompletableFuture completes,
2640 * after performing the given action from a task running in the
2641 * given executor.
2642 *
2643 * <p>If this and/or the other CompletableFuture complete
2644 * exceptionally, then the returned CompletableFuture may also do so,
2645 * with a CompletionException holding one of these exceptions as its
2646 * cause. No guarantees are made about which result or exception is
2647 * used in the returned CompletableFuture. If the supplied action
2648 * throws an exception, then the returned CompletableFuture completes
2649 * exceptionally with a CompletionException holding the exception as
2650 * its cause.
2651 *
2652 * @param other the other CompletableFuture
2653 * @param action the action to perform before completing the
2654 * returned CompletableFuture
2655 * @param executor the executor to use for asynchronous execution
2656 * @return the new CompletableFuture
2657 */
2658 public CompletableFuture<Void> runAfterEitherAsync
2659 (CompletableFuture<?> other,
2660 Runnable action,
2661 Executor executor) {
2662 if (executor == null) throw new NullPointerException();
2663 return doOrRun(other, action, executor);
2664 }
2665
2666 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2667 Runnable action,
2668 Executor e) {
2669 if (other == null || action == null) throw new NullPointerException();
2670 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2671 OrRunCompletion<T> d = null;
2672 Object r;
2673 if ((r = result) == null && (r = other.result) == null) {
2674 d = new OrRunCompletion<T>(this, other, action, dst, e);
2675 CompletionNode q = null, p = new CompletionNode(d);
2676 while ((r = result) == null && (r = other.result) == null) {
2677 if (q != null) {
2678 if (UNSAFE.compareAndSwapObject
2679 (other, COMPLETIONS, q.next = other.completions, q))
2680 break;
2681 }
2682 else if (UNSAFE.compareAndSwapObject
2683 (this, COMPLETIONS, p.next = completions, p))
2684 q = new CompletionNode(d);
2685 }
2686 }
2687 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2688 Throwable ex;
2689 if (r instanceof AltResult)
2690 ex = ((AltResult)r).ex;
2691 else
2692 ex = null;
2693 if (ex == null) {
2694 try {
2695 if (e != null)
2696 e.execute(new AsyncRun(action, dst));
2697 else
2698 action.run();
2699 } catch (Throwable rex) {
2700 ex = rex;
2701 }
2702 }
2703 if (e == null || ex != null)
2704 dst.internalComplete(null, ex);
2705 }
2706 helpPostComplete();
2707 other.helpPostComplete();
2708 return dst;
2709 }
2710
2711 /**
2712 * Returns a CompletableFuture that upon completion, has the same
2713 * value as produced by the given function of the result of this
2714 * CompletableFuture.
2715 *
2716 * <p>If this CompletableFuture completes exceptionally, then the
2717 * returned CompletableFuture also does so, with a
2718 * CompletionException holding this exception as its cause.
2719 * Similarly, if the computed CompletableFuture completes
2720 * exceptionally, then so does the returned CompletableFuture.
2721 *
2722 * @param fn the function returning a new CompletableFuture
2723 * @return the CompletableFuture
2724 */
2725 public <U> CompletableFuture<U> thenCompose
2726 (Function<? super T, CompletableFuture<U>> fn) {
2727 return doCompose(fn, null);
2728 }
2729
2730 /**
2731 * Returns a CompletableFuture that upon completion, has the same
2732 * value as that produced asynchronously using the {@link
2733 * ForkJoinPool#commonPool()} by the given function of the result
2734 * of this CompletableFuture.
2735 *
2736 * <p>If this CompletableFuture completes exceptionally, then the
2737 * returned CompletableFuture also does so, with a
2738 * CompletionException holding this exception as its cause.
2739 * Similarly, if the computed CompletableFuture completes
2740 * exceptionally, then so does the returned CompletableFuture.
2741 *
2742 * @param fn the function returning a new CompletableFuture
2743 * @return the CompletableFuture
2744 */
2745 public <U> CompletableFuture<U> thenComposeAsync
2746 (Function<? super T, CompletableFuture<U>> fn) {
2747 return doCompose(fn, ForkJoinPool.commonPool());
2748 }
2749
2750 /**
2751 * Returns a CompletableFuture that upon completion, has the same
2752 * value as that produced asynchronously using the given executor
2753 * by the given function of this CompletableFuture.
2754 *
2755 * <p>If this CompletableFuture completes exceptionally, then the
2756 * returned CompletableFuture also does so, with a
2757 * CompletionException holding this exception as its cause.
2758 * Similarly, if the computed CompletableFuture completes
2759 * exceptionally, then so does the returned CompletableFuture.
2760 *
2761 * @param fn the function returning a new CompletableFuture
2762 * @param executor the executor to use for asynchronous execution
2763 * @return the CompletableFuture
2764 */
2765 public <U> CompletableFuture<U> thenComposeAsync
2766 (Function<? super T, CompletableFuture<U>> fn,
2767 Executor executor) {
2768 if (executor == null) throw new NullPointerException();
2769 return doCompose(fn, executor);
2770 }
2771
2772 private <U> CompletableFuture<U> doCompose
2773 (Function<? super T, CompletableFuture<U>> fn,
2774 Executor e) {
2775 if (fn == null) throw new NullPointerException();
2776 CompletableFuture<U> dst = null;
2777 ComposeCompletion<T,U> d = null;
2778 Object r;
2779 if ((r = result) == null) {
2780 dst = new CompletableFuture<U>();
2781 CompletionNode p = new CompletionNode
2782 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2783 while ((r = result) == null) {
2784 if (UNSAFE.compareAndSwapObject
2785 (this, COMPLETIONS, p.next = completions, p))
2786 break;
2787 }
2788 }
2789 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2790 T t; Throwable ex;
2791 if (r instanceof AltResult) {
2792 ex = ((AltResult)r).ex;
2793 t = null;
2794 }
2795 else {
2796 ex = null;
2797 @SuppressWarnings("unchecked") T tr = (T) r;
2798 t = tr;
2799 }
2800 if (ex == null) {
2801 if (e != null) {
2802 if (dst == null)
2803 dst = new CompletableFuture<U>();
2804 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2805 }
2806 else {
2807 try {
2808 if ((dst = fn.apply(t)) == null)
2809 ex = new NullPointerException();
2810 } catch (Throwable rex) {
2811 ex = rex;
2812 }
2813 if (dst == null)
2814 dst = new CompletableFuture<U>();
2815 }
2816 }
2817 if (e == null && ex != null)
2818 dst.internalComplete(null, ex);
2819 }
2820 helpPostComplete();
2821 dst.helpPostComplete();
2822 return dst;
2823 }
2824
2825 /**
2826 * Returns a new CompletableFuture that is completed when this
2827 * CompletableFuture completes, with the result of the given
2828 * function of the exception triggering this CompletableFuture's
2829 * completion when it completes exceptionally; otherwise, if this
2830 * CompletableFuture completes normally, then the returned
2831 * CompletableFuture also completes normally with the same value.
2832 *
2833 * @param fn the function to use to compute the value of the
2834 * returned CompletableFuture if this CompletableFuture completed
2835 * exceptionally
2836 * @return the new CompletableFuture
2837 */
2838 public CompletableFuture<T> exceptionally
2839 (Function<Throwable, ? extends T> fn) {
2840 if (fn == null) throw new NullPointerException();
2841 CompletableFuture<T> dst = new CompletableFuture<T>();
2842 ExceptionCompletion<T> d = null;
2843 Object r;
2844 if ((r = result) == null) {
2845 CompletionNode p =
2846 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2847 while ((r = result) == null) {
2848 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2849 p.next = completions, p))
2850 break;
2851 }
2852 }
2853 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2854 T t = null; Throwable ex, dx = null;
2855 if (r instanceof AltResult) {
2856 if ((ex = ((AltResult)r).ex) != null) {
2857 try {
2858 t = fn.apply(ex);
2859 } catch (Throwable rex) {
2860 dx = rex;
2861 }
2862 }
2863 }
2864 else {
2865 @SuppressWarnings("unchecked") T tr = (T) r;
2866 t = tr;
2867 }
2868 dst.internalComplete(t, dx);
2869 }
2870 helpPostComplete();
2871 return dst;
2872 }
2873
2874 /**
2875 * Returns a new CompletableFuture that is completed when this
2876 * CompletableFuture completes, with the result of the given
2877 * function of the result and exception of this CompletableFuture's
2878 * completion. The given function is invoked with the result (or
2879 * {@code null} if none) and the exception (or {@code null} if none)
2880 * of this CompletableFuture when complete.
2881 *
2882 * @param fn the function to use to compute the value of the
2883 * returned CompletableFuture
2884 * @return the new CompletableFuture
2885 */
2886 public <U> CompletableFuture<U> handle
2887 (BiFunction<? super T, Throwable, ? extends U> fn) {
2888 if (fn == null) throw new NullPointerException();
2889 CompletableFuture<U> dst = new CompletableFuture<U>();
2890 HandleCompletion<T,U> d = null;
2891 Object r;
2892 if ((r = result) == null) {
2893 CompletionNode p =
2894 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2895 while ((r = result) == null) {
2896 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2897 p.next = completions, p))
2898 break;
2899 }
2900 }
2901 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2902 T t; Throwable ex;
2903 if (r instanceof AltResult) {
2904 ex = ((AltResult)r).ex;
2905 t = null;
2906 }
2907 else {
2908 ex = null;
2909 @SuppressWarnings("unchecked") T tr = (T) r;
2910 t = tr;
2911 }
2912 U u; Throwable dx;
2913 try {
2914 u = fn.apply(t, ex);
2915 dx = null;
2916 } catch (Throwable rex) {
2917 dx = rex;
2918 u = null;
2919 }
2920 dst.internalComplete(u, dx);
2921 }
2922 helpPostComplete();
2923 return dst;
2924 }
2925
2926
2927 /* ------------- Arbitrary-arity constructions -------------- */
2928
2929 /*
2930 * The basic plan of attack is to recursively form binary
2931 * completion trees of elements. This can be overkill for small
2932 * sets, but scales nicely. The And/All vs Or/Any forms use the
2933 * same idea, but details differ.
2934 */
2935
2936 /**
2937 * Returns a new CompletableFuture that is completed when all of
2938 * the given CompletableFutures complete. If any of the given
2939 * CompletableFutures complete exceptionally, then the returned
2940 * CompletableFuture also does so, with a CompletionException
2941 * holding this exception as its cause. Otherwise, the results,
2942 * if any, of the given CompletableFutures are not reflected in
2943 * the returned CompletableFuture, but may be obtained by
2944 * inspecting them individually. If no CompletableFutures are
2945 * provided, returns a CompletableFuture completed with the value
2946 * {@code null}.
2947 *
2948 * <p>Among the applications of this method is to await completion
2949 * of a set of independent CompletableFutures before continuing a
2950 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2951 * c3).join();}.
2952 *
2953 * @param cfs the CompletableFutures
2954 * @return a new CompletableFuture that is completed when all of the
2955 * given CompletableFutures complete
2956 * @throws NullPointerException if the array or any of its elements are
2957 * {@code null}
2958 */
2959 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2960 int len = cfs.length; // Directly handle empty and singleton cases
2961 if (len > 1)
2962 return allTree(cfs, 0, len - 1);
2963 else {
2964 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2965 CompletableFuture<?> f;
2966 if (len == 0)
2967 dst.result = NIL;
2968 else if ((f = cfs[0]) == null)
2969 throw new NullPointerException();
2970 else {
2971 ThenPropagate d = null;
2972 CompletionNode p = null;
2973 Object r;
2974 while ((r = f.result) == null) {
2975 if (d == null)
2976 d = new ThenPropagate(f, dst);
2977 else if (p == null)
2978 p = new CompletionNode(d);
2979 else if (UNSAFE.compareAndSwapObject
2980 (f, COMPLETIONS, p.next = f.completions, p))
2981 break;
2982 }
2983 if (r != null && (d == null || d.compareAndSet(0, 1)))
2984 dst.internalComplete(null, (r instanceof AltResult) ?
2985 ((AltResult)r).ex : null);
2986 f.helpPostComplete();
2987 }
2988 return dst;
2989 }
2990 }
2991
2992 /**
2993 * Recursively constructs an And'ed tree of CompletableFutures.
2994 * Called only when array known to have at least two elements.
2995 */
2996 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2997 int lo, int hi) {
2998 CompletableFuture<?> fst, snd;
2999 int mid = (lo + hi) >>> 1;
3000 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
3001 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
3002 throw new NullPointerException();
3003 CompletableFuture<Void> dst = new CompletableFuture<Void>();
3004 AndCompletion d = null;
3005 CompletionNode p = null, q = null;
3006 Object r = null, s = null;
3007 while ((r = fst.result) == null || (s = snd.result) == null) {
3008 if (d == null)
3009 d = new AndCompletion(fst, snd, dst);
3010 else if (p == null)
3011 p = new CompletionNode(d);
3012 else if (q == null) {
3013 if (UNSAFE.compareAndSwapObject
3014 (fst, COMPLETIONS, p.next = fst.completions, p))
3015 q = new CompletionNode(d);
3016 }
3017 else if (UNSAFE.compareAndSwapObject
3018 (snd, COMPLETIONS, q.next = snd.completions, q))
3019 break;
3020 }
3021 if ((r != null || (r = fst.result) != null) &&
3022 (s != null || (s = snd.result) != null) &&
3023 (d == null || d.compareAndSet(0, 1))) {
3024 Throwable ex;
3025 if (r instanceof AltResult)
3026 ex = ((AltResult)r).ex;
3027 else
3028 ex = null;
3029 if (ex == null && (s instanceof AltResult))
3030 ex = ((AltResult)s).ex;
3031 dst.internalComplete(null, ex);
3032 }
3033 fst.helpPostComplete();
3034 snd.helpPostComplete();
3035 return dst;
3036 }
3037
3038 /**
3039 * Returns a new CompletableFuture that is completed when any of
3040 * the given CompletableFutures complete, with the same result.
3041 * Otherwise, if it completed exceptionally, the returned
3042 * CompletableFuture also does so, with a CompletionException
3043 * holding this exception as its cause. If no CompletableFutures
3044 * are provided, returns an incomplete CompletableFuture.
3045 *
3046 * @param cfs the CompletableFutures
3047 * @return a new CompletableFuture that is completed with the
3048 * result or exception of any of the given CompletableFutures when
3049 * one completes
3050 * @throws NullPointerException if the array or any of its elements are
3051 * {@code null}
3052 */
3053 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
3054 int len = cfs.length; // Same idea as allOf
3055 if (len > 1)
3056 return anyTree(cfs, 0, len - 1);
3057 else {
3058 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3059 CompletableFuture<?> f;
3060 if (len == 0)
3061 ; // skip
3062 else if ((f = cfs[0]) == null)
3063 throw new NullPointerException();
3064 else {
3065 ThenCopy<Object> d = null;
3066 CompletionNode p = null;
3067 Object r;
3068 while ((r = f.result) == null) {
3069 if (d == null)
3070 d = new ThenCopy<Object>(f, dst);
3071 else if (p == null)
3072 p = new CompletionNode(d);
3073 else if (UNSAFE.compareAndSwapObject
3074 (f, COMPLETIONS, p.next = f.completions, p))
3075 break;
3076 }
3077 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3078 Throwable ex; Object t;
3079 if (r instanceof AltResult) {
3080 ex = ((AltResult)r).ex;
3081 t = null;
3082 }
3083 else {
3084 ex = null;
3085 t = r;
3086 }
3087 dst.internalComplete(t, ex);
3088 }
3089 f.helpPostComplete();
3090 }
3091 return dst;
3092 }
3093 }
3094
3095 /**
3096 * Recursively constructs an Or'ed tree of CompletableFutures.
3097 */
3098 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
3099 int lo, int hi) {
3100 CompletableFuture<?> fst, snd;
3101 int mid = (lo + hi) >>> 1;
3102 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3103 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3104 throw new NullPointerException();
3105 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3106 OrCompletion d = null;
3107 CompletionNode p = null, q = null;
3108 Object r;
3109 while ((r = fst.result) == null && (r = snd.result) == null) {
3110 if (d == null)
3111 d = new OrCompletion(fst, snd, dst);
3112 else if (p == null)
3113 p = new CompletionNode(d);
3114 else if (q == null) {
3115 if (UNSAFE.compareAndSwapObject
3116 (fst, COMPLETIONS, p.next = fst.completions, p))
3117 q = new CompletionNode(d);
3118 }
3119 else if (UNSAFE.compareAndSwapObject
3120 (snd, COMPLETIONS, q.next = snd.completions, q))
3121 break;
3122 }
3123 if ((r != null || (r = fst.result) != null ||
3124 (r = snd.result) != null) &&
3125 (d == null || d.compareAndSet(0, 1))) {
3126 Throwable ex; Object t;
3127 if (r instanceof AltResult) {
3128 ex = ((AltResult)r).ex;
3129 t = null;
3130 }
3131 else {
3132 ex = null;
3133 t = r;
3134 }
3135 dst.internalComplete(t, ex);
3136 }
3137 fst.helpPostComplete();
3138 snd.helpPostComplete();
3139 return dst;
3140 }
3141
3142 /* ------------- Control and status methods -------------- */
3143
3144 /**
3145 * If not already completed, completes this CompletableFuture with
3146 * a {@link CancellationException}. Dependent CompletableFutures
3147 * that have not already completed will also complete
3148 * exceptionally, with a {@link CompletionException} caused by
3149 * this {@code CancellationException}.
3150 *
3151 * @param mayInterruptIfRunning this value has no effect in this
3152 * implementation because interrupts are not used to control
3153 * processing.
3154 *
3155 * @return {@code true} if this task is now cancelled
3156 */
3157 public boolean cancel(boolean mayInterruptIfRunning) {
3158 boolean cancelled = (result == null) &&
3159 UNSAFE.compareAndSwapObject
3160 (this, RESULT, null, new AltResult(new CancellationException()));
3161 postComplete();
3162 return cancelled || isCancelled();
3163 }
3164
3165 /**
3166 * Returns {@code true} if this CompletableFuture was cancelled
3167 * before it completed normally.
3168 *
3169 * @return {@code true} if this CompletableFuture was cancelled
3170 * before it completed normally
3171 */
3172 public boolean isCancelled() {
3173 Object r;
3174 return ((r = result) instanceof AltResult) &&
3175 (((AltResult)r).ex instanceof CancellationException);
3176 }
3177
3178 /**
3179 * Forcibly sets or resets the value subsequently returned by
3180 * method {@link #get()} and related methods, whether or not
3181 * already completed. This method is designed for use only in
3182 * error recovery actions, and even in such situations may result
3183 * in ongoing dependent completions using established versus
3184 * overwritten outcomes.
3185 *
3186 * @param value the completion value
3187 */
3188 public void obtrudeValue(T value) {
3189 result = (value == null) ? NIL : value;
3190 postComplete();
3191 }
3192
3193 /**
3194 * Forcibly causes subsequent invocations of method {@link #get()}
3195 * and related methods to throw the given exception, whether or
3196 * not already completed. This method is designed for use only in
3197 * recovery actions, and even in such situations may result in
3198 * ongoing dependent completions using established versus
3199 * overwritten outcomes.
3200 *
3201 * @param ex the exception
3202 */
3203 public void obtrudeException(Throwable ex) {
3204 if (ex == null) throw new NullPointerException();
3205 result = new AltResult(ex);
3206 postComplete();
3207 }
3208
3209 /**
3210 * Returns the estimated number of CompletableFutures whose
3211 * completions are awaiting completion of this CompletableFuture.
3212 * This method is designed for use in monitoring system state, not
3213 * for synchronization control.
3214 *
3215 * @return the number of dependent CompletableFutures
3216 */
3217 public int getNumberOfDependents() {
3218 int count = 0;
3219 for (CompletionNode p = completions; p != null; p = p.next)
3220 ++count;
3221 return count;
3222 }
3223
3224 /**
3225 * Returns a string identifying this CompletableFuture, as well as
3226 * its completion state. The state, in brackets, contains the
3227 * String {@code "Completed Normally"} or the String {@code
3228 * "Completed Exceptionally"}, or the String {@code "Not
3229 * completed"} followed by the number of CompletableFutures
3230 * dependent upon its completion, if any.
3231 *
3232 * @return a string identifying this CompletableFuture, as well as its state
3233 */
3234 public String toString() {
3235 Object r = result;
3236 int count;
3237 return super.toString() +
3238 ((r == null) ?
3239 (((count = getNumberOfDependents()) == 0) ?
3240 "[Not completed]" :
3241 "[Not completed, " + count + " dependents]") :
3242 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3243 "[Completed exceptionally]" :
3244 "[Completed normally]"));
3245 }
3246
3247 // Unsafe mechanics
3248 private static final sun.misc.Unsafe UNSAFE;
3249 private static final long RESULT;
3250 private static final long WAITERS;
3251 private static final long COMPLETIONS;
3252 static {
3253 try {
3254 UNSAFE = sun.misc.Unsafe.getUnsafe();
3255 Class<?> k = CompletableFuture.class;
3256 RESULT = UNSAFE.objectFieldOffset
3257 (k.getDeclaredField("result"));
3258 WAITERS = UNSAFE.objectFieldOffset
3259 (k.getDeclaredField("waiters"));
3260 COMPLETIONS = UNSAFE.objectFieldOffset
3261 (k.getDeclaredField("completions"));
3262 } catch (Exception e) {
3263 throw new Error(e);
3264 }
3265 }
3266 }