ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CompletableFuture.java
Revision: 1.18
Committed: Wed Jun 19 14:55:40 2013 UTC (10 years, 11 months ago) by dl
Branch: MAIN
Changes since 1.17: +1092 -497 lines
Log Message:
Sync with jdk8 versions

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.FutureTask;
10 import java.util.concurrent.TimeUnit;
11 import java.util.concurrent.Executor;
12 import java.util.concurrent.ThreadLocalRandom;
13 import java.util.concurrent.ExecutionException;
14 import java.util.concurrent.TimeoutException;
15 import java.util.concurrent.CancellationException;
16 import java.util.concurrent.atomic.AtomicInteger;
17 import java.util.concurrent.locks.LockSupport;
18
19 /**
20 * A {@link Future} that may be explicitly completed (setting its
21 * value and status), and may include dependent functions and actions
22 * that trigger upon its completion.
23 *
24 * <p>When two or more threads attempt to
25 * {@link #complete complete},
26 * {@link #completeExceptionally completeExceptionally}, or
27 * {@link #cancel cancel}
28 * a CompletableFuture, only one of them succeeds.
29 *
30 * <p>Methods are available for adding dependents based on
31 * user-provided Functions, Actions, or Runnables. The appropriate
32 * form to use depends on whether actions require arguments and/or
33 * produce results. Completion of a dependent action will trigger the
34 * completion of another CompletableFuture. Actions may also be
35 * triggered after either or both the current and another
36 * CompletableFuture complete. Multiple CompletableFutures may also
37 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
38 * {@link #allOf(CompletableFuture...)}.
39 *
40 * <p>CompletableFutures themselves do not execute asynchronously.
41 * However, actions supplied for dependent completions of another
42 * CompletableFuture may do so, depending on whether they are provided
43 * via one of the <em>async</em> methods (that is, methods with names
44 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
45 * methods provide a way to commence asynchronous processing of an
46 * action using either a given {@link Executor} or by default the
47 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
48 * debugging, and tracking, all generated asynchronous tasks are
49 * instances of the marker interface {@link AsynchronousCompletionTask}.
50 *
51 * <p>Actions supplied for dependent completions of <em>non-async</em>
52 * methods may be performed by the thread that completes the current
53 * CompletableFuture, or by any other caller of these methods. There
54 * are no guarantees about the order of processing completions unless
55 * constrained by these methods.
56 *
57 * <p>Since (unlike {@link FutureTask}) this class has no direct
58 * control over the computation that causes it to be completed,
59 * cancellation is treated as just another form of exceptional completion.
60 * Method {@link #cancel cancel} has the same effect as
61 * {@code completeExceptionally(new CancellationException())}.
62 *
63 * <p>Upon exceptional completion (including cancellation), or when a
64 * completion entails an additional computation which terminates
65 * abruptly with an (unchecked) exception or error, then all of their
66 * dependent completions (and their dependents in turn) generally act
67 * as {@code completeExceptionally} with a {@link CompletionException}
68 * holding that exception as its cause. However, the {@link
69 * #exceptionally exceptionally} and {@link #handle handle}
70 * completions <em>are</em> able to handle exceptional completions of
71 * the CompletableFutures they depend on.
72 *
73 * <p>In case of exceptional completion with a CompletionException,
74 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
75 * {@link ExecutionException} with the same cause as held in the
76 * corresponding CompletionException. However, in these cases,
77 * methods {@link #join()} and {@link #getNow} throw the
78 * CompletionException, which simplifies usage.
79 *
80 * <p>Arguments used to pass a completion result (that is, for parameters
81 * of type {@code T}) may be null, but passing a null value for any other
82 * parameter will result in a {@link NullPointerException} being thrown.
83 *
84 * @author Doug Lea
85 */
86 public class CompletableFuture<T> implements Future<T> {
87 // jsr166e nested interfaces
88
89 /** Interface describing a void action of one argument */
90 public interface Action<A> { void accept(A a); }
91 /** Interface describing a void action of two arguments */
92 public interface BiAction<A,B> { void accept(A a, B b); }
93 /** Interface describing a function of one argument */
94 public interface Fun<A,T> { T apply(A a); }
95 /** Interface describing a function of two arguments */
96 public interface BiFun<A,B,T> { T apply(A a, B b); }
97 /** Interface describing a function of no arguments */
98 public interface Generator<T> { T get(); }
99
100
101 /*
102 * Overview:
103 *
104 * 1. Non-nullness of field result (set via CAS) indicates done.
105 * An AltResult is used to box null as a result, as well as to
106 * hold exceptions. Using a single field makes completion fast
107 * and simple to detect and trigger, at the expense of a lot of
108 * encoding and decoding that infiltrates many methods. One minor
109 * simplification relies on the (static) NIL (to box null results)
110 * being the only AltResult with a null exception field, so we
111 * don't usually need explicit comparisons with NIL. The CF
112 * exception propagation mechanics surrounding decoding rely on
113 * unchecked casts of decoded results really being unchecked,
114 * where user type errors are caught at point of use, as is
115 * currently the case in Java. These are highlighted by using
116 * SuppressWarnings-annotated temporaries.
117 *
118 * 2. Waiters are held in a Treiber stack similar to the one used
119 * in FutureTask, Phaser, and SynchronousQueue. See their
120 * internal documentation for algorithmic details.
121 *
122 * 3. Completions are also kept in a list/stack, and pulled off
123 * and run when completion is triggered. (We could even use the
124 * same stack as for waiters, but would give up the potential
125 * parallelism obtained because woken waiters help release/run
126 * others -- see method postComplete). Because post-processing
127 * may race with direct calls, class Completion opportunistically
128 * extends AtomicInteger so callers can claim the action via
129 * compareAndSet(0, 1). The Completion.run methods are all
130 * written a boringly similar uniform way (that sometimes includes
131 * unnecessary-looking checks, kept to maintain uniformity).
132 * There are enough dimensions upon which they differ that
133 * attempts to factor commonalities while maintaining efficiency
134 * require more lines of code than they would save.
135 *
136 * 4. The exported then/and/or methods do support a bit of
137 * factoring (see doThenApply etc). They must cope with the
138 * intrinsic races surrounding addition of a dependent action
139 * versus performing the action directly because the task is
140 * already complete. For example, a CF may not be complete upon
141 * entry, so a dependent completion is added, but by the time it
142 * is added, the target CF is complete, so must be directly
143 * executed. This is all done while avoiding unnecessary object
144 * construction in safe-bypass cases.
145 */
146
147 // preliminaries
148
149 static final class AltResult {
150 final Throwable ex; // null only for NIL
151 AltResult(Throwable ex) { this.ex = ex; }
152 }
153
154 static final AltResult NIL = new AltResult(null);
155
156 // Fields
157
158 volatile Object result; // Either the result or boxed AltResult
159 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
160 volatile CompletionNode completions; // list (Treiber stack) of completions
161
162 // Basic utilities for triggering and processing completions
163
164 /**
165 * Removes and signals all waiting threads and runs all completions.
166 */
167 final void postComplete() {
168 WaitNode q; Thread t;
169 while ((q = waiters) != null) {
170 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
171 (t = q.thread) != null) {
172 q.thread = null;
173 LockSupport.unpark(t);
174 }
175 }
176
177 CompletionNode h; Completion c;
178 while ((h = completions) != null) {
179 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
180 (c = h.completion) != null)
181 c.run();
182 }
183 }
184
185 /**
186 * Triggers completion with the encoding of the given arguments:
187 * if the exception is non-null, encodes it as a wrapped
188 * CompletionException unless it is one already. Otherwise uses
189 * the given result, boxed as NIL if null.
190 */
191 final void internalComplete(T v, Throwable ex) {
192 if (result == null)
193 UNSAFE.compareAndSwapObject
194 (this, RESULT, null,
195 (ex == null) ? (v == null) ? NIL : v :
196 new AltResult((ex instanceof CompletionException) ? ex :
197 new CompletionException(ex)));
198 postComplete(); // help out even if not triggered
199 }
200
201 /**
202 * If triggered, helps release and/or process completions.
203 */
204 final void helpPostComplete() {
205 if (result != null)
206 postComplete();
207 }
208
209 /* ------------- waiting for completions -------------- */
210
211 /** Number of processors, for spin control */
212 static final int NCPU = Runtime.getRuntime().availableProcessors();
213
214 /**
215 * Heuristic spin value for waitingGet() before blocking on
216 * multiprocessors
217 */
218 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
219
220 /**
221 * Linked nodes to record waiting threads in a Treiber stack. See
222 * other classes such as Phaser and SynchronousQueue for more
223 * detailed explanation. This class implements ManagedBlocker to
224 * avoid starvation when blocking actions pile up in
225 * ForkJoinPools.
226 */
227 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
228 long nanos; // wait time if timed
229 final long deadline; // non-zero if timed
230 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
231 volatile Thread thread;
232 volatile WaitNode next;
233 WaitNode(boolean interruptible, long nanos, long deadline) {
234 this.thread = Thread.currentThread();
235 this.interruptControl = interruptible ? 1 : 0;
236 this.nanos = nanos;
237 this.deadline = deadline;
238 }
239 public boolean isReleasable() {
240 if (thread == null)
241 return true;
242 if (Thread.interrupted()) {
243 int i = interruptControl;
244 interruptControl = -1;
245 if (i > 0)
246 return true;
247 }
248 if (deadline != 0L &&
249 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
250 thread = null;
251 return true;
252 }
253 return false;
254 }
255 public boolean block() {
256 if (isReleasable())
257 return true;
258 else if (deadline == 0L)
259 LockSupport.park(this);
260 else if (nanos > 0L)
261 LockSupport.parkNanos(this, nanos);
262 return isReleasable();
263 }
264 }
265
266 /**
267 * Returns raw result after waiting, or null if interruptible and
268 * interrupted.
269 */
270 private Object waitingGet(boolean interruptible) {
271 WaitNode q = null;
272 boolean queued = false;
273 int spins = SPINS;
274 for (Object r;;) {
275 if ((r = result) != null) {
276 if (q != null) { // suppress unpark
277 q.thread = null;
278 if (q.interruptControl < 0) {
279 if (interruptible) {
280 removeWaiter(q);
281 return null;
282 }
283 Thread.currentThread().interrupt();
284 }
285 }
286 postComplete(); // help release others
287 return r;
288 }
289 else if (spins > 0) {
290 int rnd = ThreadLocalRandom.current().nextInt();
291 if (rnd >= 0)
292 --spins;
293 }
294 else if (q == null)
295 q = new WaitNode(interruptible, 0L, 0L);
296 else if (!queued)
297 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
298 q.next = waiters, q);
299 else if (interruptible && q.interruptControl < 0) {
300 removeWaiter(q);
301 return null;
302 }
303 else if (q.thread != null && result == null) {
304 try {
305 ForkJoinPool.managedBlock(q);
306 } catch (InterruptedException ex) {
307 q.interruptControl = -1;
308 }
309 }
310 }
311 }
312
313 /**
314 * Awaits completion or aborts on interrupt or timeout.
315 *
316 * @param nanos time to wait
317 * @return raw result
318 */
319 private Object timedAwaitDone(long nanos)
320 throws InterruptedException, TimeoutException {
321 WaitNode q = null;
322 boolean queued = false;
323 for (Object r;;) {
324 if ((r = result) != null) {
325 if (q != null) {
326 q.thread = null;
327 if (q.interruptControl < 0) {
328 removeWaiter(q);
329 throw new InterruptedException();
330 }
331 }
332 postComplete();
333 return r;
334 }
335 else if (q == null) {
336 if (nanos <= 0L)
337 throw new TimeoutException();
338 long d = System.nanoTime() + nanos;
339 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
340 }
341 else if (!queued)
342 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
343 q.next = waiters, q);
344 else if (q.interruptControl < 0) {
345 removeWaiter(q);
346 throw new InterruptedException();
347 }
348 else if (q.nanos <= 0L) {
349 if (result == null) {
350 removeWaiter(q);
351 throw new TimeoutException();
352 }
353 }
354 else if (q.thread != null && result == null) {
355 try {
356 ForkJoinPool.managedBlock(q);
357 } catch (InterruptedException ex) {
358 q.interruptControl = -1;
359 }
360 }
361 }
362 }
363
364 /**
365 * Tries to unlink a timed-out or interrupted wait node to avoid
366 * accumulating garbage. Internal nodes are simply unspliced
367 * without CAS since it is harmless if they are traversed anyway
368 * by releasers. To avoid effects of unsplicing from already
369 * removed nodes, the list is retraversed in case of an apparent
370 * race. This is slow when there are a lot of nodes, but we don't
371 * expect lists to be long enough to outweigh higher-overhead
372 * schemes.
373 */
374 private void removeWaiter(WaitNode node) {
375 if (node != null) {
376 node.thread = null;
377 retry:
378 for (;;) { // restart on removeWaiter race
379 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
380 s = q.next;
381 if (q.thread != null)
382 pred = q;
383 else if (pred != null) {
384 pred.next = s;
385 if (pred.thread == null) // check for race
386 continue retry;
387 }
388 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
389 continue retry;
390 }
391 break;
392 }
393 }
394 }
395
396 /* ------------- Async tasks -------------- */
397
398 /**
399 * A marker interface identifying asynchronous tasks produced by
400 * {@code async} methods. This may be useful for monitoring,
401 * debugging, and tracking asynchronous activities.
402 *
403 * @since 1.8
404 */
405 public static interface AsynchronousCompletionTask {
406 }
407
408 /** Base class can act as either FJ or plain Runnable */
409 abstract static class Async extends ForkJoinTask<Void>
410 implements Runnable, AsynchronousCompletionTask {
411 public final Void getRawResult() { return null; }
412 public final void setRawResult(Void v) { }
413 public final void run() { exec(); }
414 }
415
416 static final class AsyncRun extends Async {
417 final Runnable fn;
418 final CompletableFuture<Void> dst;
419 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
420 this.fn = fn; this.dst = dst;
421 }
422 public final boolean exec() {
423 CompletableFuture<Void> d; Throwable ex;
424 if ((d = this.dst) != null && d.result == null) {
425 try {
426 fn.run();
427 ex = null;
428 } catch (Throwable rex) {
429 ex = rex;
430 }
431 d.internalComplete(null, ex);
432 }
433 return true;
434 }
435 private static final long serialVersionUID = 5232453952276885070L;
436 }
437
438 static final class AsyncSupply<U> extends Async {
439 final Generator<U> fn;
440 final CompletableFuture<U> dst;
441 AsyncSupply(Generator<U> fn, CompletableFuture<U> dst) {
442 this.fn = fn; this.dst = dst;
443 }
444 public final boolean exec() {
445 CompletableFuture<U> d; U u; Throwable ex;
446 if ((d = this.dst) != null && d.result == null) {
447 try {
448 u = fn.get();
449 ex = null;
450 } catch (Throwable rex) {
451 ex = rex;
452 u = null;
453 }
454 d.internalComplete(u, ex);
455 }
456 return true;
457 }
458 private static final long serialVersionUID = 5232453952276885070L;
459 }
460
461 static final class AsyncApply<T,U> extends Async {
462 final T arg;
463 final Fun<? super T,? extends U> fn;
464 final CompletableFuture<U> dst;
465 AsyncApply(T arg, Fun<? super T,? extends U> fn,
466 CompletableFuture<U> dst) {
467 this.arg = arg; this.fn = fn; this.dst = dst;
468 }
469 public final boolean exec() {
470 CompletableFuture<U> d; U u; Throwable ex;
471 if ((d = this.dst) != null && d.result == null) {
472 try {
473 u = fn.apply(arg);
474 ex = null;
475 } catch (Throwable rex) {
476 ex = rex;
477 u = null;
478 }
479 d.internalComplete(u, ex);
480 }
481 return true;
482 }
483 private static final long serialVersionUID = 5232453952276885070L;
484 }
485
486 static final class AsyncCombine<T,U,V> extends Async {
487 final T arg1;
488 final U arg2;
489 final BiFun<? super T,? super U,? extends V> fn;
490 final CompletableFuture<V> dst;
491 AsyncCombine(T arg1, U arg2,
492 BiFun<? super T,? super U,? extends V> fn,
493 CompletableFuture<V> dst) {
494 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
495 }
496 public final boolean exec() {
497 CompletableFuture<V> d; V v; Throwable ex;
498 if ((d = this.dst) != null && d.result == null) {
499 try {
500 v = fn.apply(arg1, arg2);
501 ex = null;
502 } catch (Throwable rex) {
503 ex = rex;
504 v = null;
505 }
506 d.internalComplete(v, ex);
507 }
508 return true;
509 }
510 private static final long serialVersionUID = 5232453952276885070L;
511 }
512
513 static final class AsyncAccept<T> extends Async {
514 final T arg;
515 final Action<? super T> fn;
516 final CompletableFuture<Void> dst;
517 AsyncAccept(T arg, Action<? super T> fn,
518 CompletableFuture<Void> dst) {
519 this.arg = arg; this.fn = fn; this.dst = dst;
520 }
521 public final boolean exec() {
522 CompletableFuture<Void> d; Throwable ex;
523 if ((d = this.dst) != null && d.result == null) {
524 try {
525 fn.accept(arg);
526 ex = null;
527 } catch (Throwable rex) {
528 ex = rex;
529 }
530 d.internalComplete(null, ex);
531 }
532 return true;
533 }
534 private static final long serialVersionUID = 5232453952276885070L;
535 }
536
537 static final class AsyncAcceptBoth<T,U> extends Async {
538 final T arg1;
539 final U arg2;
540 final BiAction<? super T,? super U> fn;
541 final CompletableFuture<Void> dst;
542 AsyncAcceptBoth(T arg1, U arg2,
543 BiAction<? super T,? super U> fn,
544 CompletableFuture<Void> dst) {
545 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
546 }
547 public final boolean exec() {
548 CompletableFuture<Void> d; Throwable ex;
549 if ((d = this.dst) != null && d.result == null) {
550 try {
551 fn.accept(arg1, arg2);
552 ex = null;
553 } catch (Throwable rex) {
554 ex = rex;
555 }
556 d.internalComplete(null, ex);
557 }
558 return true;
559 }
560 private static final long serialVersionUID = 5232453952276885070L;
561 }
562
563 static final class AsyncCompose<T,U> extends Async {
564 final T arg;
565 final Fun<? super T, CompletableFuture<U>> fn;
566 final CompletableFuture<U> dst;
567 AsyncCompose(T arg,
568 Fun<? super T, CompletableFuture<U>> fn,
569 CompletableFuture<U> dst) {
570 this.arg = arg; this.fn = fn; this.dst = dst;
571 }
572 public final boolean exec() {
573 CompletableFuture<U> d, fr; U u; Throwable ex;
574 if ((d = this.dst) != null && d.result == null) {
575 try {
576 fr = fn.apply(arg);
577 ex = (fr == null) ? new NullPointerException() : null;
578 } catch (Throwable rex) {
579 ex = rex;
580 fr = null;
581 }
582 if (ex != null)
583 u = null;
584 else {
585 Object r = fr.result;
586 if (r == null)
587 r = fr.waitingGet(false);
588 if (r instanceof AltResult) {
589 ex = ((AltResult)r).ex;
590 u = null;
591 }
592 else {
593 @SuppressWarnings("unchecked") U ur = (U) r;
594 u = ur;
595 }
596 }
597 d.internalComplete(u, ex);
598 }
599 return true;
600 }
601 private static final long serialVersionUID = 5232453952276885070L;
602 }
603
604 /* ------------- Completions -------------- */
605
606 /**
607 * Simple linked list nodes to record completions, used in
608 * basically the same way as WaitNodes. (We separate nodes from
609 * the Completions themselves mainly because for the And and Or
610 * methods, the same Completion object resides in two lists.)
611 */
612 static final class CompletionNode {
613 final Completion completion;
614 volatile CompletionNode next;
615 CompletionNode(Completion completion) { this.completion = completion; }
616 }
617
618 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
619 abstract static class Completion extends AtomicInteger implements Runnable {
620 }
621
622 static final class ThenApply<T,U> extends Completion {
623 final CompletableFuture<? extends T> src;
624 final Fun<? super T,? extends U> fn;
625 final CompletableFuture<U> dst;
626 final Executor executor;
627 ThenApply(CompletableFuture<? extends T> src,
628 Fun<? super T,? extends U> fn,
629 CompletableFuture<U> dst,
630 Executor executor) {
631 this.src = src; this.fn = fn; this.dst = dst;
632 this.executor = executor;
633 }
634 public final void run() {
635 final CompletableFuture<? extends T> a;
636 final Fun<? super T,? extends U> fn;
637 final CompletableFuture<U> dst;
638 Object r; T t; Throwable ex;
639 if ((dst = this.dst) != null &&
640 (fn = this.fn) != null &&
641 (a = this.src) != null &&
642 (r = a.result) != null &&
643 compareAndSet(0, 1)) {
644 if (r instanceof AltResult) {
645 ex = ((AltResult)r).ex;
646 t = null;
647 }
648 else {
649 ex = null;
650 @SuppressWarnings("unchecked") T tr = (T) r;
651 t = tr;
652 }
653 Executor e = executor;
654 U u = null;
655 if (ex == null) {
656 try {
657 if (e != null)
658 e.execute(new AsyncApply<T,U>(t, fn, dst));
659 else
660 u = fn.apply(t);
661 } catch (Throwable rex) {
662 ex = rex;
663 }
664 }
665 if (e == null || ex != null)
666 dst.internalComplete(u, ex);
667 }
668 }
669 private static final long serialVersionUID = 5232453952276885070L;
670 }
671
672 static final class ThenAccept<T> extends Completion {
673 final CompletableFuture<? extends T> src;
674 final Action<? super T> fn;
675 final CompletableFuture<Void> dst;
676 final Executor executor;
677 ThenAccept(CompletableFuture<? extends T> src,
678 Action<? super T> fn,
679 CompletableFuture<Void> dst,
680 Executor executor) {
681 this.src = src; this.fn = fn; this.dst = dst;
682 this.executor = executor;
683 }
684 public final void run() {
685 final CompletableFuture<? extends T> a;
686 final Action<? super T> fn;
687 final CompletableFuture<Void> dst;
688 Object r; T t; Throwable ex;
689 if ((dst = this.dst) != null &&
690 (fn = this.fn) != null &&
691 (a = this.src) != null &&
692 (r = a.result) != null &&
693 compareAndSet(0, 1)) {
694 if (r instanceof AltResult) {
695 ex = ((AltResult)r).ex;
696 t = null;
697 }
698 else {
699 ex = null;
700 @SuppressWarnings("unchecked") T tr = (T) r;
701 t = tr;
702 }
703 Executor e = executor;
704 if (ex == null) {
705 try {
706 if (e != null)
707 e.execute(new AsyncAccept<T>(t, fn, dst));
708 else
709 fn.accept(t);
710 } catch (Throwable rex) {
711 ex = rex;
712 }
713 }
714 if (e == null || ex != null)
715 dst.internalComplete(null, ex);
716 }
717 }
718 private static final long serialVersionUID = 5232453952276885070L;
719 }
720
721 static final class ThenRun extends Completion {
722 final CompletableFuture<?> src;
723 final Runnable fn;
724 final CompletableFuture<Void> dst;
725 final Executor executor;
726 ThenRun(CompletableFuture<?> src,
727 Runnable fn,
728 CompletableFuture<Void> dst,
729 Executor executor) {
730 this.src = src; this.fn = fn; this.dst = dst;
731 this.executor = executor;
732 }
733 public final void run() {
734 final CompletableFuture<?> a;
735 final Runnable fn;
736 final CompletableFuture<Void> dst;
737 Object r; Throwable ex;
738 if ((dst = this.dst) != null &&
739 (fn = this.fn) != null &&
740 (a = this.src) != null &&
741 (r = a.result) != null &&
742 compareAndSet(0, 1)) {
743 if (r instanceof AltResult)
744 ex = ((AltResult)r).ex;
745 else
746 ex = null;
747 Executor e = executor;
748 if (ex == null) {
749 try {
750 if (e != null)
751 e.execute(new AsyncRun(fn, dst));
752 else
753 fn.run();
754 } catch (Throwable rex) {
755 ex = rex;
756 }
757 }
758 if (e == null || ex != null)
759 dst.internalComplete(null, ex);
760 }
761 }
762 private static final long serialVersionUID = 5232453952276885070L;
763 }
764
765 static final class ThenCombine<T,U,V> extends Completion {
766 final CompletableFuture<? extends T> src;
767 final CompletableFuture<? extends U> snd;
768 final BiFun<? super T,? super U,? extends V> fn;
769 final CompletableFuture<V> dst;
770 final Executor executor;
771 ThenCombine(CompletableFuture<? extends T> src,
772 CompletableFuture<? extends U> snd,
773 BiFun<? super T,? super U,? extends V> fn,
774 CompletableFuture<V> dst,
775 Executor executor) {
776 this.src = src; this.snd = snd;
777 this.fn = fn; this.dst = dst;
778 this.executor = executor;
779 }
780 public final void run() {
781 final CompletableFuture<? extends T> a;
782 final CompletableFuture<? extends U> b;
783 final BiFun<? super T,? super U,? extends V> fn;
784 final CompletableFuture<V> dst;
785 Object r, s; T t; U u; Throwable ex;
786 if ((dst = this.dst) != null &&
787 (fn = this.fn) != null &&
788 (a = this.src) != null &&
789 (r = a.result) != null &&
790 (b = this.snd) != null &&
791 (s = b.result) != null &&
792 compareAndSet(0, 1)) {
793 if (r instanceof AltResult) {
794 ex = ((AltResult)r).ex;
795 t = null;
796 }
797 else {
798 ex = null;
799 @SuppressWarnings("unchecked") T tr = (T) r;
800 t = tr;
801 }
802 if (ex != null)
803 u = null;
804 else if (s instanceof AltResult) {
805 ex = ((AltResult)s).ex;
806 u = null;
807 }
808 else {
809 @SuppressWarnings("unchecked") U us = (U) s;
810 u = us;
811 }
812 Executor e = executor;
813 V v = null;
814 if (ex == null) {
815 try {
816 if (e != null)
817 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
818 else
819 v = fn.apply(t, u);
820 } catch (Throwable rex) {
821 ex = rex;
822 }
823 }
824 if (e == null || ex != null)
825 dst.internalComplete(v, ex);
826 }
827 }
828 private static final long serialVersionUID = 5232453952276885070L;
829 }
830
831 static final class ThenAcceptBoth<T,U> extends Completion {
832 final CompletableFuture<? extends T> src;
833 final CompletableFuture<? extends U> snd;
834 final BiAction<? super T,? super U> fn;
835 final CompletableFuture<Void> dst;
836 final Executor executor;
837 ThenAcceptBoth(CompletableFuture<? extends T> src,
838 CompletableFuture<? extends U> snd,
839 BiAction<? super T,? super U> fn,
840 CompletableFuture<Void> dst,
841 Executor executor) {
842 this.src = src; this.snd = snd;
843 this.fn = fn; this.dst = dst;
844 this.executor = executor;
845 }
846 public final void run() {
847 final CompletableFuture<? extends T> a;
848 final CompletableFuture<? extends U> b;
849 final BiAction<? super T,? super U> fn;
850 final CompletableFuture<Void> dst;
851 Object r, s; T t; U u; Throwable ex;
852 if ((dst = this.dst) != null &&
853 (fn = this.fn) != null &&
854 (a = this.src) != null &&
855 (r = a.result) != null &&
856 (b = this.snd) != null &&
857 (s = b.result) != null &&
858 compareAndSet(0, 1)) {
859 if (r instanceof AltResult) {
860 ex = ((AltResult)r).ex;
861 t = null;
862 }
863 else {
864 ex = null;
865 @SuppressWarnings("unchecked") T tr = (T) r;
866 t = tr;
867 }
868 if (ex != null)
869 u = null;
870 else if (s instanceof AltResult) {
871 ex = ((AltResult)s).ex;
872 u = null;
873 }
874 else {
875 @SuppressWarnings("unchecked") U us = (U) s;
876 u = us;
877 }
878 Executor e = executor;
879 if (ex == null) {
880 try {
881 if (e != null)
882 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
883 else
884 fn.accept(t, u);
885 } catch (Throwable rex) {
886 ex = rex;
887 }
888 }
889 if (e == null || ex != null)
890 dst.internalComplete(null, ex);
891 }
892 }
893 private static final long serialVersionUID = 5232453952276885070L;
894 }
895
896 static final class RunAfterBoth extends Completion {
897 final CompletableFuture<?> src;
898 final CompletableFuture<?> snd;
899 final Runnable fn;
900 final CompletableFuture<Void> dst;
901 final Executor executor;
902 RunAfterBoth(CompletableFuture<?> src,
903 CompletableFuture<?> snd,
904 Runnable fn,
905 CompletableFuture<Void> dst,
906 Executor executor) {
907 this.src = src; this.snd = snd;
908 this.fn = fn; this.dst = dst;
909 this.executor = executor;
910 }
911 public final void run() {
912 final CompletableFuture<?> a;
913 final CompletableFuture<?> b;
914 final Runnable fn;
915 final CompletableFuture<Void> dst;
916 Object r, s; Throwable ex;
917 if ((dst = this.dst) != null &&
918 (fn = this.fn) != null &&
919 (a = this.src) != null &&
920 (r = a.result) != null &&
921 (b = this.snd) != null &&
922 (s = b.result) != null &&
923 compareAndSet(0, 1)) {
924 if (r instanceof AltResult)
925 ex = ((AltResult)r).ex;
926 else
927 ex = null;
928 if (ex == null && (s instanceof AltResult))
929 ex = ((AltResult)s).ex;
930 Executor e = executor;
931 if (ex == null) {
932 try {
933 if (e != null)
934 e.execute(new AsyncRun(fn, dst));
935 else
936 fn.run();
937 } catch (Throwable rex) {
938 ex = rex;
939 }
940 }
941 if (e == null || ex != null)
942 dst.internalComplete(null, ex);
943 }
944 }
945 private static final long serialVersionUID = 5232453952276885070L;
946 }
947
948 static final class AndCompletion extends Completion {
949 final CompletableFuture<?> src;
950 final CompletableFuture<?> snd;
951 final CompletableFuture<Void> dst;
952 AndCompletion(CompletableFuture<?> src,
953 CompletableFuture<?> snd,
954 CompletableFuture<Void> dst) {
955 this.src = src; this.snd = snd; this.dst = dst;
956 }
957 public final void run() {
958 final CompletableFuture<?> a;
959 final CompletableFuture<?> b;
960 final CompletableFuture<Void> dst;
961 Object r, s; Throwable ex;
962 if ((dst = this.dst) != null &&
963 (a = this.src) != null &&
964 (r = a.result) != null &&
965 (b = this.snd) != null &&
966 (s = b.result) != null &&
967 compareAndSet(0, 1)) {
968 if (r instanceof AltResult)
969 ex = ((AltResult)r).ex;
970 else
971 ex = null;
972 if (ex == null && (s instanceof AltResult))
973 ex = ((AltResult)s).ex;
974 dst.internalComplete(null, ex);
975 }
976 }
977 private static final long serialVersionUID = 5232453952276885070L;
978 }
979
980 static final class ApplyToEither<T,U> extends Completion {
981 final CompletableFuture<? extends T> src;
982 final CompletableFuture<? extends T> snd;
983 final Fun<? super T,? extends U> fn;
984 final CompletableFuture<U> dst;
985 final Executor executor;
986 ApplyToEither(CompletableFuture<? extends T> src,
987 CompletableFuture<? extends T> snd,
988 Fun<? super T,? extends U> fn,
989 CompletableFuture<U> dst,
990 Executor executor) {
991 this.src = src; this.snd = snd;
992 this.fn = fn; this.dst = dst;
993 this.executor = executor;
994 }
995 public final void run() {
996 final CompletableFuture<? extends T> a;
997 final CompletableFuture<? extends T> b;
998 final Fun<? super T,? extends U> fn;
999 final CompletableFuture<U> dst;
1000 Object r; T t; Throwable ex;
1001 if ((dst = this.dst) != null &&
1002 (fn = this.fn) != null &&
1003 (((a = this.src) != null && (r = a.result) != null) ||
1004 ((b = this.snd) != null && (r = b.result) != null)) &&
1005 compareAndSet(0, 1)) {
1006 if (r instanceof AltResult) {
1007 ex = ((AltResult)r).ex;
1008 t = null;
1009 }
1010 else {
1011 ex = null;
1012 @SuppressWarnings("unchecked") T tr = (T) r;
1013 t = tr;
1014 }
1015 Executor e = executor;
1016 U u = null;
1017 if (ex == null) {
1018 try {
1019 if (e != null)
1020 e.execute(new AsyncApply<T,U>(t, fn, dst));
1021 else
1022 u = fn.apply(t);
1023 } catch (Throwable rex) {
1024 ex = rex;
1025 }
1026 }
1027 if (e == null || ex != null)
1028 dst.internalComplete(u, ex);
1029 }
1030 }
1031 private static final long serialVersionUID = 5232453952276885070L;
1032 }
1033
1034 static final class AcceptEither<T> extends Completion {
1035 final CompletableFuture<? extends T> src;
1036 final CompletableFuture<? extends T> snd;
1037 final Action<? super T> fn;
1038 final CompletableFuture<Void> dst;
1039 final Executor executor;
1040 AcceptEither(CompletableFuture<? extends T> src,
1041 CompletableFuture<? extends T> snd,
1042 Action<? super T> fn,
1043 CompletableFuture<Void> dst,
1044 Executor executor) {
1045 this.src = src; this.snd = snd;
1046 this.fn = fn; this.dst = dst;
1047 this.executor = executor;
1048 }
1049 public final void run() {
1050 final CompletableFuture<? extends T> a;
1051 final CompletableFuture<? extends T> b;
1052 final Action<? super T> fn;
1053 final CompletableFuture<Void> dst;
1054 Object r; T t; Throwable ex;
1055 if ((dst = this.dst) != null &&
1056 (fn = this.fn) != null &&
1057 (((a = this.src) != null && (r = a.result) != null) ||
1058 ((b = this.snd) != null && (r = b.result) != null)) &&
1059 compareAndSet(0, 1)) {
1060 if (r instanceof AltResult) {
1061 ex = ((AltResult)r).ex;
1062 t = null;
1063 }
1064 else {
1065 ex = null;
1066 @SuppressWarnings("unchecked") T tr = (T) r;
1067 t = tr;
1068 }
1069 Executor e = executor;
1070 if (ex == null) {
1071 try {
1072 if (e != null)
1073 e.execute(new AsyncAccept<T>(t, fn, dst));
1074 else
1075 fn.accept(t);
1076 } catch (Throwable rex) {
1077 ex = rex;
1078 }
1079 }
1080 if (e == null || ex != null)
1081 dst.internalComplete(null, ex);
1082 }
1083 }
1084 private static final long serialVersionUID = 5232453952276885070L;
1085 }
1086
1087 static final class RunAfterEither extends Completion {
1088 final CompletableFuture<?> src;
1089 final CompletableFuture<?> snd;
1090 final Runnable fn;
1091 final CompletableFuture<Void> dst;
1092 final Executor executor;
1093 RunAfterEither(CompletableFuture<?> src,
1094 CompletableFuture<?> snd,
1095 Runnable fn,
1096 CompletableFuture<Void> dst,
1097 Executor executor) {
1098 this.src = src; this.snd = snd;
1099 this.fn = fn; this.dst = dst;
1100 this.executor = executor;
1101 }
1102 public final void run() {
1103 final CompletableFuture<?> a;
1104 final CompletableFuture<?> b;
1105 final Runnable fn;
1106 final CompletableFuture<Void> dst;
1107 Object r; Throwable ex;
1108 if ((dst = this.dst) != null &&
1109 (fn = this.fn) != null &&
1110 (((a = this.src) != null && (r = a.result) != null) ||
1111 ((b = this.snd) != null && (r = b.result) != null)) &&
1112 compareAndSet(0, 1)) {
1113 if (r instanceof AltResult)
1114 ex = ((AltResult)r).ex;
1115 else
1116 ex = null;
1117 Executor e = executor;
1118 if (ex == null) {
1119 try {
1120 if (e != null)
1121 e.execute(new AsyncRun(fn, dst));
1122 else
1123 fn.run();
1124 } catch (Throwable rex) {
1125 ex = rex;
1126 }
1127 }
1128 if (e == null || ex != null)
1129 dst.internalComplete(null, ex);
1130 }
1131 }
1132 private static final long serialVersionUID = 5232453952276885070L;
1133 }
1134
1135 static final class OrCompletion extends Completion {
1136 final CompletableFuture<?> src;
1137 final CompletableFuture<?> snd;
1138 final CompletableFuture<Object> dst;
1139 OrCompletion(CompletableFuture<?> src,
1140 CompletableFuture<?> snd,
1141 CompletableFuture<Object> dst) {
1142 this.src = src; this.snd = snd; this.dst = dst;
1143 }
1144 public final void run() {
1145 final CompletableFuture<?> a;
1146 final CompletableFuture<?> b;
1147 final CompletableFuture<Object> dst;
1148 Object r, t; Throwable ex;
1149 if ((dst = this.dst) != null &&
1150 (((a = this.src) != null && (r = a.result) != null) ||
1151 ((b = this.snd) != null && (r = b.result) != null)) &&
1152 compareAndSet(0, 1)) {
1153 if (r instanceof AltResult) {
1154 ex = ((AltResult)r).ex;
1155 t = null;
1156 }
1157 else {
1158 ex = null;
1159 t = r;
1160 }
1161 dst.internalComplete(t, ex);
1162 }
1163 }
1164 private static final long serialVersionUID = 5232453952276885070L;
1165 }
1166
1167 static final class ExceptionCompletion<T> extends Completion {
1168 final CompletableFuture<? extends T> src;
1169 final Fun<? super Throwable, ? extends T> fn;
1170 final CompletableFuture<T> dst;
1171 ExceptionCompletion(CompletableFuture<? extends T> src,
1172 Fun<? super Throwable, ? extends T> fn,
1173 CompletableFuture<T> dst) {
1174 this.src = src; this.fn = fn; this.dst = dst;
1175 }
1176 public final void run() {
1177 final CompletableFuture<? extends T> a;
1178 final Fun<? super Throwable, ? extends T> fn;
1179 final CompletableFuture<T> dst;
1180 Object r; T t = null; Throwable ex, dx = null;
1181 if ((dst = this.dst) != null &&
1182 (fn = this.fn) != null &&
1183 (a = this.src) != null &&
1184 (r = a.result) != null &&
1185 compareAndSet(0, 1)) {
1186 if ((r instanceof AltResult) &&
1187 (ex = ((AltResult)r).ex) != null) {
1188 try {
1189 t = fn.apply(ex);
1190 } catch (Throwable rex) {
1191 dx = rex;
1192 }
1193 }
1194 else {
1195 @SuppressWarnings("unchecked") T tr = (T) r;
1196 t = tr;
1197 }
1198 dst.internalComplete(t, dx);
1199 }
1200 }
1201 private static final long serialVersionUID = 5232453952276885070L;
1202 }
1203
1204 static final class ThenCopy<T> extends Completion {
1205 final CompletableFuture<?> src;
1206 final CompletableFuture<T> dst;
1207 ThenCopy(CompletableFuture<?> src,
1208 CompletableFuture<T> dst) {
1209 this.src = src; this.dst = dst;
1210 }
1211 public final void run() {
1212 final CompletableFuture<?> a;
1213 final CompletableFuture<T> dst;
1214 Object r; T t; Throwable ex;
1215 if ((dst = this.dst) != null &&
1216 (a = this.src) != null &&
1217 (r = a.result) != null &&
1218 compareAndSet(0, 1)) {
1219 if (r instanceof AltResult) {
1220 ex = ((AltResult)r).ex;
1221 t = null;
1222 }
1223 else {
1224 ex = null;
1225 @SuppressWarnings("unchecked") T tr = (T) r;
1226 t = tr;
1227 }
1228 dst.internalComplete(t, ex);
1229 }
1230 }
1231 private static final long serialVersionUID = 5232453952276885070L;
1232 }
1233
1234 // version of ThenCopy for CompletableFuture<Void> dst
1235 static final class ThenPropagate extends Completion {
1236 final CompletableFuture<?> src;
1237 final CompletableFuture<Void> dst;
1238 ThenPropagate(CompletableFuture<?> src,
1239 CompletableFuture<Void> dst) {
1240 this.src = src; this.dst = dst;
1241 }
1242 public final void run() {
1243 final CompletableFuture<?> a;
1244 final CompletableFuture<Void> dst;
1245 Object r; Throwable ex;
1246 if ((dst = this.dst) != null &&
1247 (a = this.src) != null &&
1248 (r = a.result) != null &&
1249 compareAndSet(0, 1)) {
1250 if (r instanceof AltResult)
1251 ex = ((AltResult)r).ex;
1252 else
1253 ex = null;
1254 dst.internalComplete(null, ex);
1255 }
1256 }
1257 private static final long serialVersionUID = 5232453952276885070L;
1258 }
1259
1260 static final class HandleCompletion<T,U> extends Completion {
1261 final CompletableFuture<? extends T> src;
1262 final BiFun<? super T, Throwable, ? extends U> fn;
1263 final CompletableFuture<U> dst;
1264 HandleCompletion(CompletableFuture<? extends T> src,
1265 BiFun<? super T, Throwable, ? extends U> fn,
1266 CompletableFuture<U> dst) {
1267 this.src = src; this.fn = fn; this.dst = dst;
1268 }
1269 public final void run() {
1270 final CompletableFuture<? extends T> a;
1271 final BiFun<? super T, Throwable, ? extends U> fn;
1272 final CompletableFuture<U> dst;
1273 Object r; T t; Throwable ex;
1274 if ((dst = this.dst) != null &&
1275 (fn = this.fn) != null &&
1276 (a = this.src) != null &&
1277 (r = a.result) != null &&
1278 compareAndSet(0, 1)) {
1279 if (r instanceof AltResult) {
1280 ex = ((AltResult)r).ex;
1281 t = null;
1282 }
1283 else {
1284 ex = null;
1285 @SuppressWarnings("unchecked") T tr = (T) r;
1286 t = tr;
1287 }
1288 U u = null; Throwable dx = null;
1289 try {
1290 u = fn.apply(t, ex);
1291 } catch (Throwable rex) {
1292 dx = rex;
1293 }
1294 dst.internalComplete(u, dx);
1295 }
1296 }
1297 private static final long serialVersionUID = 5232453952276885070L;
1298 }
1299
1300 static final class ThenCompose<T,U> extends Completion {
1301 final CompletableFuture<? extends T> src;
1302 final Fun<? super T, CompletableFuture<U>> fn;
1303 final CompletableFuture<U> dst;
1304 final Executor executor;
1305 ThenCompose(CompletableFuture<? extends T> src,
1306 Fun<? super T, CompletableFuture<U>> fn,
1307 CompletableFuture<U> dst,
1308 Executor executor) {
1309 this.src = src; this.fn = fn; this.dst = dst;
1310 this.executor = executor;
1311 }
1312 public final void run() {
1313 final CompletableFuture<? extends T> a;
1314 final Fun<? super T, CompletableFuture<U>> fn;
1315 final CompletableFuture<U> dst;
1316 Object r; T t; Throwable ex; Executor e;
1317 if ((dst = this.dst) != null &&
1318 (fn = this.fn) != null &&
1319 (a = this.src) != null &&
1320 (r = a.result) != null &&
1321 compareAndSet(0, 1)) {
1322 if (r instanceof AltResult) {
1323 ex = ((AltResult)r).ex;
1324 t = null;
1325 }
1326 else {
1327 ex = null;
1328 @SuppressWarnings("unchecked") T tr = (T) r;
1329 t = tr;
1330 }
1331 CompletableFuture<U> c = null;
1332 U u = null;
1333 boolean complete = false;
1334 if (ex == null) {
1335 if ((e = executor) != null)
1336 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1337 else {
1338 try {
1339 if ((c = fn.apply(t)) == null)
1340 ex = new NullPointerException();
1341 } catch (Throwable rex) {
1342 ex = rex;
1343 }
1344 }
1345 }
1346 if (c != null) {
1347 ThenCopy<U> d = null;
1348 Object s;
1349 if ((s = c.result) == null) {
1350 CompletionNode p = new CompletionNode
1351 (d = new ThenCopy<U>(c, dst));
1352 while ((s = c.result) == null) {
1353 if (UNSAFE.compareAndSwapObject
1354 (c, COMPLETIONS, p.next = c.completions, p))
1355 break;
1356 }
1357 }
1358 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1359 complete = true;
1360 if (s instanceof AltResult) {
1361 ex = ((AltResult)s).ex; // no rewrap
1362 u = null;
1363 }
1364 else {
1365 @SuppressWarnings("unchecked") U us = (U) s;
1366 u = us;
1367 }
1368 }
1369 }
1370 if (complete || ex != null)
1371 dst.internalComplete(u, ex);
1372 if (c != null)
1373 c.helpPostComplete();
1374 }
1375 }
1376 private static final long serialVersionUID = 5232453952276885070L;
1377 }
1378
1379 // public methods
1380
1381 /**
1382 * Creates a new incomplete CompletableFuture.
1383 */
1384 public CompletableFuture() {
1385 }
1386
1387 /**
1388 * Returns a new CompletableFuture that is asynchronously completed
1389 * by a task running in the {@link ForkJoinPool#commonPool()} with
1390 * the value obtained by calling the given Generator.
1391 *
1392 * @param supplier a function returning the value to be used
1393 * to complete the returned CompletableFuture
1394 * @return the new CompletableFuture
1395 */
1396 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier) {
1397 if (supplier == null) throw new NullPointerException();
1398 CompletableFuture<U> f = new CompletableFuture<U>();
1399 ForkJoinPool.commonPool().
1400 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1401 return f;
1402 }
1403
1404 /**
1405 * Returns a new CompletableFuture that is asynchronously completed
1406 * by a task running in the given executor with the value obtained
1407 * by calling the given Generator.
1408 *
1409 * @param supplier a function returning the value to be used
1410 * to complete the returned CompletableFuture
1411 * @param executor the executor to use for asynchronous execution
1412 * @return the new CompletableFuture
1413 */
1414 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier,
1415 Executor executor) {
1416 if (executor == null || supplier == null)
1417 throw new NullPointerException();
1418 CompletableFuture<U> f = new CompletableFuture<U>();
1419 executor.execute(new AsyncSupply<U>(supplier, f));
1420 return f;
1421 }
1422
1423 /**
1424 * Returns a new CompletableFuture that is asynchronously completed
1425 * by a task running in the {@link ForkJoinPool#commonPool()} after
1426 * it runs the given action.
1427 *
1428 * @param runnable the action to run before completing the
1429 * returned CompletableFuture
1430 * @return the new CompletableFuture
1431 */
1432 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1433 if (runnable == null) throw new NullPointerException();
1434 CompletableFuture<Void> f = new CompletableFuture<Void>();
1435 ForkJoinPool.commonPool().
1436 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1437 return f;
1438 }
1439
1440 /**
1441 * Returns a new CompletableFuture that is asynchronously completed
1442 * by a task running in the given executor after it runs the given
1443 * action.
1444 *
1445 * @param runnable the action to run before completing the
1446 * returned CompletableFuture
1447 * @param executor the executor to use for asynchronous execution
1448 * @return the new CompletableFuture
1449 */
1450 public static CompletableFuture<Void> runAsync(Runnable runnable,
1451 Executor executor) {
1452 if (executor == null || runnable == null)
1453 throw new NullPointerException();
1454 CompletableFuture<Void> f = new CompletableFuture<Void>();
1455 executor.execute(new AsyncRun(runnable, f));
1456 return f;
1457 }
1458
1459 /**
1460 * Returns a new CompletableFuture that is already completed with
1461 * the given value.
1462 *
1463 * @param value the value
1464 * @return the completed CompletableFuture
1465 */
1466 public static <U> CompletableFuture<U> completedFuture(U value) {
1467 CompletableFuture<U> f = new CompletableFuture<U>();
1468 f.result = (value == null) ? NIL : value;
1469 return f;
1470 }
1471
1472 /**
1473 * Returns {@code true} if completed in any fashion: normally,
1474 * exceptionally, or via cancellation.
1475 *
1476 * @return {@code true} if completed
1477 */
1478 public boolean isDone() {
1479 return result != null;
1480 }
1481
1482 /**
1483 * Waits if necessary for this future to complete, and then
1484 * returns its result.
1485 *
1486 * @return the result value
1487 * @throws CancellationException if this future was cancelled
1488 * @throws ExecutionException if this future completed exceptionally
1489 * @throws InterruptedException if the current thread was interrupted
1490 * while waiting
1491 */
1492 public T get() throws InterruptedException, ExecutionException {
1493 Object r; Throwable ex, cause;
1494 if ((r = result) == null && (r = waitingGet(true)) == null)
1495 throw new InterruptedException();
1496 if (!(r instanceof AltResult)) {
1497 @SuppressWarnings("unchecked") T tr = (T) r;
1498 return tr;
1499 }
1500 if ((ex = ((AltResult)r).ex) == null)
1501 return null;
1502 if (ex instanceof CancellationException)
1503 throw (CancellationException)ex;
1504 if ((ex instanceof CompletionException) &&
1505 (cause = ex.getCause()) != null)
1506 ex = cause;
1507 throw new ExecutionException(ex);
1508 }
1509
1510 /**
1511 * Waits if necessary for at most the given time for this future
1512 * to complete, and then returns its result, if available.
1513 *
1514 * @param timeout the maximum time to wait
1515 * @param unit the time unit of the timeout argument
1516 * @return the result value
1517 * @throws CancellationException if this future was cancelled
1518 * @throws ExecutionException if this future completed exceptionally
1519 * @throws InterruptedException if the current thread was interrupted
1520 * while waiting
1521 * @throws TimeoutException if the wait timed out
1522 */
1523 public T get(long timeout, TimeUnit unit)
1524 throws InterruptedException, ExecutionException, TimeoutException {
1525 Object r; Throwable ex, cause;
1526 long nanos = unit.toNanos(timeout);
1527 if (Thread.interrupted())
1528 throw new InterruptedException();
1529 if ((r = result) == null)
1530 r = timedAwaitDone(nanos);
1531 if (!(r instanceof AltResult)) {
1532 @SuppressWarnings("unchecked") T tr = (T) r;
1533 return tr;
1534 }
1535 if ((ex = ((AltResult)r).ex) == null)
1536 return null;
1537 if (ex instanceof CancellationException)
1538 throw (CancellationException)ex;
1539 if ((ex instanceof CompletionException) &&
1540 (cause = ex.getCause()) != null)
1541 ex = cause;
1542 throw new ExecutionException(ex);
1543 }
1544
1545 /**
1546 * Returns the result value when complete, or throws an
1547 * (unchecked) exception if completed exceptionally. To better
1548 * conform with the use of common functional forms, if a
1549 * computation involved in the completion of this
1550 * CompletableFuture threw an exception, this method throws an
1551 * (unchecked) {@link CompletionException} with the underlying
1552 * exception as its cause.
1553 *
1554 * @return the result value
1555 * @throws CancellationException if the computation was cancelled
1556 * @throws CompletionException if this future completed
1557 * exceptionally or a completion computation threw an exception
1558 */
1559 public T join() {
1560 Object r; Throwable ex;
1561 if ((r = result) == null)
1562 r = waitingGet(false);
1563 if (!(r instanceof AltResult)) {
1564 @SuppressWarnings("unchecked") T tr = (T) r;
1565 return tr;
1566 }
1567 if ((ex = ((AltResult)r).ex) == null)
1568 return null;
1569 if (ex instanceof CancellationException)
1570 throw (CancellationException)ex;
1571 if (ex instanceof CompletionException)
1572 throw (CompletionException)ex;
1573 throw new CompletionException(ex);
1574 }
1575
1576 /**
1577 * Returns the result value (or throws any encountered exception)
1578 * if completed, else returns the given valueIfAbsent.
1579 *
1580 * @param valueIfAbsent the value to return if not completed
1581 * @return the result value, if completed, else the given valueIfAbsent
1582 * @throws CancellationException if the computation was cancelled
1583 * @throws CompletionException if this future completed
1584 * exceptionally or a completion computation threw an exception
1585 */
1586 public T getNow(T valueIfAbsent) {
1587 Object r; Throwable ex;
1588 if ((r = result) == null)
1589 return valueIfAbsent;
1590 if (!(r instanceof AltResult)) {
1591 @SuppressWarnings("unchecked") T tr = (T) r;
1592 return tr;
1593 }
1594 if ((ex = ((AltResult)r).ex) == null)
1595 return null;
1596 if (ex instanceof CancellationException)
1597 throw (CancellationException)ex;
1598 if (ex instanceof CompletionException)
1599 throw (CompletionException)ex;
1600 throw new CompletionException(ex);
1601 }
1602
1603 /**
1604 * If not already completed, sets the value returned by {@link
1605 * #get()} and related methods to the given value.
1606 *
1607 * @param value the result value
1608 * @return {@code true} if this invocation caused this CompletableFuture
1609 * to transition to a completed state, else {@code false}
1610 */
1611 public boolean complete(T value) {
1612 boolean triggered = result == null &&
1613 UNSAFE.compareAndSwapObject(this, RESULT, null,
1614 value == null ? NIL : value);
1615 postComplete();
1616 return triggered;
1617 }
1618
1619 /**
1620 * If not already completed, causes invocations of {@link #get()}
1621 * and related methods to throw the given exception.
1622 *
1623 * @param ex the exception
1624 * @return {@code true} if this invocation caused this CompletableFuture
1625 * to transition to a completed state, else {@code false}
1626 */
1627 public boolean completeExceptionally(Throwable ex) {
1628 if (ex == null) throw new NullPointerException();
1629 boolean triggered = result == null &&
1630 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1631 postComplete();
1632 return triggered;
1633 }
1634
1635 /**
1636 * Returns a new CompletableFuture that is completed
1637 * when this CompletableFuture completes, with the result of the
1638 * given function of this CompletableFuture's result.
1639 *
1640 * <p>If this CompletableFuture completes exceptionally, or the
1641 * supplied function throws an exception, then the returned
1642 * CompletableFuture completes exceptionally with a
1643 * CompletionException holding the exception as its cause.
1644 *
1645 * @param fn the function to use to compute the value of
1646 * the returned CompletableFuture
1647 * @return the new CompletableFuture
1648 */
1649 public <U> CompletableFuture<U> thenApply(Fun<? super T,? extends U> fn) {
1650 return doThenApply(fn, null);
1651 }
1652
1653 /**
1654 * Returns a new CompletableFuture that is asynchronously completed
1655 * when this CompletableFuture completes, with the result of the
1656 * given function of this CompletableFuture's result from a
1657 * task running in the {@link ForkJoinPool#commonPool()}.
1658 *
1659 * <p>If this CompletableFuture completes exceptionally, or the
1660 * supplied function throws an exception, then the returned
1661 * CompletableFuture completes exceptionally with a
1662 * CompletionException holding the exception as its cause.
1663 *
1664 * @param fn the function to use to compute the value of
1665 * the returned CompletableFuture
1666 * @return the new CompletableFuture
1667 */
1668 public <U> CompletableFuture<U> thenApplyAsync
1669 (Fun<? super T,? extends U> fn) {
1670 return doThenApply(fn, ForkJoinPool.commonPool());
1671 }
1672
1673 /**
1674 * Returns a new CompletableFuture that is asynchronously completed
1675 * when this CompletableFuture completes, with the result of the
1676 * given function of this CompletableFuture's result from a
1677 * task running in the given executor.
1678 *
1679 * <p>If this CompletableFuture completes exceptionally, or the
1680 * supplied function throws an exception, then the returned
1681 * CompletableFuture completes exceptionally with a
1682 * CompletionException holding the exception as its cause.
1683 *
1684 * @param fn the function to use to compute the value of
1685 * the returned CompletableFuture
1686 * @param executor the executor to use for asynchronous execution
1687 * @return the new CompletableFuture
1688 */
1689 public <U> CompletableFuture<U> thenApplyAsync
1690 (Fun<? super T,? extends U> fn,
1691 Executor executor) {
1692 if (executor == null) throw new NullPointerException();
1693 return doThenApply(fn, executor);
1694 }
1695
1696 private <U> CompletableFuture<U> doThenApply
1697 (Fun<? super T,? extends U> fn,
1698 Executor e) {
1699 if (fn == null) throw new NullPointerException();
1700 CompletableFuture<U> dst = new CompletableFuture<U>();
1701 ThenApply<T,U> d = null;
1702 Object r;
1703 if ((r = result) == null) {
1704 CompletionNode p = new CompletionNode
1705 (d = new ThenApply<T,U>(this, fn, dst, e));
1706 while ((r = result) == null) {
1707 if (UNSAFE.compareAndSwapObject
1708 (this, COMPLETIONS, p.next = completions, p))
1709 break;
1710 }
1711 }
1712 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1713 T t; Throwable ex;
1714 if (r instanceof AltResult) {
1715 ex = ((AltResult)r).ex;
1716 t = null;
1717 }
1718 else {
1719 ex = null;
1720 @SuppressWarnings("unchecked") T tr = (T) r;
1721 t = tr;
1722 }
1723 U u = null;
1724 if (ex == null) {
1725 try {
1726 if (e != null)
1727 e.execute(new AsyncApply<T,U>(t, fn, dst));
1728 else
1729 u = fn.apply(t);
1730 } catch (Throwable rex) {
1731 ex = rex;
1732 }
1733 }
1734 if (e == null || ex != null)
1735 dst.internalComplete(u, ex);
1736 }
1737 helpPostComplete();
1738 return dst;
1739 }
1740
1741 /**
1742 * Returns a new CompletableFuture that is completed
1743 * when this CompletableFuture completes, after performing the given
1744 * action with this CompletableFuture's result.
1745 *
1746 * <p>If this CompletableFuture completes exceptionally, or the
1747 * supplied action throws an exception, then the returned
1748 * CompletableFuture completes exceptionally with a
1749 * CompletionException holding the exception as its cause.
1750 *
1751 * @param block the action to perform before completing the
1752 * returned CompletableFuture
1753 * @return the new CompletableFuture
1754 */
1755 public CompletableFuture<Void> thenAccept(Action<? super T> block) {
1756 return doThenAccept(block, null);
1757 }
1758
1759 /**
1760 * Returns a new CompletableFuture that is asynchronously completed
1761 * when this CompletableFuture completes, after performing the given
1762 * action with this CompletableFuture's result from a task running
1763 * in the {@link ForkJoinPool#commonPool()}.
1764 *
1765 * <p>If this CompletableFuture completes exceptionally, or the
1766 * supplied action throws an exception, then the returned
1767 * CompletableFuture completes exceptionally with a
1768 * CompletionException holding the exception as its cause.
1769 *
1770 * @param block the action to perform before completing the
1771 * returned CompletableFuture
1772 * @return the new CompletableFuture
1773 */
1774 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block) {
1775 return doThenAccept(block, ForkJoinPool.commonPool());
1776 }
1777
1778 /**
1779 * Returns a new CompletableFuture that is asynchronously completed
1780 * when this CompletableFuture completes, after performing the given
1781 * action with this CompletableFuture's result from a task running
1782 * in the given executor.
1783 *
1784 * <p>If this CompletableFuture completes exceptionally, or the
1785 * supplied action throws an exception, then the returned
1786 * CompletableFuture completes exceptionally with a
1787 * CompletionException holding the exception as its cause.
1788 *
1789 * @param block the action to perform before completing the
1790 * returned CompletableFuture
1791 * @param executor the executor to use for asynchronous execution
1792 * @return the new CompletableFuture
1793 */
1794 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block,
1795 Executor executor) {
1796 if (executor == null) throw new NullPointerException();
1797 return doThenAccept(block, executor);
1798 }
1799
1800 private CompletableFuture<Void> doThenAccept(Action<? super T> fn,
1801 Executor e) {
1802 if (fn == null) throw new NullPointerException();
1803 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1804 ThenAccept<T> d = null;
1805 Object r;
1806 if ((r = result) == null) {
1807 CompletionNode p = new CompletionNode
1808 (d = new ThenAccept<T>(this, fn, dst, e));
1809 while ((r = result) == null) {
1810 if (UNSAFE.compareAndSwapObject
1811 (this, COMPLETIONS, p.next = completions, p))
1812 break;
1813 }
1814 }
1815 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1816 T t; Throwable ex;
1817 if (r instanceof AltResult) {
1818 ex = ((AltResult)r).ex;
1819 t = null;
1820 }
1821 else {
1822 ex = null;
1823 @SuppressWarnings("unchecked") T tr = (T) r;
1824 t = tr;
1825 }
1826 if (ex == null) {
1827 try {
1828 if (e != null)
1829 e.execute(new AsyncAccept<T>(t, fn, dst));
1830 else
1831 fn.accept(t);
1832 } catch (Throwable rex) {
1833 ex = rex;
1834 }
1835 }
1836 if (e == null || ex != null)
1837 dst.internalComplete(null, ex);
1838 }
1839 helpPostComplete();
1840 return dst;
1841 }
1842
1843 /**
1844 * Returns a new CompletableFuture that is completed
1845 * when this CompletableFuture completes, after performing the given
1846 * action.
1847 *
1848 * <p>If this CompletableFuture completes exceptionally, or the
1849 * supplied action throws an exception, then the returned
1850 * CompletableFuture completes exceptionally with a
1851 * CompletionException holding the exception as its cause.
1852 *
1853 * @param action the action to perform before completing the
1854 * returned CompletableFuture
1855 * @return the new CompletableFuture
1856 */
1857 public CompletableFuture<Void> thenRun(Runnable action) {
1858 return doThenRun(action, null);
1859 }
1860
1861 /**
1862 * Returns a new CompletableFuture that is asynchronously completed
1863 * when this CompletableFuture completes, after performing the given
1864 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1865 *
1866 * <p>If this CompletableFuture completes exceptionally, or the
1867 * supplied action throws an exception, then the returned
1868 * CompletableFuture completes exceptionally with a
1869 * CompletionException holding the exception as its cause.
1870 *
1871 * @param action the action to perform before completing the
1872 * returned CompletableFuture
1873 * @return the new CompletableFuture
1874 */
1875 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1876 return doThenRun(action, ForkJoinPool.commonPool());
1877 }
1878
1879 /**
1880 * Returns a new CompletableFuture that is asynchronously completed
1881 * when this CompletableFuture completes, after performing the given
1882 * action from a task running in the given executor.
1883 *
1884 * <p>If this CompletableFuture completes exceptionally, or the
1885 * supplied action throws an exception, then the returned
1886 * CompletableFuture completes exceptionally with a
1887 * CompletionException holding the exception as its cause.
1888 *
1889 * @param action the action to perform before completing the
1890 * returned CompletableFuture
1891 * @param executor the executor to use for asynchronous execution
1892 * @return the new CompletableFuture
1893 */
1894 public CompletableFuture<Void> thenRunAsync(Runnable action,
1895 Executor executor) {
1896 if (executor == null) throw new NullPointerException();
1897 return doThenRun(action, executor);
1898 }
1899
1900 private CompletableFuture<Void> doThenRun(Runnable action,
1901 Executor e) {
1902 if (action == null) throw new NullPointerException();
1903 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1904 ThenRun d = null;
1905 Object r;
1906 if ((r = result) == null) {
1907 CompletionNode p = new CompletionNode
1908 (d = new ThenRun(this, action, dst, e));
1909 while ((r = result) == null) {
1910 if (UNSAFE.compareAndSwapObject
1911 (this, COMPLETIONS, p.next = completions, p))
1912 break;
1913 }
1914 }
1915 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1916 Throwable ex;
1917 if (r instanceof AltResult)
1918 ex = ((AltResult)r).ex;
1919 else
1920 ex = null;
1921 if (ex == null) {
1922 try {
1923 if (e != null)
1924 e.execute(new AsyncRun(action, dst));
1925 else
1926 action.run();
1927 } catch (Throwable rex) {
1928 ex = rex;
1929 }
1930 }
1931 if (e == null || ex != null)
1932 dst.internalComplete(null, ex);
1933 }
1934 helpPostComplete();
1935 return dst;
1936 }
1937
1938 /**
1939 * Returns a new CompletableFuture that is completed
1940 * when both this and the other given CompletableFuture complete,
1941 * with the result of the given function of the results of the two
1942 * CompletableFutures.
1943 *
1944 * <p>If this and/or the other CompletableFuture complete
1945 * exceptionally, or the supplied function throws an exception,
1946 * then the returned CompletableFuture completes exceptionally
1947 * with a CompletionException holding the exception as its cause.
1948 *
1949 * @param other the other CompletableFuture
1950 * @param fn the function to use to compute the value of
1951 * the returned CompletableFuture
1952 * @return the new CompletableFuture
1953 */
1954 public <U,V> CompletableFuture<V> thenCombine
1955 (CompletableFuture<? extends U> other,
1956 BiFun<? super T,? super U,? extends V> fn) {
1957 return doThenCombine(other, fn, null);
1958 }
1959
1960 /**
1961 * Returns a new CompletableFuture that is asynchronously completed
1962 * when both this and the other given CompletableFuture complete,
1963 * with the result of the given function of the results of the two
1964 * CompletableFutures from a task running in the
1965 * {@link ForkJoinPool#commonPool()}.
1966 *
1967 * <p>If this and/or the other CompletableFuture complete
1968 * exceptionally, or the supplied function throws an exception,
1969 * then the returned CompletableFuture completes exceptionally
1970 * with a CompletionException holding the exception as its cause.
1971 *
1972 * @param other the other CompletableFuture
1973 * @param fn the function to use to compute the value of
1974 * the returned CompletableFuture
1975 * @return the new CompletableFuture
1976 */
1977 public <U,V> CompletableFuture<V> thenCombineAsync
1978 (CompletableFuture<? extends U> other,
1979 BiFun<? super T,? super U,? extends V> fn) {
1980 return doThenCombine(other, fn, ForkJoinPool.commonPool());
1981 }
1982
1983 /**
1984 * Returns a new CompletableFuture that is asynchronously completed
1985 * when both this and the other given CompletableFuture complete,
1986 * with the result of the given function of the results of the two
1987 * CompletableFutures from a task running in the given executor.
1988 *
1989 * <p>If this and/or the other CompletableFuture complete
1990 * exceptionally, or the supplied function throws an exception,
1991 * then the returned CompletableFuture completes exceptionally
1992 * with a CompletionException holding the exception as its cause.
1993 *
1994 * @param other the other CompletableFuture
1995 * @param fn the function to use to compute the value of
1996 * the returned CompletableFuture
1997 * @param executor the executor to use for asynchronous execution
1998 * @return the new CompletableFuture
1999 */
2000 public <U,V> CompletableFuture<V> thenCombineAsync
2001 (CompletableFuture<? extends U> other,
2002 BiFun<? super T,? super U,? extends V> fn,
2003 Executor executor) {
2004 if (executor == null) throw new NullPointerException();
2005 return doThenCombine(other, fn, executor);
2006 }
2007
2008 private <U,V> CompletableFuture<V> doThenCombine
2009 (CompletableFuture<? extends U> other,
2010 BiFun<? super T,? super U,? extends V> fn,
2011 Executor e) {
2012 if (other == null || fn == null) throw new NullPointerException();
2013 CompletableFuture<V> dst = new CompletableFuture<V>();
2014 ThenCombine<T,U,V> d = null;
2015 Object r, s = null;
2016 if ((r = result) == null || (s = other.result) == null) {
2017 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
2018 CompletionNode q = null, p = new CompletionNode(d);
2019 while ((r == null && (r = result) == null) ||
2020 (s == null && (s = other.result) == null)) {
2021 if (q != null) {
2022 if (s != null ||
2023 UNSAFE.compareAndSwapObject
2024 (other, COMPLETIONS, q.next = other.completions, q))
2025 break;
2026 }
2027 else if (r != null ||
2028 UNSAFE.compareAndSwapObject
2029 (this, COMPLETIONS, p.next = completions, p)) {
2030 if (s != null)
2031 break;
2032 q = new CompletionNode(d);
2033 }
2034 }
2035 }
2036 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2037 T t; U u; Throwable ex;
2038 if (r instanceof AltResult) {
2039 ex = ((AltResult)r).ex;
2040 t = null;
2041 }
2042 else {
2043 ex = null;
2044 @SuppressWarnings("unchecked") T tr = (T) r;
2045 t = tr;
2046 }
2047 if (ex != null)
2048 u = null;
2049 else if (s instanceof AltResult) {
2050 ex = ((AltResult)s).ex;
2051 u = null;
2052 }
2053 else {
2054 @SuppressWarnings("unchecked") U us = (U) s;
2055 u = us;
2056 }
2057 V v = null;
2058 if (ex == null) {
2059 try {
2060 if (e != null)
2061 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
2062 else
2063 v = fn.apply(t, u);
2064 } catch (Throwable rex) {
2065 ex = rex;
2066 }
2067 }
2068 if (e == null || ex != null)
2069 dst.internalComplete(v, ex);
2070 }
2071 helpPostComplete();
2072 other.helpPostComplete();
2073 return dst;
2074 }
2075
2076 /**
2077 * Returns a new CompletableFuture that is completed
2078 * when both this and the other given CompletableFuture complete,
2079 * after performing the given action with the results of the two
2080 * CompletableFutures.
2081 *
2082 * <p>If this and/or the other CompletableFuture complete
2083 * exceptionally, or the supplied action throws an exception,
2084 * then the returned CompletableFuture completes exceptionally
2085 * with a CompletionException holding the exception as its cause.
2086 *
2087 * @param other the other CompletableFuture
2088 * @param block the action to perform before completing the
2089 * returned CompletableFuture
2090 * @return the new CompletableFuture
2091 */
2092 public <U> CompletableFuture<Void> thenAcceptBoth
2093 (CompletableFuture<? extends U> other,
2094 BiAction<? super T, ? super U> block) {
2095 return doThenAcceptBoth(other, block, null);
2096 }
2097
2098 /**
2099 * Returns a new CompletableFuture that is asynchronously completed
2100 * when both this and the other given CompletableFuture complete,
2101 * after performing the given action with the results of the two
2102 * CompletableFutures from a task running in the {@link
2103 * ForkJoinPool#commonPool()}.
2104 *
2105 * <p>If this and/or the other CompletableFuture complete
2106 * exceptionally, or the supplied action throws an exception,
2107 * then the returned CompletableFuture completes exceptionally
2108 * with a CompletionException holding the exception as its cause.
2109 *
2110 * @param other the other CompletableFuture
2111 * @param block the action to perform before completing the
2112 * returned CompletableFuture
2113 * @return the new CompletableFuture
2114 */
2115 public <U> CompletableFuture<Void> thenAcceptBothAsync
2116 (CompletableFuture<? extends U> other,
2117 BiAction<? super T, ? super U> block) {
2118 return doThenAcceptBoth(other, block, ForkJoinPool.commonPool());
2119 }
2120
2121 /**
2122 * Returns a new CompletableFuture that is asynchronously completed
2123 * when both this and the other given CompletableFuture complete,
2124 * after performing the given action with the results of the two
2125 * CompletableFutures from a task running in the given executor.
2126 *
2127 * <p>If this and/or the other CompletableFuture complete
2128 * exceptionally, or the supplied action throws an exception,
2129 * then the returned CompletableFuture completes exceptionally
2130 * with a CompletionException holding the exception as its cause.
2131 *
2132 * @param other the other CompletableFuture
2133 * @param block the action to perform before completing the
2134 * returned CompletableFuture
2135 * @param executor the executor to use for asynchronous execution
2136 * @return the new CompletableFuture
2137 */
2138 public <U> CompletableFuture<Void> thenAcceptBothAsync
2139 (CompletableFuture<? extends U> other,
2140 BiAction<? super T, ? super U> block,
2141 Executor executor) {
2142 if (executor == null) throw new NullPointerException();
2143 return doThenAcceptBoth(other, block, executor);
2144 }
2145
2146 private <U> CompletableFuture<Void> doThenAcceptBoth
2147 (CompletableFuture<? extends U> other,
2148 BiAction<? super T,? super U> fn,
2149 Executor e) {
2150 if (other == null || fn == null) throw new NullPointerException();
2151 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2152 ThenAcceptBoth<T,U> d = null;
2153 Object r, s = null;
2154 if ((r = result) == null || (s = other.result) == null) {
2155 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
2156 CompletionNode q = null, p = new CompletionNode(d);
2157 while ((r == null && (r = result) == null) ||
2158 (s == null && (s = other.result) == null)) {
2159 if (q != null) {
2160 if (s != null ||
2161 UNSAFE.compareAndSwapObject
2162 (other, COMPLETIONS, q.next = other.completions, q))
2163 break;
2164 }
2165 else if (r != null ||
2166 UNSAFE.compareAndSwapObject
2167 (this, COMPLETIONS, p.next = completions, p)) {
2168 if (s != null)
2169 break;
2170 q = new CompletionNode(d);
2171 }
2172 }
2173 }
2174 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2175 T t; U u; Throwable ex;
2176 if (r instanceof AltResult) {
2177 ex = ((AltResult)r).ex;
2178 t = null;
2179 }
2180 else {
2181 ex = null;
2182 @SuppressWarnings("unchecked") T tr = (T) r;
2183 t = tr;
2184 }
2185 if (ex != null)
2186 u = null;
2187 else if (s instanceof AltResult) {
2188 ex = ((AltResult)s).ex;
2189 u = null;
2190 }
2191 else {
2192 @SuppressWarnings("unchecked") U us = (U) s;
2193 u = us;
2194 }
2195 if (ex == null) {
2196 try {
2197 if (e != null)
2198 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
2199 else
2200 fn.accept(t, u);
2201 } catch (Throwable rex) {
2202 ex = rex;
2203 }
2204 }
2205 if (e == null || ex != null)
2206 dst.internalComplete(null, ex);
2207 }
2208 helpPostComplete();
2209 other.helpPostComplete();
2210 return dst;
2211 }
2212
2213 /**
2214 * Returns a new CompletableFuture that is completed
2215 * when both this and the other given CompletableFuture complete,
2216 * after performing the given action.
2217 *
2218 * <p>If this and/or the other CompletableFuture complete
2219 * exceptionally, or the supplied action throws an exception,
2220 * then the returned CompletableFuture completes exceptionally
2221 * with a CompletionException holding the exception as its cause.
2222 *
2223 * @param other the other CompletableFuture
2224 * @param action the action to perform before completing the
2225 * returned CompletableFuture
2226 * @return the new CompletableFuture
2227 */
2228 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2229 Runnable action) {
2230 return doRunAfterBoth(other, action, null);
2231 }
2232
2233 /**
2234 * Returns a new CompletableFuture that is asynchronously completed
2235 * when both this and the other given CompletableFuture complete,
2236 * after performing the given action from a task running in the
2237 * {@link ForkJoinPool#commonPool()}.
2238 *
2239 * <p>If this and/or the other CompletableFuture complete
2240 * exceptionally, or the supplied action throws an exception,
2241 * then the returned CompletableFuture completes exceptionally
2242 * with a CompletionException holding the exception as its cause.
2243 *
2244 * @param other the other CompletableFuture
2245 * @param action the action to perform before completing the
2246 * returned CompletableFuture
2247 * @return the new CompletableFuture
2248 */
2249 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2250 Runnable action) {
2251 return doRunAfterBoth(other, action, ForkJoinPool.commonPool());
2252 }
2253
2254 /**
2255 * Returns a new CompletableFuture that is asynchronously completed
2256 * when both this and the other given CompletableFuture complete,
2257 * after performing the given action from a task running in the
2258 * given executor.
2259 *
2260 * <p>If this and/or the other CompletableFuture complete
2261 * exceptionally, or the supplied action throws an exception,
2262 * then the returned CompletableFuture completes exceptionally
2263 * with a CompletionException holding the exception as its cause.
2264 *
2265 * @param other the other CompletableFuture
2266 * @param action the action to perform before completing the
2267 * returned CompletableFuture
2268 * @param executor the executor to use for asynchronous execution
2269 * @return the new CompletableFuture
2270 */
2271 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2272 Runnable action,
2273 Executor executor) {
2274 if (executor == null) throw new NullPointerException();
2275 return doRunAfterBoth(other, action, executor);
2276 }
2277
2278 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
2279 Runnable action,
2280 Executor e) {
2281 if (other == null || action == null) throw new NullPointerException();
2282 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2283 RunAfterBoth d = null;
2284 Object r, s = null;
2285 if ((r = result) == null || (s = other.result) == null) {
2286 d = new RunAfterBoth(this, other, action, dst, e);
2287 CompletionNode q = null, p = new CompletionNode(d);
2288 while ((r == null && (r = result) == null) ||
2289 (s == null && (s = other.result) == null)) {
2290 if (q != null) {
2291 if (s != null ||
2292 UNSAFE.compareAndSwapObject
2293 (other, COMPLETIONS, q.next = other.completions, q))
2294 break;
2295 }
2296 else if (r != null ||
2297 UNSAFE.compareAndSwapObject
2298 (this, COMPLETIONS, p.next = completions, p)) {
2299 if (s != null)
2300 break;
2301 q = new CompletionNode(d);
2302 }
2303 }
2304 }
2305 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2306 Throwable ex;
2307 if (r instanceof AltResult)
2308 ex = ((AltResult)r).ex;
2309 else
2310 ex = null;
2311 if (ex == null && (s instanceof AltResult))
2312 ex = ((AltResult)s).ex;
2313 if (ex == null) {
2314 try {
2315 if (e != null)
2316 e.execute(new AsyncRun(action, dst));
2317 else
2318 action.run();
2319 } catch (Throwable rex) {
2320 ex = rex;
2321 }
2322 }
2323 if (e == null || ex != null)
2324 dst.internalComplete(null, ex);
2325 }
2326 helpPostComplete();
2327 other.helpPostComplete();
2328 return dst;
2329 }
2330
2331 /**
2332 * Returns a new CompletableFuture that is completed
2333 * when either this or the other given CompletableFuture completes,
2334 * with the result of the given function of either this or the other
2335 * CompletableFuture's result.
2336 *
2337 * <p>If this and/or the other CompletableFuture complete
2338 * exceptionally, then the returned CompletableFuture may also do so,
2339 * with a CompletionException holding one of these exceptions as its
2340 * cause. No guarantees are made about which result or exception is
2341 * used in the returned CompletableFuture. If the supplied function
2342 * throws an exception, then the returned CompletableFuture completes
2343 * exceptionally with a CompletionException holding the exception as
2344 * its cause.
2345 *
2346 * @param other the other CompletableFuture
2347 * @param fn the function to use to compute the value of
2348 * the returned CompletableFuture
2349 * @return the new CompletableFuture
2350 */
2351 public <U> CompletableFuture<U> applyToEither
2352 (CompletableFuture<? extends T> other,
2353 Fun<? super T, U> fn) {
2354 return doApplyToEither(other, fn, null);
2355 }
2356
2357 /**
2358 * Returns a new CompletableFuture that is asynchronously completed
2359 * when either this or the other given CompletableFuture completes,
2360 * with the result of the given function of either this or the other
2361 * CompletableFuture's result from a task running in the
2362 * {@link ForkJoinPool#commonPool()}.
2363 *
2364 * <p>If this and/or the other CompletableFuture complete
2365 * exceptionally, then the returned CompletableFuture may also do so,
2366 * with a CompletionException holding one of these exceptions as its
2367 * cause. No guarantees are made about which result or exception is
2368 * used in the returned CompletableFuture. If the supplied function
2369 * throws an exception, then the returned CompletableFuture completes
2370 * exceptionally with a CompletionException holding the exception as
2371 * its cause.
2372 *
2373 * @param other the other CompletableFuture
2374 * @param fn the function to use to compute the value of
2375 * the returned CompletableFuture
2376 * @return the new CompletableFuture
2377 */
2378 public <U> CompletableFuture<U> applyToEitherAsync
2379 (CompletableFuture<? extends T> other,
2380 Fun<? super T, U> fn) {
2381 return doApplyToEither(other, fn, ForkJoinPool.commonPool());
2382 }
2383
2384 /**
2385 * Returns a new CompletableFuture that is asynchronously completed
2386 * when either this or the other given CompletableFuture completes,
2387 * with the result of the given function of either this or the other
2388 * CompletableFuture's result from a task running in the
2389 * given executor.
2390 *
2391 * <p>If this and/or the other CompletableFuture complete
2392 * exceptionally, then the returned CompletableFuture may also do so,
2393 * with a CompletionException holding one of these exceptions as its
2394 * cause. No guarantees are made about which result or exception is
2395 * used in the returned CompletableFuture. If the supplied function
2396 * throws an exception, then the returned CompletableFuture completes
2397 * exceptionally with a CompletionException holding the exception as
2398 * its cause.
2399 *
2400 * @param other the other CompletableFuture
2401 * @param fn the function to use to compute the value of
2402 * the returned CompletableFuture
2403 * @param executor the executor to use for asynchronous execution
2404 * @return the new CompletableFuture
2405 */
2406 public <U> CompletableFuture<U> applyToEitherAsync
2407 (CompletableFuture<? extends T> other,
2408 Fun<? super T, U> fn,
2409 Executor executor) {
2410 if (executor == null) throw new NullPointerException();
2411 return doApplyToEither(other, fn, executor);
2412 }
2413
2414 private <U> CompletableFuture<U> doApplyToEither
2415 (CompletableFuture<? extends T> other,
2416 Fun<? super T, U> fn,
2417 Executor e) {
2418 if (other == null || fn == null) throw new NullPointerException();
2419 CompletableFuture<U> dst = new CompletableFuture<U>();
2420 ApplyToEither<T,U> d = null;
2421 Object r;
2422 if ((r = result) == null && (r = other.result) == null) {
2423 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
2424 CompletionNode q = null, p = new CompletionNode(d);
2425 while ((r = result) == null && (r = other.result) == null) {
2426 if (q != null) {
2427 if (UNSAFE.compareAndSwapObject
2428 (other, COMPLETIONS, q.next = other.completions, q))
2429 break;
2430 }
2431 else if (UNSAFE.compareAndSwapObject
2432 (this, COMPLETIONS, p.next = completions, p))
2433 q = new CompletionNode(d);
2434 }
2435 }
2436 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2437 T t; Throwable ex;
2438 if (r instanceof AltResult) {
2439 ex = ((AltResult)r).ex;
2440 t = null;
2441 }
2442 else {
2443 ex = null;
2444 @SuppressWarnings("unchecked") T tr = (T) r;
2445 t = tr;
2446 }
2447 U u = null;
2448 if (ex == null) {
2449 try {
2450 if (e != null)
2451 e.execute(new AsyncApply<T,U>(t, fn, dst));
2452 else
2453 u = fn.apply(t);
2454 } catch (Throwable rex) {
2455 ex = rex;
2456 }
2457 }
2458 if (e == null || ex != null)
2459 dst.internalComplete(u, ex);
2460 }
2461 helpPostComplete();
2462 other.helpPostComplete();
2463 return dst;
2464 }
2465
2466 /**
2467 * Returns a new CompletableFuture that is completed
2468 * when either this or the other given CompletableFuture completes,
2469 * after performing the given action with the result of either this
2470 * or the other CompletableFuture's result.
2471 *
2472 * <p>If this and/or the other CompletableFuture complete
2473 * exceptionally, then the returned CompletableFuture may also do so,
2474 * with a CompletionException holding one of these exceptions as its
2475 * cause. No guarantees are made about which result or exception is
2476 * used in the returned CompletableFuture. If the supplied action
2477 * throws an exception, then the returned CompletableFuture completes
2478 * exceptionally with a CompletionException holding the exception as
2479 * its cause.
2480 *
2481 * @param other the other CompletableFuture
2482 * @param block the action to perform before completing the
2483 * returned CompletableFuture
2484 * @return the new CompletableFuture
2485 */
2486 public CompletableFuture<Void> acceptEither
2487 (CompletableFuture<? extends T> other,
2488 Action<? super T> block) {
2489 return doAcceptEither(other, block, null);
2490 }
2491
2492 /**
2493 * Returns a new CompletableFuture that is asynchronously completed
2494 * when either this or the other given CompletableFuture completes,
2495 * after performing the given action with the result of either this
2496 * or the other CompletableFuture's result from a task running in
2497 * the {@link ForkJoinPool#commonPool()}.
2498 *
2499 * <p>If this and/or the other CompletableFuture complete
2500 * exceptionally, then the returned CompletableFuture may also do so,
2501 * with a CompletionException holding one of these exceptions as its
2502 * cause. No guarantees are made about which result or exception is
2503 * used in the returned CompletableFuture. If the supplied action
2504 * throws an exception, then the returned CompletableFuture completes
2505 * exceptionally with a CompletionException holding the exception as
2506 * its cause.
2507 *
2508 * @param other the other CompletableFuture
2509 * @param block the action to perform before completing the
2510 * returned CompletableFuture
2511 * @return the new CompletableFuture
2512 */
2513 public CompletableFuture<Void> acceptEitherAsync
2514 (CompletableFuture<? extends T> other,
2515 Action<? super T> block) {
2516 return doAcceptEither(other, block, ForkJoinPool.commonPool());
2517 }
2518
2519 /**
2520 * Returns a new CompletableFuture that is asynchronously completed
2521 * when either this or the other given CompletableFuture completes,
2522 * after performing the given action with the result of either this
2523 * or the other CompletableFuture's result from a task running in
2524 * the given executor.
2525 *
2526 * <p>If this and/or the other CompletableFuture complete
2527 * exceptionally, then the returned CompletableFuture may also do so,
2528 * with a CompletionException holding one of these exceptions as its
2529 * cause. No guarantees are made about which result or exception is
2530 * used in the returned CompletableFuture. If the supplied action
2531 * throws an exception, then the returned CompletableFuture completes
2532 * exceptionally with a CompletionException holding the exception as
2533 * its cause.
2534 *
2535 * @param other the other CompletableFuture
2536 * @param block the action to perform before completing the
2537 * returned CompletableFuture
2538 * @param executor the executor to use for asynchronous execution
2539 * @return the new CompletableFuture
2540 */
2541 public CompletableFuture<Void> acceptEitherAsync
2542 (CompletableFuture<? extends T> other,
2543 Action<? super T> block,
2544 Executor executor) {
2545 if (executor == null) throw new NullPointerException();
2546 return doAcceptEither(other, block, executor);
2547 }
2548
2549 private CompletableFuture<Void> doAcceptEither
2550 (CompletableFuture<? extends T> other,
2551 Action<? super T> fn,
2552 Executor e) {
2553 if (other == null || fn == null) throw new NullPointerException();
2554 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2555 AcceptEither<T> d = null;
2556 Object r;
2557 if ((r = result) == null && (r = other.result) == null) {
2558 d = new AcceptEither<T>(this, other, fn, dst, e);
2559 CompletionNode q = null, p = new CompletionNode(d);
2560 while ((r = result) == null && (r = other.result) == null) {
2561 if (q != null) {
2562 if (UNSAFE.compareAndSwapObject
2563 (other, COMPLETIONS, q.next = other.completions, q))
2564 break;
2565 }
2566 else if (UNSAFE.compareAndSwapObject
2567 (this, COMPLETIONS, p.next = completions, p))
2568 q = new CompletionNode(d);
2569 }
2570 }
2571 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2572 T t; Throwable ex;
2573 if (r instanceof AltResult) {
2574 ex = ((AltResult)r).ex;
2575 t = null;
2576 }
2577 else {
2578 ex = null;
2579 @SuppressWarnings("unchecked") T tr = (T) r;
2580 t = tr;
2581 }
2582 if (ex == null) {
2583 try {
2584 if (e != null)
2585 e.execute(new AsyncAccept<T>(t, fn, dst));
2586 else
2587 fn.accept(t);
2588 } catch (Throwable rex) {
2589 ex = rex;
2590 }
2591 }
2592 if (e == null || ex != null)
2593 dst.internalComplete(null, ex);
2594 }
2595 helpPostComplete();
2596 other.helpPostComplete();
2597 return dst;
2598 }
2599
2600 /**
2601 * Returns a new CompletableFuture that is completed
2602 * when either this or the other given CompletableFuture completes,
2603 * after performing the given action.
2604 *
2605 * <p>If this and/or the other CompletableFuture complete
2606 * exceptionally, then the returned CompletableFuture may also do so,
2607 * with a CompletionException holding one of these exceptions as its
2608 * cause. No guarantees are made about which result or exception is
2609 * used in the returned CompletableFuture. If the supplied action
2610 * throws an exception, then the returned CompletableFuture completes
2611 * exceptionally with a CompletionException holding the exception as
2612 * its cause.
2613 *
2614 * @param other the other CompletableFuture
2615 * @param action the action to perform before completing the
2616 * returned CompletableFuture
2617 * @return the new CompletableFuture
2618 */
2619 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2620 Runnable action) {
2621 return doRunAfterEither(other, action, null);
2622 }
2623
2624 /**
2625 * Returns a new CompletableFuture that is asynchronously completed
2626 * when either this or the other given CompletableFuture completes,
2627 * after performing the given action from a task running in the
2628 * {@link ForkJoinPool#commonPool()}.
2629 *
2630 * <p>If this and/or the other CompletableFuture complete
2631 * exceptionally, then the returned CompletableFuture may also do so,
2632 * with a CompletionException holding one of these exceptions as its
2633 * cause. No guarantees are made about which result or exception is
2634 * used in the returned CompletableFuture. If the supplied action
2635 * throws an exception, then the returned CompletableFuture completes
2636 * exceptionally with a CompletionException holding the exception as
2637 * its cause.
2638 *
2639 * @param other the other CompletableFuture
2640 * @param action the action to perform before completing the
2641 * returned CompletableFuture
2642 * @return the new CompletableFuture
2643 */
2644 public CompletableFuture<Void> runAfterEitherAsync
2645 (CompletableFuture<?> other,
2646 Runnable action) {
2647 return doRunAfterEither(other, action, ForkJoinPool.commonPool());
2648 }
2649
2650 /**
2651 * Returns a new CompletableFuture that is asynchronously completed
2652 * when either this or the other given CompletableFuture completes,
2653 * after performing the given action from a task running in the
2654 * given executor.
2655 *
2656 * <p>If this and/or the other CompletableFuture complete
2657 * exceptionally, then the returned CompletableFuture may also do so,
2658 * with a CompletionException holding one of these exceptions as its
2659 * cause. No guarantees are made about which result or exception is
2660 * used in the returned CompletableFuture. If the supplied action
2661 * throws an exception, then the returned CompletableFuture completes
2662 * exceptionally with a CompletionException holding the exception as
2663 * its cause.
2664 *
2665 * @param other the other CompletableFuture
2666 * @param action the action to perform before completing the
2667 * returned CompletableFuture
2668 * @param executor the executor to use for asynchronous execution
2669 * @return the new CompletableFuture
2670 */
2671 public CompletableFuture<Void> runAfterEitherAsync
2672 (CompletableFuture<?> other,
2673 Runnable action,
2674 Executor executor) {
2675 if (executor == null) throw new NullPointerException();
2676 return doRunAfterEither(other, action, executor);
2677 }
2678
2679 private CompletableFuture<Void> doRunAfterEither
2680 (CompletableFuture<?> other,
2681 Runnable action,
2682 Executor e) {
2683 if (other == null || action == null) throw new NullPointerException();
2684 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2685 RunAfterEither d = null;
2686 Object r;
2687 if ((r = result) == null && (r = other.result) == null) {
2688 d = new RunAfterEither(this, other, action, dst, e);
2689 CompletionNode q = null, p = new CompletionNode(d);
2690 while ((r = result) == null && (r = other.result) == null) {
2691 if (q != null) {
2692 if (UNSAFE.compareAndSwapObject
2693 (other, COMPLETIONS, q.next = other.completions, q))
2694 break;
2695 }
2696 else if (UNSAFE.compareAndSwapObject
2697 (this, COMPLETIONS, p.next = completions, p))
2698 q = new CompletionNode(d);
2699 }
2700 }
2701 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2702 Throwable ex;
2703 if (r instanceof AltResult)
2704 ex = ((AltResult)r).ex;
2705 else
2706 ex = null;
2707 if (ex == null) {
2708 try {
2709 if (e != null)
2710 e.execute(new AsyncRun(action, dst));
2711 else
2712 action.run();
2713 } catch (Throwable rex) {
2714 ex = rex;
2715 }
2716 }
2717 if (e == null || ex != null)
2718 dst.internalComplete(null, ex);
2719 }
2720 helpPostComplete();
2721 other.helpPostComplete();
2722 return dst;
2723 }
2724
2725 /**
2726 * Returns a CompletableFuture that upon completion, has the same
2727 * value as produced by the given function of the result of this
2728 * CompletableFuture.
2729 *
2730 * <p>If this CompletableFuture completes exceptionally, then the
2731 * returned CompletableFuture also does so, with a
2732 * CompletionException holding this exception as its cause.
2733 * Similarly, if the computed CompletableFuture completes
2734 * exceptionally, then so does the returned CompletableFuture.
2735 *
2736 * @param fn the function returning a new CompletableFuture
2737 * @return the CompletableFuture
2738 */
2739 public <U> CompletableFuture<U> thenCompose
2740 (Fun<? super T, CompletableFuture<U>> fn) {
2741 return doThenCompose(fn, null);
2742 }
2743
2744 /**
2745 * Returns a CompletableFuture that upon completion, has the same
2746 * value as that produced asynchronously using the {@link
2747 * ForkJoinPool#commonPool()} by the given function of the result
2748 * of this CompletableFuture.
2749 *
2750 * <p>If this CompletableFuture completes exceptionally, then the
2751 * returned CompletableFuture also does so, with a
2752 * CompletionException holding this exception as its cause.
2753 * Similarly, if the computed CompletableFuture completes
2754 * exceptionally, then so does the returned CompletableFuture.
2755 *
2756 * @param fn the function returning a new CompletableFuture
2757 * @return the CompletableFuture
2758 */
2759 public <U> CompletableFuture<U> thenComposeAsync
2760 (Fun<? super T, CompletableFuture<U>> fn) {
2761 return doThenCompose(fn, ForkJoinPool.commonPool());
2762 }
2763
2764 /**
2765 * Returns a CompletableFuture that upon completion, has the same
2766 * value as that produced asynchronously using the given executor
2767 * by the given function of this CompletableFuture.
2768 *
2769 * <p>If this CompletableFuture completes exceptionally, then the
2770 * returned CompletableFuture also does so, with a
2771 * CompletionException holding this exception as its cause.
2772 * Similarly, if the computed CompletableFuture completes
2773 * exceptionally, then so does the returned CompletableFuture.
2774 *
2775 * @param fn the function returning a new CompletableFuture
2776 * @param executor the executor to use for asynchronous execution
2777 * @return the CompletableFuture
2778 */
2779 public <U> CompletableFuture<U> thenComposeAsync
2780 (Fun<? super T, CompletableFuture<U>> fn,
2781 Executor executor) {
2782 if (executor == null) throw new NullPointerException();
2783 return doThenCompose(fn, executor);
2784 }
2785
2786 private <U> CompletableFuture<U> doThenCompose
2787 (Fun<? super T, CompletableFuture<U>> fn,
2788 Executor e) {
2789 if (fn == null) throw new NullPointerException();
2790 CompletableFuture<U> dst = null;
2791 ThenCompose<T,U> d = null;
2792 Object r;
2793 if ((r = result) == null) {
2794 dst = new CompletableFuture<U>();
2795 CompletionNode p = new CompletionNode
2796 (d = new ThenCompose<T,U>(this, fn, dst, e));
2797 while ((r = result) == null) {
2798 if (UNSAFE.compareAndSwapObject
2799 (this, COMPLETIONS, p.next = completions, p))
2800 break;
2801 }
2802 }
2803 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2804 T t; Throwable ex;
2805 if (r instanceof AltResult) {
2806 ex = ((AltResult)r).ex;
2807 t = null;
2808 }
2809 else {
2810 ex = null;
2811 @SuppressWarnings("unchecked") T tr = (T) r;
2812 t = tr;
2813 }
2814 if (ex == null) {
2815 if (e != null) {
2816 if (dst == null)
2817 dst = new CompletableFuture<U>();
2818 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2819 }
2820 else {
2821 try {
2822 if ((dst = fn.apply(t)) == null)
2823 ex = new NullPointerException();
2824 } catch (Throwable rex) {
2825 ex = rex;
2826 }
2827 }
2828 }
2829 if (dst == null)
2830 dst = new CompletableFuture<U>();
2831 if (e == null || ex != null)
2832 dst.internalComplete(null, ex);
2833 }
2834 helpPostComplete();
2835 dst.helpPostComplete();
2836 return dst;
2837 }
2838
2839 /**
2840 * Returns a new CompletableFuture that is completed when this
2841 * CompletableFuture completes, with the result of the given
2842 * function of the exception triggering this CompletableFuture's
2843 * completion when it completes exceptionally; otherwise, if this
2844 * CompletableFuture completes normally, then the returned
2845 * CompletableFuture also completes normally with the same value.
2846 *
2847 * @param fn the function to use to compute the value of the
2848 * returned CompletableFuture if this CompletableFuture completed
2849 * exceptionally
2850 * @return the new CompletableFuture
2851 */
2852 public CompletableFuture<T> exceptionally
2853 (Fun<Throwable, ? extends T> fn) {
2854 if (fn == null) throw new NullPointerException();
2855 CompletableFuture<T> dst = new CompletableFuture<T>();
2856 ExceptionCompletion<T> d = null;
2857 Object r;
2858 if ((r = result) == null) {
2859 CompletionNode p =
2860 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2861 while ((r = result) == null) {
2862 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2863 p.next = completions, p))
2864 break;
2865 }
2866 }
2867 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2868 T t = null; Throwable ex, dx = null;
2869 if (r instanceof AltResult) {
2870 if ((ex = ((AltResult)r).ex) != null) {
2871 try {
2872 t = fn.apply(ex);
2873 } catch (Throwable rex) {
2874 dx = rex;
2875 }
2876 }
2877 }
2878 else {
2879 @SuppressWarnings("unchecked") T tr = (T) r;
2880 t = tr;
2881 }
2882 dst.internalComplete(t, dx);
2883 }
2884 helpPostComplete();
2885 return dst;
2886 }
2887
2888 /**
2889 * Returns a new CompletableFuture that is completed when this
2890 * CompletableFuture completes, with the result of the given
2891 * function of the result and exception of this CompletableFuture's
2892 * completion. The given function is invoked with the result (or
2893 * {@code null} if none) and the exception (or {@code null} if none)
2894 * of this CompletableFuture when complete.
2895 *
2896 * @param fn the function to use to compute the value of the
2897 * returned CompletableFuture
2898 * @return the new CompletableFuture
2899 */
2900 public <U> CompletableFuture<U> handle
2901 (BiFun<? super T, Throwable, ? extends U> fn) {
2902 if (fn == null) throw new NullPointerException();
2903 CompletableFuture<U> dst = new CompletableFuture<U>();
2904 HandleCompletion<T,U> d = null;
2905 Object r;
2906 if ((r = result) == null) {
2907 CompletionNode p =
2908 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2909 while ((r = result) == null) {
2910 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2911 p.next = completions, p))
2912 break;
2913 }
2914 }
2915 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2916 T t; Throwable ex;
2917 if (r instanceof AltResult) {
2918 ex = ((AltResult)r).ex;
2919 t = null;
2920 }
2921 else {
2922 ex = null;
2923 @SuppressWarnings("unchecked") T tr = (T) r;
2924 t = tr;
2925 }
2926 U u; Throwable dx;
2927 try {
2928 u = fn.apply(t, ex);
2929 dx = null;
2930 } catch (Throwable rex) {
2931 dx = rex;
2932 u = null;
2933 }
2934 dst.internalComplete(u, dx);
2935 }
2936 helpPostComplete();
2937 return dst;
2938 }
2939
2940
2941 /* ------------- Arbitrary-arity constructions -------------- */
2942
2943 /*
2944 * The basic plan of attack is to recursively form binary
2945 * completion trees of elements. This can be overkill for small
2946 * sets, but scales nicely. The And/All vs Or/Any forms use the
2947 * same idea, but details differ.
2948 */
2949
2950 /**
2951 * Returns a new CompletableFuture that is completed when all of
2952 * the given CompletableFutures complete. If any of the given
2953 * CompletableFutures complete exceptionally, then the returned
2954 * CompletableFuture also does so, with a CompletionException
2955 * holding this exception as its cause. Otherwise, the results,
2956 * if any, of the given CompletableFutures are not reflected in
2957 * the returned CompletableFuture, but may be obtained by
2958 * inspecting them individually. If no CompletableFutures are
2959 * provided, returns a CompletableFuture completed with the value
2960 * {@code null}.
2961 *
2962 * <p>Among the applications of this method is to await completion
2963 * of a set of independent CompletableFutures before continuing a
2964 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2965 * c3).join();}.
2966 *
2967 * @param cfs the CompletableFutures
2968 * @return a new CompletableFuture that is completed when all of the
2969 * given CompletableFutures complete
2970 * @throws NullPointerException if the array or any of its elements are
2971 * {@code null}
2972 */
2973 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2974 int len = cfs.length; // Directly handle empty and singleton cases
2975 if (len > 1)
2976 return allTree(cfs, 0, len - 1);
2977 else {
2978 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2979 CompletableFuture<?> f;
2980 if (len == 0)
2981 dst.result = NIL;
2982 else if ((f = cfs[0]) == null)
2983 throw new NullPointerException();
2984 else {
2985 ThenPropagate d = null;
2986 CompletionNode p = null;
2987 Object r;
2988 while ((r = f.result) == null) {
2989 if (d == null)
2990 d = new ThenPropagate(f, dst);
2991 else if (p == null)
2992 p = new CompletionNode(d);
2993 else if (UNSAFE.compareAndSwapObject
2994 (f, COMPLETIONS, p.next = f.completions, p))
2995 break;
2996 }
2997 if (r != null && (d == null || d.compareAndSet(0, 1)))
2998 dst.internalComplete(null, (r instanceof AltResult) ?
2999 ((AltResult)r).ex : null);
3000 f.helpPostComplete();
3001 }
3002 return dst;
3003 }
3004 }
3005
3006 /**
3007 * Recursively constructs an And'ed tree of CompletableFutures.
3008 * Called only when array known to have at least two elements.
3009 */
3010 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
3011 int lo, int hi) {
3012 CompletableFuture<?> fst, snd;
3013 int mid = (lo + hi) >>> 1;
3014 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
3015 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
3016 throw new NullPointerException();
3017 CompletableFuture<Void> dst = new CompletableFuture<Void>();
3018 AndCompletion d = null;
3019 CompletionNode p = null, q = null;
3020 Object r = null, s = null;
3021 while ((r = fst.result) == null || (s = snd.result) == null) {
3022 if (d == null)
3023 d = new AndCompletion(fst, snd, dst);
3024 else if (p == null)
3025 p = new CompletionNode(d);
3026 else if (q == null) {
3027 if (UNSAFE.compareAndSwapObject
3028 (fst, COMPLETIONS, p.next = fst.completions, p))
3029 q = new CompletionNode(d);
3030 }
3031 else if (UNSAFE.compareAndSwapObject
3032 (snd, COMPLETIONS, q.next = snd.completions, q))
3033 break;
3034 }
3035 if ((r != null || (r = fst.result) != null) &&
3036 (s != null || (s = snd.result) != null) &&
3037 (d == null || d.compareAndSet(0, 1))) {
3038 Throwable ex;
3039 if (r instanceof AltResult)
3040 ex = ((AltResult)r).ex;
3041 else
3042 ex = null;
3043 if (ex == null && (s instanceof AltResult))
3044 ex = ((AltResult)s).ex;
3045 dst.internalComplete(null, ex);
3046 }
3047 fst.helpPostComplete();
3048 snd.helpPostComplete();
3049 return dst;
3050 }
3051
3052 /**
3053 * Returns a new CompletableFuture that is completed when any of
3054 * the given CompletableFutures complete, with the same result.
3055 * Otherwise, if it completed exceptionally, the returned
3056 * CompletableFuture also does so, with a CompletionException
3057 * holding this exception as its cause. If no CompletableFutures
3058 * are provided, returns an incomplete CompletableFuture.
3059 *
3060 * @param cfs the CompletableFutures
3061 * @return a new CompletableFuture that is completed with the
3062 * result or exception of any of the given CompletableFutures when
3063 * one completes
3064 * @throws NullPointerException if the array or any of its elements are
3065 * {@code null}
3066 */
3067 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
3068 int len = cfs.length; // Same idea as allOf
3069 if (len > 1)
3070 return anyTree(cfs, 0, len - 1);
3071 else {
3072 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3073 CompletableFuture<?> f;
3074 if (len == 0)
3075 ; // skip
3076 else if ((f = cfs[0]) == null)
3077 throw new NullPointerException();
3078 else {
3079 ThenCopy<Object> d = null;
3080 CompletionNode p = null;
3081 Object r;
3082 while ((r = f.result) == null) {
3083 if (d == null)
3084 d = new ThenCopy<Object>(f, dst);
3085 else if (p == null)
3086 p = new CompletionNode(d);
3087 else if (UNSAFE.compareAndSwapObject
3088 (f, COMPLETIONS, p.next = f.completions, p))
3089 break;
3090 }
3091 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3092 Throwable ex; Object t;
3093 if (r instanceof AltResult) {
3094 ex = ((AltResult)r).ex;
3095 t = null;
3096 }
3097 else {
3098 ex = null;
3099 t = r;
3100 }
3101 dst.internalComplete(t, ex);
3102 }
3103 f.helpPostComplete();
3104 }
3105 return dst;
3106 }
3107 }
3108
3109 /**
3110 * Recursively constructs an Or'ed tree of CompletableFutures.
3111 */
3112 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
3113 int lo, int hi) {
3114 CompletableFuture<?> fst, snd;
3115 int mid = (lo + hi) >>> 1;
3116 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3117 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3118 throw new NullPointerException();
3119 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3120 OrCompletion d = null;
3121 CompletionNode p = null, q = null;
3122 Object r;
3123 while ((r = fst.result) == null && (r = snd.result) == null) {
3124 if (d == null)
3125 d = new OrCompletion(fst, snd, dst);
3126 else if (p == null)
3127 p = new CompletionNode(d);
3128 else if (q == null) {
3129 if (UNSAFE.compareAndSwapObject
3130 (fst, COMPLETIONS, p.next = fst.completions, p))
3131 q = new CompletionNode(d);
3132 }
3133 else if (UNSAFE.compareAndSwapObject
3134 (snd, COMPLETIONS, q.next = snd.completions, q))
3135 break;
3136 }
3137 if ((r != null || (r = fst.result) != null ||
3138 (r = snd.result) != null) &&
3139 (d == null || d.compareAndSet(0, 1))) {
3140 Throwable ex; Object t;
3141 if (r instanceof AltResult) {
3142 ex = ((AltResult)r).ex;
3143 t = null;
3144 }
3145 else {
3146 ex = null;
3147 t = r;
3148 }
3149 dst.internalComplete(t, ex);
3150 }
3151 fst.helpPostComplete();
3152 snd.helpPostComplete();
3153 return dst;
3154 }
3155
3156 /* ------------- Control and status methods -------------- */
3157
3158 /**
3159 * If not already completed, completes this CompletableFuture with
3160 * a {@link CancellationException}. Dependent CompletableFutures
3161 * that have not already completed will also complete
3162 * exceptionally, with a {@link CompletionException} caused by
3163 * this {@code CancellationException}.
3164 *
3165 * @param mayInterruptIfRunning this value has no effect in this
3166 * implementation because interrupts are not used to control
3167 * processing.
3168 *
3169 * @return {@code true} if this task is now cancelled
3170 */
3171 public boolean cancel(boolean mayInterruptIfRunning) {
3172 boolean cancelled = (result == null) &&
3173 UNSAFE.compareAndSwapObject
3174 (this, RESULT, null, new AltResult(new CancellationException()));
3175 postComplete();
3176 return cancelled || isCancelled();
3177 }
3178
3179 /**
3180 * Returns {@code true} if this CompletableFuture was cancelled
3181 * before it completed normally.
3182 *
3183 * @return {@code true} if this CompletableFuture was cancelled
3184 * before it completed normally
3185 */
3186 public boolean isCancelled() {
3187 Object r;
3188 return ((r = result) instanceof AltResult) &&
3189 (((AltResult)r).ex instanceof CancellationException);
3190 }
3191
3192 /**
3193 * Forcibly sets or resets the value subsequently returned by
3194 * method {@link #get()} and related methods, whether or not
3195 * already completed. This method is designed for use only in
3196 * error recovery actions, and even in such situations may result
3197 * in ongoing dependent completions using established versus
3198 * overwritten outcomes.
3199 *
3200 * @param value the completion value
3201 */
3202 public void obtrudeValue(T value) {
3203 result = (value == null) ? NIL : value;
3204 postComplete();
3205 }
3206
3207 /**
3208 * Forcibly causes subsequent invocations of method {@link #get()}
3209 * and related methods to throw the given exception, whether or
3210 * not already completed. This method is designed for use only in
3211 * recovery actions, and even in such situations may result in
3212 * ongoing dependent completions using established versus
3213 * overwritten outcomes.
3214 *
3215 * @param ex the exception
3216 */
3217 public void obtrudeException(Throwable ex) {
3218 if (ex == null) throw new NullPointerException();
3219 result = new AltResult(ex);
3220 postComplete();
3221 }
3222
3223 /**
3224 * Returns the estimated number of CompletableFutures whose
3225 * completions are awaiting completion of this CompletableFuture.
3226 * This method is designed for use in monitoring system state, not
3227 * for synchronization control.
3228 *
3229 * @return the number of dependent CompletableFutures
3230 */
3231 public int getNumberOfDependents() {
3232 int count = 0;
3233 for (CompletionNode p = completions; p != null; p = p.next)
3234 ++count;
3235 return count;
3236 }
3237
3238 /**
3239 * Returns a string identifying this CompletableFuture, as well as
3240 * its completion state. The state, in brackets, contains the
3241 * String {@code "Completed Normally"} or the String {@code
3242 * "Completed Exceptionally"}, or the String {@code "Not
3243 * completed"} followed by the number of CompletableFutures
3244 * dependent upon its completion, if any.
3245 *
3246 * @return a string identifying this CompletableFuture, as well as its state
3247 */
3248 public String toString() {
3249 Object r = result;
3250 int count;
3251 return super.toString() +
3252 ((r == null) ?
3253 (((count = getNumberOfDependents()) == 0) ?
3254 "[Not completed]" :
3255 "[Not completed, " + count + " dependents]") :
3256 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3257 "[Completed exceptionally]" :
3258 "[Completed normally]"));
3259 }
3260
3261 // Unsafe mechanics
3262 private static final sun.misc.Unsafe UNSAFE;
3263 private static final long RESULT;
3264 private static final long WAITERS;
3265 private static final long COMPLETIONS;
3266 static {
3267 try {
3268 UNSAFE = getUnsafe();
3269 Class<?> k = CompletableFuture.class;
3270 RESULT = UNSAFE.objectFieldOffset
3271 (k.getDeclaredField("result"));
3272 WAITERS = UNSAFE.objectFieldOffset
3273 (k.getDeclaredField("waiters"));
3274 COMPLETIONS = UNSAFE.objectFieldOffset
3275 (k.getDeclaredField("completions"));
3276 } catch (Exception e) {
3277 throw new Error(e);
3278 }
3279 }
3280
3281 /**
3282 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
3283 * Replace with a simple call to Unsafe.getUnsafe when integrating
3284 * into a jdk.
3285 *
3286 * @return a sun.misc.Unsafe
3287 */
3288 private static sun.misc.Unsafe getUnsafe() {
3289 try {
3290 return sun.misc.Unsafe.getUnsafe();
3291 } catch (SecurityException tryReflectionInstead) {}
3292 try {
3293 return java.security.AccessController.doPrivileged
3294 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
3295 public sun.misc.Unsafe run() throws Exception {
3296 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
3297 for (java.lang.reflect.Field f : k.getDeclaredFields()) {
3298 f.setAccessible(true);
3299 Object x = f.get(null);
3300 if (k.isInstance(x))
3301 return k.cast(x);
3302 }
3303 throw new NoSuchFieldError("the Unsafe");
3304 }});
3305 } catch (java.security.PrivilegedActionException e) {
3306 throw new RuntimeException("Could not initialize intrinsics",
3307 e.getCause());
3308 }
3309 }
3310 }