ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CompletableFuture.java
Revision: 1.22
Committed: Sun Jan 18 20:17:33 2015 UTC (9 years, 3 months ago) by jsr166
Branch: MAIN
CVS Tags: HEAD
Changes since 1.21: +1 -0 lines
Log Message:
exactly one blank line before and after package statements

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8
9 import java.util.concurrent.Future;
10 import java.util.concurrent.FutureTask;
11 import java.util.concurrent.TimeUnit;
12 import java.util.concurrent.Executor;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.TimeoutException;
15 import java.util.concurrent.CancellationException;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.concurrent.locks.LockSupport;
18
19 /**
20 * A {@link Future} that may be explicitly completed (setting its
21 * value and status), and may include dependent functions and actions
22 * that trigger upon its completion.
23 *
24 * <p>When two or more threads attempt to
25 * {@link #complete complete},
26 * {@link #completeExceptionally completeExceptionally}, or
27 * {@link #cancel cancel}
28 * a CompletableFuture, only one of them succeeds.
29 *
30 * <p>Methods are available for adding dependents based on
31 * user-provided Functions, Actions, or Runnables. The appropriate
32 * form to use depends on whether actions require arguments and/or
33 * produce results. Completion of a dependent action will trigger the
34 * completion of another CompletableFuture. Actions may also be
35 * triggered after either or both the current and another
36 * CompletableFuture complete. Multiple CompletableFutures may also
37 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
38 * {@link #allOf(CompletableFuture...)}.
39 *
40 * <p>CompletableFutures themselves do not execute asynchronously.
41 * However, actions supplied for dependent completions of another
42 * CompletableFuture may do so, depending on whether they are provided
43 * via one of the <em>async</em> methods (that is, methods with names
44 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
45 * methods provide a way to commence asynchronous processing of an
46 * action using either a given {@link Executor} or by default the
47 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
48 * debugging, and tracking, all generated asynchronous tasks are
49 * instances of the marker interface {@link AsynchronousCompletionTask}.
50 *
51 * <p>Actions supplied for dependent completions of <em>non-async</em>
52 * methods may be performed by the thread that completes the current
53 * CompletableFuture, or by any other caller of these methods. There
54 * are no guarantees about the order of processing completions unless
55 * constrained by these methods.
56 *
57 * <p>Since (unlike {@link FutureTask}) this class has no direct
58 * control over the computation that causes it to be completed,
59 * cancellation is treated as just another form of exceptional completion.
60 * Method {@link #cancel cancel} has the same effect as
61 * {@code completeExceptionally(new CancellationException())}.
62 *
63 * <p>Upon exceptional completion (including cancellation), or when a
64 * completion entails an additional computation which terminates
65 * abruptly with an (unchecked) exception or error, then all of their
66 * dependent completions (and their dependents in turn) generally act
67 * as {@code completeExceptionally} with a {@link CompletionException}
68 * holding that exception as its cause. However, the {@link
69 * #exceptionally exceptionally} and {@link #handle handle}
70 * completions <em>are</em> able to handle exceptional completions of
71 * the CompletableFutures they depend on.
72 *
73 * <p>In case of exceptional completion with a CompletionException,
74 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
75 * {@link ExecutionException} with the same cause as held in the
76 * corresponding CompletionException. However, in these cases,
77 * methods {@link #join()} and {@link #getNow} throw the
78 * CompletionException, which simplifies usage.
79 *
80 * <p>Arguments used to pass a completion result (that is, for parameters
81 * of type {@code T}) may be null, but passing a null value for any other
82 * parameter will result in a {@link NullPointerException} being thrown.
83 *
84 * @author Doug Lea
85 */
86 public class CompletableFuture<T> implements Future<T> {
87 // jsr166e nested interfaces
88
89 /** Interface describing a void action of one argument */
90 public interface Action<A> { void accept(A a); }
91 /** Interface describing a void action of two arguments */
92 public interface BiAction<A,B> { void accept(A a, B b); }
93 /** Interface describing a function of one argument */
94 public interface Fun<A,T> { T apply(A a); }
95 /** Interface describing a function of two arguments */
96 public interface BiFun<A,B,T> { T apply(A a, B b); }
97 /** Interface describing a function of no arguments */
98 public interface Generator<T> { T get(); }
99
100
101 /*
102 * Overview:
103 *
104 * 1. Non-nullness of field result (set via CAS) indicates done.
105 * An AltResult is used to box null as a result, as well as to
106 * hold exceptions. Using a single field makes completion fast
107 * and simple to detect and trigger, at the expense of a lot of
108 * encoding and decoding that infiltrates many methods. One minor
109 * simplification relies on the (static) NIL (to box null results)
110 * being the only AltResult with a null exception field, so we
111 * don't usually need explicit comparisons with NIL. The CF
112 * exception propagation mechanics surrounding decoding rely on
113 * unchecked casts of decoded results really being unchecked,
114 * where user type errors are caught at point of use, as is
115 * currently the case in Java. These are highlighted by using
116 * SuppressWarnings-annotated temporaries.
117 *
118 * 2. Waiters are held in a Treiber stack similar to the one used
119 * in FutureTask, Phaser, and SynchronousQueue. See their
120 * internal documentation for algorithmic details.
121 *
122 * 3. Completions are also kept in a list/stack, and pulled off
123 * and run when completion is triggered. (We could even use the
124 * same stack as for waiters, but would give up the potential
125 * parallelism obtained because woken waiters help release/run
126 * others -- see method postComplete). Because post-processing
127 * may race with direct calls, class Completion opportunistically
128 * extends AtomicInteger so callers can claim the action via
129 * compareAndSet(0, 1). The Completion.run methods are all
130 * written a boringly similar uniform way (that sometimes includes
131 * unnecessary-looking checks, kept to maintain uniformity).
132 * There are enough dimensions upon which they differ that
133 * attempts to factor commonalities while maintaining efficiency
134 * require more lines of code than they would save.
135 *
136 * 4. The exported then/and/or methods do support a bit of
137 * factoring (see doThenApply etc). They must cope with the
138 * intrinsic races surrounding addition of a dependent action
139 * versus performing the action directly because the task is
140 * already complete. For example, a CF may not be complete upon
141 * entry, so a dependent completion is added, but by the time it
142 * is added, the target CF is complete, so must be directly
143 * executed. This is all done while avoiding unnecessary object
144 * construction in safe-bypass cases.
145 */
146
147 // preliminaries
148
149 static final class AltResult {
150 final Throwable ex; // null only for NIL
151 AltResult(Throwable ex) { this.ex = ex; }
152 }
153
154 static final AltResult NIL = new AltResult(null);
155
156 // Fields
157
158 volatile Object result; // Either the result or boxed AltResult
159 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
160 volatile CompletionNode completions; // list (Treiber stack) of completions
161
162 // Basic utilities for triggering and processing completions
163
164 /**
165 * Removes and signals all waiting threads and runs all completions.
166 */
167 final void postComplete() {
168 WaitNode q; Thread t;
169 while ((q = waiters) != null) {
170 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
171 (t = q.thread) != null) {
172 q.thread = null;
173 LockSupport.unpark(t);
174 }
175 }
176
177 CompletionNode h; Completion c;
178 while ((h = completions) != null) {
179 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
180 (c = h.completion) != null)
181 c.run();
182 }
183 }
184
185 /**
186 * Triggers completion with the encoding of the given arguments:
187 * if the exception is non-null, encodes it as a wrapped
188 * CompletionException unless it is one already. Otherwise uses
189 * the given result, boxed as NIL if null.
190 */
191 final void internalComplete(T v, Throwable ex) {
192 if (result == null)
193 UNSAFE.compareAndSwapObject
194 (this, RESULT, null,
195 (ex == null) ? (v == null) ? NIL : v :
196 new AltResult((ex instanceof CompletionException) ? ex :
197 new CompletionException(ex)));
198 postComplete(); // help out even if not triggered
199 }
200
201 /**
202 * If triggered, helps release and/or process completions.
203 */
204 final void helpPostComplete() {
205 if (result != null)
206 postComplete();
207 }
208
209 /* ------------- waiting for completions -------------- */
210
211 /** Number of processors, for spin control */
212 static final int NCPU = Runtime.getRuntime().availableProcessors();
213
214 /**
215 * Heuristic spin value for waitingGet() before blocking on
216 * multiprocessors
217 */
218 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
219
220 /**
221 * Linked nodes to record waiting threads in a Treiber stack. See
222 * other classes such as Phaser and SynchronousQueue for more
223 * detailed explanation. This class implements ManagedBlocker to
224 * avoid starvation when blocking actions pile up in
225 * ForkJoinPools.
226 */
227 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
228 long nanos; // wait time if timed
229 final long deadline; // non-zero if timed
230 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
231 volatile Thread thread;
232 volatile WaitNode next;
233 WaitNode(boolean interruptible, long nanos, long deadline) {
234 this.thread = Thread.currentThread();
235 this.interruptControl = interruptible ? 1 : 0;
236 this.nanos = nanos;
237 this.deadline = deadline;
238 }
239 public boolean isReleasable() {
240 if (thread == null)
241 return true;
242 if (Thread.interrupted()) {
243 int i = interruptControl;
244 interruptControl = -1;
245 if (i > 0)
246 return true;
247 }
248 if (deadline != 0L &&
249 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
250 thread = null;
251 return true;
252 }
253 return false;
254 }
255 public boolean block() {
256 if (isReleasable())
257 return true;
258 else if (deadline == 0L)
259 LockSupport.park(this);
260 else if (nanos > 0L)
261 LockSupport.parkNanos(this, nanos);
262 return isReleasable();
263 }
264 }
265
266 /**
267 * Returns raw result after waiting, or null if interruptible and
268 * interrupted.
269 */
270 private Object waitingGet(boolean interruptible) {
271 WaitNode q = null;
272 boolean queued = false;
273 int spins = SPINS;
274 for (Object r;;) {
275 if ((r = result) != null) {
276 if (q != null) { // suppress unpark
277 q.thread = null;
278 if (q.interruptControl < 0) {
279 if (interruptible) {
280 removeWaiter(q);
281 return null;
282 }
283 Thread.currentThread().interrupt();
284 }
285 }
286 postComplete(); // help release others
287 return r;
288 }
289 else if (spins > 0) {
290 int rnd = ThreadLocalRandom.current().nextInt();
291 if (rnd >= 0)
292 --spins;
293 }
294 else if (q == null)
295 q = new WaitNode(interruptible, 0L, 0L);
296 else if (!queued)
297 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
298 q.next = waiters, q);
299 else if (interruptible && q.interruptControl < 0) {
300 removeWaiter(q);
301 return null;
302 }
303 else if (q.thread != null && result == null) {
304 try {
305 ForkJoinPool.managedBlock(q);
306 } catch (InterruptedException ex) {
307 q.interruptControl = -1;
308 }
309 }
310 }
311 }
312
313 /**
314 * Awaits completion or aborts on interrupt or timeout.
315 *
316 * @param nanos time to wait
317 * @return raw result
318 */
319 private Object timedAwaitDone(long nanos)
320 throws InterruptedException, TimeoutException {
321 WaitNode q = null;
322 boolean queued = false;
323 for (Object r;;) {
324 if ((r = result) != null) {
325 if (q != null) {
326 q.thread = null;
327 if (q.interruptControl < 0) {
328 removeWaiter(q);
329 throw new InterruptedException();
330 }
331 }
332 postComplete();
333 return r;
334 }
335 else if (q == null) {
336 if (nanos <= 0L)
337 throw new TimeoutException();
338 long d = System.nanoTime() + nanos;
339 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
340 }
341 else if (!queued)
342 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
343 q.next = waiters, q);
344 else if (q.interruptControl < 0) {
345 removeWaiter(q);
346 throw new InterruptedException();
347 }
348 else if (q.nanos <= 0L) {
349 if (result == null) {
350 removeWaiter(q);
351 throw new TimeoutException();
352 }
353 }
354 else if (q.thread != null && result == null) {
355 try {
356 ForkJoinPool.managedBlock(q);
357 } catch (InterruptedException ex) {
358 q.interruptControl = -1;
359 }
360 }
361 }
362 }
363
364 /**
365 * Tries to unlink a timed-out or interrupted wait node to avoid
366 * accumulating garbage. Internal nodes are simply unspliced
367 * without CAS since it is harmless if they are traversed anyway
368 * by releasers. To avoid effects of unsplicing from already
369 * removed nodes, the list is retraversed in case of an apparent
370 * race. This is slow when there are a lot of nodes, but we don't
371 * expect lists to be long enough to outweigh higher-overhead
372 * schemes.
373 */
374 private void removeWaiter(WaitNode node) {
375 if (node != null) {
376 node.thread = null;
377 retry:
378 for (;;) { // restart on removeWaiter race
379 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
380 s = q.next;
381 if (q.thread != null)
382 pred = q;
383 else if (pred != null) {
384 pred.next = s;
385 if (pred.thread == null) // check for race
386 continue retry;
387 }
388 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
389 continue retry;
390 }
391 break;
392 }
393 }
394 }
395
396 /* ------------- Async tasks -------------- */
397
398 /**
399 * A marker interface identifying asynchronous tasks produced by
400 * {@code async} methods. This may be useful for monitoring,
401 * debugging, and tracking asynchronous activities.
402 *
403 * @since 1.8
404 */
405 public static interface AsynchronousCompletionTask {
406 }
407
408 /** Base class can act as either FJ or plain Runnable */
409 abstract static class Async extends ForkJoinTask<Void>
410 implements Runnable, AsynchronousCompletionTask {
411 public final Void getRawResult() { return null; }
412 public final void setRawResult(Void v) { }
413 public final void run() { exec(); }
414 }
415
416 static final class AsyncRun extends Async {
417 final Runnable fn;
418 final CompletableFuture<Void> dst;
419 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
420 this.fn = fn; this.dst = dst;
421 }
422 public final boolean exec() {
423 CompletableFuture<Void> d; Throwable ex;
424 if ((d = this.dst) != null && d.result == null) {
425 try {
426 fn.run();
427 ex = null;
428 } catch (Throwable rex) {
429 ex = rex;
430 }
431 d.internalComplete(null, ex);
432 }
433 return true;
434 }
435 private static final long serialVersionUID = 5232453952276885070L;
436 }
437
438 static final class AsyncSupply<U> extends Async {
439 final Generator<U> fn;
440 final CompletableFuture<U> dst;
441 AsyncSupply(Generator<U> fn, CompletableFuture<U> dst) {
442 this.fn = fn; this.dst = dst;
443 }
444 public final boolean exec() {
445 CompletableFuture<U> d; U u; Throwable ex;
446 if ((d = this.dst) != null && d.result == null) {
447 try {
448 u = fn.get();
449 ex = null;
450 } catch (Throwable rex) {
451 ex = rex;
452 u = null;
453 }
454 d.internalComplete(u, ex);
455 }
456 return true;
457 }
458 private static final long serialVersionUID = 5232453952276885070L;
459 }
460
461 static final class AsyncApply<T,U> extends Async {
462 final T arg;
463 final Fun<? super T,? extends U> fn;
464 final CompletableFuture<U> dst;
465 AsyncApply(T arg, Fun<? super T,? extends U> fn,
466 CompletableFuture<U> dst) {
467 this.arg = arg; this.fn = fn; this.dst = dst;
468 }
469 public final boolean exec() {
470 CompletableFuture<U> d; U u; Throwable ex;
471 if ((d = this.dst) != null && d.result == null) {
472 try {
473 u = fn.apply(arg);
474 ex = null;
475 } catch (Throwable rex) {
476 ex = rex;
477 u = null;
478 }
479 d.internalComplete(u, ex);
480 }
481 return true;
482 }
483 private static final long serialVersionUID = 5232453952276885070L;
484 }
485
486 static final class AsyncCombine<T,U,V> extends Async {
487 final T arg1;
488 final U arg2;
489 final BiFun<? super T,? super U,? extends V> fn;
490 final CompletableFuture<V> dst;
491 AsyncCombine(T arg1, U arg2,
492 BiFun<? super T,? super U,? extends V> fn,
493 CompletableFuture<V> dst) {
494 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
495 }
496 public final boolean exec() {
497 CompletableFuture<V> d; V v; Throwable ex;
498 if ((d = this.dst) != null && d.result == null) {
499 try {
500 v = fn.apply(arg1, arg2);
501 ex = null;
502 } catch (Throwable rex) {
503 ex = rex;
504 v = null;
505 }
506 d.internalComplete(v, ex);
507 }
508 return true;
509 }
510 private static final long serialVersionUID = 5232453952276885070L;
511 }
512
513 static final class AsyncAccept<T> extends Async {
514 final T arg;
515 final Action<? super T> fn;
516 final CompletableFuture<Void> dst;
517 AsyncAccept(T arg, Action<? super T> fn,
518 CompletableFuture<Void> dst) {
519 this.arg = arg; this.fn = fn; this.dst = dst;
520 }
521 public final boolean exec() {
522 CompletableFuture<Void> d; Throwable ex;
523 if ((d = this.dst) != null && d.result == null) {
524 try {
525 fn.accept(arg);
526 ex = null;
527 } catch (Throwable rex) {
528 ex = rex;
529 }
530 d.internalComplete(null, ex);
531 }
532 return true;
533 }
534 private static final long serialVersionUID = 5232453952276885070L;
535 }
536
537 static final class AsyncAcceptBoth<T,U> extends Async {
538 final T arg1;
539 final U arg2;
540 final BiAction<? super T,? super U> fn;
541 final CompletableFuture<Void> dst;
542 AsyncAcceptBoth(T arg1, U arg2,
543 BiAction<? super T,? super U> fn,
544 CompletableFuture<Void> dst) {
545 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
546 }
547 public final boolean exec() {
548 CompletableFuture<Void> d; Throwable ex;
549 if ((d = this.dst) != null && d.result == null) {
550 try {
551 fn.accept(arg1, arg2);
552 ex = null;
553 } catch (Throwable rex) {
554 ex = rex;
555 }
556 d.internalComplete(null, ex);
557 }
558 return true;
559 }
560 private static final long serialVersionUID = 5232453952276885070L;
561 }
562
563 static final class AsyncCompose<T,U> extends Async {
564 final T arg;
565 final Fun<? super T, CompletableFuture<U>> fn;
566 final CompletableFuture<U> dst;
567 AsyncCompose(T arg,
568 Fun<? super T, CompletableFuture<U>> fn,
569 CompletableFuture<U> dst) {
570 this.arg = arg; this.fn = fn; this.dst = dst;
571 }
572 public final boolean exec() {
573 CompletableFuture<U> d, fr; U u; Throwable ex;
574 if ((d = this.dst) != null && d.result == null) {
575 try {
576 fr = fn.apply(arg);
577 ex = (fr == null) ? new NullPointerException() : null;
578 } catch (Throwable rex) {
579 ex = rex;
580 fr = null;
581 }
582 if (ex != null)
583 u = null;
584 else {
585 Object r = fr.result;
586 if (r == null)
587 r = fr.waitingGet(false);
588 if (r instanceof AltResult) {
589 ex = ((AltResult)r).ex;
590 u = null;
591 }
592 else {
593 @SuppressWarnings("unchecked") U ur = (U) r;
594 u = ur;
595 }
596 }
597 d.internalComplete(u, ex);
598 }
599 return true;
600 }
601 private static final long serialVersionUID = 5232453952276885070L;
602 }
603
604 /* ------------- Completions -------------- */
605
606 /**
607 * Simple linked list nodes to record completions, used in
608 * basically the same way as WaitNodes. (We separate nodes from
609 * the Completions themselves mainly because for the And and Or
610 * methods, the same Completion object resides in two lists.)
611 */
612 static final class CompletionNode {
613 final Completion completion;
614 volatile CompletionNode next;
615 CompletionNode(Completion completion) { this.completion = completion; }
616 }
617
618 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
619 abstract static class Completion extends AtomicInteger implements Runnable {
620 }
621
622 static final class ThenApply<T,U> extends Completion {
623 final CompletableFuture<? extends T> src;
624 final Fun<? super T,? extends U> fn;
625 final CompletableFuture<U> dst;
626 final Executor executor;
627 ThenApply(CompletableFuture<? extends T> src,
628 Fun<? super T,? extends U> fn,
629 CompletableFuture<U> dst,
630 Executor executor) {
631 this.src = src; this.fn = fn; this.dst = dst;
632 this.executor = executor;
633 }
634 public final void run() {
635 final CompletableFuture<? extends T> a;
636 final Fun<? super T,? extends U> fn;
637 final CompletableFuture<U> dst;
638 Object r; T t; Throwable ex;
639 if ((dst = this.dst) != null &&
640 (fn = this.fn) != null &&
641 (a = this.src) != null &&
642 (r = a.result) != null &&
643 compareAndSet(0, 1)) {
644 if (r instanceof AltResult) {
645 ex = ((AltResult)r).ex;
646 t = null;
647 }
648 else {
649 ex = null;
650 @SuppressWarnings("unchecked") T tr = (T) r;
651 t = tr;
652 }
653 Executor e = executor;
654 U u = null;
655 if (ex == null) {
656 try {
657 if (e != null)
658 e.execute(new AsyncApply<T,U>(t, fn, dst));
659 else
660 u = fn.apply(t);
661 } catch (Throwable rex) {
662 ex = rex;
663 }
664 }
665 if (e == null || ex != null)
666 dst.internalComplete(u, ex);
667 }
668 }
669 private static final long serialVersionUID = 5232453952276885070L;
670 }
671
672 static final class ThenAccept<T> extends Completion {
673 final CompletableFuture<? extends T> src;
674 final Action<? super T> fn;
675 final CompletableFuture<Void> dst;
676 final Executor executor;
677 ThenAccept(CompletableFuture<? extends T> src,
678 Action<? super T> fn,
679 CompletableFuture<Void> dst,
680 Executor executor) {
681 this.src = src; this.fn = fn; this.dst = dst;
682 this.executor = executor;
683 }
684 public final void run() {
685 final CompletableFuture<? extends T> a;
686 final Action<? super T> fn;
687 final CompletableFuture<Void> dst;
688 Object r; T t; Throwable ex;
689 if ((dst = this.dst) != null &&
690 (fn = this.fn) != null &&
691 (a = this.src) != null &&
692 (r = a.result) != null &&
693 compareAndSet(0, 1)) {
694 if (r instanceof AltResult) {
695 ex = ((AltResult)r).ex;
696 t = null;
697 }
698 else {
699 ex = null;
700 @SuppressWarnings("unchecked") T tr = (T) r;
701 t = tr;
702 }
703 Executor e = executor;
704 if (ex == null) {
705 try {
706 if (e != null)
707 e.execute(new AsyncAccept<T>(t, fn, dst));
708 else
709 fn.accept(t);
710 } catch (Throwable rex) {
711 ex = rex;
712 }
713 }
714 if (e == null || ex != null)
715 dst.internalComplete(null, ex);
716 }
717 }
718 private static final long serialVersionUID = 5232453952276885070L;
719 }
720
721 static final class ThenRun extends Completion {
722 final CompletableFuture<?> src;
723 final Runnable fn;
724 final CompletableFuture<Void> dst;
725 final Executor executor;
726 ThenRun(CompletableFuture<?> src,
727 Runnable fn,
728 CompletableFuture<Void> dst,
729 Executor executor) {
730 this.src = src; this.fn = fn; this.dst = dst;
731 this.executor = executor;
732 }
733 public final void run() {
734 final CompletableFuture<?> a;
735 final Runnable fn;
736 final CompletableFuture<Void> dst;
737 Object r; Throwable ex;
738 if ((dst = this.dst) != null &&
739 (fn = this.fn) != null &&
740 (a = this.src) != null &&
741 (r = a.result) != null &&
742 compareAndSet(0, 1)) {
743 if (r instanceof AltResult)
744 ex = ((AltResult)r).ex;
745 else
746 ex = null;
747 Executor e = executor;
748 if (ex == null) {
749 try {
750 if (e != null)
751 e.execute(new AsyncRun(fn, dst));
752 else
753 fn.run();
754 } catch (Throwable rex) {
755 ex = rex;
756 }
757 }
758 if (e == null || ex != null)
759 dst.internalComplete(null, ex);
760 }
761 }
762 private static final long serialVersionUID = 5232453952276885070L;
763 }
764
765 static final class ThenCombine<T,U,V> extends Completion {
766 final CompletableFuture<? extends T> src;
767 final CompletableFuture<? extends U> snd;
768 final BiFun<? super T,? super U,? extends V> fn;
769 final CompletableFuture<V> dst;
770 final Executor executor;
771 ThenCombine(CompletableFuture<? extends T> src,
772 CompletableFuture<? extends U> snd,
773 BiFun<? super T,? super U,? extends V> fn,
774 CompletableFuture<V> dst,
775 Executor executor) {
776 this.src = src; this.snd = snd;
777 this.fn = fn; this.dst = dst;
778 this.executor = executor;
779 }
780 public final void run() {
781 final CompletableFuture<? extends T> a;
782 final CompletableFuture<? extends U> b;
783 final BiFun<? super T,? super U,? extends V> fn;
784 final CompletableFuture<V> dst;
785 Object r, s; T t; U u; Throwable ex;
786 if ((dst = this.dst) != null &&
787 (fn = this.fn) != null &&
788 (a = this.src) != null &&
789 (r = a.result) != null &&
790 (b = this.snd) != null &&
791 (s = b.result) != null &&
792 compareAndSet(0, 1)) {
793 if (r instanceof AltResult) {
794 ex = ((AltResult)r).ex;
795 t = null;
796 }
797 else {
798 ex = null;
799 @SuppressWarnings("unchecked") T tr = (T) r;
800 t = tr;
801 }
802 if (ex != null)
803 u = null;
804 else if (s instanceof AltResult) {
805 ex = ((AltResult)s).ex;
806 u = null;
807 }
808 else {
809 @SuppressWarnings("unchecked") U us = (U) s;
810 u = us;
811 }
812 Executor e = executor;
813 V v = null;
814 if (ex == null) {
815 try {
816 if (e != null)
817 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
818 else
819 v = fn.apply(t, u);
820 } catch (Throwable rex) {
821 ex = rex;
822 }
823 }
824 if (e == null || ex != null)
825 dst.internalComplete(v, ex);
826 }
827 }
828 private static final long serialVersionUID = 5232453952276885070L;
829 }
830
831 static final class ThenAcceptBoth<T,U> extends Completion {
832 final CompletableFuture<? extends T> src;
833 final CompletableFuture<? extends U> snd;
834 final BiAction<? super T,? super U> fn;
835 final CompletableFuture<Void> dst;
836 final Executor executor;
837 ThenAcceptBoth(CompletableFuture<? extends T> src,
838 CompletableFuture<? extends U> snd,
839 BiAction<? super T,? super U> fn,
840 CompletableFuture<Void> dst,
841 Executor executor) {
842 this.src = src; this.snd = snd;
843 this.fn = fn; this.dst = dst;
844 this.executor = executor;
845 }
846 public final void run() {
847 final CompletableFuture<? extends T> a;
848 final CompletableFuture<? extends U> b;
849 final BiAction<? super T,? super U> fn;
850 final CompletableFuture<Void> dst;
851 Object r, s; T t; U u; Throwable ex;
852 if ((dst = this.dst) != null &&
853 (fn = this.fn) != null &&
854 (a = this.src) != null &&
855 (r = a.result) != null &&
856 (b = this.snd) != null &&
857 (s = b.result) != null &&
858 compareAndSet(0, 1)) {
859 if (r instanceof AltResult) {
860 ex = ((AltResult)r).ex;
861 t = null;
862 }
863 else {
864 ex = null;
865 @SuppressWarnings("unchecked") T tr = (T) r;
866 t = tr;
867 }
868 if (ex != null)
869 u = null;
870 else if (s instanceof AltResult) {
871 ex = ((AltResult)s).ex;
872 u = null;
873 }
874 else {
875 @SuppressWarnings("unchecked") U us = (U) s;
876 u = us;
877 }
878 Executor e = executor;
879 if (ex == null) {
880 try {
881 if (e != null)
882 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
883 else
884 fn.accept(t, u);
885 } catch (Throwable rex) {
886 ex = rex;
887 }
888 }
889 if (e == null || ex != null)
890 dst.internalComplete(null, ex);
891 }
892 }
893 private static final long serialVersionUID = 5232453952276885070L;
894 }
895
896 static final class RunAfterBoth extends Completion {
897 final CompletableFuture<?> src;
898 final CompletableFuture<?> snd;
899 final Runnable fn;
900 final CompletableFuture<Void> dst;
901 final Executor executor;
902 RunAfterBoth(CompletableFuture<?> src,
903 CompletableFuture<?> snd,
904 Runnable fn,
905 CompletableFuture<Void> dst,
906 Executor executor) {
907 this.src = src; this.snd = snd;
908 this.fn = fn; this.dst = dst;
909 this.executor = executor;
910 }
911 public final void run() {
912 final CompletableFuture<?> a;
913 final CompletableFuture<?> b;
914 final Runnable fn;
915 final CompletableFuture<Void> dst;
916 Object r, s; Throwable ex;
917 if ((dst = this.dst) != null &&
918 (fn = this.fn) != null &&
919 (a = this.src) != null &&
920 (r = a.result) != null &&
921 (b = this.snd) != null &&
922 (s = b.result) != null &&
923 compareAndSet(0, 1)) {
924 if (r instanceof AltResult)
925 ex = ((AltResult)r).ex;
926 else
927 ex = null;
928 if (ex == null && (s instanceof AltResult))
929 ex = ((AltResult)s).ex;
930 Executor e = executor;
931 if (ex == null) {
932 try {
933 if (e != null)
934 e.execute(new AsyncRun(fn, dst));
935 else
936 fn.run();
937 } catch (Throwable rex) {
938 ex = rex;
939 }
940 }
941 if (e == null || ex != null)
942 dst.internalComplete(null, ex);
943 }
944 }
945 private static final long serialVersionUID = 5232453952276885070L;
946 }
947
948 static final class AndCompletion extends Completion {
949 final CompletableFuture<?> src;
950 final CompletableFuture<?> snd;
951 final CompletableFuture<Void> dst;
952 AndCompletion(CompletableFuture<?> src,
953 CompletableFuture<?> snd,
954 CompletableFuture<Void> dst) {
955 this.src = src; this.snd = snd; this.dst = dst;
956 }
957 public final void run() {
958 final CompletableFuture<?> a;
959 final CompletableFuture<?> b;
960 final CompletableFuture<Void> dst;
961 Object r, s; Throwable ex;
962 if ((dst = this.dst) != null &&
963 (a = this.src) != null &&
964 (r = a.result) != null &&
965 (b = this.snd) != null &&
966 (s = b.result) != null &&
967 compareAndSet(0, 1)) {
968 if (r instanceof AltResult)
969 ex = ((AltResult)r).ex;
970 else
971 ex = null;
972 if (ex == null && (s instanceof AltResult))
973 ex = ((AltResult)s).ex;
974 dst.internalComplete(null, ex);
975 }
976 }
977 private static final long serialVersionUID = 5232453952276885070L;
978 }
979
980 static final class ApplyToEither<T,U> extends Completion {
981 final CompletableFuture<? extends T> src;
982 final CompletableFuture<? extends T> snd;
983 final Fun<? super T,? extends U> fn;
984 final CompletableFuture<U> dst;
985 final Executor executor;
986 ApplyToEither(CompletableFuture<? extends T> src,
987 CompletableFuture<? extends T> snd,
988 Fun<? super T,? extends U> fn,
989 CompletableFuture<U> dst,
990 Executor executor) {
991 this.src = src; this.snd = snd;
992 this.fn = fn; this.dst = dst;
993 this.executor = executor;
994 }
995 public final void run() {
996 final CompletableFuture<? extends T> a;
997 final CompletableFuture<? extends T> b;
998 final Fun<? super T,? extends U> fn;
999 final CompletableFuture<U> dst;
1000 Object r; T t; Throwable ex;
1001 if ((dst = this.dst) != null &&
1002 (fn = this.fn) != null &&
1003 (((a = this.src) != null && (r = a.result) != null) ||
1004 ((b = this.snd) != null && (r = b.result) != null)) &&
1005 compareAndSet(0, 1)) {
1006 if (r instanceof AltResult) {
1007 ex = ((AltResult)r).ex;
1008 t = null;
1009 }
1010 else {
1011 ex = null;
1012 @SuppressWarnings("unchecked") T tr = (T) r;
1013 t = tr;
1014 }
1015 Executor e = executor;
1016 U u = null;
1017 if (ex == null) {
1018 try {
1019 if (e != null)
1020 e.execute(new AsyncApply<T,U>(t, fn, dst));
1021 else
1022 u = fn.apply(t);
1023 } catch (Throwable rex) {
1024 ex = rex;
1025 }
1026 }
1027 if (e == null || ex != null)
1028 dst.internalComplete(u, ex);
1029 }
1030 }
1031 private static final long serialVersionUID = 5232453952276885070L;
1032 }
1033
1034 static final class AcceptEither<T> extends Completion {
1035 final CompletableFuture<? extends T> src;
1036 final CompletableFuture<? extends T> snd;
1037 final Action<? super T> fn;
1038 final CompletableFuture<Void> dst;
1039 final Executor executor;
1040 AcceptEither(CompletableFuture<? extends T> src,
1041 CompletableFuture<? extends T> snd,
1042 Action<? super T> fn,
1043 CompletableFuture<Void> dst,
1044 Executor executor) {
1045 this.src = src; this.snd = snd;
1046 this.fn = fn; this.dst = dst;
1047 this.executor = executor;
1048 }
1049 public final void run() {
1050 final CompletableFuture<? extends T> a;
1051 final CompletableFuture<? extends T> b;
1052 final Action<? super T> fn;
1053 final CompletableFuture<Void> dst;
1054 Object r; T t; Throwable ex;
1055 if ((dst = this.dst) != null &&
1056 (fn = this.fn) != null &&
1057 (((a = this.src) != null && (r = a.result) != null) ||
1058 ((b = this.snd) != null && (r = b.result) != null)) &&
1059 compareAndSet(0, 1)) {
1060 if (r instanceof AltResult) {
1061 ex = ((AltResult)r).ex;
1062 t = null;
1063 }
1064 else {
1065 ex = null;
1066 @SuppressWarnings("unchecked") T tr = (T) r;
1067 t = tr;
1068 }
1069 Executor e = executor;
1070 if (ex == null) {
1071 try {
1072 if (e != null)
1073 e.execute(new AsyncAccept<T>(t, fn, dst));
1074 else
1075 fn.accept(t);
1076 } catch (Throwable rex) {
1077 ex = rex;
1078 }
1079 }
1080 if (e == null || ex != null)
1081 dst.internalComplete(null, ex);
1082 }
1083 }
1084 private static final long serialVersionUID = 5232453952276885070L;
1085 }
1086
1087 static final class RunAfterEither extends Completion {
1088 final CompletableFuture<?> src;
1089 final CompletableFuture<?> snd;
1090 final Runnable fn;
1091 final CompletableFuture<Void> dst;
1092 final Executor executor;
1093 RunAfterEither(CompletableFuture<?> src,
1094 CompletableFuture<?> snd,
1095 Runnable fn,
1096 CompletableFuture<Void> dst,
1097 Executor executor) {
1098 this.src = src; this.snd = snd;
1099 this.fn = fn; this.dst = dst;
1100 this.executor = executor;
1101 }
1102 public final void run() {
1103 final CompletableFuture<?> a;
1104 final CompletableFuture<?> b;
1105 final Runnable fn;
1106 final CompletableFuture<Void> dst;
1107 Object r; Throwable ex;
1108 if ((dst = this.dst) != null &&
1109 (fn = this.fn) != null &&
1110 (((a = this.src) != null && (r = a.result) != null) ||
1111 ((b = this.snd) != null && (r = b.result) != null)) &&
1112 compareAndSet(0, 1)) {
1113 if (r instanceof AltResult)
1114 ex = ((AltResult)r).ex;
1115 else
1116 ex = null;
1117 Executor e = executor;
1118 if (ex == null) {
1119 try {
1120 if (e != null)
1121 e.execute(new AsyncRun(fn, dst));
1122 else
1123 fn.run();
1124 } catch (Throwable rex) {
1125 ex = rex;
1126 }
1127 }
1128 if (e == null || ex != null)
1129 dst.internalComplete(null, ex);
1130 }
1131 }
1132 private static final long serialVersionUID = 5232453952276885070L;
1133 }
1134
1135 static final class OrCompletion extends Completion {
1136 final CompletableFuture<?> src;
1137 final CompletableFuture<?> snd;
1138 final CompletableFuture<Object> dst;
1139 OrCompletion(CompletableFuture<?> src,
1140 CompletableFuture<?> snd,
1141 CompletableFuture<Object> dst) {
1142 this.src = src; this.snd = snd; this.dst = dst;
1143 }
1144 public final void run() {
1145 final CompletableFuture<?> a;
1146 final CompletableFuture<?> b;
1147 final CompletableFuture<Object> dst;
1148 Object r, t; Throwable ex;
1149 if ((dst = this.dst) != null &&
1150 (((a = this.src) != null && (r = a.result) != null) ||
1151 ((b = this.snd) != null && (r = b.result) != null)) &&
1152 compareAndSet(0, 1)) {
1153 if (r instanceof AltResult) {
1154 ex = ((AltResult)r).ex;
1155 t = null;
1156 }
1157 else {
1158 ex = null;
1159 t = r;
1160 }
1161 dst.internalComplete(t, ex);
1162 }
1163 }
1164 private static final long serialVersionUID = 5232453952276885070L;
1165 }
1166
1167 static final class ExceptionCompletion<T> extends Completion {
1168 final CompletableFuture<? extends T> src;
1169 final Fun<? super Throwable, ? extends T> fn;
1170 final CompletableFuture<T> dst;
1171 ExceptionCompletion(CompletableFuture<? extends T> src,
1172 Fun<? super Throwable, ? extends T> fn,
1173 CompletableFuture<T> dst) {
1174 this.src = src; this.fn = fn; this.dst = dst;
1175 }
1176 public final void run() {
1177 final CompletableFuture<? extends T> a;
1178 final Fun<? super Throwable, ? extends T> fn;
1179 final CompletableFuture<T> dst;
1180 Object r; T t = null; Throwable ex, dx = null;
1181 if ((dst = this.dst) != null &&
1182 (fn = this.fn) != null &&
1183 (a = this.src) != null &&
1184 (r = a.result) != null &&
1185 compareAndSet(0, 1)) {
1186 if ((r instanceof AltResult) &&
1187 (ex = ((AltResult)r).ex) != null) {
1188 try {
1189 t = fn.apply(ex);
1190 } catch (Throwable rex) {
1191 dx = rex;
1192 }
1193 }
1194 else {
1195 @SuppressWarnings("unchecked") T tr = (T) r;
1196 t = tr;
1197 }
1198 dst.internalComplete(t, dx);
1199 }
1200 }
1201 private static final long serialVersionUID = 5232453952276885070L;
1202 }
1203
1204 static final class ThenCopy<T> extends Completion {
1205 final CompletableFuture<?> src;
1206 final CompletableFuture<T> dst;
1207 ThenCopy(CompletableFuture<?> src,
1208 CompletableFuture<T> dst) {
1209 this.src = src; this.dst = dst;
1210 }
1211 public final void run() {
1212 final CompletableFuture<?> a;
1213 final CompletableFuture<T> dst;
1214 Object r; T t; Throwable ex;
1215 if ((dst = this.dst) != null &&
1216 (a = this.src) != null &&
1217 (r = a.result) != null &&
1218 compareAndSet(0, 1)) {
1219 if (r instanceof AltResult) {
1220 ex = ((AltResult)r).ex;
1221 t = null;
1222 }
1223 else {
1224 ex = null;
1225 @SuppressWarnings("unchecked") T tr = (T) r;
1226 t = tr;
1227 }
1228 dst.internalComplete(t, ex);
1229 }
1230 }
1231 private static final long serialVersionUID = 5232453952276885070L;
1232 }
1233
1234 // version of ThenCopy for CompletableFuture<Void> dst
1235 static final class ThenPropagate extends Completion {
1236 final CompletableFuture<?> src;
1237 final CompletableFuture<Void> dst;
1238 ThenPropagate(CompletableFuture<?> src,
1239 CompletableFuture<Void> dst) {
1240 this.src = src; this.dst = dst;
1241 }
1242 public final void run() {
1243 final CompletableFuture<?> a;
1244 final CompletableFuture<Void> dst;
1245 Object r; Throwable ex;
1246 if ((dst = this.dst) != null &&
1247 (a = this.src) != null &&
1248 (r = a.result) != null &&
1249 compareAndSet(0, 1)) {
1250 if (r instanceof AltResult)
1251 ex = ((AltResult)r).ex;
1252 else
1253 ex = null;
1254 dst.internalComplete(null, ex);
1255 }
1256 }
1257 private static final long serialVersionUID = 5232453952276885070L;
1258 }
1259
1260 static final class HandleCompletion<T,U> extends Completion {
1261 final CompletableFuture<? extends T> src;
1262 final BiFun<? super T, Throwable, ? extends U> fn;
1263 final CompletableFuture<U> dst;
1264 HandleCompletion(CompletableFuture<? extends T> src,
1265 BiFun<? super T, Throwable, ? extends U> fn,
1266 CompletableFuture<U> dst) {
1267 this.src = src; this.fn = fn; this.dst = dst;
1268 }
1269 public final void run() {
1270 final CompletableFuture<? extends T> a;
1271 final BiFun<? super T, Throwable, ? extends U> fn;
1272 final CompletableFuture<U> dst;
1273 Object r; T t; Throwable ex;
1274 if ((dst = this.dst) != null &&
1275 (fn = this.fn) != null &&
1276 (a = this.src) != null &&
1277 (r = a.result) != null &&
1278 compareAndSet(0, 1)) {
1279 if (r instanceof AltResult) {
1280 ex = ((AltResult)r).ex;
1281 t = null;
1282 }
1283 else {
1284 ex = null;
1285 @SuppressWarnings("unchecked") T tr = (T) r;
1286 t = tr;
1287 }
1288 U u = null; Throwable dx = null;
1289 try {
1290 u = fn.apply(t, ex);
1291 } catch (Throwable rex) {
1292 dx = rex;
1293 }
1294 dst.internalComplete(u, dx);
1295 }
1296 }
1297 private static final long serialVersionUID = 5232453952276885070L;
1298 }
1299
1300 static final class ThenCompose<T,U> extends Completion {
1301 final CompletableFuture<? extends T> src;
1302 final Fun<? super T, CompletableFuture<U>> fn;
1303 final CompletableFuture<U> dst;
1304 final Executor executor;
1305 ThenCompose(CompletableFuture<? extends T> src,
1306 Fun<? super T, CompletableFuture<U>> fn,
1307 CompletableFuture<U> dst,
1308 Executor executor) {
1309 this.src = src; this.fn = fn; this.dst = dst;
1310 this.executor = executor;
1311 }
1312 public final void run() {
1313 final CompletableFuture<? extends T> a;
1314 final Fun<? super T, CompletableFuture<U>> fn;
1315 final CompletableFuture<U> dst;
1316 Object r; T t; Throwable ex; Executor e;
1317 if ((dst = this.dst) != null &&
1318 (fn = this.fn) != null &&
1319 (a = this.src) != null &&
1320 (r = a.result) != null &&
1321 compareAndSet(0, 1)) {
1322 if (r instanceof AltResult) {
1323 ex = ((AltResult)r).ex;
1324 t = null;
1325 }
1326 else {
1327 ex = null;
1328 @SuppressWarnings("unchecked") T tr = (T) r;
1329 t = tr;
1330 }
1331 CompletableFuture<U> c = null;
1332 U u = null;
1333 boolean complete = false;
1334 if (ex == null) {
1335 if ((e = executor) != null)
1336 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1337 else {
1338 try {
1339 if ((c = fn.apply(t)) == null)
1340 ex = new NullPointerException();
1341 } catch (Throwable rex) {
1342 ex = rex;
1343 }
1344 }
1345 }
1346 if (c != null) {
1347 ThenCopy<U> d = null;
1348 Object s;
1349 if ((s = c.result) == null) {
1350 CompletionNode p = new CompletionNode
1351 (d = new ThenCopy<U>(c, dst));
1352 while ((s = c.result) == null) {
1353 if (UNSAFE.compareAndSwapObject
1354 (c, COMPLETIONS, p.next = c.completions, p))
1355 break;
1356 }
1357 }
1358 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1359 complete = true;
1360 if (s instanceof AltResult) {
1361 ex = ((AltResult)s).ex; // no rewrap
1362 u = null;
1363 }
1364 else {
1365 @SuppressWarnings("unchecked") U us = (U) s;
1366 u = us;
1367 }
1368 }
1369 }
1370 if (complete || ex != null)
1371 dst.internalComplete(u, ex);
1372 if (c != null)
1373 c.helpPostComplete();
1374 }
1375 }
1376 private static final long serialVersionUID = 5232453952276885070L;
1377 }
1378
1379 // public methods
1380
1381 /**
1382 * Creates a new incomplete CompletableFuture.
1383 */
1384 public CompletableFuture() {
1385 }
1386
1387 /**
1388 * Returns a new CompletableFuture that is asynchronously completed
1389 * by a task running in the {@link ForkJoinPool#commonPool()} with
1390 * the value obtained by calling the given Generator.
1391 *
1392 * @param supplier a function returning the value to be used
1393 * to complete the returned CompletableFuture
1394 * @param <U> the function's return type
1395 * @return the new CompletableFuture
1396 */
1397 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier) {
1398 if (supplier == null) throw new NullPointerException();
1399 CompletableFuture<U> f = new CompletableFuture<U>();
1400 ForkJoinPool.commonPool().
1401 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1402 return f;
1403 }
1404
1405 /**
1406 * Returns a new CompletableFuture that is asynchronously completed
1407 * by a task running in the given executor with the value obtained
1408 * by calling the given Generator.
1409 *
1410 * @param supplier a function returning the value to be used
1411 * to complete the returned CompletableFuture
1412 * @param executor the executor to use for asynchronous execution
1413 * @param <U> the function's return type
1414 * @return the new CompletableFuture
1415 */
1416 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier,
1417 Executor executor) {
1418 if (executor == null || supplier == null)
1419 throw new NullPointerException();
1420 CompletableFuture<U> f = new CompletableFuture<U>();
1421 executor.execute(new AsyncSupply<U>(supplier, f));
1422 return f;
1423 }
1424
1425 /**
1426 * Returns a new CompletableFuture that is asynchronously completed
1427 * by a task running in the {@link ForkJoinPool#commonPool()} after
1428 * it runs the given action.
1429 *
1430 * @param runnable the action to run before completing the
1431 * returned CompletableFuture
1432 * @return the new CompletableFuture
1433 */
1434 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1435 if (runnable == null) throw new NullPointerException();
1436 CompletableFuture<Void> f = new CompletableFuture<Void>();
1437 ForkJoinPool.commonPool().
1438 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1439 return f;
1440 }
1441
1442 /**
1443 * Returns a new CompletableFuture that is asynchronously completed
1444 * by a task running in the given executor after it runs the given
1445 * action.
1446 *
1447 * @param runnable the action to run before completing the
1448 * returned CompletableFuture
1449 * @param executor the executor to use for asynchronous execution
1450 * @return the new CompletableFuture
1451 */
1452 public static CompletableFuture<Void> runAsync(Runnable runnable,
1453 Executor executor) {
1454 if (executor == null || runnable == null)
1455 throw new NullPointerException();
1456 CompletableFuture<Void> f = new CompletableFuture<Void>();
1457 executor.execute(new AsyncRun(runnable, f));
1458 return f;
1459 }
1460
1461 /**
1462 * Returns a new CompletableFuture that is already completed with
1463 * the given value.
1464 *
1465 * @param value the value
1466 * @param <U> the type of the value
1467 * @return the completed CompletableFuture
1468 */
1469 public static <U> CompletableFuture<U> completedFuture(U value) {
1470 CompletableFuture<U> f = new CompletableFuture<U>();
1471 f.result = (value == null) ? NIL : value;
1472 return f;
1473 }
1474
1475 /**
1476 * Returns {@code true} if completed in any fashion: normally,
1477 * exceptionally, or via cancellation.
1478 *
1479 * @return {@code true} if completed
1480 */
1481 public boolean isDone() {
1482 return result != null;
1483 }
1484
1485 /**
1486 * Waits if necessary for this future to complete, and then
1487 * returns its result.
1488 *
1489 * @return the result value
1490 * @throws CancellationException if this future was cancelled
1491 * @throws ExecutionException if this future completed exceptionally
1492 * @throws InterruptedException if the current thread was interrupted
1493 * while waiting
1494 */
1495 public T get() throws InterruptedException, ExecutionException {
1496 Object r; Throwable ex, cause;
1497 if ((r = result) == null && (r = waitingGet(true)) == null)
1498 throw new InterruptedException();
1499 if (!(r instanceof AltResult)) {
1500 @SuppressWarnings("unchecked") T tr = (T) r;
1501 return tr;
1502 }
1503 if ((ex = ((AltResult)r).ex) == null)
1504 return null;
1505 if (ex instanceof CancellationException)
1506 throw (CancellationException)ex;
1507 if ((ex instanceof CompletionException) &&
1508 (cause = ex.getCause()) != null)
1509 ex = cause;
1510 throw new ExecutionException(ex);
1511 }
1512
1513 /**
1514 * Waits if necessary for at most the given time for this future
1515 * to complete, and then returns its result, if available.
1516 *
1517 * @param timeout the maximum time to wait
1518 * @param unit the time unit of the timeout argument
1519 * @return the result value
1520 * @throws CancellationException if this future was cancelled
1521 * @throws ExecutionException if this future completed exceptionally
1522 * @throws InterruptedException if the current thread was interrupted
1523 * while waiting
1524 * @throws TimeoutException if the wait timed out
1525 */
1526 public T get(long timeout, TimeUnit unit)
1527 throws InterruptedException, ExecutionException, TimeoutException {
1528 Object r; Throwable ex, cause;
1529 long nanos = unit.toNanos(timeout);
1530 if (Thread.interrupted())
1531 throw new InterruptedException();
1532 if ((r = result) == null)
1533 r = timedAwaitDone(nanos);
1534 if (!(r instanceof AltResult)) {
1535 @SuppressWarnings("unchecked") T tr = (T) r;
1536 return tr;
1537 }
1538 if ((ex = ((AltResult)r).ex) == null)
1539 return null;
1540 if (ex instanceof CancellationException)
1541 throw (CancellationException)ex;
1542 if ((ex instanceof CompletionException) &&
1543 (cause = ex.getCause()) != null)
1544 ex = cause;
1545 throw new ExecutionException(ex);
1546 }
1547
1548 /**
1549 * Returns the result value when complete, or throws an
1550 * (unchecked) exception if completed exceptionally. To better
1551 * conform with the use of common functional forms, if a
1552 * computation involved in the completion of this
1553 * CompletableFuture threw an exception, this method throws an
1554 * (unchecked) {@link CompletionException} with the underlying
1555 * exception as its cause.
1556 *
1557 * @return the result value
1558 * @throws CancellationException if the computation was cancelled
1559 * @throws CompletionException if this future completed
1560 * exceptionally or a completion computation threw an exception
1561 */
1562 public T join() {
1563 Object r; Throwable ex;
1564 if ((r = result) == null)
1565 r = waitingGet(false);
1566 if (!(r instanceof AltResult)) {
1567 @SuppressWarnings("unchecked") T tr = (T) r;
1568 return tr;
1569 }
1570 if ((ex = ((AltResult)r).ex) == null)
1571 return null;
1572 if (ex instanceof CancellationException)
1573 throw (CancellationException)ex;
1574 if (ex instanceof CompletionException)
1575 throw (CompletionException)ex;
1576 throw new CompletionException(ex);
1577 }
1578
1579 /**
1580 * Returns the result value (or throws any encountered exception)
1581 * if completed, else returns the given valueIfAbsent.
1582 *
1583 * @param valueIfAbsent the value to return if not completed
1584 * @return the result value, if completed, else the given valueIfAbsent
1585 * @throws CancellationException if the computation was cancelled
1586 * @throws CompletionException if this future completed
1587 * exceptionally or a completion computation threw an exception
1588 */
1589 public T getNow(T valueIfAbsent) {
1590 Object r; Throwable ex;
1591 if ((r = result) == null)
1592 return valueIfAbsent;
1593 if (!(r instanceof AltResult)) {
1594 @SuppressWarnings("unchecked") T tr = (T) r;
1595 return tr;
1596 }
1597 if ((ex = ((AltResult)r).ex) == null)
1598 return null;
1599 if (ex instanceof CancellationException)
1600 throw (CancellationException)ex;
1601 if (ex instanceof CompletionException)
1602 throw (CompletionException)ex;
1603 throw new CompletionException(ex);
1604 }
1605
1606 /**
1607 * If not already completed, sets the value returned by {@link
1608 * #get()} and related methods to the given value.
1609 *
1610 * @param value the result value
1611 * @return {@code true} if this invocation caused this CompletableFuture
1612 * to transition to a completed state, else {@code false}
1613 */
1614 public boolean complete(T value) {
1615 boolean triggered = result == null &&
1616 UNSAFE.compareAndSwapObject(this, RESULT, null,
1617 value == null ? NIL : value);
1618 postComplete();
1619 return triggered;
1620 }
1621
1622 /**
1623 * If not already completed, causes invocations of {@link #get()}
1624 * and related methods to throw the given exception.
1625 *
1626 * @param ex the exception
1627 * @return {@code true} if this invocation caused this CompletableFuture
1628 * to transition to a completed state, else {@code false}
1629 */
1630 public boolean completeExceptionally(Throwable ex) {
1631 if (ex == null) throw new NullPointerException();
1632 boolean triggered = result == null &&
1633 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1634 postComplete();
1635 return triggered;
1636 }
1637
1638 /**
1639 * Returns a new CompletableFuture that is completed
1640 * when this CompletableFuture completes, with the result of the
1641 * given function of this CompletableFuture's result.
1642 *
1643 * <p>If this CompletableFuture completes exceptionally, or the
1644 * supplied function throws an exception, then the returned
1645 * CompletableFuture completes exceptionally with a
1646 * CompletionException holding the exception as its cause.
1647 *
1648 * @param fn the function to use to compute the value of
1649 * the returned CompletableFuture
1650 * @return the new CompletableFuture
1651 */
1652 public <U> CompletableFuture<U> thenApply(Fun<? super T,? extends U> fn) {
1653 return doThenApply(fn, null);
1654 }
1655
1656 /**
1657 * Returns a new CompletableFuture that is asynchronously completed
1658 * when this CompletableFuture completes, with the result of the
1659 * given function of this CompletableFuture's result from a
1660 * task running in the {@link ForkJoinPool#commonPool()}.
1661 *
1662 * <p>If this CompletableFuture completes exceptionally, or the
1663 * supplied function throws an exception, then the returned
1664 * CompletableFuture completes exceptionally with a
1665 * CompletionException holding the exception as its cause.
1666 *
1667 * @param fn the function to use to compute the value of
1668 * the returned CompletableFuture
1669 * @return the new CompletableFuture
1670 */
1671 public <U> CompletableFuture<U> thenApplyAsync
1672 (Fun<? super T,? extends U> fn) {
1673 return doThenApply(fn, ForkJoinPool.commonPool());
1674 }
1675
1676 /**
1677 * Returns a new CompletableFuture that is asynchronously completed
1678 * when this CompletableFuture completes, with the result of the
1679 * given function of this CompletableFuture's result from a
1680 * task running in the given executor.
1681 *
1682 * <p>If this CompletableFuture completes exceptionally, or the
1683 * supplied function throws an exception, then the returned
1684 * CompletableFuture completes exceptionally with a
1685 * CompletionException holding the exception as its cause.
1686 *
1687 * @param fn the function to use to compute the value of
1688 * the returned CompletableFuture
1689 * @param executor the executor to use for asynchronous execution
1690 * @return the new CompletableFuture
1691 */
1692 public <U> CompletableFuture<U> thenApplyAsync
1693 (Fun<? super T,? extends U> fn,
1694 Executor executor) {
1695 if (executor == null) throw new NullPointerException();
1696 return doThenApply(fn, executor);
1697 }
1698
1699 private <U> CompletableFuture<U> doThenApply
1700 (Fun<? super T,? extends U> fn,
1701 Executor e) {
1702 if (fn == null) throw new NullPointerException();
1703 CompletableFuture<U> dst = new CompletableFuture<U>();
1704 ThenApply<T,U> d = null;
1705 Object r;
1706 if ((r = result) == null) {
1707 CompletionNode p = new CompletionNode
1708 (d = new ThenApply<T,U>(this, fn, dst, e));
1709 while ((r = result) == null) {
1710 if (UNSAFE.compareAndSwapObject
1711 (this, COMPLETIONS, p.next = completions, p))
1712 break;
1713 }
1714 }
1715 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1716 T t; Throwable ex;
1717 if (r instanceof AltResult) {
1718 ex = ((AltResult)r).ex;
1719 t = null;
1720 }
1721 else {
1722 ex = null;
1723 @SuppressWarnings("unchecked") T tr = (T) r;
1724 t = tr;
1725 }
1726 U u = null;
1727 if (ex == null) {
1728 try {
1729 if (e != null)
1730 e.execute(new AsyncApply<T,U>(t, fn, dst));
1731 else
1732 u = fn.apply(t);
1733 } catch (Throwable rex) {
1734 ex = rex;
1735 }
1736 }
1737 if (e == null || ex != null)
1738 dst.internalComplete(u, ex);
1739 }
1740 helpPostComplete();
1741 return dst;
1742 }
1743
1744 /**
1745 * Returns a new CompletableFuture that is completed
1746 * when this CompletableFuture completes, after performing the given
1747 * action with this CompletableFuture's result.
1748 *
1749 * <p>If this CompletableFuture completes exceptionally, or the
1750 * supplied action throws an exception, then the returned
1751 * CompletableFuture completes exceptionally with a
1752 * CompletionException holding the exception as its cause.
1753 *
1754 * @param block the action to perform before completing the
1755 * returned CompletableFuture
1756 * @return the new CompletableFuture
1757 */
1758 public CompletableFuture<Void> thenAccept(Action<? super T> block) {
1759 return doThenAccept(block, null);
1760 }
1761
1762 /**
1763 * Returns a new CompletableFuture that is asynchronously completed
1764 * when this CompletableFuture completes, after performing the given
1765 * action with this CompletableFuture's result from a task running
1766 * in the {@link ForkJoinPool#commonPool()}.
1767 *
1768 * <p>If this CompletableFuture completes exceptionally, or the
1769 * supplied action throws an exception, then the returned
1770 * CompletableFuture completes exceptionally with a
1771 * CompletionException holding the exception as its cause.
1772 *
1773 * @param block the action to perform before completing the
1774 * returned CompletableFuture
1775 * @return the new CompletableFuture
1776 */
1777 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block) {
1778 return doThenAccept(block, ForkJoinPool.commonPool());
1779 }
1780
1781 /**
1782 * Returns a new CompletableFuture that is asynchronously completed
1783 * when this CompletableFuture completes, after performing the given
1784 * action with this CompletableFuture's result from a task running
1785 * in the given executor.
1786 *
1787 * <p>If this CompletableFuture completes exceptionally, or the
1788 * supplied action throws an exception, then the returned
1789 * CompletableFuture completes exceptionally with a
1790 * CompletionException holding the exception as its cause.
1791 *
1792 * @param block the action to perform before completing the
1793 * returned CompletableFuture
1794 * @param executor the executor to use for asynchronous execution
1795 * @return the new CompletableFuture
1796 */
1797 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block,
1798 Executor executor) {
1799 if (executor == null) throw new NullPointerException();
1800 return doThenAccept(block, executor);
1801 }
1802
1803 private CompletableFuture<Void> doThenAccept(Action<? super T> fn,
1804 Executor e) {
1805 if (fn == null) throw new NullPointerException();
1806 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1807 ThenAccept<T> d = null;
1808 Object r;
1809 if ((r = result) == null) {
1810 CompletionNode p = new CompletionNode
1811 (d = new ThenAccept<T>(this, fn, dst, e));
1812 while ((r = result) == null) {
1813 if (UNSAFE.compareAndSwapObject
1814 (this, COMPLETIONS, p.next = completions, p))
1815 break;
1816 }
1817 }
1818 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1819 T t; Throwable ex;
1820 if (r instanceof AltResult) {
1821 ex = ((AltResult)r).ex;
1822 t = null;
1823 }
1824 else {
1825 ex = null;
1826 @SuppressWarnings("unchecked") T tr = (T) r;
1827 t = tr;
1828 }
1829 if (ex == null) {
1830 try {
1831 if (e != null)
1832 e.execute(new AsyncAccept<T>(t, fn, dst));
1833 else
1834 fn.accept(t);
1835 } catch (Throwable rex) {
1836 ex = rex;
1837 }
1838 }
1839 if (e == null || ex != null)
1840 dst.internalComplete(null, ex);
1841 }
1842 helpPostComplete();
1843 return dst;
1844 }
1845
1846 /**
1847 * Returns a new CompletableFuture that is completed
1848 * when this CompletableFuture completes, after performing the given
1849 * action.
1850 *
1851 * <p>If this CompletableFuture completes exceptionally, or the
1852 * supplied action throws an exception, then the returned
1853 * CompletableFuture completes exceptionally with a
1854 * CompletionException holding the exception as its cause.
1855 *
1856 * @param action the action to perform before completing the
1857 * returned CompletableFuture
1858 * @return the new CompletableFuture
1859 */
1860 public CompletableFuture<Void> thenRun(Runnable action) {
1861 return doThenRun(action, null);
1862 }
1863
1864 /**
1865 * Returns a new CompletableFuture that is asynchronously completed
1866 * when this CompletableFuture completes, after performing the given
1867 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1868 *
1869 * <p>If this CompletableFuture completes exceptionally, or the
1870 * supplied action throws an exception, then the returned
1871 * CompletableFuture completes exceptionally with a
1872 * CompletionException holding the exception as its cause.
1873 *
1874 * @param action the action to perform before completing the
1875 * returned CompletableFuture
1876 * @return the new CompletableFuture
1877 */
1878 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1879 return doThenRun(action, ForkJoinPool.commonPool());
1880 }
1881
1882 /**
1883 * Returns a new CompletableFuture that is asynchronously completed
1884 * when this CompletableFuture completes, after performing the given
1885 * action from a task running in the given executor.
1886 *
1887 * <p>If this CompletableFuture completes exceptionally, or the
1888 * supplied action throws an exception, then the returned
1889 * CompletableFuture completes exceptionally with a
1890 * CompletionException holding the exception as its cause.
1891 *
1892 * @param action the action to perform before completing the
1893 * returned CompletableFuture
1894 * @param executor the executor to use for asynchronous execution
1895 * @return the new CompletableFuture
1896 */
1897 public CompletableFuture<Void> thenRunAsync(Runnable action,
1898 Executor executor) {
1899 if (executor == null) throw new NullPointerException();
1900 return doThenRun(action, executor);
1901 }
1902
1903 private CompletableFuture<Void> doThenRun(Runnable action,
1904 Executor e) {
1905 if (action == null) throw new NullPointerException();
1906 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1907 ThenRun d = null;
1908 Object r;
1909 if ((r = result) == null) {
1910 CompletionNode p = new CompletionNode
1911 (d = new ThenRun(this, action, dst, e));
1912 while ((r = result) == null) {
1913 if (UNSAFE.compareAndSwapObject
1914 (this, COMPLETIONS, p.next = completions, p))
1915 break;
1916 }
1917 }
1918 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1919 Throwable ex;
1920 if (r instanceof AltResult)
1921 ex = ((AltResult)r).ex;
1922 else
1923 ex = null;
1924 if (ex == null) {
1925 try {
1926 if (e != null)
1927 e.execute(new AsyncRun(action, dst));
1928 else
1929 action.run();
1930 } catch (Throwable rex) {
1931 ex = rex;
1932 }
1933 }
1934 if (e == null || ex != null)
1935 dst.internalComplete(null, ex);
1936 }
1937 helpPostComplete();
1938 return dst;
1939 }
1940
1941 /**
1942 * Returns a new CompletableFuture that is completed
1943 * when both this and the other given CompletableFuture complete,
1944 * with the result of the given function of the results of the two
1945 * CompletableFutures.
1946 *
1947 * <p>If this and/or the other CompletableFuture complete
1948 * exceptionally, or the supplied function throws an exception,
1949 * then the returned CompletableFuture completes exceptionally
1950 * with a CompletionException holding the exception as its cause.
1951 *
1952 * @param other the other CompletableFuture
1953 * @param fn the function to use to compute the value of
1954 * the returned CompletableFuture
1955 * @return the new CompletableFuture
1956 */
1957 public <U,V> CompletableFuture<V> thenCombine
1958 (CompletableFuture<? extends U> other,
1959 BiFun<? super T,? super U,? extends V> fn) {
1960 return doThenCombine(other, fn, null);
1961 }
1962
1963 /**
1964 * Returns a new CompletableFuture that is asynchronously completed
1965 * when both this and the other given CompletableFuture complete,
1966 * with the result of the given function of the results of the two
1967 * CompletableFutures from a task running in the
1968 * {@link ForkJoinPool#commonPool()}.
1969 *
1970 * <p>If this and/or the other CompletableFuture complete
1971 * exceptionally, or the supplied function throws an exception,
1972 * then the returned CompletableFuture completes exceptionally
1973 * with a CompletionException holding the exception as its cause.
1974 *
1975 * @param other the other CompletableFuture
1976 * @param fn the function to use to compute the value of
1977 * the returned CompletableFuture
1978 * @return the new CompletableFuture
1979 */
1980 public <U,V> CompletableFuture<V> thenCombineAsync
1981 (CompletableFuture<? extends U> other,
1982 BiFun<? super T,? super U,? extends V> fn) {
1983 return doThenCombine(other, fn, ForkJoinPool.commonPool());
1984 }
1985
1986 /**
1987 * Returns a new CompletableFuture that is asynchronously completed
1988 * when both this and the other given CompletableFuture complete,
1989 * with the result of the given function of the results of the two
1990 * CompletableFutures from a task running in the given executor.
1991 *
1992 * <p>If this and/or the other CompletableFuture complete
1993 * exceptionally, or the supplied function throws an exception,
1994 * then the returned CompletableFuture completes exceptionally
1995 * with a CompletionException holding the exception as its cause.
1996 *
1997 * @param other the other CompletableFuture
1998 * @param fn the function to use to compute the value of
1999 * the returned CompletableFuture
2000 * @param executor the executor to use for asynchronous execution
2001 * @return the new CompletableFuture
2002 */
2003 public <U,V> CompletableFuture<V> thenCombineAsync
2004 (CompletableFuture<? extends U> other,
2005 BiFun<? super T,? super U,? extends V> fn,
2006 Executor executor) {
2007 if (executor == null) throw new NullPointerException();
2008 return doThenCombine(other, fn, executor);
2009 }
2010
2011 private <U,V> CompletableFuture<V> doThenCombine
2012 (CompletableFuture<? extends U> other,
2013 BiFun<? super T,? super U,? extends V> fn,
2014 Executor e) {
2015 if (other == null || fn == null) throw new NullPointerException();
2016 CompletableFuture<V> dst = new CompletableFuture<V>();
2017 ThenCombine<T,U,V> d = null;
2018 Object r, s = null;
2019 if ((r = result) == null || (s = other.result) == null) {
2020 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
2021 CompletionNode q = null, p = new CompletionNode(d);
2022 while ((r == null && (r = result) == null) ||
2023 (s == null && (s = other.result) == null)) {
2024 if (q != null) {
2025 if (s != null ||
2026 UNSAFE.compareAndSwapObject
2027 (other, COMPLETIONS, q.next = other.completions, q))
2028 break;
2029 }
2030 else if (r != null ||
2031 UNSAFE.compareAndSwapObject
2032 (this, COMPLETIONS, p.next = completions, p)) {
2033 if (s != null)
2034 break;
2035 q = new CompletionNode(d);
2036 }
2037 }
2038 }
2039 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2040 T t; U u; Throwable ex;
2041 if (r instanceof AltResult) {
2042 ex = ((AltResult)r).ex;
2043 t = null;
2044 }
2045 else {
2046 ex = null;
2047 @SuppressWarnings("unchecked") T tr = (T) r;
2048 t = tr;
2049 }
2050 if (ex != null)
2051 u = null;
2052 else if (s instanceof AltResult) {
2053 ex = ((AltResult)s).ex;
2054 u = null;
2055 }
2056 else {
2057 @SuppressWarnings("unchecked") U us = (U) s;
2058 u = us;
2059 }
2060 V v = null;
2061 if (ex == null) {
2062 try {
2063 if (e != null)
2064 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
2065 else
2066 v = fn.apply(t, u);
2067 } catch (Throwable rex) {
2068 ex = rex;
2069 }
2070 }
2071 if (e == null || ex != null)
2072 dst.internalComplete(v, ex);
2073 }
2074 helpPostComplete();
2075 other.helpPostComplete();
2076 return dst;
2077 }
2078
2079 /**
2080 * Returns a new CompletableFuture that is completed
2081 * when both this and the other given CompletableFuture complete,
2082 * after performing the given action with the results of the two
2083 * CompletableFutures.
2084 *
2085 * <p>If this and/or the other CompletableFuture complete
2086 * exceptionally, or the supplied action throws an exception,
2087 * then the returned CompletableFuture completes exceptionally
2088 * with a CompletionException holding the exception as its cause.
2089 *
2090 * @param other the other CompletableFuture
2091 * @param block the action to perform before completing the
2092 * returned CompletableFuture
2093 * @return the new CompletableFuture
2094 */
2095 public <U> CompletableFuture<Void> thenAcceptBoth
2096 (CompletableFuture<? extends U> other,
2097 BiAction<? super T, ? super U> block) {
2098 return doThenAcceptBoth(other, block, null);
2099 }
2100
2101 /**
2102 * Returns a new CompletableFuture that is asynchronously completed
2103 * when both this and the other given CompletableFuture complete,
2104 * after performing the given action with the results of the two
2105 * CompletableFutures from a task running in the {@link
2106 * ForkJoinPool#commonPool()}.
2107 *
2108 * <p>If this and/or the other CompletableFuture complete
2109 * exceptionally, or the supplied action throws an exception,
2110 * then the returned CompletableFuture completes exceptionally
2111 * with a CompletionException holding the exception as its cause.
2112 *
2113 * @param other the other CompletableFuture
2114 * @param block the action to perform before completing the
2115 * returned CompletableFuture
2116 * @return the new CompletableFuture
2117 */
2118 public <U> CompletableFuture<Void> thenAcceptBothAsync
2119 (CompletableFuture<? extends U> other,
2120 BiAction<? super T, ? super U> block) {
2121 return doThenAcceptBoth(other, block, ForkJoinPool.commonPool());
2122 }
2123
2124 /**
2125 * Returns a new CompletableFuture that is asynchronously completed
2126 * when both this and the other given CompletableFuture complete,
2127 * after performing the given action with the results of the two
2128 * CompletableFutures from a task running in the given executor.
2129 *
2130 * <p>If this and/or the other CompletableFuture complete
2131 * exceptionally, or the supplied action throws an exception,
2132 * then the returned CompletableFuture completes exceptionally
2133 * with a CompletionException holding the exception as its cause.
2134 *
2135 * @param other the other CompletableFuture
2136 * @param block the action to perform before completing the
2137 * returned CompletableFuture
2138 * @param executor the executor to use for asynchronous execution
2139 * @return the new CompletableFuture
2140 */
2141 public <U> CompletableFuture<Void> thenAcceptBothAsync
2142 (CompletableFuture<? extends U> other,
2143 BiAction<? super T, ? super U> block,
2144 Executor executor) {
2145 if (executor == null) throw new NullPointerException();
2146 return doThenAcceptBoth(other, block, executor);
2147 }
2148
2149 private <U> CompletableFuture<Void> doThenAcceptBoth
2150 (CompletableFuture<? extends U> other,
2151 BiAction<? super T,? super U> fn,
2152 Executor e) {
2153 if (other == null || fn == null) throw new NullPointerException();
2154 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2155 ThenAcceptBoth<T,U> d = null;
2156 Object r, s = null;
2157 if ((r = result) == null || (s = other.result) == null) {
2158 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
2159 CompletionNode q = null, p = new CompletionNode(d);
2160 while ((r == null && (r = result) == null) ||
2161 (s == null && (s = other.result) == null)) {
2162 if (q != null) {
2163 if (s != null ||
2164 UNSAFE.compareAndSwapObject
2165 (other, COMPLETIONS, q.next = other.completions, q))
2166 break;
2167 }
2168 else if (r != null ||
2169 UNSAFE.compareAndSwapObject
2170 (this, COMPLETIONS, p.next = completions, p)) {
2171 if (s != null)
2172 break;
2173 q = new CompletionNode(d);
2174 }
2175 }
2176 }
2177 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2178 T t; U u; Throwable ex;
2179 if (r instanceof AltResult) {
2180 ex = ((AltResult)r).ex;
2181 t = null;
2182 }
2183 else {
2184 ex = null;
2185 @SuppressWarnings("unchecked") T tr = (T) r;
2186 t = tr;
2187 }
2188 if (ex != null)
2189 u = null;
2190 else if (s instanceof AltResult) {
2191 ex = ((AltResult)s).ex;
2192 u = null;
2193 }
2194 else {
2195 @SuppressWarnings("unchecked") U us = (U) s;
2196 u = us;
2197 }
2198 if (ex == null) {
2199 try {
2200 if (e != null)
2201 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
2202 else
2203 fn.accept(t, u);
2204 } catch (Throwable rex) {
2205 ex = rex;
2206 }
2207 }
2208 if (e == null || ex != null)
2209 dst.internalComplete(null, ex);
2210 }
2211 helpPostComplete();
2212 other.helpPostComplete();
2213 return dst;
2214 }
2215
2216 /**
2217 * Returns a new CompletableFuture that is completed
2218 * when both this and the other given CompletableFuture complete,
2219 * after performing the given action.
2220 *
2221 * <p>If this and/or the other CompletableFuture complete
2222 * exceptionally, or the supplied action throws an exception,
2223 * then the returned CompletableFuture completes exceptionally
2224 * with a CompletionException holding the exception as its cause.
2225 *
2226 * @param other the other CompletableFuture
2227 * @param action the action to perform before completing the
2228 * returned CompletableFuture
2229 * @return the new CompletableFuture
2230 */
2231 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2232 Runnable action) {
2233 return doRunAfterBoth(other, action, null);
2234 }
2235
2236 /**
2237 * Returns a new CompletableFuture that is asynchronously completed
2238 * when both this and the other given CompletableFuture complete,
2239 * after performing the given action from a task running in the
2240 * {@link ForkJoinPool#commonPool()}.
2241 *
2242 * <p>If this and/or the other CompletableFuture complete
2243 * exceptionally, or the supplied action throws an exception,
2244 * then the returned CompletableFuture completes exceptionally
2245 * with a CompletionException holding the exception as its cause.
2246 *
2247 * @param other the other CompletableFuture
2248 * @param action the action to perform before completing the
2249 * returned CompletableFuture
2250 * @return the new CompletableFuture
2251 */
2252 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2253 Runnable action) {
2254 return doRunAfterBoth(other, action, ForkJoinPool.commonPool());
2255 }
2256
2257 /**
2258 * Returns a new CompletableFuture that is asynchronously completed
2259 * when both this and the other given CompletableFuture complete,
2260 * after performing the given action from a task running in the
2261 * given executor.
2262 *
2263 * <p>If this and/or the other CompletableFuture complete
2264 * exceptionally, or the supplied action throws an exception,
2265 * then the returned CompletableFuture completes exceptionally
2266 * with a CompletionException holding the exception as its cause.
2267 *
2268 * @param other the other CompletableFuture
2269 * @param action the action to perform before completing the
2270 * returned CompletableFuture
2271 * @param executor the executor to use for asynchronous execution
2272 * @return the new CompletableFuture
2273 */
2274 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2275 Runnable action,
2276 Executor executor) {
2277 if (executor == null) throw new NullPointerException();
2278 return doRunAfterBoth(other, action, executor);
2279 }
2280
2281 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
2282 Runnable action,
2283 Executor e) {
2284 if (other == null || action == null) throw new NullPointerException();
2285 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2286 RunAfterBoth d = null;
2287 Object r, s = null;
2288 if ((r = result) == null || (s = other.result) == null) {
2289 d = new RunAfterBoth(this, other, action, dst, e);
2290 CompletionNode q = null, p = new CompletionNode(d);
2291 while ((r == null && (r = result) == null) ||
2292 (s == null && (s = other.result) == null)) {
2293 if (q != null) {
2294 if (s != null ||
2295 UNSAFE.compareAndSwapObject
2296 (other, COMPLETIONS, q.next = other.completions, q))
2297 break;
2298 }
2299 else if (r != null ||
2300 UNSAFE.compareAndSwapObject
2301 (this, COMPLETIONS, p.next = completions, p)) {
2302 if (s != null)
2303 break;
2304 q = new CompletionNode(d);
2305 }
2306 }
2307 }
2308 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2309 Throwable ex;
2310 if (r instanceof AltResult)
2311 ex = ((AltResult)r).ex;
2312 else
2313 ex = null;
2314 if (ex == null && (s instanceof AltResult))
2315 ex = ((AltResult)s).ex;
2316 if (ex == null) {
2317 try {
2318 if (e != null)
2319 e.execute(new AsyncRun(action, dst));
2320 else
2321 action.run();
2322 } catch (Throwable rex) {
2323 ex = rex;
2324 }
2325 }
2326 if (e == null || ex != null)
2327 dst.internalComplete(null, ex);
2328 }
2329 helpPostComplete();
2330 other.helpPostComplete();
2331 return dst;
2332 }
2333
2334 /**
2335 * Returns a new CompletableFuture that is completed
2336 * when either this or the other given CompletableFuture completes,
2337 * with the result of the given function of either this or the other
2338 * CompletableFuture's result.
2339 *
2340 * <p>If this and/or the other CompletableFuture complete
2341 * exceptionally, then the returned CompletableFuture may also do so,
2342 * with a CompletionException holding one of these exceptions as its
2343 * cause. No guarantees are made about which result or exception is
2344 * used in the returned CompletableFuture. If the supplied function
2345 * throws an exception, then the returned CompletableFuture completes
2346 * exceptionally with a CompletionException holding the exception as
2347 * its cause.
2348 *
2349 * @param other the other CompletableFuture
2350 * @param fn the function to use to compute the value of
2351 * the returned CompletableFuture
2352 * @return the new CompletableFuture
2353 */
2354 public <U> CompletableFuture<U> applyToEither
2355 (CompletableFuture<? extends T> other,
2356 Fun<? super T, U> fn) {
2357 return doApplyToEither(other, fn, null);
2358 }
2359
2360 /**
2361 * Returns a new CompletableFuture that is asynchronously completed
2362 * when either this or the other given CompletableFuture completes,
2363 * with the result of the given function of either this or the other
2364 * CompletableFuture's result from a task running in the
2365 * {@link ForkJoinPool#commonPool()}.
2366 *
2367 * <p>If this and/or the other CompletableFuture complete
2368 * exceptionally, then the returned CompletableFuture may also do so,
2369 * with a CompletionException holding one of these exceptions as its
2370 * cause. No guarantees are made about which result or exception is
2371 * used in the returned CompletableFuture. If the supplied function
2372 * throws an exception, then the returned CompletableFuture completes
2373 * exceptionally with a CompletionException holding the exception as
2374 * its cause.
2375 *
2376 * @param other the other CompletableFuture
2377 * @param fn the function to use to compute the value of
2378 * the returned CompletableFuture
2379 * @return the new CompletableFuture
2380 */
2381 public <U> CompletableFuture<U> applyToEitherAsync
2382 (CompletableFuture<? extends T> other,
2383 Fun<? super T, U> fn) {
2384 return doApplyToEither(other, fn, ForkJoinPool.commonPool());
2385 }
2386
2387 /**
2388 * Returns a new CompletableFuture that is asynchronously completed
2389 * when either this or the other given CompletableFuture completes,
2390 * with the result of the given function of either this or the other
2391 * CompletableFuture's result from a task running in the
2392 * given executor.
2393 *
2394 * <p>If this and/or the other CompletableFuture complete
2395 * exceptionally, then the returned CompletableFuture may also do so,
2396 * with a CompletionException holding one of these exceptions as its
2397 * cause. No guarantees are made about which result or exception is
2398 * used in the returned CompletableFuture. If the supplied function
2399 * throws an exception, then the returned CompletableFuture completes
2400 * exceptionally with a CompletionException holding the exception as
2401 * its cause.
2402 *
2403 * @param other the other CompletableFuture
2404 * @param fn the function to use to compute the value of
2405 * the returned CompletableFuture
2406 * @param executor the executor to use for asynchronous execution
2407 * @return the new CompletableFuture
2408 */
2409 public <U> CompletableFuture<U> applyToEitherAsync
2410 (CompletableFuture<? extends T> other,
2411 Fun<? super T, U> fn,
2412 Executor executor) {
2413 if (executor == null) throw new NullPointerException();
2414 return doApplyToEither(other, fn, executor);
2415 }
2416
2417 private <U> CompletableFuture<U> doApplyToEither
2418 (CompletableFuture<? extends T> other,
2419 Fun<? super T, U> fn,
2420 Executor e) {
2421 if (other == null || fn == null) throw new NullPointerException();
2422 CompletableFuture<U> dst = new CompletableFuture<U>();
2423 ApplyToEither<T,U> d = null;
2424 Object r;
2425 if ((r = result) == null && (r = other.result) == null) {
2426 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
2427 CompletionNode q = null, p = new CompletionNode(d);
2428 while ((r = result) == null && (r = other.result) == null) {
2429 if (q != null) {
2430 if (UNSAFE.compareAndSwapObject
2431 (other, COMPLETIONS, q.next = other.completions, q))
2432 break;
2433 }
2434 else if (UNSAFE.compareAndSwapObject
2435 (this, COMPLETIONS, p.next = completions, p))
2436 q = new CompletionNode(d);
2437 }
2438 }
2439 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2440 T t; Throwable ex;
2441 if (r instanceof AltResult) {
2442 ex = ((AltResult)r).ex;
2443 t = null;
2444 }
2445 else {
2446 ex = null;
2447 @SuppressWarnings("unchecked") T tr = (T) r;
2448 t = tr;
2449 }
2450 U u = null;
2451 if (ex == null) {
2452 try {
2453 if (e != null)
2454 e.execute(new AsyncApply<T,U>(t, fn, dst));
2455 else
2456 u = fn.apply(t);
2457 } catch (Throwable rex) {
2458 ex = rex;
2459 }
2460 }
2461 if (e == null || ex != null)
2462 dst.internalComplete(u, ex);
2463 }
2464 helpPostComplete();
2465 other.helpPostComplete();
2466 return dst;
2467 }
2468
2469 /**
2470 * Returns a new CompletableFuture that is completed
2471 * when either this or the other given CompletableFuture completes,
2472 * after performing the given action with the result of either this
2473 * or the other CompletableFuture's result.
2474 *
2475 * <p>If this and/or the other CompletableFuture complete
2476 * exceptionally, then the returned CompletableFuture may also do so,
2477 * with a CompletionException holding one of these exceptions as its
2478 * cause. No guarantees are made about which result or exception is
2479 * used in the returned CompletableFuture. If the supplied action
2480 * throws an exception, then the returned CompletableFuture completes
2481 * exceptionally with a CompletionException holding the exception as
2482 * its cause.
2483 *
2484 * @param other the other CompletableFuture
2485 * @param block the action to perform before completing the
2486 * returned CompletableFuture
2487 * @return the new CompletableFuture
2488 */
2489 public CompletableFuture<Void> acceptEither
2490 (CompletableFuture<? extends T> other,
2491 Action<? super T> block) {
2492 return doAcceptEither(other, block, null);
2493 }
2494
2495 /**
2496 * Returns a new CompletableFuture that is asynchronously completed
2497 * when either this or the other given CompletableFuture completes,
2498 * after performing the given action with the result of either this
2499 * or the other CompletableFuture's result from a task running in
2500 * the {@link ForkJoinPool#commonPool()}.
2501 *
2502 * <p>If this and/or the other CompletableFuture complete
2503 * exceptionally, then the returned CompletableFuture may also do so,
2504 * with a CompletionException holding one of these exceptions as its
2505 * cause. No guarantees are made about which result or exception is
2506 * used in the returned CompletableFuture. If the supplied action
2507 * throws an exception, then the returned CompletableFuture completes
2508 * exceptionally with a CompletionException holding the exception as
2509 * its cause.
2510 *
2511 * @param other the other CompletableFuture
2512 * @param block the action to perform before completing the
2513 * returned CompletableFuture
2514 * @return the new CompletableFuture
2515 */
2516 public CompletableFuture<Void> acceptEitherAsync
2517 (CompletableFuture<? extends T> other,
2518 Action<? super T> block) {
2519 return doAcceptEither(other, block, ForkJoinPool.commonPool());
2520 }
2521
2522 /**
2523 * Returns a new CompletableFuture that is asynchronously completed
2524 * when either this or the other given CompletableFuture completes,
2525 * after performing the given action with the result of either this
2526 * or the other CompletableFuture's result from a task running in
2527 * the given executor.
2528 *
2529 * <p>If this and/or the other CompletableFuture complete
2530 * exceptionally, then the returned CompletableFuture may also do so,
2531 * with a CompletionException holding one of these exceptions as its
2532 * cause. No guarantees are made about which result or exception is
2533 * used in the returned CompletableFuture. If the supplied action
2534 * throws an exception, then the returned CompletableFuture completes
2535 * exceptionally with a CompletionException holding the exception as
2536 * its cause.
2537 *
2538 * @param other the other CompletableFuture
2539 * @param block the action to perform before completing the
2540 * returned CompletableFuture
2541 * @param executor the executor to use for asynchronous execution
2542 * @return the new CompletableFuture
2543 */
2544 public CompletableFuture<Void> acceptEitherAsync
2545 (CompletableFuture<? extends T> other,
2546 Action<? super T> block,
2547 Executor executor) {
2548 if (executor == null) throw new NullPointerException();
2549 return doAcceptEither(other, block, executor);
2550 }
2551
2552 private CompletableFuture<Void> doAcceptEither
2553 (CompletableFuture<? extends T> other,
2554 Action<? super T> fn,
2555 Executor e) {
2556 if (other == null || fn == null) throw new NullPointerException();
2557 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2558 AcceptEither<T> d = null;
2559 Object r;
2560 if ((r = result) == null && (r = other.result) == null) {
2561 d = new AcceptEither<T>(this, other, fn, dst, e);
2562 CompletionNode q = null, p = new CompletionNode(d);
2563 while ((r = result) == null && (r = other.result) == null) {
2564 if (q != null) {
2565 if (UNSAFE.compareAndSwapObject
2566 (other, COMPLETIONS, q.next = other.completions, q))
2567 break;
2568 }
2569 else if (UNSAFE.compareAndSwapObject
2570 (this, COMPLETIONS, p.next = completions, p))
2571 q = new CompletionNode(d);
2572 }
2573 }
2574 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2575 T t; Throwable ex;
2576 if (r instanceof AltResult) {
2577 ex = ((AltResult)r).ex;
2578 t = null;
2579 }
2580 else {
2581 ex = null;
2582 @SuppressWarnings("unchecked") T tr = (T) r;
2583 t = tr;
2584 }
2585 if (ex == null) {
2586 try {
2587 if (e != null)
2588 e.execute(new AsyncAccept<T>(t, fn, dst));
2589 else
2590 fn.accept(t);
2591 } catch (Throwable rex) {
2592 ex = rex;
2593 }
2594 }
2595 if (e == null || ex != null)
2596 dst.internalComplete(null, ex);
2597 }
2598 helpPostComplete();
2599 other.helpPostComplete();
2600 return dst;
2601 }
2602
2603 /**
2604 * Returns a new CompletableFuture that is completed
2605 * when either this or the other given CompletableFuture completes,
2606 * after performing the given action.
2607 *
2608 * <p>If this and/or the other CompletableFuture complete
2609 * exceptionally, then the returned CompletableFuture may also do so,
2610 * with a CompletionException holding one of these exceptions as its
2611 * cause. No guarantees are made about which result or exception is
2612 * used in the returned CompletableFuture. If the supplied action
2613 * throws an exception, then the returned CompletableFuture completes
2614 * exceptionally with a CompletionException holding the exception as
2615 * its cause.
2616 *
2617 * @param other the other CompletableFuture
2618 * @param action the action to perform before completing the
2619 * returned CompletableFuture
2620 * @return the new CompletableFuture
2621 */
2622 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2623 Runnable action) {
2624 return doRunAfterEither(other, action, null);
2625 }
2626
2627 /**
2628 * Returns a new CompletableFuture that is asynchronously completed
2629 * when either this or the other given CompletableFuture completes,
2630 * after performing the given action from a task running in the
2631 * {@link ForkJoinPool#commonPool()}.
2632 *
2633 * <p>If this and/or the other CompletableFuture complete
2634 * exceptionally, then the returned CompletableFuture may also do so,
2635 * with a CompletionException holding one of these exceptions as its
2636 * cause. No guarantees are made about which result or exception is
2637 * used in the returned CompletableFuture. If the supplied action
2638 * throws an exception, then the returned CompletableFuture completes
2639 * exceptionally with a CompletionException holding the exception as
2640 * its cause.
2641 *
2642 * @param other the other CompletableFuture
2643 * @param action the action to perform before completing the
2644 * returned CompletableFuture
2645 * @return the new CompletableFuture
2646 */
2647 public CompletableFuture<Void> runAfterEitherAsync
2648 (CompletableFuture<?> other,
2649 Runnable action) {
2650 return doRunAfterEither(other, action, ForkJoinPool.commonPool());
2651 }
2652
2653 /**
2654 * Returns a new CompletableFuture that is asynchronously completed
2655 * when either this or the other given CompletableFuture completes,
2656 * after performing the given action from a task running in the
2657 * given executor.
2658 *
2659 * <p>If this and/or the other CompletableFuture complete
2660 * exceptionally, then the returned CompletableFuture may also do so,
2661 * with a CompletionException holding one of these exceptions as its
2662 * cause. No guarantees are made about which result or exception is
2663 * used in the returned CompletableFuture. If the supplied action
2664 * throws an exception, then the returned CompletableFuture completes
2665 * exceptionally with a CompletionException holding the exception as
2666 * its cause.
2667 *
2668 * @param other the other CompletableFuture
2669 * @param action the action to perform before completing the
2670 * returned CompletableFuture
2671 * @param executor the executor to use for asynchronous execution
2672 * @return the new CompletableFuture
2673 */
2674 public CompletableFuture<Void> runAfterEitherAsync
2675 (CompletableFuture<?> other,
2676 Runnable action,
2677 Executor executor) {
2678 if (executor == null) throw new NullPointerException();
2679 return doRunAfterEither(other, action, executor);
2680 }
2681
2682 private CompletableFuture<Void> doRunAfterEither
2683 (CompletableFuture<?> other,
2684 Runnable action,
2685 Executor e) {
2686 if (other == null || action == null) throw new NullPointerException();
2687 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2688 RunAfterEither d = null;
2689 Object r;
2690 if ((r = result) == null && (r = other.result) == null) {
2691 d = new RunAfterEither(this, other, action, dst, e);
2692 CompletionNode q = null, p = new CompletionNode(d);
2693 while ((r = result) == null && (r = other.result) == null) {
2694 if (q != null) {
2695 if (UNSAFE.compareAndSwapObject
2696 (other, COMPLETIONS, q.next = other.completions, q))
2697 break;
2698 }
2699 else if (UNSAFE.compareAndSwapObject
2700 (this, COMPLETIONS, p.next = completions, p))
2701 q = new CompletionNode(d);
2702 }
2703 }
2704 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2705 Throwable ex;
2706 if (r instanceof AltResult)
2707 ex = ((AltResult)r).ex;
2708 else
2709 ex = null;
2710 if (ex == null) {
2711 try {
2712 if (e != null)
2713 e.execute(new AsyncRun(action, dst));
2714 else
2715 action.run();
2716 } catch (Throwable rex) {
2717 ex = rex;
2718 }
2719 }
2720 if (e == null || ex != null)
2721 dst.internalComplete(null, ex);
2722 }
2723 helpPostComplete();
2724 other.helpPostComplete();
2725 return dst;
2726 }
2727
2728 /**
2729 * Returns a CompletableFuture that upon completion, has the same
2730 * value as produced by the given function of the result of this
2731 * CompletableFuture.
2732 *
2733 * <p>If this CompletableFuture completes exceptionally, then the
2734 * returned CompletableFuture also does so, with a
2735 * CompletionException holding this exception as its cause.
2736 * Similarly, if the computed CompletableFuture completes
2737 * exceptionally, then so does the returned CompletableFuture.
2738 *
2739 * @param fn the function returning a new CompletableFuture
2740 * @return the CompletableFuture
2741 */
2742 public <U> CompletableFuture<U> thenCompose
2743 (Fun<? super T, CompletableFuture<U>> fn) {
2744 return doThenCompose(fn, null);
2745 }
2746
2747 /**
2748 * Returns a CompletableFuture that upon completion, has the same
2749 * value as that produced asynchronously using the {@link
2750 * ForkJoinPool#commonPool()} by the given function of the result
2751 * of this CompletableFuture.
2752 *
2753 * <p>If this CompletableFuture completes exceptionally, then the
2754 * returned CompletableFuture also does so, with a
2755 * CompletionException holding this exception as its cause.
2756 * Similarly, if the computed CompletableFuture completes
2757 * exceptionally, then so does the returned CompletableFuture.
2758 *
2759 * @param fn the function returning a new CompletableFuture
2760 * @return the CompletableFuture
2761 */
2762 public <U> CompletableFuture<U> thenComposeAsync
2763 (Fun<? super T, CompletableFuture<U>> fn) {
2764 return doThenCompose(fn, ForkJoinPool.commonPool());
2765 }
2766
2767 /**
2768 * Returns a CompletableFuture that upon completion, has the same
2769 * value as that produced asynchronously using the given executor
2770 * by the given function of this CompletableFuture.
2771 *
2772 * <p>If this CompletableFuture completes exceptionally, then the
2773 * returned CompletableFuture also does so, with a
2774 * CompletionException holding this exception as its cause.
2775 * Similarly, if the computed CompletableFuture completes
2776 * exceptionally, then so does the returned CompletableFuture.
2777 *
2778 * @param fn the function returning a new CompletableFuture
2779 * @param executor the executor to use for asynchronous execution
2780 * @return the CompletableFuture
2781 */
2782 public <U> CompletableFuture<U> thenComposeAsync
2783 (Fun<? super T, CompletableFuture<U>> fn,
2784 Executor executor) {
2785 if (executor == null) throw new NullPointerException();
2786 return doThenCompose(fn, executor);
2787 }
2788
2789 private <U> CompletableFuture<U> doThenCompose
2790 (Fun<? super T, CompletableFuture<U>> fn,
2791 Executor e) {
2792 if (fn == null) throw new NullPointerException();
2793 CompletableFuture<U> dst = null;
2794 ThenCompose<T,U> d = null;
2795 Object r;
2796 if ((r = result) == null) {
2797 dst = new CompletableFuture<U>();
2798 CompletionNode p = new CompletionNode
2799 (d = new ThenCompose<T,U>(this, fn, dst, e));
2800 while ((r = result) == null) {
2801 if (UNSAFE.compareAndSwapObject
2802 (this, COMPLETIONS, p.next = completions, p))
2803 break;
2804 }
2805 }
2806 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2807 T t; Throwable ex;
2808 if (r instanceof AltResult) {
2809 ex = ((AltResult)r).ex;
2810 t = null;
2811 }
2812 else {
2813 ex = null;
2814 @SuppressWarnings("unchecked") T tr = (T) r;
2815 t = tr;
2816 }
2817 if (ex == null) {
2818 if (e != null) {
2819 if (dst == null)
2820 dst = new CompletableFuture<U>();
2821 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2822 }
2823 else {
2824 try {
2825 if ((dst = fn.apply(t)) == null)
2826 ex = new NullPointerException();
2827 } catch (Throwable rex) {
2828 ex = rex;
2829 }
2830 }
2831 }
2832 if (dst == null)
2833 dst = new CompletableFuture<U>();
2834 if (ex != null)
2835 dst.internalComplete(null, ex);
2836 }
2837 helpPostComplete();
2838 dst.helpPostComplete();
2839 return dst;
2840 }
2841
2842 /**
2843 * Returns a new CompletableFuture that is completed when this
2844 * CompletableFuture completes, with the result of the given
2845 * function of the exception triggering this CompletableFuture's
2846 * completion when it completes exceptionally; otherwise, if this
2847 * CompletableFuture completes normally, then the returned
2848 * CompletableFuture also completes normally with the same value.
2849 *
2850 * @param fn the function to use to compute the value of the
2851 * returned CompletableFuture if this CompletableFuture completed
2852 * exceptionally
2853 * @return the new CompletableFuture
2854 */
2855 public CompletableFuture<T> exceptionally
2856 (Fun<Throwable, ? extends T> fn) {
2857 if (fn == null) throw new NullPointerException();
2858 CompletableFuture<T> dst = new CompletableFuture<T>();
2859 ExceptionCompletion<T> d = null;
2860 Object r;
2861 if ((r = result) == null) {
2862 CompletionNode p =
2863 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2864 while ((r = result) == null) {
2865 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2866 p.next = completions, p))
2867 break;
2868 }
2869 }
2870 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2871 T t = null; Throwable ex, dx = null;
2872 if (r instanceof AltResult) {
2873 if ((ex = ((AltResult)r).ex) != null) {
2874 try {
2875 t = fn.apply(ex);
2876 } catch (Throwable rex) {
2877 dx = rex;
2878 }
2879 }
2880 }
2881 else {
2882 @SuppressWarnings("unchecked") T tr = (T) r;
2883 t = tr;
2884 }
2885 dst.internalComplete(t, dx);
2886 }
2887 helpPostComplete();
2888 return dst;
2889 }
2890
2891 /**
2892 * Returns a new CompletableFuture that is completed when this
2893 * CompletableFuture completes, with the result of the given
2894 * function of the result and exception of this CompletableFuture's
2895 * completion. The given function is invoked with the result (or
2896 * {@code null} if none) and the exception (or {@code null} if none)
2897 * of this CompletableFuture when complete.
2898 *
2899 * @param fn the function to use to compute the value of the
2900 * returned CompletableFuture
2901 * @return the new CompletableFuture
2902 */
2903 public <U> CompletableFuture<U> handle
2904 (BiFun<? super T, Throwable, ? extends U> fn) {
2905 if (fn == null) throw new NullPointerException();
2906 CompletableFuture<U> dst = new CompletableFuture<U>();
2907 HandleCompletion<T,U> d = null;
2908 Object r;
2909 if ((r = result) == null) {
2910 CompletionNode p =
2911 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2912 while ((r = result) == null) {
2913 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2914 p.next = completions, p))
2915 break;
2916 }
2917 }
2918 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2919 T t; Throwable ex;
2920 if (r instanceof AltResult) {
2921 ex = ((AltResult)r).ex;
2922 t = null;
2923 }
2924 else {
2925 ex = null;
2926 @SuppressWarnings("unchecked") T tr = (T) r;
2927 t = tr;
2928 }
2929 U u; Throwable dx;
2930 try {
2931 u = fn.apply(t, ex);
2932 dx = null;
2933 } catch (Throwable rex) {
2934 dx = rex;
2935 u = null;
2936 }
2937 dst.internalComplete(u, dx);
2938 }
2939 helpPostComplete();
2940 return dst;
2941 }
2942
2943
2944 /* ------------- Arbitrary-arity constructions -------------- */
2945
2946 /*
2947 * The basic plan of attack is to recursively form binary
2948 * completion trees of elements. This can be overkill for small
2949 * sets, but scales nicely. The And/All vs Or/Any forms use the
2950 * same idea, but details differ.
2951 */
2952
2953 /**
2954 * Returns a new CompletableFuture that is completed when all of
2955 * the given CompletableFutures complete. If any of the given
2956 * CompletableFutures complete exceptionally, then the returned
2957 * CompletableFuture also does so, with a CompletionException
2958 * holding this exception as its cause. Otherwise, the results,
2959 * if any, of the given CompletableFutures are not reflected in
2960 * the returned CompletableFuture, but may be obtained by
2961 * inspecting them individually. If no CompletableFutures are
2962 * provided, returns a CompletableFuture completed with the value
2963 * {@code null}.
2964 *
2965 * <p>Among the applications of this method is to await completion
2966 * of a set of independent CompletableFutures before continuing a
2967 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2968 * c3).join();}.
2969 *
2970 * @param cfs the CompletableFutures
2971 * @return a new CompletableFuture that is completed when all of the
2972 * given CompletableFutures complete
2973 * @throws NullPointerException if the array or any of its elements are
2974 * {@code null}
2975 */
2976 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2977 int len = cfs.length; // Directly handle empty and singleton cases
2978 if (len > 1)
2979 return allTree(cfs, 0, len - 1);
2980 else {
2981 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2982 CompletableFuture<?> f;
2983 if (len == 0)
2984 dst.result = NIL;
2985 else if ((f = cfs[0]) == null)
2986 throw new NullPointerException();
2987 else {
2988 ThenPropagate d = null;
2989 CompletionNode p = null;
2990 Object r;
2991 while ((r = f.result) == null) {
2992 if (d == null)
2993 d = new ThenPropagate(f, dst);
2994 else if (p == null)
2995 p = new CompletionNode(d);
2996 else if (UNSAFE.compareAndSwapObject
2997 (f, COMPLETIONS, p.next = f.completions, p))
2998 break;
2999 }
3000 if (r != null && (d == null || d.compareAndSet(0, 1)))
3001 dst.internalComplete(null, (r instanceof AltResult) ?
3002 ((AltResult)r).ex : null);
3003 f.helpPostComplete();
3004 }
3005 return dst;
3006 }
3007 }
3008
3009 /**
3010 * Recursively constructs an And'ed tree of CompletableFutures.
3011 * Called only when array known to have at least two elements.
3012 */
3013 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
3014 int lo, int hi) {
3015 CompletableFuture<?> fst, snd;
3016 int mid = (lo + hi) >>> 1;
3017 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
3018 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
3019 throw new NullPointerException();
3020 CompletableFuture<Void> dst = new CompletableFuture<Void>();
3021 AndCompletion d = null;
3022 CompletionNode p = null, q = null;
3023 Object r = null, s = null;
3024 while ((r = fst.result) == null || (s = snd.result) == null) {
3025 if (d == null)
3026 d = new AndCompletion(fst, snd, dst);
3027 else if (p == null)
3028 p = new CompletionNode(d);
3029 else if (q == null) {
3030 if (UNSAFE.compareAndSwapObject
3031 (fst, COMPLETIONS, p.next = fst.completions, p))
3032 q = new CompletionNode(d);
3033 }
3034 else if (UNSAFE.compareAndSwapObject
3035 (snd, COMPLETIONS, q.next = snd.completions, q))
3036 break;
3037 }
3038 if ((r != null || (r = fst.result) != null) &&
3039 (s != null || (s = snd.result) != null) &&
3040 (d == null || d.compareAndSet(0, 1))) {
3041 Throwable ex;
3042 if (r instanceof AltResult)
3043 ex = ((AltResult)r).ex;
3044 else
3045 ex = null;
3046 if (ex == null && (s instanceof AltResult))
3047 ex = ((AltResult)s).ex;
3048 dst.internalComplete(null, ex);
3049 }
3050 fst.helpPostComplete();
3051 snd.helpPostComplete();
3052 return dst;
3053 }
3054
3055 /**
3056 * Returns a new CompletableFuture that is completed when any of
3057 * the given CompletableFutures complete, with the same result.
3058 * Otherwise, if it completed exceptionally, the returned
3059 * CompletableFuture also does so, with a CompletionException
3060 * holding this exception as its cause. If no CompletableFutures
3061 * are provided, returns an incomplete CompletableFuture.
3062 *
3063 * @param cfs the CompletableFutures
3064 * @return a new CompletableFuture that is completed with the
3065 * result or exception of any of the given CompletableFutures when
3066 * one completes
3067 * @throws NullPointerException if the array or any of its elements are
3068 * {@code null}
3069 */
3070 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
3071 int len = cfs.length; // Same idea as allOf
3072 if (len > 1)
3073 return anyTree(cfs, 0, len - 1);
3074 else {
3075 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3076 CompletableFuture<?> f;
3077 if (len == 0)
3078 ; // skip
3079 else if ((f = cfs[0]) == null)
3080 throw new NullPointerException();
3081 else {
3082 ThenCopy<Object> d = null;
3083 CompletionNode p = null;
3084 Object r;
3085 while ((r = f.result) == null) {
3086 if (d == null)
3087 d = new ThenCopy<Object>(f, dst);
3088 else if (p == null)
3089 p = new CompletionNode(d);
3090 else if (UNSAFE.compareAndSwapObject
3091 (f, COMPLETIONS, p.next = f.completions, p))
3092 break;
3093 }
3094 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3095 Throwable ex; Object t;
3096 if (r instanceof AltResult) {
3097 ex = ((AltResult)r).ex;
3098 t = null;
3099 }
3100 else {
3101 ex = null;
3102 t = r;
3103 }
3104 dst.internalComplete(t, ex);
3105 }
3106 f.helpPostComplete();
3107 }
3108 return dst;
3109 }
3110 }
3111
3112 /**
3113 * Recursively constructs an Or'ed tree of CompletableFutures.
3114 */
3115 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
3116 int lo, int hi) {
3117 CompletableFuture<?> fst, snd;
3118 int mid = (lo + hi) >>> 1;
3119 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3120 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3121 throw new NullPointerException();
3122 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3123 OrCompletion d = null;
3124 CompletionNode p = null, q = null;
3125 Object r;
3126 while ((r = fst.result) == null && (r = snd.result) == null) {
3127 if (d == null)
3128 d = new OrCompletion(fst, snd, dst);
3129 else if (p == null)
3130 p = new CompletionNode(d);
3131 else if (q == null) {
3132 if (UNSAFE.compareAndSwapObject
3133 (fst, COMPLETIONS, p.next = fst.completions, p))
3134 q = new CompletionNode(d);
3135 }
3136 else if (UNSAFE.compareAndSwapObject
3137 (snd, COMPLETIONS, q.next = snd.completions, q))
3138 break;
3139 }
3140 if ((r != null || (r = fst.result) != null ||
3141 (r = snd.result) != null) &&
3142 (d == null || d.compareAndSet(0, 1))) {
3143 Throwable ex; Object t;
3144 if (r instanceof AltResult) {
3145 ex = ((AltResult)r).ex;
3146 t = null;
3147 }
3148 else {
3149 ex = null;
3150 t = r;
3151 }
3152 dst.internalComplete(t, ex);
3153 }
3154 fst.helpPostComplete();
3155 snd.helpPostComplete();
3156 return dst;
3157 }
3158
3159 /* ------------- Control and status methods -------------- */
3160
3161 /**
3162 * If not already completed, completes this CompletableFuture with
3163 * a {@link CancellationException}. Dependent CompletableFutures
3164 * that have not already completed will also complete
3165 * exceptionally, with a {@link CompletionException} caused by
3166 * this {@code CancellationException}.
3167 *
3168 * @param mayInterruptIfRunning this value has no effect in this
3169 * implementation because interrupts are not used to control
3170 * processing.
3171 *
3172 * @return {@code true} if this task is now cancelled
3173 */
3174 public boolean cancel(boolean mayInterruptIfRunning) {
3175 boolean cancelled = (result == null) &&
3176 UNSAFE.compareAndSwapObject
3177 (this, RESULT, null, new AltResult(new CancellationException()));
3178 postComplete();
3179 return cancelled || isCancelled();
3180 }
3181
3182 /**
3183 * Returns {@code true} if this CompletableFuture was cancelled
3184 * before it completed normally.
3185 *
3186 * @return {@code true} if this CompletableFuture was cancelled
3187 * before it completed normally
3188 */
3189 public boolean isCancelled() {
3190 Object r;
3191 return ((r = result) instanceof AltResult) &&
3192 (((AltResult)r).ex instanceof CancellationException);
3193 }
3194
3195 /**
3196 * Forcibly sets or resets the value subsequently returned by
3197 * method {@link #get()} and related methods, whether or not
3198 * already completed. This method is designed for use only in
3199 * error recovery actions, and even in such situations may result
3200 * in ongoing dependent completions using established versus
3201 * overwritten outcomes.
3202 *
3203 * @param value the completion value
3204 */
3205 public void obtrudeValue(T value) {
3206 result = (value == null) ? NIL : value;
3207 postComplete();
3208 }
3209
3210 /**
3211 * Forcibly causes subsequent invocations of method {@link #get()}
3212 * and related methods to throw the given exception, whether or
3213 * not already completed. This method is designed for use only in
3214 * recovery actions, and even in such situations may result in
3215 * ongoing dependent completions using established versus
3216 * overwritten outcomes.
3217 *
3218 * @param ex the exception
3219 */
3220 public void obtrudeException(Throwable ex) {
3221 if (ex == null) throw new NullPointerException();
3222 result = new AltResult(ex);
3223 postComplete();
3224 }
3225
3226 /**
3227 * Returns the estimated number of CompletableFutures whose
3228 * completions are awaiting completion of this CompletableFuture.
3229 * This method is designed for use in monitoring system state, not
3230 * for synchronization control.
3231 *
3232 * @return the number of dependent CompletableFutures
3233 */
3234 public int getNumberOfDependents() {
3235 int count = 0;
3236 for (CompletionNode p = completions; p != null; p = p.next)
3237 ++count;
3238 return count;
3239 }
3240
3241 /**
3242 * Returns a string identifying this CompletableFuture, as well as
3243 * its completion state. The state, in brackets, contains the
3244 * String {@code "Completed Normally"} or the String {@code
3245 * "Completed Exceptionally"}, or the String {@code "Not
3246 * completed"} followed by the number of CompletableFutures
3247 * dependent upon its completion, if any.
3248 *
3249 * @return a string identifying this CompletableFuture, as well as its state
3250 */
3251 public String toString() {
3252 Object r = result;
3253 int count;
3254 return super.toString() +
3255 ((r == null) ?
3256 (((count = getNumberOfDependents()) == 0) ?
3257 "[Not completed]" :
3258 "[Not completed, " + count + " dependents]") :
3259 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3260 "[Completed exceptionally]" :
3261 "[Completed normally]"));
3262 }
3263
3264 // Unsafe mechanics
3265 private static final sun.misc.Unsafe UNSAFE;
3266 private static final long RESULT;
3267 private static final long WAITERS;
3268 private static final long COMPLETIONS;
3269 static {
3270 try {
3271 UNSAFE = getUnsafe();
3272 Class<?> k = CompletableFuture.class;
3273 RESULT = UNSAFE.objectFieldOffset
3274 (k.getDeclaredField("result"));
3275 WAITERS = UNSAFE.objectFieldOffset
3276 (k.getDeclaredField("waiters"));
3277 COMPLETIONS = UNSAFE.objectFieldOffset
3278 (k.getDeclaredField("completions"));
3279 } catch (Exception e) {
3280 throw new Error(e);
3281 }
3282 }
3283
3284 /**
3285 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
3286 * Replace with a simple call to Unsafe.getUnsafe when integrating
3287 * into a jdk.
3288 *
3289 * @return a sun.misc.Unsafe
3290 */
3291 private static sun.misc.Unsafe getUnsafe() {
3292 try {
3293 return sun.misc.Unsafe.getUnsafe();
3294 } catch (SecurityException tryReflectionInstead) {}
3295 try {
3296 return java.security.AccessController.doPrivileged
3297 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
3298 public sun.misc.Unsafe run() throws Exception {
3299 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
3300 for (java.lang.reflect.Field f : k.getDeclaredFields()) {
3301 f.setAccessible(true);
3302 Object x = f.get(null);
3303 if (k.isInstance(x))
3304 return k.cast(x);
3305 }
3306 throw new NoSuchFieldError("the Unsafe");
3307 }});
3308 } catch (java.security.PrivilegedActionException e) {
3309 throw new RuntimeException("Could not initialize intrinsics",
3310 e.getCause());
3311 }
3312 }
3313 }