ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CompletableFuture.java
Revision: 1.21
Committed: Tue Nov 26 18:36:21 2013 UTC (10 years, 5 months ago) by dl
Branch: MAIN
Changes since 1.20: +1 -1 lines
Log Message:
Fix bad conditional

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.FutureTask;
10 import java.util.concurrent.TimeUnit;
11 import java.util.concurrent.Executor;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeoutException;
14 import java.util.concurrent.CancellationException;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.locks.LockSupport;
17
18 /**
19 * A {@link Future} that may be explicitly completed (setting its
20 * value and status), and may include dependent functions and actions
21 * that trigger upon its completion.
22 *
23 * <p>When two or more threads attempt to
24 * {@link #complete complete},
25 * {@link #completeExceptionally completeExceptionally}, or
26 * {@link #cancel cancel}
27 * a CompletableFuture, only one of them succeeds.
28 *
29 * <p>Methods are available for adding dependents based on
30 * user-provided Functions, Actions, or Runnables. The appropriate
31 * form to use depends on whether actions require arguments and/or
32 * produce results. Completion of a dependent action will trigger the
33 * completion of another CompletableFuture. Actions may also be
34 * triggered after either or both the current and another
35 * CompletableFuture complete. Multiple CompletableFutures may also
36 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
37 * {@link #allOf(CompletableFuture...)}.
38 *
39 * <p>CompletableFutures themselves do not execute asynchronously.
40 * However, actions supplied for dependent completions of another
41 * CompletableFuture may do so, depending on whether they are provided
42 * via one of the <em>async</em> methods (that is, methods with names
43 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
44 * methods provide a way to commence asynchronous processing of an
45 * action using either a given {@link Executor} or by default the
46 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
47 * debugging, and tracking, all generated asynchronous tasks are
48 * instances of the marker interface {@link AsynchronousCompletionTask}.
49 *
50 * <p>Actions supplied for dependent completions of <em>non-async</em>
51 * methods may be performed by the thread that completes the current
52 * CompletableFuture, or by any other caller of these methods. There
53 * are no guarantees about the order of processing completions unless
54 * constrained by these methods.
55 *
56 * <p>Since (unlike {@link FutureTask}) this class has no direct
57 * control over the computation that causes it to be completed,
58 * cancellation is treated as just another form of exceptional completion.
59 * Method {@link #cancel cancel} has the same effect as
60 * {@code completeExceptionally(new CancellationException())}.
61 *
62 * <p>Upon exceptional completion (including cancellation), or when a
63 * completion entails an additional computation which terminates
64 * abruptly with an (unchecked) exception or error, then all of their
65 * dependent completions (and their dependents in turn) generally act
66 * as {@code completeExceptionally} with a {@link CompletionException}
67 * holding that exception as its cause. However, the {@link
68 * #exceptionally exceptionally} and {@link #handle handle}
69 * completions <em>are</em> able to handle exceptional completions of
70 * the CompletableFutures they depend on.
71 *
72 * <p>In case of exceptional completion with a CompletionException,
73 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
74 * {@link ExecutionException} with the same cause as held in the
75 * corresponding CompletionException. However, in these cases,
76 * methods {@link #join()} and {@link #getNow} throw the
77 * CompletionException, which simplifies usage.
78 *
79 * <p>Arguments used to pass a completion result (that is, for parameters
80 * of type {@code T}) may be null, but passing a null value for any other
81 * parameter will result in a {@link NullPointerException} being thrown.
82 *
83 * @author Doug Lea
84 */
85 public class CompletableFuture<T> implements Future<T> {
86 // jsr166e nested interfaces
87
88 /** Interface describing a void action of one argument */
89 public interface Action<A> { void accept(A a); }
90 /** Interface describing a void action of two arguments */
91 public interface BiAction<A,B> { void accept(A a, B b); }
92 /** Interface describing a function of one argument */
93 public interface Fun<A,T> { T apply(A a); }
94 /** Interface describing a function of two arguments */
95 public interface BiFun<A,B,T> { T apply(A a, B b); }
96 /** Interface describing a function of no arguments */
97 public interface Generator<T> { T get(); }
98
99
100 /*
101 * Overview:
102 *
103 * 1. Non-nullness of field result (set via CAS) indicates done.
104 * An AltResult is used to box null as a result, as well as to
105 * hold exceptions. Using a single field makes completion fast
106 * and simple to detect and trigger, at the expense of a lot of
107 * encoding and decoding that infiltrates many methods. One minor
108 * simplification relies on the (static) NIL (to box null results)
109 * being the only AltResult with a null exception field, so we
110 * don't usually need explicit comparisons with NIL. The CF
111 * exception propagation mechanics surrounding decoding rely on
112 * unchecked casts of decoded results really being unchecked,
113 * where user type errors are caught at point of use, as is
114 * currently the case in Java. These are highlighted by using
115 * SuppressWarnings-annotated temporaries.
116 *
117 * 2. Waiters are held in a Treiber stack similar to the one used
118 * in FutureTask, Phaser, and SynchronousQueue. See their
119 * internal documentation for algorithmic details.
120 *
121 * 3. Completions are also kept in a list/stack, and pulled off
122 * and run when completion is triggered. (We could even use the
123 * same stack as for waiters, but would give up the potential
124 * parallelism obtained because woken waiters help release/run
125 * others -- see method postComplete). Because post-processing
126 * may race with direct calls, class Completion opportunistically
127 * extends AtomicInteger so callers can claim the action via
128 * compareAndSet(0, 1). The Completion.run methods are all
129 * written a boringly similar uniform way (that sometimes includes
130 * unnecessary-looking checks, kept to maintain uniformity).
131 * There are enough dimensions upon which they differ that
132 * attempts to factor commonalities while maintaining efficiency
133 * require more lines of code than they would save.
134 *
135 * 4. The exported then/and/or methods do support a bit of
136 * factoring (see doThenApply etc). They must cope with the
137 * intrinsic races surrounding addition of a dependent action
138 * versus performing the action directly because the task is
139 * already complete. For example, a CF may not be complete upon
140 * entry, so a dependent completion is added, but by the time it
141 * is added, the target CF is complete, so must be directly
142 * executed. This is all done while avoiding unnecessary object
143 * construction in safe-bypass cases.
144 */
145
146 // preliminaries
147
148 static final class AltResult {
149 final Throwable ex; // null only for NIL
150 AltResult(Throwable ex) { this.ex = ex; }
151 }
152
153 static final AltResult NIL = new AltResult(null);
154
155 // Fields
156
157 volatile Object result; // Either the result or boxed AltResult
158 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
159 volatile CompletionNode completions; // list (Treiber stack) of completions
160
161 // Basic utilities for triggering and processing completions
162
163 /**
164 * Removes and signals all waiting threads and runs all completions.
165 */
166 final void postComplete() {
167 WaitNode q; Thread t;
168 while ((q = waiters) != null) {
169 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
170 (t = q.thread) != null) {
171 q.thread = null;
172 LockSupport.unpark(t);
173 }
174 }
175
176 CompletionNode h; Completion c;
177 while ((h = completions) != null) {
178 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
179 (c = h.completion) != null)
180 c.run();
181 }
182 }
183
184 /**
185 * Triggers completion with the encoding of the given arguments:
186 * if the exception is non-null, encodes it as a wrapped
187 * CompletionException unless it is one already. Otherwise uses
188 * the given result, boxed as NIL if null.
189 */
190 final void internalComplete(T v, Throwable ex) {
191 if (result == null)
192 UNSAFE.compareAndSwapObject
193 (this, RESULT, null,
194 (ex == null) ? (v == null) ? NIL : v :
195 new AltResult((ex instanceof CompletionException) ? ex :
196 new CompletionException(ex)));
197 postComplete(); // help out even if not triggered
198 }
199
200 /**
201 * If triggered, helps release and/or process completions.
202 */
203 final void helpPostComplete() {
204 if (result != null)
205 postComplete();
206 }
207
208 /* ------------- waiting for completions -------------- */
209
210 /** Number of processors, for spin control */
211 static final int NCPU = Runtime.getRuntime().availableProcessors();
212
213 /**
214 * Heuristic spin value for waitingGet() before blocking on
215 * multiprocessors
216 */
217 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
218
219 /**
220 * Linked nodes to record waiting threads in a Treiber stack. See
221 * other classes such as Phaser and SynchronousQueue for more
222 * detailed explanation. This class implements ManagedBlocker to
223 * avoid starvation when blocking actions pile up in
224 * ForkJoinPools.
225 */
226 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
227 long nanos; // wait time if timed
228 final long deadline; // non-zero if timed
229 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
230 volatile Thread thread;
231 volatile WaitNode next;
232 WaitNode(boolean interruptible, long nanos, long deadline) {
233 this.thread = Thread.currentThread();
234 this.interruptControl = interruptible ? 1 : 0;
235 this.nanos = nanos;
236 this.deadline = deadline;
237 }
238 public boolean isReleasable() {
239 if (thread == null)
240 return true;
241 if (Thread.interrupted()) {
242 int i = interruptControl;
243 interruptControl = -1;
244 if (i > 0)
245 return true;
246 }
247 if (deadline != 0L &&
248 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
249 thread = null;
250 return true;
251 }
252 return false;
253 }
254 public boolean block() {
255 if (isReleasable())
256 return true;
257 else if (deadline == 0L)
258 LockSupport.park(this);
259 else if (nanos > 0L)
260 LockSupport.parkNanos(this, nanos);
261 return isReleasable();
262 }
263 }
264
265 /**
266 * Returns raw result after waiting, or null if interruptible and
267 * interrupted.
268 */
269 private Object waitingGet(boolean interruptible) {
270 WaitNode q = null;
271 boolean queued = false;
272 int spins = SPINS;
273 for (Object r;;) {
274 if ((r = result) != null) {
275 if (q != null) { // suppress unpark
276 q.thread = null;
277 if (q.interruptControl < 0) {
278 if (interruptible) {
279 removeWaiter(q);
280 return null;
281 }
282 Thread.currentThread().interrupt();
283 }
284 }
285 postComplete(); // help release others
286 return r;
287 }
288 else if (spins > 0) {
289 int rnd = ThreadLocalRandom.current().nextInt();
290 if (rnd >= 0)
291 --spins;
292 }
293 else if (q == null)
294 q = new WaitNode(interruptible, 0L, 0L);
295 else if (!queued)
296 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
297 q.next = waiters, q);
298 else if (interruptible && q.interruptControl < 0) {
299 removeWaiter(q);
300 return null;
301 }
302 else if (q.thread != null && result == null) {
303 try {
304 ForkJoinPool.managedBlock(q);
305 } catch (InterruptedException ex) {
306 q.interruptControl = -1;
307 }
308 }
309 }
310 }
311
312 /**
313 * Awaits completion or aborts on interrupt or timeout.
314 *
315 * @param nanos time to wait
316 * @return raw result
317 */
318 private Object timedAwaitDone(long nanos)
319 throws InterruptedException, TimeoutException {
320 WaitNode q = null;
321 boolean queued = false;
322 for (Object r;;) {
323 if ((r = result) != null) {
324 if (q != null) {
325 q.thread = null;
326 if (q.interruptControl < 0) {
327 removeWaiter(q);
328 throw new InterruptedException();
329 }
330 }
331 postComplete();
332 return r;
333 }
334 else if (q == null) {
335 if (nanos <= 0L)
336 throw new TimeoutException();
337 long d = System.nanoTime() + nanos;
338 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
339 }
340 else if (!queued)
341 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
342 q.next = waiters, q);
343 else if (q.interruptControl < 0) {
344 removeWaiter(q);
345 throw new InterruptedException();
346 }
347 else if (q.nanos <= 0L) {
348 if (result == null) {
349 removeWaiter(q);
350 throw new TimeoutException();
351 }
352 }
353 else if (q.thread != null && result == null) {
354 try {
355 ForkJoinPool.managedBlock(q);
356 } catch (InterruptedException ex) {
357 q.interruptControl = -1;
358 }
359 }
360 }
361 }
362
363 /**
364 * Tries to unlink a timed-out or interrupted wait node to avoid
365 * accumulating garbage. Internal nodes are simply unspliced
366 * without CAS since it is harmless if they are traversed anyway
367 * by releasers. To avoid effects of unsplicing from already
368 * removed nodes, the list is retraversed in case of an apparent
369 * race. This is slow when there are a lot of nodes, but we don't
370 * expect lists to be long enough to outweigh higher-overhead
371 * schemes.
372 */
373 private void removeWaiter(WaitNode node) {
374 if (node != null) {
375 node.thread = null;
376 retry:
377 for (;;) { // restart on removeWaiter race
378 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
379 s = q.next;
380 if (q.thread != null)
381 pred = q;
382 else if (pred != null) {
383 pred.next = s;
384 if (pred.thread == null) // check for race
385 continue retry;
386 }
387 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
388 continue retry;
389 }
390 break;
391 }
392 }
393 }
394
395 /* ------------- Async tasks -------------- */
396
397 /**
398 * A marker interface identifying asynchronous tasks produced by
399 * {@code async} methods. This may be useful for monitoring,
400 * debugging, and tracking asynchronous activities.
401 *
402 * @since 1.8
403 */
404 public static interface AsynchronousCompletionTask {
405 }
406
407 /** Base class can act as either FJ or plain Runnable */
408 abstract static class Async extends ForkJoinTask<Void>
409 implements Runnable, AsynchronousCompletionTask {
410 public final Void getRawResult() { return null; }
411 public final void setRawResult(Void v) { }
412 public final void run() { exec(); }
413 }
414
415 static final class AsyncRun extends Async {
416 final Runnable fn;
417 final CompletableFuture<Void> dst;
418 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
419 this.fn = fn; this.dst = dst;
420 }
421 public final boolean exec() {
422 CompletableFuture<Void> d; Throwable ex;
423 if ((d = this.dst) != null && d.result == null) {
424 try {
425 fn.run();
426 ex = null;
427 } catch (Throwable rex) {
428 ex = rex;
429 }
430 d.internalComplete(null, ex);
431 }
432 return true;
433 }
434 private static final long serialVersionUID = 5232453952276885070L;
435 }
436
437 static final class AsyncSupply<U> extends Async {
438 final Generator<U> fn;
439 final CompletableFuture<U> dst;
440 AsyncSupply(Generator<U> fn, CompletableFuture<U> dst) {
441 this.fn = fn; this.dst = dst;
442 }
443 public final boolean exec() {
444 CompletableFuture<U> d; U u; Throwable ex;
445 if ((d = this.dst) != null && d.result == null) {
446 try {
447 u = fn.get();
448 ex = null;
449 } catch (Throwable rex) {
450 ex = rex;
451 u = null;
452 }
453 d.internalComplete(u, ex);
454 }
455 return true;
456 }
457 private static final long serialVersionUID = 5232453952276885070L;
458 }
459
460 static final class AsyncApply<T,U> extends Async {
461 final T arg;
462 final Fun<? super T,? extends U> fn;
463 final CompletableFuture<U> dst;
464 AsyncApply(T arg, Fun<? super T,? extends U> fn,
465 CompletableFuture<U> dst) {
466 this.arg = arg; this.fn = fn; this.dst = dst;
467 }
468 public final boolean exec() {
469 CompletableFuture<U> d; U u; Throwable ex;
470 if ((d = this.dst) != null && d.result == null) {
471 try {
472 u = fn.apply(arg);
473 ex = null;
474 } catch (Throwable rex) {
475 ex = rex;
476 u = null;
477 }
478 d.internalComplete(u, ex);
479 }
480 return true;
481 }
482 private static final long serialVersionUID = 5232453952276885070L;
483 }
484
485 static final class AsyncCombine<T,U,V> extends Async {
486 final T arg1;
487 final U arg2;
488 final BiFun<? super T,? super U,? extends V> fn;
489 final CompletableFuture<V> dst;
490 AsyncCombine(T arg1, U arg2,
491 BiFun<? super T,? super U,? extends V> fn,
492 CompletableFuture<V> dst) {
493 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
494 }
495 public final boolean exec() {
496 CompletableFuture<V> d; V v; Throwable ex;
497 if ((d = this.dst) != null && d.result == null) {
498 try {
499 v = fn.apply(arg1, arg2);
500 ex = null;
501 } catch (Throwable rex) {
502 ex = rex;
503 v = null;
504 }
505 d.internalComplete(v, ex);
506 }
507 return true;
508 }
509 private static final long serialVersionUID = 5232453952276885070L;
510 }
511
512 static final class AsyncAccept<T> extends Async {
513 final T arg;
514 final Action<? super T> fn;
515 final CompletableFuture<Void> dst;
516 AsyncAccept(T arg, Action<? super T> fn,
517 CompletableFuture<Void> dst) {
518 this.arg = arg; this.fn = fn; this.dst = dst;
519 }
520 public final boolean exec() {
521 CompletableFuture<Void> d; Throwable ex;
522 if ((d = this.dst) != null && d.result == null) {
523 try {
524 fn.accept(arg);
525 ex = null;
526 } catch (Throwable rex) {
527 ex = rex;
528 }
529 d.internalComplete(null, ex);
530 }
531 return true;
532 }
533 private static final long serialVersionUID = 5232453952276885070L;
534 }
535
536 static final class AsyncAcceptBoth<T,U> extends Async {
537 final T arg1;
538 final U arg2;
539 final BiAction<? super T,? super U> fn;
540 final CompletableFuture<Void> dst;
541 AsyncAcceptBoth(T arg1, U arg2,
542 BiAction<? super T,? super U> fn,
543 CompletableFuture<Void> dst) {
544 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
545 }
546 public final boolean exec() {
547 CompletableFuture<Void> d; Throwable ex;
548 if ((d = this.dst) != null && d.result == null) {
549 try {
550 fn.accept(arg1, arg2);
551 ex = null;
552 } catch (Throwable rex) {
553 ex = rex;
554 }
555 d.internalComplete(null, ex);
556 }
557 return true;
558 }
559 private static final long serialVersionUID = 5232453952276885070L;
560 }
561
562 static final class AsyncCompose<T,U> extends Async {
563 final T arg;
564 final Fun<? super T, CompletableFuture<U>> fn;
565 final CompletableFuture<U> dst;
566 AsyncCompose(T arg,
567 Fun<? super T, CompletableFuture<U>> fn,
568 CompletableFuture<U> dst) {
569 this.arg = arg; this.fn = fn; this.dst = dst;
570 }
571 public final boolean exec() {
572 CompletableFuture<U> d, fr; U u; Throwable ex;
573 if ((d = this.dst) != null && d.result == null) {
574 try {
575 fr = fn.apply(arg);
576 ex = (fr == null) ? new NullPointerException() : null;
577 } catch (Throwable rex) {
578 ex = rex;
579 fr = null;
580 }
581 if (ex != null)
582 u = null;
583 else {
584 Object r = fr.result;
585 if (r == null)
586 r = fr.waitingGet(false);
587 if (r instanceof AltResult) {
588 ex = ((AltResult)r).ex;
589 u = null;
590 }
591 else {
592 @SuppressWarnings("unchecked") U ur = (U) r;
593 u = ur;
594 }
595 }
596 d.internalComplete(u, ex);
597 }
598 return true;
599 }
600 private static final long serialVersionUID = 5232453952276885070L;
601 }
602
603 /* ------------- Completions -------------- */
604
605 /**
606 * Simple linked list nodes to record completions, used in
607 * basically the same way as WaitNodes. (We separate nodes from
608 * the Completions themselves mainly because for the And and Or
609 * methods, the same Completion object resides in two lists.)
610 */
611 static final class CompletionNode {
612 final Completion completion;
613 volatile CompletionNode next;
614 CompletionNode(Completion completion) { this.completion = completion; }
615 }
616
617 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
618 abstract static class Completion extends AtomicInteger implements Runnable {
619 }
620
621 static final class ThenApply<T,U> extends Completion {
622 final CompletableFuture<? extends T> src;
623 final Fun<? super T,? extends U> fn;
624 final CompletableFuture<U> dst;
625 final Executor executor;
626 ThenApply(CompletableFuture<? extends T> src,
627 Fun<? super T,? extends U> fn,
628 CompletableFuture<U> dst,
629 Executor executor) {
630 this.src = src; this.fn = fn; this.dst = dst;
631 this.executor = executor;
632 }
633 public final void run() {
634 final CompletableFuture<? extends T> a;
635 final Fun<? super T,? extends U> fn;
636 final CompletableFuture<U> dst;
637 Object r; T t; Throwable ex;
638 if ((dst = this.dst) != null &&
639 (fn = this.fn) != null &&
640 (a = this.src) != null &&
641 (r = a.result) != null &&
642 compareAndSet(0, 1)) {
643 if (r instanceof AltResult) {
644 ex = ((AltResult)r).ex;
645 t = null;
646 }
647 else {
648 ex = null;
649 @SuppressWarnings("unchecked") T tr = (T) r;
650 t = tr;
651 }
652 Executor e = executor;
653 U u = null;
654 if (ex == null) {
655 try {
656 if (e != null)
657 e.execute(new AsyncApply<T,U>(t, fn, dst));
658 else
659 u = fn.apply(t);
660 } catch (Throwable rex) {
661 ex = rex;
662 }
663 }
664 if (e == null || ex != null)
665 dst.internalComplete(u, ex);
666 }
667 }
668 private static final long serialVersionUID = 5232453952276885070L;
669 }
670
671 static final class ThenAccept<T> extends Completion {
672 final CompletableFuture<? extends T> src;
673 final Action<? super T> fn;
674 final CompletableFuture<Void> dst;
675 final Executor executor;
676 ThenAccept(CompletableFuture<? extends T> src,
677 Action<? super T> fn,
678 CompletableFuture<Void> dst,
679 Executor executor) {
680 this.src = src; this.fn = fn; this.dst = dst;
681 this.executor = executor;
682 }
683 public final void run() {
684 final CompletableFuture<? extends T> a;
685 final Action<? super T> fn;
686 final CompletableFuture<Void> dst;
687 Object r; T t; Throwable ex;
688 if ((dst = this.dst) != null &&
689 (fn = this.fn) != null &&
690 (a = this.src) != null &&
691 (r = a.result) != null &&
692 compareAndSet(0, 1)) {
693 if (r instanceof AltResult) {
694 ex = ((AltResult)r).ex;
695 t = null;
696 }
697 else {
698 ex = null;
699 @SuppressWarnings("unchecked") T tr = (T) r;
700 t = tr;
701 }
702 Executor e = executor;
703 if (ex == null) {
704 try {
705 if (e != null)
706 e.execute(new AsyncAccept<T>(t, fn, dst));
707 else
708 fn.accept(t);
709 } catch (Throwable rex) {
710 ex = rex;
711 }
712 }
713 if (e == null || ex != null)
714 dst.internalComplete(null, ex);
715 }
716 }
717 private static final long serialVersionUID = 5232453952276885070L;
718 }
719
720 static final class ThenRun extends Completion {
721 final CompletableFuture<?> src;
722 final Runnable fn;
723 final CompletableFuture<Void> dst;
724 final Executor executor;
725 ThenRun(CompletableFuture<?> src,
726 Runnable fn,
727 CompletableFuture<Void> dst,
728 Executor executor) {
729 this.src = src; this.fn = fn; this.dst = dst;
730 this.executor = executor;
731 }
732 public final void run() {
733 final CompletableFuture<?> a;
734 final Runnable fn;
735 final CompletableFuture<Void> dst;
736 Object r; Throwable ex;
737 if ((dst = this.dst) != null &&
738 (fn = this.fn) != null &&
739 (a = this.src) != null &&
740 (r = a.result) != null &&
741 compareAndSet(0, 1)) {
742 if (r instanceof AltResult)
743 ex = ((AltResult)r).ex;
744 else
745 ex = null;
746 Executor e = executor;
747 if (ex == null) {
748 try {
749 if (e != null)
750 e.execute(new AsyncRun(fn, dst));
751 else
752 fn.run();
753 } catch (Throwable rex) {
754 ex = rex;
755 }
756 }
757 if (e == null || ex != null)
758 dst.internalComplete(null, ex);
759 }
760 }
761 private static final long serialVersionUID = 5232453952276885070L;
762 }
763
764 static final class ThenCombine<T,U,V> extends Completion {
765 final CompletableFuture<? extends T> src;
766 final CompletableFuture<? extends U> snd;
767 final BiFun<? super T,? super U,? extends V> fn;
768 final CompletableFuture<V> dst;
769 final Executor executor;
770 ThenCombine(CompletableFuture<? extends T> src,
771 CompletableFuture<? extends U> snd,
772 BiFun<? super T,? super U,? extends V> fn,
773 CompletableFuture<V> dst,
774 Executor executor) {
775 this.src = src; this.snd = snd;
776 this.fn = fn; this.dst = dst;
777 this.executor = executor;
778 }
779 public final void run() {
780 final CompletableFuture<? extends T> a;
781 final CompletableFuture<? extends U> b;
782 final BiFun<? super T,? super U,? extends V> fn;
783 final CompletableFuture<V> dst;
784 Object r, s; T t; U u; Throwable ex;
785 if ((dst = this.dst) != null &&
786 (fn = this.fn) != null &&
787 (a = this.src) != null &&
788 (r = a.result) != null &&
789 (b = this.snd) != null &&
790 (s = b.result) != null &&
791 compareAndSet(0, 1)) {
792 if (r instanceof AltResult) {
793 ex = ((AltResult)r).ex;
794 t = null;
795 }
796 else {
797 ex = null;
798 @SuppressWarnings("unchecked") T tr = (T) r;
799 t = tr;
800 }
801 if (ex != null)
802 u = null;
803 else if (s instanceof AltResult) {
804 ex = ((AltResult)s).ex;
805 u = null;
806 }
807 else {
808 @SuppressWarnings("unchecked") U us = (U) s;
809 u = us;
810 }
811 Executor e = executor;
812 V v = null;
813 if (ex == null) {
814 try {
815 if (e != null)
816 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
817 else
818 v = fn.apply(t, u);
819 } catch (Throwable rex) {
820 ex = rex;
821 }
822 }
823 if (e == null || ex != null)
824 dst.internalComplete(v, ex);
825 }
826 }
827 private static final long serialVersionUID = 5232453952276885070L;
828 }
829
830 static final class ThenAcceptBoth<T,U> extends Completion {
831 final CompletableFuture<? extends T> src;
832 final CompletableFuture<? extends U> snd;
833 final BiAction<? super T,? super U> fn;
834 final CompletableFuture<Void> dst;
835 final Executor executor;
836 ThenAcceptBoth(CompletableFuture<? extends T> src,
837 CompletableFuture<? extends U> snd,
838 BiAction<? super T,? super U> fn,
839 CompletableFuture<Void> dst,
840 Executor executor) {
841 this.src = src; this.snd = snd;
842 this.fn = fn; this.dst = dst;
843 this.executor = executor;
844 }
845 public final void run() {
846 final CompletableFuture<? extends T> a;
847 final CompletableFuture<? extends U> b;
848 final BiAction<? super T,? super U> fn;
849 final CompletableFuture<Void> dst;
850 Object r, s; T t; U u; Throwable ex;
851 if ((dst = this.dst) != null &&
852 (fn = this.fn) != null &&
853 (a = this.src) != null &&
854 (r = a.result) != null &&
855 (b = this.snd) != null &&
856 (s = b.result) != null &&
857 compareAndSet(0, 1)) {
858 if (r instanceof AltResult) {
859 ex = ((AltResult)r).ex;
860 t = null;
861 }
862 else {
863 ex = null;
864 @SuppressWarnings("unchecked") T tr = (T) r;
865 t = tr;
866 }
867 if (ex != null)
868 u = null;
869 else if (s instanceof AltResult) {
870 ex = ((AltResult)s).ex;
871 u = null;
872 }
873 else {
874 @SuppressWarnings("unchecked") U us = (U) s;
875 u = us;
876 }
877 Executor e = executor;
878 if (ex == null) {
879 try {
880 if (e != null)
881 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
882 else
883 fn.accept(t, u);
884 } catch (Throwable rex) {
885 ex = rex;
886 }
887 }
888 if (e == null || ex != null)
889 dst.internalComplete(null, ex);
890 }
891 }
892 private static final long serialVersionUID = 5232453952276885070L;
893 }
894
895 static final class RunAfterBoth extends Completion {
896 final CompletableFuture<?> src;
897 final CompletableFuture<?> snd;
898 final Runnable fn;
899 final CompletableFuture<Void> dst;
900 final Executor executor;
901 RunAfterBoth(CompletableFuture<?> src,
902 CompletableFuture<?> snd,
903 Runnable fn,
904 CompletableFuture<Void> dst,
905 Executor executor) {
906 this.src = src; this.snd = snd;
907 this.fn = fn; this.dst = dst;
908 this.executor = executor;
909 }
910 public final void run() {
911 final CompletableFuture<?> a;
912 final CompletableFuture<?> b;
913 final Runnable fn;
914 final CompletableFuture<Void> dst;
915 Object r, s; Throwable ex;
916 if ((dst = this.dst) != null &&
917 (fn = this.fn) != null &&
918 (a = this.src) != null &&
919 (r = a.result) != null &&
920 (b = this.snd) != null &&
921 (s = b.result) != null &&
922 compareAndSet(0, 1)) {
923 if (r instanceof AltResult)
924 ex = ((AltResult)r).ex;
925 else
926 ex = null;
927 if (ex == null && (s instanceof AltResult))
928 ex = ((AltResult)s).ex;
929 Executor e = executor;
930 if (ex == null) {
931 try {
932 if (e != null)
933 e.execute(new AsyncRun(fn, dst));
934 else
935 fn.run();
936 } catch (Throwable rex) {
937 ex = rex;
938 }
939 }
940 if (e == null || ex != null)
941 dst.internalComplete(null, ex);
942 }
943 }
944 private static final long serialVersionUID = 5232453952276885070L;
945 }
946
947 static final class AndCompletion extends Completion {
948 final CompletableFuture<?> src;
949 final CompletableFuture<?> snd;
950 final CompletableFuture<Void> dst;
951 AndCompletion(CompletableFuture<?> src,
952 CompletableFuture<?> snd,
953 CompletableFuture<Void> dst) {
954 this.src = src; this.snd = snd; this.dst = dst;
955 }
956 public final void run() {
957 final CompletableFuture<?> a;
958 final CompletableFuture<?> b;
959 final CompletableFuture<Void> dst;
960 Object r, s; Throwable ex;
961 if ((dst = this.dst) != null &&
962 (a = this.src) != null &&
963 (r = a.result) != null &&
964 (b = this.snd) != null &&
965 (s = b.result) != null &&
966 compareAndSet(0, 1)) {
967 if (r instanceof AltResult)
968 ex = ((AltResult)r).ex;
969 else
970 ex = null;
971 if (ex == null && (s instanceof AltResult))
972 ex = ((AltResult)s).ex;
973 dst.internalComplete(null, ex);
974 }
975 }
976 private static final long serialVersionUID = 5232453952276885070L;
977 }
978
979 static final class ApplyToEither<T,U> extends Completion {
980 final CompletableFuture<? extends T> src;
981 final CompletableFuture<? extends T> snd;
982 final Fun<? super T,? extends U> fn;
983 final CompletableFuture<U> dst;
984 final Executor executor;
985 ApplyToEither(CompletableFuture<? extends T> src,
986 CompletableFuture<? extends T> snd,
987 Fun<? super T,? extends U> fn,
988 CompletableFuture<U> dst,
989 Executor executor) {
990 this.src = src; this.snd = snd;
991 this.fn = fn; this.dst = dst;
992 this.executor = executor;
993 }
994 public final void run() {
995 final CompletableFuture<? extends T> a;
996 final CompletableFuture<? extends T> b;
997 final Fun<? super T,? extends U> fn;
998 final CompletableFuture<U> dst;
999 Object r; T t; Throwable ex;
1000 if ((dst = this.dst) != null &&
1001 (fn = this.fn) != null &&
1002 (((a = this.src) != null && (r = a.result) != null) ||
1003 ((b = this.snd) != null && (r = b.result) != null)) &&
1004 compareAndSet(0, 1)) {
1005 if (r instanceof AltResult) {
1006 ex = ((AltResult)r).ex;
1007 t = null;
1008 }
1009 else {
1010 ex = null;
1011 @SuppressWarnings("unchecked") T tr = (T) r;
1012 t = tr;
1013 }
1014 Executor e = executor;
1015 U u = null;
1016 if (ex == null) {
1017 try {
1018 if (e != null)
1019 e.execute(new AsyncApply<T,U>(t, fn, dst));
1020 else
1021 u = fn.apply(t);
1022 } catch (Throwable rex) {
1023 ex = rex;
1024 }
1025 }
1026 if (e == null || ex != null)
1027 dst.internalComplete(u, ex);
1028 }
1029 }
1030 private static final long serialVersionUID = 5232453952276885070L;
1031 }
1032
1033 static final class AcceptEither<T> extends Completion {
1034 final CompletableFuture<? extends T> src;
1035 final CompletableFuture<? extends T> snd;
1036 final Action<? super T> fn;
1037 final CompletableFuture<Void> dst;
1038 final Executor executor;
1039 AcceptEither(CompletableFuture<? extends T> src,
1040 CompletableFuture<? extends T> snd,
1041 Action<? super T> fn,
1042 CompletableFuture<Void> dst,
1043 Executor executor) {
1044 this.src = src; this.snd = snd;
1045 this.fn = fn; this.dst = dst;
1046 this.executor = executor;
1047 }
1048 public final void run() {
1049 final CompletableFuture<? extends T> a;
1050 final CompletableFuture<? extends T> b;
1051 final Action<? super T> fn;
1052 final CompletableFuture<Void> dst;
1053 Object r; T t; Throwable ex;
1054 if ((dst = this.dst) != null &&
1055 (fn = this.fn) != null &&
1056 (((a = this.src) != null && (r = a.result) != null) ||
1057 ((b = this.snd) != null && (r = b.result) != null)) &&
1058 compareAndSet(0, 1)) {
1059 if (r instanceof AltResult) {
1060 ex = ((AltResult)r).ex;
1061 t = null;
1062 }
1063 else {
1064 ex = null;
1065 @SuppressWarnings("unchecked") T tr = (T) r;
1066 t = tr;
1067 }
1068 Executor e = executor;
1069 if (ex == null) {
1070 try {
1071 if (e != null)
1072 e.execute(new AsyncAccept<T>(t, fn, dst));
1073 else
1074 fn.accept(t);
1075 } catch (Throwable rex) {
1076 ex = rex;
1077 }
1078 }
1079 if (e == null || ex != null)
1080 dst.internalComplete(null, ex);
1081 }
1082 }
1083 private static final long serialVersionUID = 5232453952276885070L;
1084 }
1085
1086 static final class RunAfterEither extends Completion {
1087 final CompletableFuture<?> src;
1088 final CompletableFuture<?> snd;
1089 final Runnable fn;
1090 final CompletableFuture<Void> dst;
1091 final Executor executor;
1092 RunAfterEither(CompletableFuture<?> src,
1093 CompletableFuture<?> snd,
1094 Runnable fn,
1095 CompletableFuture<Void> dst,
1096 Executor executor) {
1097 this.src = src; this.snd = snd;
1098 this.fn = fn; this.dst = dst;
1099 this.executor = executor;
1100 }
1101 public final void run() {
1102 final CompletableFuture<?> a;
1103 final CompletableFuture<?> b;
1104 final Runnable fn;
1105 final CompletableFuture<Void> dst;
1106 Object r; Throwable ex;
1107 if ((dst = this.dst) != null &&
1108 (fn = this.fn) != null &&
1109 (((a = this.src) != null && (r = a.result) != null) ||
1110 ((b = this.snd) != null && (r = b.result) != null)) &&
1111 compareAndSet(0, 1)) {
1112 if (r instanceof AltResult)
1113 ex = ((AltResult)r).ex;
1114 else
1115 ex = null;
1116 Executor e = executor;
1117 if (ex == null) {
1118 try {
1119 if (e != null)
1120 e.execute(new AsyncRun(fn, dst));
1121 else
1122 fn.run();
1123 } catch (Throwable rex) {
1124 ex = rex;
1125 }
1126 }
1127 if (e == null || ex != null)
1128 dst.internalComplete(null, ex);
1129 }
1130 }
1131 private static final long serialVersionUID = 5232453952276885070L;
1132 }
1133
1134 static final class OrCompletion extends Completion {
1135 final CompletableFuture<?> src;
1136 final CompletableFuture<?> snd;
1137 final CompletableFuture<Object> dst;
1138 OrCompletion(CompletableFuture<?> src,
1139 CompletableFuture<?> snd,
1140 CompletableFuture<Object> dst) {
1141 this.src = src; this.snd = snd; this.dst = dst;
1142 }
1143 public final void run() {
1144 final CompletableFuture<?> a;
1145 final CompletableFuture<?> b;
1146 final CompletableFuture<Object> dst;
1147 Object r, t; Throwable ex;
1148 if ((dst = this.dst) != null &&
1149 (((a = this.src) != null && (r = a.result) != null) ||
1150 ((b = this.snd) != null && (r = b.result) != null)) &&
1151 compareAndSet(0, 1)) {
1152 if (r instanceof AltResult) {
1153 ex = ((AltResult)r).ex;
1154 t = null;
1155 }
1156 else {
1157 ex = null;
1158 t = r;
1159 }
1160 dst.internalComplete(t, ex);
1161 }
1162 }
1163 private static final long serialVersionUID = 5232453952276885070L;
1164 }
1165
1166 static final class ExceptionCompletion<T> extends Completion {
1167 final CompletableFuture<? extends T> src;
1168 final Fun<? super Throwable, ? extends T> fn;
1169 final CompletableFuture<T> dst;
1170 ExceptionCompletion(CompletableFuture<? extends T> src,
1171 Fun<? super Throwable, ? extends T> fn,
1172 CompletableFuture<T> dst) {
1173 this.src = src; this.fn = fn; this.dst = dst;
1174 }
1175 public final void run() {
1176 final CompletableFuture<? extends T> a;
1177 final Fun<? super Throwable, ? extends T> fn;
1178 final CompletableFuture<T> dst;
1179 Object r; T t = null; Throwable ex, dx = null;
1180 if ((dst = this.dst) != null &&
1181 (fn = this.fn) != null &&
1182 (a = this.src) != null &&
1183 (r = a.result) != null &&
1184 compareAndSet(0, 1)) {
1185 if ((r instanceof AltResult) &&
1186 (ex = ((AltResult)r).ex) != null) {
1187 try {
1188 t = fn.apply(ex);
1189 } catch (Throwable rex) {
1190 dx = rex;
1191 }
1192 }
1193 else {
1194 @SuppressWarnings("unchecked") T tr = (T) r;
1195 t = tr;
1196 }
1197 dst.internalComplete(t, dx);
1198 }
1199 }
1200 private static final long serialVersionUID = 5232453952276885070L;
1201 }
1202
1203 static final class ThenCopy<T> extends Completion {
1204 final CompletableFuture<?> src;
1205 final CompletableFuture<T> dst;
1206 ThenCopy(CompletableFuture<?> src,
1207 CompletableFuture<T> dst) {
1208 this.src = src; this.dst = dst;
1209 }
1210 public final void run() {
1211 final CompletableFuture<?> a;
1212 final CompletableFuture<T> dst;
1213 Object r; T t; Throwable ex;
1214 if ((dst = this.dst) != null &&
1215 (a = this.src) != null &&
1216 (r = a.result) != null &&
1217 compareAndSet(0, 1)) {
1218 if (r instanceof AltResult) {
1219 ex = ((AltResult)r).ex;
1220 t = null;
1221 }
1222 else {
1223 ex = null;
1224 @SuppressWarnings("unchecked") T tr = (T) r;
1225 t = tr;
1226 }
1227 dst.internalComplete(t, ex);
1228 }
1229 }
1230 private static final long serialVersionUID = 5232453952276885070L;
1231 }
1232
1233 // version of ThenCopy for CompletableFuture<Void> dst
1234 static final class ThenPropagate extends Completion {
1235 final CompletableFuture<?> src;
1236 final CompletableFuture<Void> dst;
1237 ThenPropagate(CompletableFuture<?> src,
1238 CompletableFuture<Void> dst) {
1239 this.src = src; this.dst = dst;
1240 }
1241 public final void run() {
1242 final CompletableFuture<?> a;
1243 final CompletableFuture<Void> dst;
1244 Object r; Throwable ex;
1245 if ((dst = this.dst) != null &&
1246 (a = this.src) != null &&
1247 (r = a.result) != null &&
1248 compareAndSet(0, 1)) {
1249 if (r instanceof AltResult)
1250 ex = ((AltResult)r).ex;
1251 else
1252 ex = null;
1253 dst.internalComplete(null, ex);
1254 }
1255 }
1256 private static final long serialVersionUID = 5232453952276885070L;
1257 }
1258
1259 static final class HandleCompletion<T,U> extends Completion {
1260 final CompletableFuture<? extends T> src;
1261 final BiFun<? super T, Throwable, ? extends U> fn;
1262 final CompletableFuture<U> dst;
1263 HandleCompletion(CompletableFuture<? extends T> src,
1264 BiFun<? super T, Throwable, ? extends U> fn,
1265 CompletableFuture<U> dst) {
1266 this.src = src; this.fn = fn; this.dst = dst;
1267 }
1268 public final void run() {
1269 final CompletableFuture<? extends T> a;
1270 final BiFun<? super T, Throwable, ? extends U> fn;
1271 final CompletableFuture<U> dst;
1272 Object r; T t; Throwable ex;
1273 if ((dst = this.dst) != null &&
1274 (fn = this.fn) != null &&
1275 (a = this.src) != null &&
1276 (r = a.result) != null &&
1277 compareAndSet(0, 1)) {
1278 if (r instanceof AltResult) {
1279 ex = ((AltResult)r).ex;
1280 t = null;
1281 }
1282 else {
1283 ex = null;
1284 @SuppressWarnings("unchecked") T tr = (T) r;
1285 t = tr;
1286 }
1287 U u = null; Throwable dx = null;
1288 try {
1289 u = fn.apply(t, ex);
1290 } catch (Throwable rex) {
1291 dx = rex;
1292 }
1293 dst.internalComplete(u, dx);
1294 }
1295 }
1296 private static final long serialVersionUID = 5232453952276885070L;
1297 }
1298
1299 static final class ThenCompose<T,U> extends Completion {
1300 final CompletableFuture<? extends T> src;
1301 final Fun<? super T, CompletableFuture<U>> fn;
1302 final CompletableFuture<U> dst;
1303 final Executor executor;
1304 ThenCompose(CompletableFuture<? extends T> src,
1305 Fun<? super T, CompletableFuture<U>> fn,
1306 CompletableFuture<U> dst,
1307 Executor executor) {
1308 this.src = src; this.fn = fn; this.dst = dst;
1309 this.executor = executor;
1310 }
1311 public final void run() {
1312 final CompletableFuture<? extends T> a;
1313 final Fun<? super T, CompletableFuture<U>> fn;
1314 final CompletableFuture<U> dst;
1315 Object r; T t; Throwable ex; Executor e;
1316 if ((dst = this.dst) != null &&
1317 (fn = this.fn) != null &&
1318 (a = this.src) != null &&
1319 (r = a.result) != null &&
1320 compareAndSet(0, 1)) {
1321 if (r instanceof AltResult) {
1322 ex = ((AltResult)r).ex;
1323 t = null;
1324 }
1325 else {
1326 ex = null;
1327 @SuppressWarnings("unchecked") T tr = (T) r;
1328 t = tr;
1329 }
1330 CompletableFuture<U> c = null;
1331 U u = null;
1332 boolean complete = false;
1333 if (ex == null) {
1334 if ((e = executor) != null)
1335 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1336 else {
1337 try {
1338 if ((c = fn.apply(t)) == null)
1339 ex = new NullPointerException();
1340 } catch (Throwable rex) {
1341 ex = rex;
1342 }
1343 }
1344 }
1345 if (c != null) {
1346 ThenCopy<U> d = null;
1347 Object s;
1348 if ((s = c.result) == null) {
1349 CompletionNode p = new CompletionNode
1350 (d = new ThenCopy<U>(c, dst));
1351 while ((s = c.result) == null) {
1352 if (UNSAFE.compareAndSwapObject
1353 (c, COMPLETIONS, p.next = c.completions, p))
1354 break;
1355 }
1356 }
1357 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1358 complete = true;
1359 if (s instanceof AltResult) {
1360 ex = ((AltResult)s).ex; // no rewrap
1361 u = null;
1362 }
1363 else {
1364 @SuppressWarnings("unchecked") U us = (U) s;
1365 u = us;
1366 }
1367 }
1368 }
1369 if (complete || ex != null)
1370 dst.internalComplete(u, ex);
1371 if (c != null)
1372 c.helpPostComplete();
1373 }
1374 }
1375 private static final long serialVersionUID = 5232453952276885070L;
1376 }
1377
1378 // public methods
1379
1380 /**
1381 * Creates a new incomplete CompletableFuture.
1382 */
1383 public CompletableFuture() {
1384 }
1385
1386 /**
1387 * Returns a new CompletableFuture that is asynchronously completed
1388 * by a task running in the {@link ForkJoinPool#commonPool()} with
1389 * the value obtained by calling the given Generator.
1390 *
1391 * @param supplier a function returning the value to be used
1392 * to complete the returned CompletableFuture
1393 * @param <U> the function's return type
1394 * @return the new CompletableFuture
1395 */
1396 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier) {
1397 if (supplier == null) throw new NullPointerException();
1398 CompletableFuture<U> f = new CompletableFuture<U>();
1399 ForkJoinPool.commonPool().
1400 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1401 return f;
1402 }
1403
1404 /**
1405 * Returns a new CompletableFuture that is asynchronously completed
1406 * by a task running in the given executor with the value obtained
1407 * by calling the given Generator.
1408 *
1409 * @param supplier a function returning the value to be used
1410 * to complete the returned CompletableFuture
1411 * @param executor the executor to use for asynchronous execution
1412 * @param <U> the function's return type
1413 * @return the new CompletableFuture
1414 */
1415 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier,
1416 Executor executor) {
1417 if (executor == null || supplier == null)
1418 throw new NullPointerException();
1419 CompletableFuture<U> f = new CompletableFuture<U>();
1420 executor.execute(new AsyncSupply<U>(supplier, f));
1421 return f;
1422 }
1423
1424 /**
1425 * Returns a new CompletableFuture that is asynchronously completed
1426 * by a task running in the {@link ForkJoinPool#commonPool()} after
1427 * it runs the given action.
1428 *
1429 * @param runnable the action to run before completing the
1430 * returned CompletableFuture
1431 * @return the new CompletableFuture
1432 */
1433 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1434 if (runnable == null) throw new NullPointerException();
1435 CompletableFuture<Void> f = new CompletableFuture<Void>();
1436 ForkJoinPool.commonPool().
1437 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1438 return f;
1439 }
1440
1441 /**
1442 * Returns a new CompletableFuture that is asynchronously completed
1443 * by a task running in the given executor after it runs the given
1444 * action.
1445 *
1446 * @param runnable the action to run before completing the
1447 * returned CompletableFuture
1448 * @param executor the executor to use for asynchronous execution
1449 * @return the new CompletableFuture
1450 */
1451 public static CompletableFuture<Void> runAsync(Runnable runnable,
1452 Executor executor) {
1453 if (executor == null || runnable == null)
1454 throw new NullPointerException();
1455 CompletableFuture<Void> f = new CompletableFuture<Void>();
1456 executor.execute(new AsyncRun(runnable, f));
1457 return f;
1458 }
1459
1460 /**
1461 * Returns a new CompletableFuture that is already completed with
1462 * the given value.
1463 *
1464 * @param value the value
1465 * @param <U> the type of the value
1466 * @return the completed CompletableFuture
1467 */
1468 public static <U> CompletableFuture<U> completedFuture(U value) {
1469 CompletableFuture<U> f = new CompletableFuture<U>();
1470 f.result = (value == null) ? NIL : value;
1471 return f;
1472 }
1473
1474 /**
1475 * Returns {@code true} if completed in any fashion: normally,
1476 * exceptionally, or via cancellation.
1477 *
1478 * @return {@code true} if completed
1479 */
1480 public boolean isDone() {
1481 return result != null;
1482 }
1483
1484 /**
1485 * Waits if necessary for this future to complete, and then
1486 * returns its result.
1487 *
1488 * @return the result value
1489 * @throws CancellationException if this future was cancelled
1490 * @throws ExecutionException if this future completed exceptionally
1491 * @throws InterruptedException if the current thread was interrupted
1492 * while waiting
1493 */
1494 public T get() throws InterruptedException, ExecutionException {
1495 Object r; Throwable ex, cause;
1496 if ((r = result) == null && (r = waitingGet(true)) == null)
1497 throw new InterruptedException();
1498 if (!(r instanceof AltResult)) {
1499 @SuppressWarnings("unchecked") T tr = (T) r;
1500 return tr;
1501 }
1502 if ((ex = ((AltResult)r).ex) == null)
1503 return null;
1504 if (ex instanceof CancellationException)
1505 throw (CancellationException)ex;
1506 if ((ex instanceof CompletionException) &&
1507 (cause = ex.getCause()) != null)
1508 ex = cause;
1509 throw new ExecutionException(ex);
1510 }
1511
1512 /**
1513 * Waits if necessary for at most the given time for this future
1514 * to complete, and then returns its result, if available.
1515 *
1516 * @param timeout the maximum time to wait
1517 * @param unit the time unit of the timeout argument
1518 * @return the result value
1519 * @throws CancellationException if this future was cancelled
1520 * @throws ExecutionException if this future completed exceptionally
1521 * @throws InterruptedException if the current thread was interrupted
1522 * while waiting
1523 * @throws TimeoutException if the wait timed out
1524 */
1525 public T get(long timeout, TimeUnit unit)
1526 throws InterruptedException, ExecutionException, TimeoutException {
1527 Object r; Throwable ex, cause;
1528 long nanos = unit.toNanos(timeout);
1529 if (Thread.interrupted())
1530 throw new InterruptedException();
1531 if ((r = result) == null)
1532 r = timedAwaitDone(nanos);
1533 if (!(r instanceof AltResult)) {
1534 @SuppressWarnings("unchecked") T tr = (T) r;
1535 return tr;
1536 }
1537 if ((ex = ((AltResult)r).ex) == null)
1538 return null;
1539 if (ex instanceof CancellationException)
1540 throw (CancellationException)ex;
1541 if ((ex instanceof CompletionException) &&
1542 (cause = ex.getCause()) != null)
1543 ex = cause;
1544 throw new ExecutionException(ex);
1545 }
1546
1547 /**
1548 * Returns the result value when complete, or throws an
1549 * (unchecked) exception if completed exceptionally. To better
1550 * conform with the use of common functional forms, if a
1551 * computation involved in the completion of this
1552 * CompletableFuture threw an exception, this method throws an
1553 * (unchecked) {@link CompletionException} with the underlying
1554 * exception as its cause.
1555 *
1556 * @return the result value
1557 * @throws CancellationException if the computation was cancelled
1558 * @throws CompletionException if this future completed
1559 * exceptionally or a completion computation threw an exception
1560 */
1561 public T join() {
1562 Object r; Throwable ex;
1563 if ((r = result) == null)
1564 r = waitingGet(false);
1565 if (!(r instanceof AltResult)) {
1566 @SuppressWarnings("unchecked") T tr = (T) r;
1567 return tr;
1568 }
1569 if ((ex = ((AltResult)r).ex) == null)
1570 return null;
1571 if (ex instanceof CancellationException)
1572 throw (CancellationException)ex;
1573 if (ex instanceof CompletionException)
1574 throw (CompletionException)ex;
1575 throw new CompletionException(ex);
1576 }
1577
1578 /**
1579 * Returns the result value (or throws any encountered exception)
1580 * if completed, else returns the given valueIfAbsent.
1581 *
1582 * @param valueIfAbsent the value to return if not completed
1583 * @return the result value, if completed, else the given valueIfAbsent
1584 * @throws CancellationException if the computation was cancelled
1585 * @throws CompletionException if this future completed
1586 * exceptionally or a completion computation threw an exception
1587 */
1588 public T getNow(T valueIfAbsent) {
1589 Object r; Throwable ex;
1590 if ((r = result) == null)
1591 return valueIfAbsent;
1592 if (!(r instanceof AltResult)) {
1593 @SuppressWarnings("unchecked") T tr = (T) r;
1594 return tr;
1595 }
1596 if ((ex = ((AltResult)r).ex) == null)
1597 return null;
1598 if (ex instanceof CancellationException)
1599 throw (CancellationException)ex;
1600 if (ex instanceof CompletionException)
1601 throw (CompletionException)ex;
1602 throw new CompletionException(ex);
1603 }
1604
1605 /**
1606 * If not already completed, sets the value returned by {@link
1607 * #get()} and related methods to the given value.
1608 *
1609 * @param value the result value
1610 * @return {@code true} if this invocation caused this CompletableFuture
1611 * to transition to a completed state, else {@code false}
1612 */
1613 public boolean complete(T value) {
1614 boolean triggered = result == null &&
1615 UNSAFE.compareAndSwapObject(this, RESULT, null,
1616 value == null ? NIL : value);
1617 postComplete();
1618 return triggered;
1619 }
1620
1621 /**
1622 * If not already completed, causes invocations of {@link #get()}
1623 * and related methods to throw the given exception.
1624 *
1625 * @param ex the exception
1626 * @return {@code true} if this invocation caused this CompletableFuture
1627 * to transition to a completed state, else {@code false}
1628 */
1629 public boolean completeExceptionally(Throwable ex) {
1630 if (ex == null) throw new NullPointerException();
1631 boolean triggered = result == null &&
1632 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1633 postComplete();
1634 return triggered;
1635 }
1636
1637 /**
1638 * Returns a new CompletableFuture that is completed
1639 * when this CompletableFuture completes, with the result of the
1640 * given function of this CompletableFuture's result.
1641 *
1642 * <p>If this CompletableFuture completes exceptionally, or the
1643 * supplied function throws an exception, then the returned
1644 * CompletableFuture completes exceptionally with a
1645 * CompletionException holding the exception as its cause.
1646 *
1647 * @param fn the function to use to compute the value of
1648 * the returned CompletableFuture
1649 * @return the new CompletableFuture
1650 */
1651 public <U> CompletableFuture<U> thenApply(Fun<? super T,? extends U> fn) {
1652 return doThenApply(fn, null);
1653 }
1654
1655 /**
1656 * Returns a new CompletableFuture that is asynchronously completed
1657 * when this CompletableFuture completes, with the result of the
1658 * given function of this CompletableFuture's result from a
1659 * task running in the {@link ForkJoinPool#commonPool()}.
1660 *
1661 * <p>If this CompletableFuture completes exceptionally, or the
1662 * supplied function throws an exception, then the returned
1663 * CompletableFuture completes exceptionally with a
1664 * CompletionException holding the exception as its cause.
1665 *
1666 * @param fn the function to use to compute the value of
1667 * the returned CompletableFuture
1668 * @return the new CompletableFuture
1669 */
1670 public <U> CompletableFuture<U> thenApplyAsync
1671 (Fun<? super T,? extends U> fn) {
1672 return doThenApply(fn, ForkJoinPool.commonPool());
1673 }
1674
1675 /**
1676 * Returns a new CompletableFuture that is asynchronously completed
1677 * when this CompletableFuture completes, with the result of the
1678 * given function of this CompletableFuture's result from a
1679 * task running in the given executor.
1680 *
1681 * <p>If this CompletableFuture completes exceptionally, or the
1682 * supplied function throws an exception, then the returned
1683 * CompletableFuture completes exceptionally with a
1684 * CompletionException holding the exception as its cause.
1685 *
1686 * @param fn the function to use to compute the value of
1687 * the returned CompletableFuture
1688 * @param executor the executor to use for asynchronous execution
1689 * @return the new CompletableFuture
1690 */
1691 public <U> CompletableFuture<U> thenApplyAsync
1692 (Fun<? super T,? extends U> fn,
1693 Executor executor) {
1694 if (executor == null) throw new NullPointerException();
1695 return doThenApply(fn, executor);
1696 }
1697
1698 private <U> CompletableFuture<U> doThenApply
1699 (Fun<? super T,? extends U> fn,
1700 Executor e) {
1701 if (fn == null) throw new NullPointerException();
1702 CompletableFuture<U> dst = new CompletableFuture<U>();
1703 ThenApply<T,U> d = null;
1704 Object r;
1705 if ((r = result) == null) {
1706 CompletionNode p = new CompletionNode
1707 (d = new ThenApply<T,U>(this, fn, dst, e));
1708 while ((r = result) == null) {
1709 if (UNSAFE.compareAndSwapObject
1710 (this, COMPLETIONS, p.next = completions, p))
1711 break;
1712 }
1713 }
1714 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1715 T t; Throwable ex;
1716 if (r instanceof AltResult) {
1717 ex = ((AltResult)r).ex;
1718 t = null;
1719 }
1720 else {
1721 ex = null;
1722 @SuppressWarnings("unchecked") T tr = (T) r;
1723 t = tr;
1724 }
1725 U u = null;
1726 if (ex == null) {
1727 try {
1728 if (e != null)
1729 e.execute(new AsyncApply<T,U>(t, fn, dst));
1730 else
1731 u = fn.apply(t);
1732 } catch (Throwable rex) {
1733 ex = rex;
1734 }
1735 }
1736 if (e == null || ex != null)
1737 dst.internalComplete(u, ex);
1738 }
1739 helpPostComplete();
1740 return dst;
1741 }
1742
1743 /**
1744 * Returns a new CompletableFuture that is completed
1745 * when this CompletableFuture completes, after performing the given
1746 * action with this CompletableFuture's result.
1747 *
1748 * <p>If this CompletableFuture completes exceptionally, or the
1749 * supplied action throws an exception, then the returned
1750 * CompletableFuture completes exceptionally with a
1751 * CompletionException holding the exception as its cause.
1752 *
1753 * @param block the action to perform before completing the
1754 * returned CompletableFuture
1755 * @return the new CompletableFuture
1756 */
1757 public CompletableFuture<Void> thenAccept(Action<? super T> block) {
1758 return doThenAccept(block, null);
1759 }
1760
1761 /**
1762 * Returns a new CompletableFuture that is asynchronously completed
1763 * when this CompletableFuture completes, after performing the given
1764 * action with this CompletableFuture's result from a task running
1765 * in the {@link ForkJoinPool#commonPool()}.
1766 *
1767 * <p>If this CompletableFuture completes exceptionally, or the
1768 * supplied action throws an exception, then the returned
1769 * CompletableFuture completes exceptionally with a
1770 * CompletionException holding the exception as its cause.
1771 *
1772 * @param block the action to perform before completing the
1773 * returned CompletableFuture
1774 * @return the new CompletableFuture
1775 */
1776 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block) {
1777 return doThenAccept(block, ForkJoinPool.commonPool());
1778 }
1779
1780 /**
1781 * Returns a new CompletableFuture that is asynchronously completed
1782 * when this CompletableFuture completes, after performing the given
1783 * action with this CompletableFuture's result from a task running
1784 * in the given executor.
1785 *
1786 * <p>If this CompletableFuture completes exceptionally, or the
1787 * supplied action throws an exception, then the returned
1788 * CompletableFuture completes exceptionally with a
1789 * CompletionException holding the exception as its cause.
1790 *
1791 * @param block the action to perform before completing the
1792 * returned CompletableFuture
1793 * @param executor the executor to use for asynchronous execution
1794 * @return the new CompletableFuture
1795 */
1796 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block,
1797 Executor executor) {
1798 if (executor == null) throw new NullPointerException();
1799 return doThenAccept(block, executor);
1800 }
1801
1802 private CompletableFuture<Void> doThenAccept(Action<? super T> fn,
1803 Executor e) {
1804 if (fn == null) throw new NullPointerException();
1805 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1806 ThenAccept<T> d = null;
1807 Object r;
1808 if ((r = result) == null) {
1809 CompletionNode p = new CompletionNode
1810 (d = new ThenAccept<T>(this, fn, dst, e));
1811 while ((r = result) == null) {
1812 if (UNSAFE.compareAndSwapObject
1813 (this, COMPLETIONS, p.next = completions, p))
1814 break;
1815 }
1816 }
1817 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1818 T t; Throwable ex;
1819 if (r instanceof AltResult) {
1820 ex = ((AltResult)r).ex;
1821 t = null;
1822 }
1823 else {
1824 ex = null;
1825 @SuppressWarnings("unchecked") T tr = (T) r;
1826 t = tr;
1827 }
1828 if (ex == null) {
1829 try {
1830 if (e != null)
1831 e.execute(new AsyncAccept<T>(t, fn, dst));
1832 else
1833 fn.accept(t);
1834 } catch (Throwable rex) {
1835 ex = rex;
1836 }
1837 }
1838 if (e == null || ex != null)
1839 dst.internalComplete(null, ex);
1840 }
1841 helpPostComplete();
1842 return dst;
1843 }
1844
1845 /**
1846 * Returns a new CompletableFuture that is completed
1847 * when this CompletableFuture completes, after performing the given
1848 * action.
1849 *
1850 * <p>If this CompletableFuture completes exceptionally, or the
1851 * supplied action throws an exception, then the returned
1852 * CompletableFuture completes exceptionally with a
1853 * CompletionException holding the exception as its cause.
1854 *
1855 * @param action the action to perform before completing the
1856 * returned CompletableFuture
1857 * @return the new CompletableFuture
1858 */
1859 public CompletableFuture<Void> thenRun(Runnable action) {
1860 return doThenRun(action, null);
1861 }
1862
1863 /**
1864 * Returns a new CompletableFuture that is asynchronously completed
1865 * when this CompletableFuture completes, after performing the given
1866 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1867 *
1868 * <p>If this CompletableFuture completes exceptionally, or the
1869 * supplied action throws an exception, then the returned
1870 * CompletableFuture completes exceptionally with a
1871 * CompletionException holding the exception as its cause.
1872 *
1873 * @param action the action to perform before completing the
1874 * returned CompletableFuture
1875 * @return the new CompletableFuture
1876 */
1877 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1878 return doThenRun(action, ForkJoinPool.commonPool());
1879 }
1880
1881 /**
1882 * Returns a new CompletableFuture that is asynchronously completed
1883 * when this CompletableFuture completes, after performing the given
1884 * action from a task running in the given executor.
1885 *
1886 * <p>If this CompletableFuture completes exceptionally, or the
1887 * supplied action throws an exception, then the returned
1888 * CompletableFuture completes exceptionally with a
1889 * CompletionException holding the exception as its cause.
1890 *
1891 * @param action the action to perform before completing the
1892 * returned CompletableFuture
1893 * @param executor the executor to use for asynchronous execution
1894 * @return the new CompletableFuture
1895 */
1896 public CompletableFuture<Void> thenRunAsync(Runnable action,
1897 Executor executor) {
1898 if (executor == null) throw new NullPointerException();
1899 return doThenRun(action, executor);
1900 }
1901
1902 private CompletableFuture<Void> doThenRun(Runnable action,
1903 Executor e) {
1904 if (action == null) throw new NullPointerException();
1905 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1906 ThenRun d = null;
1907 Object r;
1908 if ((r = result) == null) {
1909 CompletionNode p = new CompletionNode
1910 (d = new ThenRun(this, action, dst, e));
1911 while ((r = result) == null) {
1912 if (UNSAFE.compareAndSwapObject
1913 (this, COMPLETIONS, p.next = completions, p))
1914 break;
1915 }
1916 }
1917 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1918 Throwable ex;
1919 if (r instanceof AltResult)
1920 ex = ((AltResult)r).ex;
1921 else
1922 ex = null;
1923 if (ex == null) {
1924 try {
1925 if (e != null)
1926 e.execute(new AsyncRun(action, dst));
1927 else
1928 action.run();
1929 } catch (Throwable rex) {
1930 ex = rex;
1931 }
1932 }
1933 if (e == null || ex != null)
1934 dst.internalComplete(null, ex);
1935 }
1936 helpPostComplete();
1937 return dst;
1938 }
1939
1940 /**
1941 * Returns a new CompletableFuture that is completed
1942 * when both this and the other given CompletableFuture complete,
1943 * with the result of the given function of the results of the two
1944 * CompletableFutures.
1945 *
1946 * <p>If this and/or the other CompletableFuture complete
1947 * exceptionally, or the supplied function throws an exception,
1948 * then the returned CompletableFuture completes exceptionally
1949 * with a CompletionException holding the exception as its cause.
1950 *
1951 * @param other the other CompletableFuture
1952 * @param fn the function to use to compute the value of
1953 * the returned CompletableFuture
1954 * @return the new CompletableFuture
1955 */
1956 public <U,V> CompletableFuture<V> thenCombine
1957 (CompletableFuture<? extends U> other,
1958 BiFun<? super T,? super U,? extends V> fn) {
1959 return doThenCombine(other, fn, null);
1960 }
1961
1962 /**
1963 * Returns a new CompletableFuture that is asynchronously completed
1964 * when both this and the other given CompletableFuture complete,
1965 * with the result of the given function of the results of the two
1966 * CompletableFutures from a task running in the
1967 * {@link ForkJoinPool#commonPool()}.
1968 *
1969 * <p>If this and/or the other CompletableFuture complete
1970 * exceptionally, or the supplied function throws an exception,
1971 * then the returned CompletableFuture completes exceptionally
1972 * with a CompletionException holding the exception as its cause.
1973 *
1974 * @param other the other CompletableFuture
1975 * @param fn the function to use to compute the value of
1976 * the returned CompletableFuture
1977 * @return the new CompletableFuture
1978 */
1979 public <U,V> CompletableFuture<V> thenCombineAsync
1980 (CompletableFuture<? extends U> other,
1981 BiFun<? super T,? super U,? extends V> fn) {
1982 return doThenCombine(other, fn, ForkJoinPool.commonPool());
1983 }
1984
1985 /**
1986 * Returns a new CompletableFuture that is asynchronously completed
1987 * when both this and the other given CompletableFuture complete,
1988 * with the result of the given function of the results of the two
1989 * CompletableFutures from a task running in the given executor.
1990 *
1991 * <p>If this and/or the other CompletableFuture complete
1992 * exceptionally, or the supplied function throws an exception,
1993 * then the returned CompletableFuture completes exceptionally
1994 * with a CompletionException holding the exception as its cause.
1995 *
1996 * @param other the other CompletableFuture
1997 * @param fn the function to use to compute the value of
1998 * the returned CompletableFuture
1999 * @param executor the executor to use for asynchronous execution
2000 * @return the new CompletableFuture
2001 */
2002 public <U,V> CompletableFuture<V> thenCombineAsync
2003 (CompletableFuture<? extends U> other,
2004 BiFun<? super T,? super U,? extends V> fn,
2005 Executor executor) {
2006 if (executor == null) throw new NullPointerException();
2007 return doThenCombine(other, fn, executor);
2008 }
2009
2010 private <U,V> CompletableFuture<V> doThenCombine
2011 (CompletableFuture<? extends U> other,
2012 BiFun<? super T,? super U,? extends V> fn,
2013 Executor e) {
2014 if (other == null || fn == null) throw new NullPointerException();
2015 CompletableFuture<V> dst = new CompletableFuture<V>();
2016 ThenCombine<T,U,V> d = null;
2017 Object r, s = null;
2018 if ((r = result) == null || (s = other.result) == null) {
2019 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
2020 CompletionNode q = null, p = new CompletionNode(d);
2021 while ((r == null && (r = result) == null) ||
2022 (s == null && (s = other.result) == null)) {
2023 if (q != null) {
2024 if (s != null ||
2025 UNSAFE.compareAndSwapObject
2026 (other, COMPLETIONS, q.next = other.completions, q))
2027 break;
2028 }
2029 else if (r != null ||
2030 UNSAFE.compareAndSwapObject
2031 (this, COMPLETIONS, p.next = completions, p)) {
2032 if (s != null)
2033 break;
2034 q = new CompletionNode(d);
2035 }
2036 }
2037 }
2038 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2039 T t; U u; Throwable ex;
2040 if (r instanceof AltResult) {
2041 ex = ((AltResult)r).ex;
2042 t = null;
2043 }
2044 else {
2045 ex = null;
2046 @SuppressWarnings("unchecked") T tr = (T) r;
2047 t = tr;
2048 }
2049 if (ex != null)
2050 u = null;
2051 else if (s instanceof AltResult) {
2052 ex = ((AltResult)s).ex;
2053 u = null;
2054 }
2055 else {
2056 @SuppressWarnings("unchecked") U us = (U) s;
2057 u = us;
2058 }
2059 V v = null;
2060 if (ex == null) {
2061 try {
2062 if (e != null)
2063 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
2064 else
2065 v = fn.apply(t, u);
2066 } catch (Throwable rex) {
2067 ex = rex;
2068 }
2069 }
2070 if (e == null || ex != null)
2071 dst.internalComplete(v, ex);
2072 }
2073 helpPostComplete();
2074 other.helpPostComplete();
2075 return dst;
2076 }
2077
2078 /**
2079 * Returns a new CompletableFuture that is completed
2080 * when both this and the other given CompletableFuture complete,
2081 * after performing the given action with the results of the two
2082 * CompletableFutures.
2083 *
2084 * <p>If this and/or the other CompletableFuture complete
2085 * exceptionally, or the supplied action throws an exception,
2086 * then the returned CompletableFuture completes exceptionally
2087 * with a CompletionException holding the exception as its cause.
2088 *
2089 * @param other the other CompletableFuture
2090 * @param block the action to perform before completing the
2091 * returned CompletableFuture
2092 * @return the new CompletableFuture
2093 */
2094 public <U> CompletableFuture<Void> thenAcceptBoth
2095 (CompletableFuture<? extends U> other,
2096 BiAction<? super T, ? super U> block) {
2097 return doThenAcceptBoth(other, block, null);
2098 }
2099
2100 /**
2101 * Returns a new CompletableFuture that is asynchronously completed
2102 * when both this and the other given CompletableFuture complete,
2103 * after performing the given action with the results of the two
2104 * CompletableFutures from a task running in the {@link
2105 * ForkJoinPool#commonPool()}.
2106 *
2107 * <p>If this and/or the other CompletableFuture complete
2108 * exceptionally, or the supplied action throws an exception,
2109 * then the returned CompletableFuture completes exceptionally
2110 * with a CompletionException holding the exception as its cause.
2111 *
2112 * @param other the other CompletableFuture
2113 * @param block the action to perform before completing the
2114 * returned CompletableFuture
2115 * @return the new CompletableFuture
2116 */
2117 public <U> CompletableFuture<Void> thenAcceptBothAsync
2118 (CompletableFuture<? extends U> other,
2119 BiAction<? super T, ? super U> block) {
2120 return doThenAcceptBoth(other, block, ForkJoinPool.commonPool());
2121 }
2122
2123 /**
2124 * Returns a new CompletableFuture that is asynchronously completed
2125 * when both this and the other given CompletableFuture complete,
2126 * after performing the given action with the results of the two
2127 * CompletableFutures from a task running in the given executor.
2128 *
2129 * <p>If this and/or the other CompletableFuture complete
2130 * exceptionally, or the supplied action throws an exception,
2131 * then the returned CompletableFuture completes exceptionally
2132 * with a CompletionException holding the exception as its cause.
2133 *
2134 * @param other the other CompletableFuture
2135 * @param block the action to perform before completing the
2136 * returned CompletableFuture
2137 * @param executor the executor to use for asynchronous execution
2138 * @return the new CompletableFuture
2139 */
2140 public <U> CompletableFuture<Void> thenAcceptBothAsync
2141 (CompletableFuture<? extends U> other,
2142 BiAction<? super T, ? super U> block,
2143 Executor executor) {
2144 if (executor == null) throw new NullPointerException();
2145 return doThenAcceptBoth(other, block, executor);
2146 }
2147
2148 private <U> CompletableFuture<Void> doThenAcceptBoth
2149 (CompletableFuture<? extends U> other,
2150 BiAction<? super T,? super U> fn,
2151 Executor e) {
2152 if (other == null || fn == null) throw new NullPointerException();
2153 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2154 ThenAcceptBoth<T,U> d = null;
2155 Object r, s = null;
2156 if ((r = result) == null || (s = other.result) == null) {
2157 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
2158 CompletionNode q = null, p = new CompletionNode(d);
2159 while ((r == null && (r = result) == null) ||
2160 (s == null && (s = other.result) == null)) {
2161 if (q != null) {
2162 if (s != null ||
2163 UNSAFE.compareAndSwapObject
2164 (other, COMPLETIONS, q.next = other.completions, q))
2165 break;
2166 }
2167 else if (r != null ||
2168 UNSAFE.compareAndSwapObject
2169 (this, COMPLETIONS, p.next = completions, p)) {
2170 if (s != null)
2171 break;
2172 q = new CompletionNode(d);
2173 }
2174 }
2175 }
2176 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2177 T t; U u; Throwable ex;
2178 if (r instanceof AltResult) {
2179 ex = ((AltResult)r).ex;
2180 t = null;
2181 }
2182 else {
2183 ex = null;
2184 @SuppressWarnings("unchecked") T tr = (T) r;
2185 t = tr;
2186 }
2187 if (ex != null)
2188 u = null;
2189 else if (s instanceof AltResult) {
2190 ex = ((AltResult)s).ex;
2191 u = null;
2192 }
2193 else {
2194 @SuppressWarnings("unchecked") U us = (U) s;
2195 u = us;
2196 }
2197 if (ex == null) {
2198 try {
2199 if (e != null)
2200 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
2201 else
2202 fn.accept(t, u);
2203 } catch (Throwable rex) {
2204 ex = rex;
2205 }
2206 }
2207 if (e == null || ex != null)
2208 dst.internalComplete(null, ex);
2209 }
2210 helpPostComplete();
2211 other.helpPostComplete();
2212 return dst;
2213 }
2214
2215 /**
2216 * Returns a new CompletableFuture that is completed
2217 * when both this and the other given CompletableFuture complete,
2218 * after performing the given action.
2219 *
2220 * <p>If this and/or the other CompletableFuture complete
2221 * exceptionally, or the supplied action throws an exception,
2222 * then the returned CompletableFuture completes exceptionally
2223 * with a CompletionException holding the exception as its cause.
2224 *
2225 * @param other the other CompletableFuture
2226 * @param action the action to perform before completing the
2227 * returned CompletableFuture
2228 * @return the new CompletableFuture
2229 */
2230 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2231 Runnable action) {
2232 return doRunAfterBoth(other, action, null);
2233 }
2234
2235 /**
2236 * Returns a new CompletableFuture that is asynchronously completed
2237 * when both this and the other given CompletableFuture complete,
2238 * after performing the given action from a task running in the
2239 * {@link ForkJoinPool#commonPool()}.
2240 *
2241 * <p>If this and/or the other CompletableFuture complete
2242 * exceptionally, or the supplied action throws an exception,
2243 * then the returned CompletableFuture completes exceptionally
2244 * with a CompletionException holding the exception as its cause.
2245 *
2246 * @param other the other CompletableFuture
2247 * @param action the action to perform before completing the
2248 * returned CompletableFuture
2249 * @return the new CompletableFuture
2250 */
2251 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2252 Runnable action) {
2253 return doRunAfterBoth(other, action, ForkJoinPool.commonPool());
2254 }
2255
2256 /**
2257 * Returns a new CompletableFuture that is asynchronously completed
2258 * when both this and the other given CompletableFuture complete,
2259 * after performing the given action from a task running in the
2260 * given executor.
2261 *
2262 * <p>If this and/or the other CompletableFuture complete
2263 * exceptionally, or the supplied action throws an exception,
2264 * then the returned CompletableFuture completes exceptionally
2265 * with a CompletionException holding the exception as its cause.
2266 *
2267 * @param other the other CompletableFuture
2268 * @param action the action to perform before completing the
2269 * returned CompletableFuture
2270 * @param executor the executor to use for asynchronous execution
2271 * @return the new CompletableFuture
2272 */
2273 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2274 Runnable action,
2275 Executor executor) {
2276 if (executor == null) throw new NullPointerException();
2277 return doRunAfterBoth(other, action, executor);
2278 }
2279
2280 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
2281 Runnable action,
2282 Executor e) {
2283 if (other == null || action == null) throw new NullPointerException();
2284 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2285 RunAfterBoth d = null;
2286 Object r, s = null;
2287 if ((r = result) == null || (s = other.result) == null) {
2288 d = new RunAfterBoth(this, other, action, dst, e);
2289 CompletionNode q = null, p = new CompletionNode(d);
2290 while ((r == null && (r = result) == null) ||
2291 (s == null && (s = other.result) == null)) {
2292 if (q != null) {
2293 if (s != null ||
2294 UNSAFE.compareAndSwapObject
2295 (other, COMPLETIONS, q.next = other.completions, q))
2296 break;
2297 }
2298 else if (r != null ||
2299 UNSAFE.compareAndSwapObject
2300 (this, COMPLETIONS, p.next = completions, p)) {
2301 if (s != null)
2302 break;
2303 q = new CompletionNode(d);
2304 }
2305 }
2306 }
2307 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2308 Throwable ex;
2309 if (r instanceof AltResult)
2310 ex = ((AltResult)r).ex;
2311 else
2312 ex = null;
2313 if (ex == null && (s instanceof AltResult))
2314 ex = ((AltResult)s).ex;
2315 if (ex == null) {
2316 try {
2317 if (e != null)
2318 e.execute(new AsyncRun(action, dst));
2319 else
2320 action.run();
2321 } catch (Throwable rex) {
2322 ex = rex;
2323 }
2324 }
2325 if (e == null || ex != null)
2326 dst.internalComplete(null, ex);
2327 }
2328 helpPostComplete();
2329 other.helpPostComplete();
2330 return dst;
2331 }
2332
2333 /**
2334 * Returns a new CompletableFuture that is completed
2335 * when either this or the other given CompletableFuture completes,
2336 * with the result of the given function of either this or the other
2337 * CompletableFuture's result.
2338 *
2339 * <p>If this and/or the other CompletableFuture complete
2340 * exceptionally, then the returned CompletableFuture may also do so,
2341 * with a CompletionException holding one of these exceptions as its
2342 * cause. No guarantees are made about which result or exception is
2343 * used in the returned CompletableFuture. If the supplied function
2344 * throws an exception, then the returned CompletableFuture completes
2345 * exceptionally with a CompletionException holding the exception as
2346 * its cause.
2347 *
2348 * @param other the other CompletableFuture
2349 * @param fn the function to use to compute the value of
2350 * the returned CompletableFuture
2351 * @return the new CompletableFuture
2352 */
2353 public <U> CompletableFuture<U> applyToEither
2354 (CompletableFuture<? extends T> other,
2355 Fun<? super T, U> fn) {
2356 return doApplyToEither(other, fn, null);
2357 }
2358
2359 /**
2360 * Returns a new CompletableFuture that is asynchronously completed
2361 * when either this or the other given CompletableFuture completes,
2362 * with the result of the given function of either this or the other
2363 * CompletableFuture's result from a task running in the
2364 * {@link ForkJoinPool#commonPool()}.
2365 *
2366 * <p>If this and/or the other CompletableFuture complete
2367 * exceptionally, then the returned CompletableFuture may also do so,
2368 * with a CompletionException holding one of these exceptions as its
2369 * cause. No guarantees are made about which result or exception is
2370 * used in the returned CompletableFuture. If the supplied function
2371 * throws an exception, then the returned CompletableFuture completes
2372 * exceptionally with a CompletionException holding the exception as
2373 * its cause.
2374 *
2375 * @param other the other CompletableFuture
2376 * @param fn the function to use to compute the value of
2377 * the returned CompletableFuture
2378 * @return the new CompletableFuture
2379 */
2380 public <U> CompletableFuture<U> applyToEitherAsync
2381 (CompletableFuture<? extends T> other,
2382 Fun<? super T, U> fn) {
2383 return doApplyToEither(other, fn, ForkJoinPool.commonPool());
2384 }
2385
2386 /**
2387 * Returns a new CompletableFuture that is asynchronously completed
2388 * when either this or the other given CompletableFuture completes,
2389 * with the result of the given function of either this or the other
2390 * CompletableFuture's result from a task running in the
2391 * given executor.
2392 *
2393 * <p>If this and/or the other CompletableFuture complete
2394 * exceptionally, then the returned CompletableFuture may also do so,
2395 * with a CompletionException holding one of these exceptions as its
2396 * cause. No guarantees are made about which result or exception is
2397 * used in the returned CompletableFuture. If the supplied function
2398 * throws an exception, then the returned CompletableFuture completes
2399 * exceptionally with a CompletionException holding the exception as
2400 * its cause.
2401 *
2402 * @param other the other CompletableFuture
2403 * @param fn the function to use to compute the value of
2404 * the returned CompletableFuture
2405 * @param executor the executor to use for asynchronous execution
2406 * @return the new CompletableFuture
2407 */
2408 public <U> CompletableFuture<U> applyToEitherAsync
2409 (CompletableFuture<? extends T> other,
2410 Fun<? super T, U> fn,
2411 Executor executor) {
2412 if (executor == null) throw new NullPointerException();
2413 return doApplyToEither(other, fn, executor);
2414 }
2415
2416 private <U> CompletableFuture<U> doApplyToEither
2417 (CompletableFuture<? extends T> other,
2418 Fun<? super T, U> fn,
2419 Executor e) {
2420 if (other == null || fn == null) throw new NullPointerException();
2421 CompletableFuture<U> dst = new CompletableFuture<U>();
2422 ApplyToEither<T,U> d = null;
2423 Object r;
2424 if ((r = result) == null && (r = other.result) == null) {
2425 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
2426 CompletionNode q = null, p = new CompletionNode(d);
2427 while ((r = result) == null && (r = other.result) == null) {
2428 if (q != null) {
2429 if (UNSAFE.compareAndSwapObject
2430 (other, COMPLETIONS, q.next = other.completions, q))
2431 break;
2432 }
2433 else if (UNSAFE.compareAndSwapObject
2434 (this, COMPLETIONS, p.next = completions, p))
2435 q = new CompletionNode(d);
2436 }
2437 }
2438 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2439 T t; Throwable ex;
2440 if (r instanceof AltResult) {
2441 ex = ((AltResult)r).ex;
2442 t = null;
2443 }
2444 else {
2445 ex = null;
2446 @SuppressWarnings("unchecked") T tr = (T) r;
2447 t = tr;
2448 }
2449 U u = null;
2450 if (ex == null) {
2451 try {
2452 if (e != null)
2453 e.execute(new AsyncApply<T,U>(t, fn, dst));
2454 else
2455 u = fn.apply(t);
2456 } catch (Throwable rex) {
2457 ex = rex;
2458 }
2459 }
2460 if (e == null || ex != null)
2461 dst.internalComplete(u, ex);
2462 }
2463 helpPostComplete();
2464 other.helpPostComplete();
2465 return dst;
2466 }
2467
2468 /**
2469 * Returns a new CompletableFuture that is completed
2470 * when either this or the other given CompletableFuture completes,
2471 * after performing the given action with the result of either this
2472 * or the other CompletableFuture's result.
2473 *
2474 * <p>If this and/or the other CompletableFuture complete
2475 * exceptionally, then the returned CompletableFuture may also do so,
2476 * with a CompletionException holding one of these exceptions as its
2477 * cause. No guarantees are made about which result or exception is
2478 * used in the returned CompletableFuture. If the supplied action
2479 * throws an exception, then the returned CompletableFuture completes
2480 * exceptionally with a CompletionException holding the exception as
2481 * its cause.
2482 *
2483 * @param other the other CompletableFuture
2484 * @param block the action to perform before completing the
2485 * returned CompletableFuture
2486 * @return the new CompletableFuture
2487 */
2488 public CompletableFuture<Void> acceptEither
2489 (CompletableFuture<? extends T> other,
2490 Action<? super T> block) {
2491 return doAcceptEither(other, block, null);
2492 }
2493
2494 /**
2495 * Returns a new CompletableFuture that is asynchronously completed
2496 * when either this or the other given CompletableFuture completes,
2497 * after performing the given action with the result of either this
2498 * or the other CompletableFuture's result from a task running in
2499 * the {@link ForkJoinPool#commonPool()}.
2500 *
2501 * <p>If this and/or the other CompletableFuture complete
2502 * exceptionally, then the returned CompletableFuture may also do so,
2503 * with a CompletionException holding one of these exceptions as its
2504 * cause. No guarantees are made about which result or exception is
2505 * used in the returned CompletableFuture. If the supplied action
2506 * throws an exception, then the returned CompletableFuture completes
2507 * exceptionally with a CompletionException holding the exception as
2508 * its cause.
2509 *
2510 * @param other the other CompletableFuture
2511 * @param block the action to perform before completing the
2512 * returned CompletableFuture
2513 * @return the new CompletableFuture
2514 */
2515 public CompletableFuture<Void> acceptEitherAsync
2516 (CompletableFuture<? extends T> other,
2517 Action<? super T> block) {
2518 return doAcceptEither(other, block, ForkJoinPool.commonPool());
2519 }
2520
2521 /**
2522 * Returns a new CompletableFuture that is asynchronously completed
2523 * when either this or the other given CompletableFuture completes,
2524 * after performing the given action with the result of either this
2525 * or the other CompletableFuture's result from a task running in
2526 * the given executor.
2527 *
2528 * <p>If this and/or the other CompletableFuture complete
2529 * exceptionally, then the returned CompletableFuture may also do so,
2530 * with a CompletionException holding one of these exceptions as its
2531 * cause. No guarantees are made about which result or exception is
2532 * used in the returned CompletableFuture. If the supplied action
2533 * throws an exception, then the returned CompletableFuture completes
2534 * exceptionally with a CompletionException holding the exception as
2535 * its cause.
2536 *
2537 * @param other the other CompletableFuture
2538 * @param block the action to perform before completing the
2539 * returned CompletableFuture
2540 * @param executor the executor to use for asynchronous execution
2541 * @return the new CompletableFuture
2542 */
2543 public CompletableFuture<Void> acceptEitherAsync
2544 (CompletableFuture<? extends T> other,
2545 Action<? super T> block,
2546 Executor executor) {
2547 if (executor == null) throw new NullPointerException();
2548 return doAcceptEither(other, block, executor);
2549 }
2550
2551 private CompletableFuture<Void> doAcceptEither
2552 (CompletableFuture<? extends T> other,
2553 Action<? super T> fn,
2554 Executor e) {
2555 if (other == null || fn == null) throw new NullPointerException();
2556 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2557 AcceptEither<T> d = null;
2558 Object r;
2559 if ((r = result) == null && (r = other.result) == null) {
2560 d = new AcceptEither<T>(this, other, fn, dst, e);
2561 CompletionNode q = null, p = new CompletionNode(d);
2562 while ((r = result) == null && (r = other.result) == null) {
2563 if (q != null) {
2564 if (UNSAFE.compareAndSwapObject
2565 (other, COMPLETIONS, q.next = other.completions, q))
2566 break;
2567 }
2568 else if (UNSAFE.compareAndSwapObject
2569 (this, COMPLETIONS, p.next = completions, p))
2570 q = new CompletionNode(d);
2571 }
2572 }
2573 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2574 T t; Throwable ex;
2575 if (r instanceof AltResult) {
2576 ex = ((AltResult)r).ex;
2577 t = null;
2578 }
2579 else {
2580 ex = null;
2581 @SuppressWarnings("unchecked") T tr = (T) r;
2582 t = tr;
2583 }
2584 if (ex == null) {
2585 try {
2586 if (e != null)
2587 e.execute(new AsyncAccept<T>(t, fn, dst));
2588 else
2589 fn.accept(t);
2590 } catch (Throwable rex) {
2591 ex = rex;
2592 }
2593 }
2594 if (e == null || ex != null)
2595 dst.internalComplete(null, ex);
2596 }
2597 helpPostComplete();
2598 other.helpPostComplete();
2599 return dst;
2600 }
2601
2602 /**
2603 * Returns a new CompletableFuture that is completed
2604 * when either this or the other given CompletableFuture completes,
2605 * after performing the given action.
2606 *
2607 * <p>If this and/or the other CompletableFuture complete
2608 * exceptionally, then the returned CompletableFuture may also do so,
2609 * with a CompletionException holding one of these exceptions as its
2610 * cause. No guarantees are made about which result or exception is
2611 * used in the returned CompletableFuture. If the supplied action
2612 * throws an exception, then the returned CompletableFuture completes
2613 * exceptionally with a CompletionException holding the exception as
2614 * its cause.
2615 *
2616 * @param other the other CompletableFuture
2617 * @param action the action to perform before completing the
2618 * returned CompletableFuture
2619 * @return the new CompletableFuture
2620 */
2621 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2622 Runnable action) {
2623 return doRunAfterEither(other, action, null);
2624 }
2625
2626 /**
2627 * Returns a new CompletableFuture that is asynchronously completed
2628 * when either this or the other given CompletableFuture completes,
2629 * after performing the given action from a task running in the
2630 * {@link ForkJoinPool#commonPool()}.
2631 *
2632 * <p>If this and/or the other CompletableFuture complete
2633 * exceptionally, then the returned CompletableFuture may also do so,
2634 * with a CompletionException holding one of these exceptions as its
2635 * cause. No guarantees are made about which result or exception is
2636 * used in the returned CompletableFuture. If the supplied action
2637 * throws an exception, then the returned CompletableFuture completes
2638 * exceptionally with a CompletionException holding the exception as
2639 * its cause.
2640 *
2641 * @param other the other CompletableFuture
2642 * @param action the action to perform before completing the
2643 * returned CompletableFuture
2644 * @return the new CompletableFuture
2645 */
2646 public CompletableFuture<Void> runAfterEitherAsync
2647 (CompletableFuture<?> other,
2648 Runnable action) {
2649 return doRunAfterEither(other, action, ForkJoinPool.commonPool());
2650 }
2651
2652 /**
2653 * Returns a new CompletableFuture that is asynchronously completed
2654 * when either this or the other given CompletableFuture completes,
2655 * after performing the given action from a task running in the
2656 * given executor.
2657 *
2658 * <p>If this and/or the other CompletableFuture complete
2659 * exceptionally, then the returned CompletableFuture may also do so,
2660 * with a CompletionException holding one of these exceptions as its
2661 * cause. No guarantees are made about which result or exception is
2662 * used in the returned CompletableFuture. If the supplied action
2663 * throws an exception, then the returned CompletableFuture completes
2664 * exceptionally with a CompletionException holding the exception as
2665 * its cause.
2666 *
2667 * @param other the other CompletableFuture
2668 * @param action the action to perform before completing the
2669 * returned CompletableFuture
2670 * @param executor the executor to use for asynchronous execution
2671 * @return the new CompletableFuture
2672 */
2673 public CompletableFuture<Void> runAfterEitherAsync
2674 (CompletableFuture<?> other,
2675 Runnable action,
2676 Executor executor) {
2677 if (executor == null) throw new NullPointerException();
2678 return doRunAfterEither(other, action, executor);
2679 }
2680
2681 private CompletableFuture<Void> doRunAfterEither
2682 (CompletableFuture<?> other,
2683 Runnable action,
2684 Executor e) {
2685 if (other == null || action == null) throw new NullPointerException();
2686 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2687 RunAfterEither d = null;
2688 Object r;
2689 if ((r = result) == null && (r = other.result) == null) {
2690 d = new RunAfterEither(this, other, action, dst, e);
2691 CompletionNode q = null, p = new CompletionNode(d);
2692 while ((r = result) == null && (r = other.result) == null) {
2693 if (q != null) {
2694 if (UNSAFE.compareAndSwapObject
2695 (other, COMPLETIONS, q.next = other.completions, q))
2696 break;
2697 }
2698 else if (UNSAFE.compareAndSwapObject
2699 (this, COMPLETIONS, p.next = completions, p))
2700 q = new CompletionNode(d);
2701 }
2702 }
2703 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2704 Throwable ex;
2705 if (r instanceof AltResult)
2706 ex = ((AltResult)r).ex;
2707 else
2708 ex = null;
2709 if (ex == null) {
2710 try {
2711 if (e != null)
2712 e.execute(new AsyncRun(action, dst));
2713 else
2714 action.run();
2715 } catch (Throwable rex) {
2716 ex = rex;
2717 }
2718 }
2719 if (e == null || ex != null)
2720 dst.internalComplete(null, ex);
2721 }
2722 helpPostComplete();
2723 other.helpPostComplete();
2724 return dst;
2725 }
2726
2727 /**
2728 * Returns a CompletableFuture that upon completion, has the same
2729 * value as produced by the given function of the result of this
2730 * CompletableFuture.
2731 *
2732 * <p>If this CompletableFuture completes exceptionally, then the
2733 * returned CompletableFuture also does so, with a
2734 * CompletionException holding this exception as its cause.
2735 * Similarly, if the computed CompletableFuture completes
2736 * exceptionally, then so does the returned CompletableFuture.
2737 *
2738 * @param fn the function returning a new CompletableFuture
2739 * @return the CompletableFuture
2740 */
2741 public <U> CompletableFuture<U> thenCompose
2742 (Fun<? super T, CompletableFuture<U>> fn) {
2743 return doThenCompose(fn, null);
2744 }
2745
2746 /**
2747 * Returns a CompletableFuture that upon completion, has the same
2748 * value as that produced asynchronously using the {@link
2749 * ForkJoinPool#commonPool()} by the given function of the result
2750 * of this CompletableFuture.
2751 *
2752 * <p>If this CompletableFuture completes exceptionally, then the
2753 * returned CompletableFuture also does so, with a
2754 * CompletionException holding this exception as its cause.
2755 * Similarly, if the computed CompletableFuture completes
2756 * exceptionally, then so does the returned CompletableFuture.
2757 *
2758 * @param fn the function returning a new CompletableFuture
2759 * @return the CompletableFuture
2760 */
2761 public <U> CompletableFuture<U> thenComposeAsync
2762 (Fun<? super T, CompletableFuture<U>> fn) {
2763 return doThenCompose(fn, ForkJoinPool.commonPool());
2764 }
2765
2766 /**
2767 * Returns a CompletableFuture that upon completion, has the same
2768 * value as that produced asynchronously using the given executor
2769 * by the given function of this CompletableFuture.
2770 *
2771 * <p>If this CompletableFuture completes exceptionally, then the
2772 * returned CompletableFuture also does so, with a
2773 * CompletionException holding this exception as its cause.
2774 * Similarly, if the computed CompletableFuture completes
2775 * exceptionally, then so does the returned CompletableFuture.
2776 *
2777 * @param fn the function returning a new CompletableFuture
2778 * @param executor the executor to use for asynchronous execution
2779 * @return the CompletableFuture
2780 */
2781 public <U> CompletableFuture<U> thenComposeAsync
2782 (Fun<? super T, CompletableFuture<U>> fn,
2783 Executor executor) {
2784 if (executor == null) throw new NullPointerException();
2785 return doThenCompose(fn, executor);
2786 }
2787
2788 private <U> CompletableFuture<U> doThenCompose
2789 (Fun<? super T, CompletableFuture<U>> fn,
2790 Executor e) {
2791 if (fn == null) throw new NullPointerException();
2792 CompletableFuture<U> dst = null;
2793 ThenCompose<T,U> d = null;
2794 Object r;
2795 if ((r = result) == null) {
2796 dst = new CompletableFuture<U>();
2797 CompletionNode p = new CompletionNode
2798 (d = new ThenCompose<T,U>(this, fn, dst, e));
2799 while ((r = result) == null) {
2800 if (UNSAFE.compareAndSwapObject
2801 (this, COMPLETIONS, p.next = completions, p))
2802 break;
2803 }
2804 }
2805 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2806 T t; Throwable ex;
2807 if (r instanceof AltResult) {
2808 ex = ((AltResult)r).ex;
2809 t = null;
2810 }
2811 else {
2812 ex = null;
2813 @SuppressWarnings("unchecked") T tr = (T) r;
2814 t = tr;
2815 }
2816 if (ex == null) {
2817 if (e != null) {
2818 if (dst == null)
2819 dst = new CompletableFuture<U>();
2820 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2821 }
2822 else {
2823 try {
2824 if ((dst = fn.apply(t)) == null)
2825 ex = new NullPointerException();
2826 } catch (Throwable rex) {
2827 ex = rex;
2828 }
2829 }
2830 }
2831 if (dst == null)
2832 dst = new CompletableFuture<U>();
2833 if (ex != null)
2834 dst.internalComplete(null, ex);
2835 }
2836 helpPostComplete();
2837 dst.helpPostComplete();
2838 return dst;
2839 }
2840
2841 /**
2842 * Returns a new CompletableFuture that is completed when this
2843 * CompletableFuture completes, with the result of the given
2844 * function of the exception triggering this CompletableFuture's
2845 * completion when it completes exceptionally; otherwise, if this
2846 * CompletableFuture completes normally, then the returned
2847 * CompletableFuture also completes normally with the same value.
2848 *
2849 * @param fn the function to use to compute the value of the
2850 * returned CompletableFuture if this CompletableFuture completed
2851 * exceptionally
2852 * @return the new CompletableFuture
2853 */
2854 public CompletableFuture<T> exceptionally
2855 (Fun<Throwable, ? extends T> fn) {
2856 if (fn == null) throw new NullPointerException();
2857 CompletableFuture<T> dst = new CompletableFuture<T>();
2858 ExceptionCompletion<T> d = null;
2859 Object r;
2860 if ((r = result) == null) {
2861 CompletionNode p =
2862 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2863 while ((r = result) == null) {
2864 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2865 p.next = completions, p))
2866 break;
2867 }
2868 }
2869 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2870 T t = null; Throwable ex, dx = null;
2871 if (r instanceof AltResult) {
2872 if ((ex = ((AltResult)r).ex) != null) {
2873 try {
2874 t = fn.apply(ex);
2875 } catch (Throwable rex) {
2876 dx = rex;
2877 }
2878 }
2879 }
2880 else {
2881 @SuppressWarnings("unchecked") T tr = (T) r;
2882 t = tr;
2883 }
2884 dst.internalComplete(t, dx);
2885 }
2886 helpPostComplete();
2887 return dst;
2888 }
2889
2890 /**
2891 * Returns a new CompletableFuture that is completed when this
2892 * CompletableFuture completes, with the result of the given
2893 * function of the result and exception of this CompletableFuture's
2894 * completion. The given function is invoked with the result (or
2895 * {@code null} if none) and the exception (or {@code null} if none)
2896 * of this CompletableFuture when complete.
2897 *
2898 * @param fn the function to use to compute the value of the
2899 * returned CompletableFuture
2900 * @return the new CompletableFuture
2901 */
2902 public <U> CompletableFuture<U> handle
2903 (BiFun<? super T, Throwable, ? extends U> fn) {
2904 if (fn == null) throw new NullPointerException();
2905 CompletableFuture<U> dst = new CompletableFuture<U>();
2906 HandleCompletion<T,U> d = null;
2907 Object r;
2908 if ((r = result) == null) {
2909 CompletionNode p =
2910 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2911 while ((r = result) == null) {
2912 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2913 p.next = completions, p))
2914 break;
2915 }
2916 }
2917 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2918 T t; Throwable ex;
2919 if (r instanceof AltResult) {
2920 ex = ((AltResult)r).ex;
2921 t = null;
2922 }
2923 else {
2924 ex = null;
2925 @SuppressWarnings("unchecked") T tr = (T) r;
2926 t = tr;
2927 }
2928 U u; Throwable dx;
2929 try {
2930 u = fn.apply(t, ex);
2931 dx = null;
2932 } catch (Throwable rex) {
2933 dx = rex;
2934 u = null;
2935 }
2936 dst.internalComplete(u, dx);
2937 }
2938 helpPostComplete();
2939 return dst;
2940 }
2941
2942
2943 /* ------------- Arbitrary-arity constructions -------------- */
2944
2945 /*
2946 * The basic plan of attack is to recursively form binary
2947 * completion trees of elements. This can be overkill for small
2948 * sets, but scales nicely. The And/All vs Or/Any forms use the
2949 * same idea, but details differ.
2950 */
2951
2952 /**
2953 * Returns a new CompletableFuture that is completed when all of
2954 * the given CompletableFutures complete. If any of the given
2955 * CompletableFutures complete exceptionally, then the returned
2956 * CompletableFuture also does so, with a CompletionException
2957 * holding this exception as its cause. Otherwise, the results,
2958 * if any, of the given CompletableFutures are not reflected in
2959 * the returned CompletableFuture, but may be obtained by
2960 * inspecting them individually. If no CompletableFutures are
2961 * provided, returns a CompletableFuture completed with the value
2962 * {@code null}.
2963 *
2964 * <p>Among the applications of this method is to await completion
2965 * of a set of independent CompletableFutures before continuing a
2966 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2967 * c3).join();}.
2968 *
2969 * @param cfs the CompletableFutures
2970 * @return a new CompletableFuture that is completed when all of the
2971 * given CompletableFutures complete
2972 * @throws NullPointerException if the array or any of its elements are
2973 * {@code null}
2974 */
2975 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2976 int len = cfs.length; // Directly handle empty and singleton cases
2977 if (len > 1)
2978 return allTree(cfs, 0, len - 1);
2979 else {
2980 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2981 CompletableFuture<?> f;
2982 if (len == 0)
2983 dst.result = NIL;
2984 else if ((f = cfs[0]) == null)
2985 throw new NullPointerException();
2986 else {
2987 ThenPropagate d = null;
2988 CompletionNode p = null;
2989 Object r;
2990 while ((r = f.result) == null) {
2991 if (d == null)
2992 d = new ThenPropagate(f, dst);
2993 else if (p == null)
2994 p = new CompletionNode(d);
2995 else if (UNSAFE.compareAndSwapObject
2996 (f, COMPLETIONS, p.next = f.completions, p))
2997 break;
2998 }
2999 if (r != null && (d == null || d.compareAndSet(0, 1)))
3000 dst.internalComplete(null, (r instanceof AltResult) ?
3001 ((AltResult)r).ex : null);
3002 f.helpPostComplete();
3003 }
3004 return dst;
3005 }
3006 }
3007
3008 /**
3009 * Recursively constructs an And'ed tree of CompletableFutures.
3010 * Called only when array known to have at least two elements.
3011 */
3012 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
3013 int lo, int hi) {
3014 CompletableFuture<?> fst, snd;
3015 int mid = (lo + hi) >>> 1;
3016 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
3017 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
3018 throw new NullPointerException();
3019 CompletableFuture<Void> dst = new CompletableFuture<Void>();
3020 AndCompletion d = null;
3021 CompletionNode p = null, q = null;
3022 Object r = null, s = null;
3023 while ((r = fst.result) == null || (s = snd.result) == null) {
3024 if (d == null)
3025 d = new AndCompletion(fst, snd, dst);
3026 else if (p == null)
3027 p = new CompletionNode(d);
3028 else if (q == null) {
3029 if (UNSAFE.compareAndSwapObject
3030 (fst, COMPLETIONS, p.next = fst.completions, p))
3031 q = new CompletionNode(d);
3032 }
3033 else if (UNSAFE.compareAndSwapObject
3034 (snd, COMPLETIONS, q.next = snd.completions, q))
3035 break;
3036 }
3037 if ((r != null || (r = fst.result) != null) &&
3038 (s != null || (s = snd.result) != null) &&
3039 (d == null || d.compareAndSet(0, 1))) {
3040 Throwable ex;
3041 if (r instanceof AltResult)
3042 ex = ((AltResult)r).ex;
3043 else
3044 ex = null;
3045 if (ex == null && (s instanceof AltResult))
3046 ex = ((AltResult)s).ex;
3047 dst.internalComplete(null, ex);
3048 }
3049 fst.helpPostComplete();
3050 snd.helpPostComplete();
3051 return dst;
3052 }
3053
3054 /**
3055 * Returns a new CompletableFuture that is completed when any of
3056 * the given CompletableFutures complete, with the same result.
3057 * Otherwise, if it completed exceptionally, the returned
3058 * CompletableFuture also does so, with a CompletionException
3059 * holding this exception as its cause. If no CompletableFutures
3060 * are provided, returns an incomplete CompletableFuture.
3061 *
3062 * @param cfs the CompletableFutures
3063 * @return a new CompletableFuture that is completed with the
3064 * result or exception of any of the given CompletableFutures when
3065 * one completes
3066 * @throws NullPointerException if the array or any of its elements are
3067 * {@code null}
3068 */
3069 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
3070 int len = cfs.length; // Same idea as allOf
3071 if (len > 1)
3072 return anyTree(cfs, 0, len - 1);
3073 else {
3074 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3075 CompletableFuture<?> f;
3076 if (len == 0)
3077 ; // skip
3078 else if ((f = cfs[0]) == null)
3079 throw new NullPointerException();
3080 else {
3081 ThenCopy<Object> d = null;
3082 CompletionNode p = null;
3083 Object r;
3084 while ((r = f.result) == null) {
3085 if (d == null)
3086 d = new ThenCopy<Object>(f, dst);
3087 else if (p == null)
3088 p = new CompletionNode(d);
3089 else if (UNSAFE.compareAndSwapObject
3090 (f, COMPLETIONS, p.next = f.completions, p))
3091 break;
3092 }
3093 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3094 Throwable ex; Object t;
3095 if (r instanceof AltResult) {
3096 ex = ((AltResult)r).ex;
3097 t = null;
3098 }
3099 else {
3100 ex = null;
3101 t = r;
3102 }
3103 dst.internalComplete(t, ex);
3104 }
3105 f.helpPostComplete();
3106 }
3107 return dst;
3108 }
3109 }
3110
3111 /**
3112 * Recursively constructs an Or'ed tree of CompletableFutures.
3113 */
3114 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
3115 int lo, int hi) {
3116 CompletableFuture<?> fst, snd;
3117 int mid = (lo + hi) >>> 1;
3118 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3119 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3120 throw new NullPointerException();
3121 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3122 OrCompletion d = null;
3123 CompletionNode p = null, q = null;
3124 Object r;
3125 while ((r = fst.result) == null && (r = snd.result) == null) {
3126 if (d == null)
3127 d = new OrCompletion(fst, snd, dst);
3128 else if (p == null)
3129 p = new CompletionNode(d);
3130 else if (q == null) {
3131 if (UNSAFE.compareAndSwapObject
3132 (fst, COMPLETIONS, p.next = fst.completions, p))
3133 q = new CompletionNode(d);
3134 }
3135 else if (UNSAFE.compareAndSwapObject
3136 (snd, COMPLETIONS, q.next = snd.completions, q))
3137 break;
3138 }
3139 if ((r != null || (r = fst.result) != null ||
3140 (r = snd.result) != null) &&
3141 (d == null || d.compareAndSet(0, 1))) {
3142 Throwable ex; Object t;
3143 if (r instanceof AltResult) {
3144 ex = ((AltResult)r).ex;
3145 t = null;
3146 }
3147 else {
3148 ex = null;
3149 t = r;
3150 }
3151 dst.internalComplete(t, ex);
3152 }
3153 fst.helpPostComplete();
3154 snd.helpPostComplete();
3155 return dst;
3156 }
3157
3158 /* ------------- Control and status methods -------------- */
3159
3160 /**
3161 * If not already completed, completes this CompletableFuture with
3162 * a {@link CancellationException}. Dependent CompletableFutures
3163 * that have not already completed will also complete
3164 * exceptionally, with a {@link CompletionException} caused by
3165 * this {@code CancellationException}.
3166 *
3167 * @param mayInterruptIfRunning this value has no effect in this
3168 * implementation because interrupts are not used to control
3169 * processing.
3170 *
3171 * @return {@code true} if this task is now cancelled
3172 */
3173 public boolean cancel(boolean mayInterruptIfRunning) {
3174 boolean cancelled = (result == null) &&
3175 UNSAFE.compareAndSwapObject
3176 (this, RESULT, null, new AltResult(new CancellationException()));
3177 postComplete();
3178 return cancelled || isCancelled();
3179 }
3180
3181 /**
3182 * Returns {@code true} if this CompletableFuture was cancelled
3183 * before it completed normally.
3184 *
3185 * @return {@code true} if this CompletableFuture was cancelled
3186 * before it completed normally
3187 */
3188 public boolean isCancelled() {
3189 Object r;
3190 return ((r = result) instanceof AltResult) &&
3191 (((AltResult)r).ex instanceof CancellationException);
3192 }
3193
3194 /**
3195 * Forcibly sets or resets the value subsequently returned by
3196 * method {@link #get()} and related methods, whether or not
3197 * already completed. This method is designed for use only in
3198 * error recovery actions, and even in such situations may result
3199 * in ongoing dependent completions using established versus
3200 * overwritten outcomes.
3201 *
3202 * @param value the completion value
3203 */
3204 public void obtrudeValue(T value) {
3205 result = (value == null) ? NIL : value;
3206 postComplete();
3207 }
3208
3209 /**
3210 * Forcibly causes subsequent invocations of method {@link #get()}
3211 * and related methods to throw the given exception, whether or
3212 * not already completed. This method is designed for use only in
3213 * recovery actions, and even in such situations may result in
3214 * ongoing dependent completions using established versus
3215 * overwritten outcomes.
3216 *
3217 * @param ex the exception
3218 */
3219 public void obtrudeException(Throwable ex) {
3220 if (ex == null) throw new NullPointerException();
3221 result = new AltResult(ex);
3222 postComplete();
3223 }
3224
3225 /**
3226 * Returns the estimated number of CompletableFutures whose
3227 * completions are awaiting completion of this CompletableFuture.
3228 * This method is designed for use in monitoring system state, not
3229 * for synchronization control.
3230 *
3231 * @return the number of dependent CompletableFutures
3232 */
3233 public int getNumberOfDependents() {
3234 int count = 0;
3235 for (CompletionNode p = completions; p != null; p = p.next)
3236 ++count;
3237 return count;
3238 }
3239
3240 /**
3241 * Returns a string identifying this CompletableFuture, as well as
3242 * its completion state. The state, in brackets, contains the
3243 * String {@code "Completed Normally"} or the String {@code
3244 * "Completed Exceptionally"}, or the String {@code "Not
3245 * completed"} followed by the number of CompletableFutures
3246 * dependent upon its completion, if any.
3247 *
3248 * @return a string identifying this CompletableFuture, as well as its state
3249 */
3250 public String toString() {
3251 Object r = result;
3252 int count;
3253 return super.toString() +
3254 ((r == null) ?
3255 (((count = getNumberOfDependents()) == 0) ?
3256 "[Not completed]" :
3257 "[Not completed, " + count + " dependents]") :
3258 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3259 "[Completed exceptionally]" :
3260 "[Completed normally]"));
3261 }
3262
3263 // Unsafe mechanics
3264 private static final sun.misc.Unsafe UNSAFE;
3265 private static final long RESULT;
3266 private static final long WAITERS;
3267 private static final long COMPLETIONS;
3268 static {
3269 try {
3270 UNSAFE = getUnsafe();
3271 Class<?> k = CompletableFuture.class;
3272 RESULT = UNSAFE.objectFieldOffset
3273 (k.getDeclaredField("result"));
3274 WAITERS = UNSAFE.objectFieldOffset
3275 (k.getDeclaredField("waiters"));
3276 COMPLETIONS = UNSAFE.objectFieldOffset
3277 (k.getDeclaredField("completions"));
3278 } catch (Exception e) {
3279 throw new Error(e);
3280 }
3281 }
3282
3283 /**
3284 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
3285 * Replace with a simple call to Unsafe.getUnsafe when integrating
3286 * into a jdk.
3287 *
3288 * @return a sun.misc.Unsafe
3289 */
3290 private static sun.misc.Unsafe getUnsafe() {
3291 try {
3292 return sun.misc.Unsafe.getUnsafe();
3293 } catch (SecurityException tryReflectionInstead) {}
3294 try {
3295 return java.security.AccessController.doPrivileged
3296 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
3297 public sun.misc.Unsafe run() throws Exception {
3298 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
3299 for (java.lang.reflect.Field f : k.getDeclaredFields()) {
3300 f.setAccessible(true);
3301 Object x = f.get(null);
3302 if (k.isInstance(x))
3303 return k.cast(x);
3304 }
3305 throw new NoSuchFieldError("the Unsafe");
3306 }});
3307 } catch (java.security.PrivilegedActionException e) {
3308 throw new RuntimeException("Could not initialize intrinsics",
3309 e.getCause());
3310 }
3311 }
3312 }