ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.79
Committed: Wed Mar 27 21:36:10 2013 UTC (11 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.78: +3 -3 lines
Log Message:
whitespace

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on
37 * user-provided Functions, Consumers, or Runnables. The appropriate
38 * form to use depends on whether actions require arguments and/or
39 * produce results. Completion of a dependent action will trigger the
40 * completion of another CompletableFuture. Actions may also be
41 * triggered after either or both the current and another
42 * CompletableFuture complete. Multiple CompletableFutures may also
43 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44 * {@link #allOf(CompletableFuture...)}.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, actions supplied for dependent completions of another
48 * CompletableFuture may do so, depending on whether they are provided
49 * via one of the <em>async</em> methods (that is, methods with names
50 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51 * methods provide a way to commence asynchronous processing of an
52 * action using either a given {@link Executor} or by default the
53 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54 * debugging, and tracking, all generated asynchronous tasks are
55 * instances of the marker interface {@link AsynchronousCompletionTask}.
56 *
57 * <p>Actions supplied for dependent completions of <em>non-async</em>
58 * methods may be performed by the thread that completes the current
59 * CompletableFuture, or by any other caller of these methods. There
60 * are no guarantees about the order of processing completions unless
61 * constrained by these methods.
62 *
63 * <p>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional completion.
66 * Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}.
68 *
69 * <p>Upon exceptional completion (including cancellation), or when a
70 * completion entails an additional computation which terminates
71 * abruptly with an (unchecked) exception or error, then all of their
72 * dependent completions (and their dependents in turn) generally act
73 * as {@code completeExceptionally} with a {@link CompletionException}
74 * holding that exception as its cause. However, the {@link
75 * #exceptionally exceptionally} and {@link #handle handle}
76 * completions <em>are</em> able to handle exceptional completions of
77 * the CompletableFutures they depend on.
78 *
79 * <p>In case of exceptional completion with a CompletionException,
80 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81 * {@link ExecutionException} with the same cause as held in the
82 * corresponding CompletionException. However, in these cases,
83 * methods {@link #join()} and {@link #getNow} throw the
84 * CompletionException, which simplifies usage.
85 *
86 * @author Doug Lea
87 * @since 1.8
88 */
89 public class CompletableFuture<T> implements Future<T> {
90
91 /*
92 * Overview:
93 *
94 * 1. Non-nullness of field result (set via CAS) indicates done.
95 * An AltResult is used to box null as a result, as well as to
96 * hold exceptions. Using a single field makes completion fast
97 * and simple to detect and trigger, at the expense of a lot of
98 * encoding and decoding that infiltrates many methods. One minor
99 * simplification relies on the (static) NIL (to box null results)
100 * being the only AltResult with a null exception field, so we
101 * don't usually need explicit comparisons with NIL. The CF
102 * exception propagation mechanics surrounding decoding rely on
103 * unchecked casts of decoded results really being unchecked,
104 * where user type errors are caught at point of use, as is
105 * currently the case in Java. These are highlighted by using
106 * SuppressWarnings-annotated temporaries.
107 *
108 * 2. Waiters are held in a Treiber stack similar to the one used
109 * in FutureTask, Phaser, and SynchronousQueue. See their
110 * internal documentation for algorithmic details.
111 *
112 * 3. Completions are also kept in a list/stack, and pulled off
113 * and run when completion is triggered. (We could even use the
114 * same stack as for waiters, but would give up the potential
115 * parallelism obtained because woken waiters help release/run
116 * others -- see method postComplete). Because post-processing
117 * may race with direct calls, class Completion opportunistically
118 * extends AtomicInteger so callers can claim the action via
119 * compareAndSet(0, 1). The Completion.run methods are all
120 * written a boringly similar uniform way (that sometimes includes
121 * unnecessary-looking checks, kept to maintain uniformity).
122 * There are enough dimensions upon which they differ that
123 * attempts to factor commonalities while maintaining efficiency
124 * require more lines of code than they would save.
125 *
126 * 4. The exported then/and/or methods do support a bit of
127 * factoring (see doThenApply etc). They must cope with the
128 * intrinsic races surrounding addition of a dependent action
129 * versus performing the action directly because the task is
130 * already complete. For example, a CF may not be complete upon
131 * entry, so a dependent completion is added, but by the time it
132 * is added, the target CF is complete, so must be directly
133 * executed. This is all done while avoiding unnecessary object
134 * construction in safe-bypass cases.
135 */
136
137 // preliminaries
138
139 static final class AltResult {
140 final Throwable ex; // null only for NIL
141 AltResult(Throwable ex) { this.ex = ex; }
142 }
143
144 static final AltResult NIL = new AltResult(null);
145
146 // Fields
147
148 volatile Object result; // Either the result or boxed AltResult
149 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
150 volatile CompletionNode completions; // list (Treiber stack) of completions
151
152 // Basic utilities for triggering and processing completions
153
154 /**
155 * Removes and signals all waiting threads and runs all completions.
156 */
157 final void postComplete() {
158 WaitNode q; Thread t;
159 while ((q = waiters) != null) {
160 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
161 (t = q.thread) != null) {
162 q.thread = null;
163 LockSupport.unpark(t);
164 }
165 }
166
167 CompletionNode h; Completion c;
168 while ((h = completions) != null) {
169 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
170 (c = h.completion) != null)
171 c.run();
172 }
173 }
174
175 /**
176 * Triggers completion with the encoding of the given arguments:
177 * if the exception is non-null, encodes it as a wrapped
178 * CompletionException unless it is one already. Otherwise uses
179 * the given result, boxed as NIL if null.
180 */
181 final void internalComplete(T v, Throwable ex) {
182 if (result == null)
183 UNSAFE.compareAndSwapObject
184 (this, RESULT, null,
185 (ex == null) ? (v == null) ? NIL : v :
186 new AltResult((ex instanceof CompletionException) ? ex :
187 new CompletionException(ex)));
188 postComplete(); // help out even if not triggered
189 }
190
191 /**
192 * If triggered, helps release and/or process completions.
193 */
194 final void helpPostComplete() {
195 if (result != null)
196 postComplete();
197 }
198
199 /* ------------- waiting for completions -------------- */
200
201 /** Number of processors, for spin control */
202 static final int NCPU = Runtime.getRuntime().availableProcessors();
203
204 /**
205 * Heuristic spin value for waitingGet() before blocking on
206 * multiprocessors
207 */
208 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
209
210 /**
211 * Linked nodes to record waiting threads in a Treiber stack. See
212 * other classes such as Phaser and SynchronousQueue for more
213 * detailed explanation. This class implements ManagedBlocker to
214 * avoid starvation when blocking actions pile up in
215 * ForkJoinPools.
216 */
217 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
218 long nanos; // wait time if timed
219 final long deadline; // non-zero if timed
220 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
221 volatile Thread thread;
222 volatile WaitNode next;
223 WaitNode(boolean interruptible, long nanos, long deadline) {
224 this.thread = Thread.currentThread();
225 this.interruptControl = interruptible ? 1 : 0;
226 this.nanos = nanos;
227 this.deadline = deadline;
228 }
229 public boolean isReleasable() {
230 if (thread == null)
231 return true;
232 if (Thread.interrupted()) {
233 int i = interruptControl;
234 interruptControl = -1;
235 if (i > 0)
236 return true;
237 }
238 if (deadline != 0L &&
239 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
240 thread = null;
241 return true;
242 }
243 return false;
244 }
245 public boolean block() {
246 if (isReleasable())
247 return true;
248 else if (deadline == 0L)
249 LockSupport.park(this);
250 else if (nanos > 0L)
251 LockSupport.parkNanos(this, nanos);
252 return isReleasable();
253 }
254 }
255
256 /**
257 * Returns raw result after waiting, or null if interruptible and
258 * interrupted.
259 */
260 private Object waitingGet(boolean interruptible) {
261 WaitNode q = null;
262 boolean queued = false;
263 int spins = SPINS;
264 for (Object r;;) {
265 if ((r = result) != null) {
266 if (q != null) { // suppress unpark
267 q.thread = null;
268 if (q.interruptControl < 0) {
269 if (interruptible) {
270 removeWaiter(q);
271 return null;
272 }
273 Thread.currentThread().interrupt();
274 }
275 }
276 postComplete(); // help release others
277 return r;
278 }
279 else if (spins > 0) {
280 int rnd = ThreadLocalRandom.nextSecondarySeed();
281 if (rnd == 0)
282 rnd = ThreadLocalRandom.current().nextInt();
283 if (rnd >= 0)
284 --spins;
285 }
286 else if (q == null)
287 q = new WaitNode(interruptible, 0L, 0L);
288 else if (!queued)
289 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
290 q.next = waiters, q);
291 else if (interruptible && q.interruptControl < 0) {
292 removeWaiter(q);
293 return null;
294 }
295 else if (q.thread != null && result == null) {
296 try {
297 ForkJoinPool.managedBlock(q);
298 } catch (InterruptedException ex) {
299 q.interruptControl = -1;
300 }
301 }
302 }
303 }
304
305 /**
306 * Awaits completion or aborts on interrupt or timeout.
307 *
308 * @param nanos time to wait
309 * @return raw result
310 */
311 private Object timedAwaitDone(long nanos)
312 throws InterruptedException, TimeoutException {
313 WaitNode q = null;
314 boolean queued = false;
315 for (Object r;;) {
316 if ((r = result) != null) {
317 if (q != null) {
318 q.thread = null;
319 if (q.interruptControl < 0) {
320 removeWaiter(q);
321 throw new InterruptedException();
322 }
323 }
324 postComplete();
325 return r;
326 }
327 else if (q == null) {
328 if (nanos <= 0L)
329 throw new TimeoutException();
330 long d = System.nanoTime() + nanos;
331 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
332 }
333 else if (!queued)
334 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
335 q.next = waiters, q);
336 else if (q.interruptControl < 0) {
337 removeWaiter(q);
338 throw new InterruptedException();
339 }
340 else if (q.nanos <= 0L) {
341 if (result == null) {
342 removeWaiter(q);
343 throw new TimeoutException();
344 }
345 }
346 else if (q.thread != null && result == null) {
347 try {
348 ForkJoinPool.managedBlock(q);
349 } catch (InterruptedException ex) {
350 q.interruptControl = -1;
351 }
352 }
353 }
354 }
355
356 /**
357 * Tries to unlink a timed-out or interrupted wait node to avoid
358 * accumulating garbage. Internal nodes are simply unspliced
359 * without CAS since it is harmless if they are traversed anyway
360 * by releasers. To avoid effects of unsplicing from already
361 * removed nodes, the list is retraversed in case of an apparent
362 * race. This is slow when there are a lot of nodes, but we don't
363 * expect lists to be long enough to outweigh higher-overhead
364 * schemes.
365 */
366 private void removeWaiter(WaitNode node) {
367 if (node != null) {
368 node.thread = null;
369 retry:
370 for (;;) { // restart on removeWaiter race
371 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
372 s = q.next;
373 if (q.thread != null)
374 pred = q;
375 else if (pred != null) {
376 pred.next = s;
377 if (pred.thread == null) // check for race
378 continue retry;
379 }
380 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
381 continue retry;
382 }
383 break;
384 }
385 }
386 }
387
388 /* ------------- Async tasks -------------- */
389
390 /**
391 * A marker interface identifying asynchronous tasks produced by
392 * {@code async} methods. This may be useful for monitoring,
393 * debugging, and tracking asynchronous activities.
394 *
395 * @since 1.8
396 */
397 public static interface AsynchronousCompletionTask {
398 }
399
400 /** Base class can act as either FJ or plain Runnable */
401 abstract static class Async extends ForkJoinTask<Void>
402 implements Runnable, AsynchronousCompletionTask {
403 public final Void getRawResult() { return null; }
404 public final void setRawResult(Void v) { }
405 public final void run() { exec(); }
406 }
407
408 static final class AsyncRun extends Async {
409 final Runnable fn;
410 final CompletableFuture<Void> dst;
411 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
412 this.fn = fn; this.dst = dst;
413 }
414 public final boolean exec() {
415 CompletableFuture<Void> d; Throwable ex;
416 if ((d = this.dst) != null && d.result == null) {
417 try {
418 fn.run();
419 ex = null;
420 } catch (Throwable rex) {
421 ex = rex;
422 }
423 d.internalComplete(null, ex);
424 }
425 return true;
426 }
427 private static final long serialVersionUID = 5232453952276885070L;
428 }
429
430 static final class AsyncSupply<U> extends Async {
431 final Supplier<U> fn;
432 final CompletableFuture<U> dst;
433 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
434 this.fn = fn; this.dst = dst;
435 }
436 public final boolean exec() {
437 CompletableFuture<U> d; U u; Throwable ex;
438 if ((d = this.dst) != null && d.result == null) {
439 try {
440 u = fn.get();
441 ex = null;
442 } catch (Throwable rex) {
443 ex = rex;
444 u = null;
445 }
446 d.internalComplete(u, ex);
447 }
448 return true;
449 }
450 private static final long serialVersionUID = 5232453952276885070L;
451 }
452
453 static final class AsyncApply<T,U> extends Async {
454 final T arg;
455 final Function<? super T,? extends U> fn;
456 final CompletableFuture<U> dst;
457 AsyncApply(T arg, Function<? super T,? extends U> fn,
458 CompletableFuture<U> dst) {
459 this.arg = arg; this.fn = fn; this.dst = dst;
460 }
461 public final boolean exec() {
462 CompletableFuture<U> d; U u; Throwable ex;
463 if ((d = this.dst) != null && d.result == null) {
464 try {
465 u = fn.apply(arg);
466 ex = null;
467 } catch (Throwable rex) {
468 ex = rex;
469 u = null;
470 }
471 d.internalComplete(u, ex);
472 }
473 return true;
474 }
475 private static final long serialVersionUID = 5232453952276885070L;
476 }
477
478 static final class AsyncBiApply<T,U,V> extends Async {
479 final T arg1;
480 final U arg2;
481 final BiFunction<? super T,? super U,? extends V> fn;
482 final CompletableFuture<V> dst;
483 AsyncBiApply(T arg1, U arg2,
484 BiFunction<? super T,? super U,? extends V> fn,
485 CompletableFuture<V> dst) {
486 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
487 }
488 public final boolean exec() {
489 CompletableFuture<V> d; V v; Throwable ex;
490 if ((d = this.dst) != null && d.result == null) {
491 try {
492 v = fn.apply(arg1, arg2);
493 ex = null;
494 } catch (Throwable rex) {
495 ex = rex;
496 v = null;
497 }
498 d.internalComplete(v, ex);
499 }
500 return true;
501 }
502 private static final long serialVersionUID = 5232453952276885070L;
503 }
504
505 static final class AsyncAccept<T> extends Async {
506 final T arg;
507 final Consumer<? super T> fn;
508 final CompletableFuture<Void> dst;
509 AsyncAccept(T arg, Consumer<? super T> fn,
510 CompletableFuture<Void> dst) {
511 this.arg = arg; this.fn = fn; this.dst = dst;
512 }
513 public final boolean exec() {
514 CompletableFuture<Void> d; Throwable ex;
515 if ((d = this.dst) != null && d.result == null) {
516 try {
517 fn.accept(arg);
518 ex = null;
519 } catch (Throwable rex) {
520 ex = rex;
521 }
522 d.internalComplete(null, ex);
523 }
524 return true;
525 }
526 private static final long serialVersionUID = 5232453952276885070L;
527 }
528
529 static final class AsyncBiAccept<T,U> extends Async {
530 final T arg1;
531 final U arg2;
532 final BiConsumer<? super T,? super U> fn;
533 final CompletableFuture<Void> dst;
534 AsyncBiAccept(T arg1, U arg2,
535 BiConsumer<? super T,? super U> fn,
536 CompletableFuture<Void> dst) {
537 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
538 }
539 public final boolean exec() {
540 CompletableFuture<Void> d; Throwable ex;
541 if ((d = this.dst) != null && d.result == null) {
542 try {
543 fn.accept(arg1, arg2);
544 ex = null;
545 } catch (Throwable rex) {
546 ex = rex;
547 }
548 d.internalComplete(null, ex);
549 }
550 return true;
551 }
552 private static final long serialVersionUID = 5232453952276885070L;
553 }
554
555 static final class AsyncCompose<T,U> extends Async {
556 final T arg;
557 final Function<? super T, CompletableFuture<U>> fn;
558 final CompletableFuture<U> dst;
559 AsyncCompose(T arg,
560 Function<? super T, CompletableFuture<U>> fn,
561 CompletableFuture<U> dst) {
562 this.arg = arg; this.fn = fn; this.dst = dst;
563 }
564 public final boolean exec() {
565 CompletableFuture<U> d, fr; U u; Throwable ex;
566 if ((d = this.dst) != null && d.result == null) {
567 try {
568 fr = fn.apply(arg);
569 ex = (fr == null) ? new NullPointerException() : null;
570 } catch (Throwable rex) {
571 ex = rex;
572 fr = null;
573 }
574 if (ex != null)
575 u = null;
576 else {
577 Object r = fr.result;
578 if (r == null)
579 r = fr.waitingGet(false);
580 if (r instanceof AltResult) {
581 ex = ((AltResult)r).ex;
582 u = null;
583 }
584 else {
585 @SuppressWarnings("unchecked") U ur = (U) r;
586 u = ur;
587 }
588 }
589 d.internalComplete(u, ex);
590 }
591 return true;
592 }
593 private static final long serialVersionUID = 5232453952276885070L;
594 }
595
596 /* ------------- Completions -------------- */
597
598 /**
599 * Simple linked list nodes to record completions, used in
600 * basically the same way as WaitNodes. (We separate nodes from
601 * the Completions themselves mainly because for the And and Or
602 * methods, the same Completion object resides in two lists.)
603 */
604 static final class CompletionNode {
605 final Completion completion;
606 volatile CompletionNode next;
607 CompletionNode(Completion completion) { this.completion = completion; }
608 }
609
610 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
611 abstract static class Completion extends AtomicInteger implements Runnable {
612 }
613
614 static final class ApplyCompletion<T,U> extends Completion {
615 final CompletableFuture<? extends T> src;
616 final Function<? super T,? extends U> fn;
617 final CompletableFuture<U> dst;
618 final Executor executor;
619 ApplyCompletion(CompletableFuture<? extends T> src,
620 Function<? super T,? extends U> fn,
621 CompletableFuture<U> dst, Executor executor) {
622 this.src = src; this.fn = fn; this.dst = dst;
623 this.executor = executor;
624 }
625 public final void run() {
626 final CompletableFuture<? extends T> a;
627 final Function<? super T,? extends U> fn;
628 final CompletableFuture<U> dst;
629 Object r; T t; Throwable ex;
630 if ((dst = this.dst) != null &&
631 (fn = this.fn) != null &&
632 (a = this.src) != null &&
633 (r = a.result) != null &&
634 compareAndSet(0, 1)) {
635 if (r instanceof AltResult) {
636 ex = ((AltResult)r).ex;
637 t = null;
638 }
639 else {
640 ex = null;
641 @SuppressWarnings("unchecked") T tr = (T) r;
642 t = tr;
643 }
644 Executor e = executor;
645 U u = null;
646 if (ex == null) {
647 try {
648 if (e != null)
649 e.execute(new AsyncApply<T,U>(t, fn, dst));
650 else
651 u = fn.apply(t);
652 } catch (Throwable rex) {
653 ex = rex;
654 }
655 }
656 if (e == null || ex != null)
657 dst.internalComplete(u, ex);
658 }
659 }
660 private static final long serialVersionUID = 5232453952276885070L;
661 }
662
663 static final class AcceptCompletion<T> extends Completion {
664 final CompletableFuture<? extends T> src;
665 final Consumer<? super T> fn;
666 final CompletableFuture<Void> dst;
667 final Executor executor;
668 AcceptCompletion(CompletableFuture<? extends T> src,
669 Consumer<? super T> fn,
670 CompletableFuture<Void> dst, Executor executor) {
671 this.src = src; this.fn = fn; this.dst = dst;
672 this.executor = executor;
673 }
674 public final void run() {
675 final CompletableFuture<? extends T> a;
676 final Consumer<? super T> fn;
677 final CompletableFuture<Void> dst;
678 Object r; T t; Throwable ex;
679 if ((dst = this.dst) != null &&
680 (fn = this.fn) != null &&
681 (a = this.src) != null &&
682 (r = a.result) != null &&
683 compareAndSet(0, 1)) {
684 if (r instanceof AltResult) {
685 ex = ((AltResult)r).ex;
686 t = null;
687 }
688 else {
689 ex = null;
690 @SuppressWarnings("unchecked") T tr = (T) r;
691 t = tr;
692 }
693 Executor e = executor;
694 if (ex == null) {
695 try {
696 if (e != null)
697 e.execute(new AsyncAccept<T>(t, fn, dst));
698 else
699 fn.accept(t);
700 } catch (Throwable rex) {
701 ex = rex;
702 }
703 }
704 if (e == null || ex != null)
705 dst.internalComplete(null, ex);
706 }
707 }
708 private static final long serialVersionUID = 5232453952276885070L;
709 }
710
711 static final class RunCompletion<T> extends Completion {
712 final CompletableFuture<? extends T> src;
713 final Runnable fn;
714 final CompletableFuture<Void> dst;
715 final Executor executor;
716 RunCompletion(CompletableFuture<? extends T> src,
717 Runnable fn,
718 CompletableFuture<Void> dst,
719 Executor executor) {
720 this.src = src; this.fn = fn; this.dst = dst;
721 this.executor = executor;
722 }
723 public final void run() {
724 final CompletableFuture<? extends T> a;
725 final Runnable fn;
726 final CompletableFuture<Void> dst;
727 Object r; Throwable ex;
728 if ((dst = this.dst) != null &&
729 (fn = this.fn) != null &&
730 (a = this.src) != null &&
731 (r = a.result) != null &&
732 compareAndSet(0, 1)) {
733 if (r instanceof AltResult)
734 ex = ((AltResult)r).ex;
735 else
736 ex = null;
737 Executor e = executor;
738 if (ex == null) {
739 try {
740 if (e != null)
741 e.execute(new AsyncRun(fn, dst));
742 else
743 fn.run();
744 } catch (Throwable rex) {
745 ex = rex;
746 }
747 }
748 if (e == null || ex != null)
749 dst.internalComplete(null, ex);
750 }
751 }
752 private static final long serialVersionUID = 5232453952276885070L;
753 }
754
755 static final class BiApplyCompletion<T,U,V> extends Completion {
756 final CompletableFuture<? extends T> src;
757 final CompletableFuture<? extends U> snd;
758 final BiFunction<? super T,? super U,? extends V> fn;
759 final CompletableFuture<V> dst;
760 final Executor executor;
761 BiApplyCompletion(CompletableFuture<? extends T> src,
762 CompletableFuture<? extends U> snd,
763 BiFunction<? super T,? super U,? extends V> fn,
764 CompletableFuture<V> dst, Executor executor) {
765 this.src = src; this.snd = snd;
766 this.fn = fn; this.dst = dst;
767 this.executor = executor;
768 }
769 public final void run() {
770 final CompletableFuture<? extends T> a;
771 final CompletableFuture<? extends U> b;
772 final BiFunction<? super T,? super U,? extends V> fn;
773 final CompletableFuture<V> dst;
774 Object r, s; T t; U u; Throwable ex;
775 if ((dst = this.dst) != null &&
776 (fn = this.fn) != null &&
777 (a = this.src) != null &&
778 (r = a.result) != null &&
779 (b = this.snd) != null &&
780 (s = b.result) != null &&
781 compareAndSet(0, 1)) {
782 if (r instanceof AltResult) {
783 ex = ((AltResult)r).ex;
784 t = null;
785 }
786 else {
787 ex = null;
788 @SuppressWarnings("unchecked") T tr = (T) r;
789 t = tr;
790 }
791 if (ex != null)
792 u = null;
793 else if (s instanceof AltResult) {
794 ex = ((AltResult)s).ex;
795 u = null;
796 }
797 else {
798 @SuppressWarnings("unchecked") U us = (U) s;
799 u = us;
800 }
801 Executor e = executor;
802 V v = null;
803 if (ex == null) {
804 try {
805 if (e != null)
806 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
807 else
808 v = fn.apply(t, u);
809 } catch (Throwable rex) {
810 ex = rex;
811 }
812 }
813 if (e == null || ex != null)
814 dst.internalComplete(v, ex);
815 }
816 }
817 private static final long serialVersionUID = 5232453952276885070L;
818 }
819
820 static final class BiAcceptCompletion<T,U> extends Completion {
821 final CompletableFuture<? extends T> src;
822 final CompletableFuture<? extends U> snd;
823 final BiConsumer<? super T,? super U> fn;
824 final CompletableFuture<Void> dst;
825 final Executor executor;
826 BiAcceptCompletion(CompletableFuture<? extends T> src,
827 CompletableFuture<? extends U> snd,
828 BiConsumer<? super T,? super U> fn,
829 CompletableFuture<Void> dst, Executor executor) {
830 this.src = src; this.snd = snd;
831 this.fn = fn; this.dst = dst;
832 this.executor = executor;
833 }
834 public final void run() {
835 final CompletableFuture<? extends T> a;
836 final CompletableFuture<? extends U> b;
837 final BiConsumer<? super T,? super U> fn;
838 final CompletableFuture<Void> dst;
839 Object r, s; T t; U u; Throwable ex;
840 if ((dst = this.dst) != null &&
841 (fn = this.fn) != null &&
842 (a = this.src) != null &&
843 (r = a.result) != null &&
844 (b = this.snd) != null &&
845 (s = b.result) != null &&
846 compareAndSet(0, 1)) {
847 if (r instanceof AltResult) {
848 ex = ((AltResult)r).ex;
849 t = null;
850 }
851 else {
852 ex = null;
853 @SuppressWarnings("unchecked") T tr = (T) r;
854 t = tr;
855 }
856 if (ex != null)
857 u = null;
858 else if (s instanceof AltResult) {
859 ex = ((AltResult)s).ex;
860 u = null;
861 }
862 else {
863 @SuppressWarnings("unchecked") U us = (U) s;
864 u = us;
865 }
866 Executor e = executor;
867 if (ex == null) {
868 try {
869 if (e != null)
870 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
871 else
872 fn.accept(t, u);
873 } catch (Throwable rex) {
874 ex = rex;
875 }
876 }
877 if (e == null || ex != null)
878 dst.internalComplete(null, ex);
879 }
880 }
881 private static final long serialVersionUID = 5232453952276885070L;
882 }
883
884 static final class BiRunCompletion<T> extends Completion {
885 final CompletableFuture<? extends T> src;
886 final CompletableFuture<?> snd;
887 final Runnable fn;
888 final CompletableFuture<Void> dst;
889 final Executor executor;
890 BiRunCompletion(CompletableFuture<? extends T> src,
891 CompletableFuture<?> snd,
892 Runnable fn,
893 CompletableFuture<Void> dst, Executor executor) {
894 this.src = src; this.snd = snd;
895 this.fn = fn; this.dst = dst;
896 this.executor = executor;
897 }
898 public final void run() {
899 final CompletableFuture<? extends T> a;
900 final CompletableFuture<?> b;
901 final Runnable fn;
902 final CompletableFuture<Void> dst;
903 Object r, s; Throwable ex;
904 if ((dst = this.dst) != null &&
905 (fn = this.fn) != null &&
906 (a = this.src) != null &&
907 (r = a.result) != null &&
908 (b = this.snd) != null &&
909 (s = b.result) != null &&
910 compareAndSet(0, 1)) {
911 if (r instanceof AltResult)
912 ex = ((AltResult)r).ex;
913 else
914 ex = null;
915 if (ex == null && (s instanceof AltResult))
916 ex = ((AltResult)s).ex;
917 Executor e = executor;
918 if (ex == null) {
919 try {
920 if (e != null)
921 e.execute(new AsyncRun(fn, dst));
922 else
923 fn.run();
924 } catch (Throwable rex) {
925 ex = rex;
926 }
927 }
928 if (e == null || ex != null)
929 dst.internalComplete(null, ex);
930 }
931 }
932 private static final long serialVersionUID = 5232453952276885070L;
933 }
934
935 static final class AndCompletion extends Completion {
936 final CompletableFuture<?> src;
937 final CompletableFuture<?> snd;
938 final CompletableFuture<Void> dst;
939 AndCompletion(CompletableFuture<?> src,
940 CompletableFuture<?> snd,
941 CompletableFuture<Void> dst) {
942 this.src = src; this.snd = snd; this.dst = dst;
943 }
944 public final void run() {
945 final CompletableFuture<?> a;
946 final CompletableFuture<?> b;
947 final CompletableFuture<Void> dst;
948 Object r, s; Throwable ex;
949 if ((dst = this.dst) != null &&
950 (a = this.src) != null &&
951 (r = a.result) != null &&
952 (b = this.snd) != null &&
953 (s = b.result) != null &&
954 compareAndSet(0, 1)) {
955 if (r instanceof AltResult)
956 ex = ((AltResult)r).ex;
957 else
958 ex = null;
959 if (ex == null && (s instanceof AltResult))
960 ex = ((AltResult)s).ex;
961 dst.internalComplete(null, ex);
962 }
963 }
964 private static final long serialVersionUID = 5232453952276885070L;
965 }
966
967 static final class OrApplyCompletion<T,U> extends Completion {
968 final CompletableFuture<? extends T> src;
969 final CompletableFuture<? extends T> snd;
970 final Function<? super T,? extends U> fn;
971 final CompletableFuture<U> dst;
972 final Executor executor;
973 OrApplyCompletion(CompletableFuture<? extends T> src,
974 CompletableFuture<? extends T> snd,
975 Function<? super T,? extends U> fn,
976 CompletableFuture<U> dst, Executor executor) {
977 this.src = src; this.snd = snd;
978 this.fn = fn; this.dst = dst;
979 this.executor = executor;
980 }
981 public final void run() {
982 final CompletableFuture<? extends T> a;
983 final CompletableFuture<? extends T> b;
984 final Function<? super T,? extends U> fn;
985 final CompletableFuture<U> dst;
986 Object r; T t; Throwable ex;
987 if ((dst = this.dst) != null &&
988 (fn = this.fn) != null &&
989 (((a = this.src) != null && (r = a.result) != null) ||
990 ((b = this.snd) != null && (r = b.result) != null)) &&
991 compareAndSet(0, 1)) {
992 if (r instanceof AltResult) {
993 ex = ((AltResult)r).ex;
994 t = null;
995 }
996 else {
997 ex = null;
998 @SuppressWarnings("unchecked") T tr = (T) r;
999 t = tr;
1000 }
1001 Executor e = executor;
1002 U u = null;
1003 if (ex == null) {
1004 try {
1005 if (e != null)
1006 e.execute(new AsyncApply<T,U>(t, fn, dst));
1007 else
1008 u = fn.apply(t);
1009 } catch (Throwable rex) {
1010 ex = rex;
1011 }
1012 }
1013 if (e == null || ex != null)
1014 dst.internalComplete(u, ex);
1015 }
1016 }
1017 private static final long serialVersionUID = 5232453952276885070L;
1018 }
1019
1020 static final class OrAcceptCompletion<T> extends Completion {
1021 final CompletableFuture<? extends T> src;
1022 final CompletableFuture<? extends T> snd;
1023 final Consumer<? super T> fn;
1024 final CompletableFuture<Void> dst;
1025 final Executor executor;
1026 OrAcceptCompletion(CompletableFuture<? extends T> src,
1027 CompletableFuture<? extends T> snd,
1028 Consumer<? super T> fn,
1029 CompletableFuture<Void> dst, Executor executor) {
1030 this.src = src; this.snd = snd;
1031 this.fn = fn; this.dst = dst;
1032 this.executor = executor;
1033 }
1034 public final void run() {
1035 final CompletableFuture<? extends T> a;
1036 final CompletableFuture<? extends T> b;
1037 final Consumer<? super T> fn;
1038 final CompletableFuture<Void> dst;
1039 Object r; T t; Throwable ex;
1040 if ((dst = this.dst) != null &&
1041 (fn = this.fn) != null &&
1042 (((a = this.src) != null && (r = a.result) != null) ||
1043 ((b = this.snd) != null && (r = b.result) != null)) &&
1044 compareAndSet(0, 1)) {
1045 if (r instanceof AltResult) {
1046 ex = ((AltResult)r).ex;
1047 t = null;
1048 }
1049 else {
1050 ex = null;
1051 @SuppressWarnings("unchecked") T tr = (T) r;
1052 t = tr;
1053 }
1054 Executor e = executor;
1055 if (ex == null) {
1056 try {
1057 if (e != null)
1058 e.execute(new AsyncAccept<T>(t, fn, dst));
1059 else
1060 fn.accept(t);
1061 } catch (Throwable rex) {
1062 ex = rex;
1063 }
1064 }
1065 if (e == null || ex != null)
1066 dst.internalComplete(null, ex);
1067 }
1068 }
1069 private static final long serialVersionUID = 5232453952276885070L;
1070 }
1071
1072 static final class OrRunCompletion<T> extends Completion {
1073 final CompletableFuture<? extends T> src;
1074 final CompletableFuture<?> snd;
1075 final Runnable fn;
1076 final CompletableFuture<Void> dst;
1077 final Executor executor;
1078 OrRunCompletion(CompletableFuture<? extends T> src,
1079 CompletableFuture<?> snd,
1080 Runnable fn,
1081 CompletableFuture<Void> dst, Executor executor) {
1082 this.src = src; this.snd = snd;
1083 this.fn = fn; this.dst = dst;
1084 this.executor = executor;
1085 }
1086 public final void run() {
1087 final CompletableFuture<? extends T> a;
1088 final CompletableFuture<?> b;
1089 final Runnable fn;
1090 final CompletableFuture<Void> dst;
1091 Object r; Throwable ex;
1092 if ((dst = this.dst) != null &&
1093 (fn = this.fn) != null &&
1094 (((a = this.src) != null && (r = a.result) != null) ||
1095 ((b = this.snd) != null && (r = b.result) != null)) &&
1096 compareAndSet(0, 1)) {
1097 if (r instanceof AltResult)
1098 ex = ((AltResult)r).ex;
1099 else
1100 ex = null;
1101 Executor e = executor;
1102 if (ex == null) {
1103 try {
1104 if (e != null)
1105 e.execute(new AsyncRun(fn, dst));
1106 else
1107 fn.run();
1108 } catch (Throwable rex) {
1109 ex = rex;
1110 }
1111 }
1112 if (e == null || ex != null)
1113 dst.internalComplete(null, ex);
1114 }
1115 }
1116 private static final long serialVersionUID = 5232453952276885070L;
1117 }
1118
1119 static final class OrCompletion extends Completion {
1120 final CompletableFuture<?> src;
1121 final CompletableFuture<?> snd;
1122 final CompletableFuture<Object> dst;
1123 OrCompletion(CompletableFuture<?> src,
1124 CompletableFuture<?> snd,
1125 CompletableFuture<Object> dst) {
1126 this.src = src; this.snd = snd; this.dst = dst;
1127 }
1128 public final void run() {
1129 final CompletableFuture<?> a;
1130 final CompletableFuture<?> b;
1131 final CompletableFuture<Object> dst;
1132 Object r, t; Throwable ex;
1133 if ((dst = this.dst) != null &&
1134 (((a = this.src) != null && (r = a.result) != null) ||
1135 ((b = this.snd) != null && (r = b.result) != null)) &&
1136 compareAndSet(0, 1)) {
1137 if (r instanceof AltResult) {
1138 ex = ((AltResult)r).ex;
1139 t = null;
1140 }
1141 else {
1142 ex = null;
1143 t = r;
1144 }
1145 dst.internalComplete(t, ex);
1146 }
1147 }
1148 private static final long serialVersionUID = 5232453952276885070L;
1149 }
1150
1151 static final class ExceptionCompletion<T> extends Completion {
1152 final CompletableFuture<? extends T> src;
1153 final Function<? super Throwable, ? extends T> fn;
1154 final CompletableFuture<T> dst;
1155 ExceptionCompletion(CompletableFuture<? extends T> src,
1156 Function<? super Throwable, ? extends T> fn,
1157 CompletableFuture<T> dst) {
1158 this.src = src; this.fn = fn; this.dst = dst;
1159 }
1160 public final void run() {
1161 final CompletableFuture<? extends T> a;
1162 final Function<? super Throwable, ? extends T> fn;
1163 final CompletableFuture<T> dst;
1164 Object r; T t = null; Throwable ex, dx = null;
1165 if ((dst = this.dst) != null &&
1166 (fn = this.fn) != null &&
1167 (a = this.src) != null &&
1168 (r = a.result) != null &&
1169 compareAndSet(0, 1)) {
1170 if ((r instanceof AltResult) &&
1171 (ex = ((AltResult)r).ex) != null) {
1172 try {
1173 t = fn.apply(ex);
1174 } catch (Throwable rex) {
1175 dx = rex;
1176 }
1177 }
1178 else {
1179 @SuppressWarnings("unchecked") T tr = (T) r;
1180 t = tr;
1181 }
1182 dst.internalComplete(t, dx);
1183 }
1184 }
1185 private static final long serialVersionUID = 5232453952276885070L;
1186 }
1187
1188 static final class ThenCopy<T> extends Completion {
1189 final CompletableFuture<?> src;
1190 final CompletableFuture<T> dst;
1191 ThenCopy(CompletableFuture<?> src,
1192 CompletableFuture<T> dst) {
1193 this.src = src; this.dst = dst;
1194 }
1195 public final void run() {
1196 final CompletableFuture<?> a;
1197 final CompletableFuture<T> dst;
1198 Object r; T t; Throwable ex;
1199 if ((dst = this.dst) != null &&
1200 (a = this.src) != null &&
1201 (r = a.result) != null &&
1202 compareAndSet(0, 1)) {
1203 if (r instanceof AltResult) {
1204 ex = ((AltResult)r).ex;
1205 t = null;
1206 }
1207 else {
1208 ex = null;
1209 @SuppressWarnings("unchecked") T tr = (T) r;
1210 t = tr;
1211 }
1212 dst.internalComplete(t, ex);
1213 }
1214 }
1215 private static final long serialVersionUID = 5232453952276885070L;
1216 }
1217
1218 // version of ThenCopy for CompletableFuture<Void> dst
1219 static final class ThenPropagate extends Completion {
1220 final CompletableFuture<?> src;
1221 final CompletableFuture<Void> dst;
1222 ThenPropagate(CompletableFuture<?> src,
1223 CompletableFuture<Void> dst) {
1224 this.src = src; this.dst = dst;
1225 }
1226 public final void run() {
1227 final CompletableFuture<?> a;
1228 final CompletableFuture<Void> dst;
1229 Object r; Throwable ex;
1230 if ((dst = this.dst) != null &&
1231 (a = this.src) != null &&
1232 (r = a.result) != null &&
1233 compareAndSet(0, 1)) {
1234 if (r instanceof AltResult)
1235 ex = ((AltResult)r).ex;
1236 else
1237 ex = null;
1238 dst.internalComplete(null, ex);
1239 }
1240 }
1241 private static final long serialVersionUID = 5232453952276885070L;
1242 }
1243
1244 static final class HandleCompletion<T,U> extends Completion {
1245 final CompletableFuture<? extends T> src;
1246 final BiFunction<? super T, Throwable, ? extends U> fn;
1247 final CompletableFuture<U> dst;
1248 HandleCompletion(CompletableFuture<? extends T> src,
1249 BiFunction<? super T, Throwable, ? extends U> fn,
1250 CompletableFuture<U> dst) {
1251 this.src = src; this.fn = fn; this.dst = dst;
1252 }
1253 public final void run() {
1254 final CompletableFuture<? extends T> a;
1255 final BiFunction<? super T, Throwable, ? extends U> fn;
1256 final CompletableFuture<U> dst;
1257 Object r; T t; Throwable ex;
1258 if ((dst = this.dst) != null &&
1259 (fn = this.fn) != null &&
1260 (a = this.src) != null &&
1261 (r = a.result) != null &&
1262 compareAndSet(0, 1)) {
1263 if (r instanceof AltResult) {
1264 ex = ((AltResult)r).ex;
1265 t = null;
1266 }
1267 else {
1268 ex = null;
1269 @SuppressWarnings("unchecked") T tr = (T) r;
1270 t = tr;
1271 }
1272 U u = null; Throwable dx = null;
1273 try {
1274 u = fn.apply(t, ex);
1275 } catch (Throwable rex) {
1276 dx = rex;
1277 }
1278 dst.internalComplete(u, dx);
1279 }
1280 }
1281 private static final long serialVersionUID = 5232453952276885070L;
1282 }
1283
1284 static final class ComposeCompletion<T,U> extends Completion {
1285 final CompletableFuture<? extends T> src;
1286 final Function<? super T, CompletableFuture<U>> fn;
1287 final CompletableFuture<U> dst;
1288 final Executor executor;
1289 ComposeCompletion(CompletableFuture<? extends T> src,
1290 Function<? super T, CompletableFuture<U>> fn,
1291 CompletableFuture<U> dst, Executor executor) {
1292 this.src = src; this.fn = fn; this.dst = dst;
1293 this.executor = executor;
1294 }
1295 public final void run() {
1296 final CompletableFuture<? extends T> a;
1297 final Function<? super T, CompletableFuture<U>> fn;
1298 final CompletableFuture<U> dst;
1299 Object r; T t; Throwable ex; Executor e;
1300 if ((dst = this.dst) != null &&
1301 (fn = this.fn) != null &&
1302 (a = this.src) != null &&
1303 (r = a.result) != null &&
1304 compareAndSet(0, 1)) {
1305 if (r instanceof AltResult) {
1306 ex = ((AltResult)r).ex;
1307 t = null;
1308 }
1309 else {
1310 ex = null;
1311 @SuppressWarnings("unchecked") T tr = (T) r;
1312 t = tr;
1313 }
1314 CompletableFuture<U> c = null;
1315 U u = null;
1316 boolean complete = false;
1317 if (ex == null) {
1318 if ((e = executor) != null)
1319 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1320 else {
1321 try {
1322 if ((c = fn.apply(t)) == null)
1323 ex = new NullPointerException();
1324 } catch (Throwable rex) {
1325 ex = rex;
1326 }
1327 }
1328 }
1329 if (c != null) {
1330 ThenCopy<U> d = null;
1331 Object s;
1332 if ((s = c.result) == null) {
1333 CompletionNode p = new CompletionNode
1334 (d = new ThenCopy<U>(c, dst));
1335 while ((s = c.result) == null) {
1336 if (UNSAFE.compareAndSwapObject
1337 (c, COMPLETIONS, p.next = c.completions, p))
1338 break;
1339 }
1340 }
1341 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1342 complete = true;
1343 if (s instanceof AltResult) {
1344 ex = ((AltResult)s).ex; // no rewrap
1345 u = null;
1346 }
1347 else {
1348 @SuppressWarnings("unchecked") U us = (U) s;
1349 u = us;
1350 }
1351 }
1352 }
1353 if (complete || ex != null)
1354 dst.internalComplete(u, ex);
1355 if (c != null)
1356 c.helpPostComplete();
1357 }
1358 }
1359 private static final long serialVersionUID = 5232453952276885070L;
1360 }
1361
1362 // public methods
1363
1364 /**
1365 * Creates a new incomplete CompletableFuture.
1366 */
1367 public CompletableFuture() {
1368 }
1369
1370 /**
1371 * Returns a new CompletableFuture that is asynchronously completed
1372 * by a task running in the {@link ForkJoinPool#commonPool()} with
1373 * the value obtained by calling the given Supplier.
1374 *
1375 * @param supplier a function returning the value to be used
1376 * to complete the returned CompletableFuture
1377 * @return the new CompletableFuture
1378 */
1379 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1380 if (supplier == null) throw new NullPointerException();
1381 CompletableFuture<U> f = new CompletableFuture<U>();
1382 ForkJoinPool.commonPool().
1383 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1384 return f;
1385 }
1386
1387 /**
1388 * Returns a new CompletableFuture that is asynchronously completed
1389 * by a task running in the given executor with the value obtained
1390 * by calling the given Supplier.
1391 *
1392 * @param supplier a function returning the value to be used
1393 * to complete the returned CompletableFuture
1394 * @param executor the executor to use for asynchronous execution
1395 * @return the new CompletableFuture
1396 */
1397 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1398 Executor executor) {
1399 if (executor == null || supplier == null)
1400 throw new NullPointerException();
1401 CompletableFuture<U> f = new CompletableFuture<U>();
1402 executor.execute(new AsyncSupply<U>(supplier, f));
1403 return f;
1404 }
1405
1406 /**
1407 * Returns a new CompletableFuture that is asynchronously completed
1408 * by a task running in the {@link ForkJoinPool#commonPool()} after
1409 * it runs the given action.
1410 *
1411 * @param runnable the action to run before completing the
1412 * returned CompletableFuture
1413 * @return the new CompletableFuture
1414 */
1415 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1416 if (runnable == null) throw new NullPointerException();
1417 CompletableFuture<Void> f = new CompletableFuture<Void>();
1418 ForkJoinPool.commonPool().
1419 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1420 return f;
1421 }
1422
1423 /**
1424 * Returns a new CompletableFuture that is asynchronously completed
1425 * by a task running in the given executor after it runs the given
1426 * action.
1427 *
1428 * @param runnable the action to run before completing the
1429 * returned CompletableFuture
1430 * @param executor the executor to use for asynchronous execution
1431 * @return the new CompletableFuture
1432 */
1433 public static CompletableFuture<Void> runAsync(Runnable runnable,
1434 Executor executor) {
1435 if (executor == null || runnable == null)
1436 throw new NullPointerException();
1437 CompletableFuture<Void> f = new CompletableFuture<Void>();
1438 executor.execute(new AsyncRun(runnable, f));
1439 return f;
1440 }
1441
1442 /**
1443 * Returns a new CompletableFuture that is already completed with
1444 * the given value.
1445 *
1446 * @param value the value
1447 * @return the completed CompletableFuture
1448 */
1449 public static <U> CompletableFuture<U> completedFuture(U value) {
1450 CompletableFuture<U> f = new CompletableFuture<U>();
1451 f.result = (value == null) ? NIL : value;
1452 return f;
1453 }
1454
1455 /**
1456 * Returns {@code true} if completed in any fashion: normally,
1457 * exceptionally, or via cancellation.
1458 *
1459 * @return {@code true} if completed
1460 */
1461 public boolean isDone() {
1462 return result != null;
1463 }
1464
1465 /**
1466 * Waits if necessary for this future to complete, and then
1467 * returns its result.
1468 *
1469 * @return the result value
1470 * @throws CancellationException if this future was cancelled
1471 * @throws ExecutionException if this future completed exceptionally
1472 * @throws InterruptedException if the current thread was interrupted
1473 * while waiting
1474 */
1475 public T get() throws InterruptedException, ExecutionException {
1476 Object r; Throwable ex, cause;
1477 if ((r = result) == null && (r = waitingGet(true)) == null)
1478 throw new InterruptedException();
1479 if (!(r instanceof AltResult)) {
1480 @SuppressWarnings("unchecked") T tr = (T) r;
1481 return tr;
1482 }
1483 if ((ex = ((AltResult)r).ex) == null)
1484 return null;
1485 if (ex instanceof CancellationException)
1486 throw (CancellationException)ex;
1487 if ((ex instanceof CompletionException) &&
1488 (cause = ex.getCause()) != null)
1489 ex = cause;
1490 throw new ExecutionException(ex);
1491 }
1492
1493 /**
1494 * Waits if necessary for at most the given time for this future
1495 * to complete, and then returns its result, if available.
1496 *
1497 * @param timeout the maximum time to wait
1498 * @param unit the time unit of the timeout argument
1499 * @return the result value
1500 * @throws CancellationException if this future was cancelled
1501 * @throws ExecutionException if this future completed exceptionally
1502 * @throws InterruptedException if the current thread was interrupted
1503 * while waiting
1504 * @throws TimeoutException if the wait timed out
1505 */
1506 public T get(long timeout, TimeUnit unit)
1507 throws InterruptedException, ExecutionException, TimeoutException {
1508 Object r; Throwable ex, cause;
1509 long nanos = unit.toNanos(timeout);
1510 if (Thread.interrupted())
1511 throw new InterruptedException();
1512 if ((r = result) == null)
1513 r = timedAwaitDone(nanos);
1514 if (!(r instanceof AltResult)) {
1515 @SuppressWarnings("unchecked") T tr = (T) r;
1516 return tr;
1517 }
1518 if ((ex = ((AltResult)r).ex) == null)
1519 return null;
1520 if (ex instanceof CancellationException)
1521 throw (CancellationException)ex;
1522 if ((ex instanceof CompletionException) &&
1523 (cause = ex.getCause()) != null)
1524 ex = cause;
1525 throw new ExecutionException(ex);
1526 }
1527
1528 /**
1529 * Returns the result value when complete, or throws an
1530 * (unchecked) exception if completed exceptionally. To better
1531 * conform with the use of common functional forms, if a
1532 * computation involved in the completion of this
1533 * CompletableFuture threw an exception, this method throws an
1534 * (unchecked) {@link CompletionException} with the underlying
1535 * exception as its cause.
1536 *
1537 * @return the result value
1538 * @throws CancellationException if the computation was cancelled
1539 * @throws CompletionException if this future completed
1540 * exceptionally or a completion computation threw an exception
1541 */
1542 public T join() {
1543 Object r; Throwable ex;
1544 if ((r = result) == null)
1545 r = waitingGet(false);
1546 if (!(r instanceof AltResult)) {
1547 @SuppressWarnings("unchecked") T tr = (T) r;
1548 return tr;
1549 }
1550 if ((ex = ((AltResult)r).ex) == null)
1551 return null;
1552 if (ex instanceof CancellationException)
1553 throw (CancellationException)ex;
1554 if (ex instanceof CompletionException)
1555 throw (CompletionException)ex;
1556 throw new CompletionException(ex);
1557 }
1558
1559 /**
1560 * Returns the result value (or throws any encountered exception)
1561 * if completed, else returns the given valueIfAbsent.
1562 *
1563 * @param valueIfAbsent the value to return if not completed
1564 * @return the result value, if completed, else the given valueIfAbsent
1565 * @throws CancellationException if the computation was cancelled
1566 * @throws CompletionException if this future completed
1567 * exceptionally or a completion computation threw an exception
1568 */
1569 public T getNow(T valueIfAbsent) {
1570 Object r; Throwable ex;
1571 if ((r = result) == null)
1572 return valueIfAbsent;
1573 if (!(r instanceof AltResult)) {
1574 @SuppressWarnings("unchecked") T tr = (T) r;
1575 return tr;
1576 }
1577 if ((ex = ((AltResult)r).ex) == null)
1578 return null;
1579 if (ex instanceof CancellationException)
1580 throw (CancellationException)ex;
1581 if (ex instanceof CompletionException)
1582 throw (CompletionException)ex;
1583 throw new CompletionException(ex);
1584 }
1585
1586 /**
1587 * If not already completed, sets the value returned by {@link
1588 * #get()} and related methods to the given value.
1589 *
1590 * @param value the result value
1591 * @return {@code true} if this invocation caused this CompletableFuture
1592 * to transition to a completed state, else {@code false}
1593 */
1594 public boolean complete(T value) {
1595 boolean triggered = result == null &&
1596 UNSAFE.compareAndSwapObject(this, RESULT, null,
1597 value == null ? NIL : value);
1598 postComplete();
1599 return triggered;
1600 }
1601
1602 /**
1603 * If not already completed, causes invocations of {@link #get()}
1604 * and related methods to throw the given exception.
1605 *
1606 * @param ex the exception
1607 * @return {@code true} if this invocation caused this CompletableFuture
1608 * to transition to a completed state, else {@code false}
1609 */
1610 public boolean completeExceptionally(Throwable ex) {
1611 if (ex == null) throw new NullPointerException();
1612 boolean triggered = result == null &&
1613 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1614 postComplete();
1615 return triggered;
1616 }
1617
1618 /**
1619 * Returns a new CompletableFuture that is completed
1620 * when this CompletableFuture completes, with the result of the
1621 * given function of this CompletableFuture's result.
1622 *
1623 * <p>If this CompletableFuture completes exceptionally, or the
1624 * supplied function throws an exception, then the returned
1625 * CompletableFuture completes exceptionally with a
1626 * CompletionException holding the exception as its cause.
1627 *
1628 * @param fn the function to use to compute the value of
1629 * the returned CompletableFuture
1630 * @return the new CompletableFuture
1631 */
1632 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1633 return doThenApply(fn, null);
1634 }
1635
1636 /**
1637 * Returns a new CompletableFuture that is asynchronously completed
1638 * when this CompletableFuture completes, with the result of the
1639 * given function of this CompletableFuture's result from a
1640 * task running in the {@link ForkJoinPool#commonPool()}.
1641 *
1642 * <p>If this CompletableFuture completes exceptionally, or the
1643 * supplied function throws an exception, then the returned
1644 * CompletableFuture completes exceptionally with a
1645 * CompletionException holding the exception as its cause.
1646 *
1647 * @param fn the function to use to compute the value of
1648 * the returned CompletableFuture
1649 * @return the new CompletableFuture
1650 */
1651 public <U> CompletableFuture<U> thenApplyAsync
1652 (Function<? super T,? extends U> fn) {
1653 return doThenApply(fn, ForkJoinPool.commonPool());
1654 }
1655
1656 /**
1657 * Returns a new CompletableFuture that is asynchronously completed
1658 * when this CompletableFuture completes, with the result of the
1659 * given function of this CompletableFuture's result from a
1660 * task running in the given executor.
1661 *
1662 * <p>If this CompletableFuture completes exceptionally, or the
1663 * supplied function throws an exception, then the returned
1664 * CompletableFuture completes exceptionally with a
1665 * CompletionException holding the exception as its cause.
1666 *
1667 * @param fn the function to use to compute the value of
1668 * the returned CompletableFuture
1669 * @param executor the executor to use for asynchronous execution
1670 * @return the new CompletableFuture
1671 */
1672 public <U> CompletableFuture<U> thenApplyAsync
1673 (Function<? super T,? extends U> fn,
1674 Executor executor) {
1675 if (executor == null) throw new NullPointerException();
1676 return doThenApply(fn, executor);
1677 }
1678
1679 private <U> CompletableFuture<U> doThenApply
1680 (Function<? super T,? extends U> fn,
1681 Executor e) {
1682 if (fn == null) throw new NullPointerException();
1683 CompletableFuture<U> dst = new CompletableFuture<U>();
1684 ApplyCompletion<T,U> d = null;
1685 Object r;
1686 if ((r = result) == null) {
1687 CompletionNode p = new CompletionNode
1688 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1689 while ((r = result) == null) {
1690 if (UNSAFE.compareAndSwapObject
1691 (this, COMPLETIONS, p.next = completions, p))
1692 break;
1693 }
1694 }
1695 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1696 T t; Throwable ex;
1697 if (r instanceof AltResult) {
1698 ex = ((AltResult)r).ex;
1699 t = null;
1700 }
1701 else {
1702 ex = null;
1703 @SuppressWarnings("unchecked") T tr = (T) r;
1704 t = tr;
1705 }
1706 U u = null;
1707 if (ex == null) {
1708 try {
1709 if (e != null)
1710 e.execute(new AsyncApply<T,U>(t, fn, dst));
1711 else
1712 u = fn.apply(t);
1713 } catch (Throwable rex) {
1714 ex = rex;
1715 }
1716 }
1717 if (e == null || ex != null)
1718 dst.internalComplete(u, ex);
1719 }
1720 helpPostComplete();
1721 return dst;
1722 }
1723
1724 /**
1725 * Returns a new CompletableFuture that is completed
1726 * when this CompletableFuture completes, after performing the given
1727 * action with this CompletableFuture's result.
1728 *
1729 * <p>If this CompletableFuture completes exceptionally, or the
1730 * supplied action throws an exception, then the returned
1731 * CompletableFuture completes exceptionally with a
1732 * CompletionException holding the exception as its cause.
1733 *
1734 * @param block the action to perform before completing the
1735 * returned CompletableFuture
1736 * @return the new CompletableFuture
1737 */
1738 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1739 return doThenAccept(block, null);
1740 }
1741
1742 /**
1743 * Returns a new CompletableFuture that is asynchronously completed
1744 * when this CompletableFuture completes, after performing the given
1745 * action with this CompletableFuture's result from a task running
1746 * in the {@link ForkJoinPool#commonPool()}.
1747 *
1748 * <p>If this CompletableFuture completes exceptionally, or the
1749 * supplied action throws an exception, then the returned
1750 * CompletableFuture completes exceptionally with a
1751 * CompletionException holding the exception as its cause.
1752 *
1753 * @param block the action to perform before completing the
1754 * returned CompletableFuture
1755 * @return the new CompletableFuture
1756 */
1757 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1758 return doThenAccept(block, ForkJoinPool.commonPool());
1759 }
1760
1761 /**
1762 * Returns a new CompletableFuture that is asynchronously completed
1763 * when this CompletableFuture completes, after performing the given
1764 * action with this CompletableFuture's result from a task running
1765 * in the given executor.
1766 *
1767 * <p>If this CompletableFuture completes exceptionally, or the
1768 * supplied action throws an exception, then the returned
1769 * CompletableFuture completes exceptionally with a
1770 * CompletionException holding the exception as its cause.
1771 *
1772 * @param block the action to perform before completing the
1773 * returned CompletableFuture
1774 * @param executor the executor to use for asynchronous execution
1775 * @return the new CompletableFuture
1776 */
1777 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1778 Executor executor) {
1779 if (executor == null) throw new NullPointerException();
1780 return doThenAccept(block, executor);
1781 }
1782
1783 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1784 Executor e) {
1785 if (fn == null) throw new NullPointerException();
1786 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1787 AcceptCompletion<T> d = null;
1788 Object r;
1789 if ((r = result) == null) {
1790 CompletionNode p = new CompletionNode
1791 (d = new AcceptCompletion<T>(this, fn, dst, e));
1792 while ((r = result) == null) {
1793 if (UNSAFE.compareAndSwapObject
1794 (this, COMPLETIONS, p.next = completions, p))
1795 break;
1796 }
1797 }
1798 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1799 T t; Throwable ex;
1800 if (r instanceof AltResult) {
1801 ex = ((AltResult)r).ex;
1802 t = null;
1803 }
1804 else {
1805 ex = null;
1806 @SuppressWarnings("unchecked") T tr = (T) r;
1807 t = tr;
1808 }
1809 if (ex == null) {
1810 try {
1811 if (e != null)
1812 e.execute(new AsyncAccept<T>(t, fn, dst));
1813 else
1814 fn.accept(t);
1815 } catch (Throwable rex) {
1816 ex = rex;
1817 }
1818 }
1819 if (e == null || ex != null)
1820 dst.internalComplete(null, ex);
1821 }
1822 helpPostComplete();
1823 return dst;
1824 }
1825
1826 /**
1827 * Returns a new CompletableFuture that is completed
1828 * when this CompletableFuture completes, after performing the given
1829 * action.
1830 *
1831 * <p>If this CompletableFuture completes exceptionally, or the
1832 * supplied action throws an exception, then the returned
1833 * CompletableFuture completes exceptionally with a
1834 * CompletionException holding the exception as its cause.
1835 *
1836 * @param action the action to perform before completing the
1837 * returned CompletableFuture
1838 * @return the new CompletableFuture
1839 */
1840 public CompletableFuture<Void> thenRun(Runnable action) {
1841 return doThenRun(action, null);
1842 }
1843
1844 /**
1845 * Returns a new CompletableFuture that is asynchronously completed
1846 * when this CompletableFuture completes, after performing the given
1847 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1848 *
1849 * <p>If this CompletableFuture completes exceptionally, or the
1850 * supplied action throws an exception, then the returned
1851 * CompletableFuture completes exceptionally with a
1852 * CompletionException holding the exception as its cause.
1853 *
1854 * @param action the action to perform before completing the
1855 * returned CompletableFuture
1856 * @return the new CompletableFuture
1857 */
1858 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1859 return doThenRun(action, ForkJoinPool.commonPool());
1860 }
1861
1862 /**
1863 * Returns a new CompletableFuture that is asynchronously completed
1864 * when this CompletableFuture completes, after performing the given
1865 * action from a task running in the given executor.
1866 *
1867 * <p>If this CompletableFuture completes exceptionally, or the
1868 * supplied action throws an exception, then the returned
1869 * CompletableFuture completes exceptionally with a
1870 * CompletionException holding the exception as its cause.
1871 *
1872 * @param action the action to perform before completing the
1873 * returned CompletableFuture
1874 * @param executor the executor to use for asynchronous execution
1875 * @return the new CompletableFuture
1876 */
1877 public CompletableFuture<Void> thenRunAsync(Runnable action,
1878 Executor executor) {
1879 if (executor == null) throw new NullPointerException();
1880 return doThenRun(action, executor);
1881 }
1882
1883 private CompletableFuture<Void> doThenRun(Runnable action,
1884 Executor e) {
1885 if (action == null) throw new NullPointerException();
1886 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1887 RunCompletion<T> d = null;
1888 Object r;
1889 if ((r = result) == null) {
1890 CompletionNode p = new CompletionNode
1891 (d = new RunCompletion<T>(this, action, dst, e));
1892 while ((r = result) == null) {
1893 if (UNSAFE.compareAndSwapObject
1894 (this, COMPLETIONS, p.next = completions, p))
1895 break;
1896 }
1897 }
1898 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1899 Throwable ex;
1900 if (r instanceof AltResult)
1901 ex = ((AltResult)r).ex;
1902 else
1903 ex = null;
1904 if (ex == null) {
1905 try {
1906 if (e != null)
1907 e.execute(new AsyncRun(action, dst));
1908 else
1909 action.run();
1910 } catch (Throwable rex) {
1911 ex = rex;
1912 }
1913 }
1914 if (e == null || ex != null)
1915 dst.internalComplete(null, ex);
1916 }
1917 helpPostComplete();
1918 return dst;
1919 }
1920
1921 /**
1922 * Returns a new CompletableFuture that is completed
1923 * when both this and the other given CompletableFuture complete,
1924 * with the result of the given function of the results of the two
1925 * CompletableFutures.
1926 *
1927 * <p>If this and/or the other CompletableFuture complete
1928 * exceptionally, or the supplied function throws an exception,
1929 * then the returned CompletableFuture completes exceptionally
1930 * with a CompletionException holding the exception as its cause.
1931 *
1932 * @param other the other CompletableFuture
1933 * @param fn the function to use to compute the value of
1934 * the returned CompletableFuture
1935 * @return the new CompletableFuture
1936 */
1937 public <U,V> CompletableFuture<V> thenCombine
1938 (CompletableFuture<? extends U> other,
1939 BiFunction<? super T,? super U,? extends V> fn) {
1940 return doThenBiApply(other, fn, null);
1941 }
1942
1943 /**
1944 * Returns a new CompletableFuture that is asynchronously completed
1945 * when both this and the other given CompletableFuture complete,
1946 * with the result of the given function of the results of the two
1947 * CompletableFutures from a task running in the
1948 * {@link ForkJoinPool#commonPool()}.
1949 *
1950 * <p>If this and/or the other CompletableFuture complete
1951 * exceptionally, or the supplied function throws an exception,
1952 * then the returned CompletableFuture completes exceptionally
1953 * with a CompletionException holding the exception as its cause.
1954 *
1955 * @param other the other CompletableFuture
1956 * @param fn the function to use to compute the value of
1957 * the returned CompletableFuture
1958 * @return the new CompletableFuture
1959 */
1960 public <U,V> CompletableFuture<V> thenCombineAsync
1961 (CompletableFuture<? extends U> other,
1962 BiFunction<? super T,? super U,? extends V> fn) {
1963 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1964 }
1965
1966 /**
1967 * Returns a new CompletableFuture that is asynchronously completed
1968 * when both this and the other given CompletableFuture complete,
1969 * with the result of the given function of the results of the two
1970 * CompletableFutures from a task running in the given executor.
1971 *
1972 * <p>If this and/or the other CompletableFuture complete
1973 * exceptionally, or the supplied function throws an exception,
1974 * then the returned CompletableFuture completes exceptionally
1975 * with a CompletionException holding the exception as its cause.
1976 *
1977 * @param other the other CompletableFuture
1978 * @param fn the function to use to compute the value of
1979 * the returned CompletableFuture
1980 * @param executor the executor to use for asynchronous execution
1981 * @return the new CompletableFuture
1982 */
1983 public <U,V> CompletableFuture<V> thenCombineAsync
1984 (CompletableFuture<? extends U> other,
1985 BiFunction<? super T,? super U,? extends V> fn,
1986 Executor executor) {
1987 if (executor == null) throw new NullPointerException();
1988 return doThenBiApply(other, fn, executor);
1989 }
1990
1991 private <U,V> CompletableFuture<V> doThenBiApply
1992 (CompletableFuture<? extends U> other,
1993 BiFunction<? super T,? super U,? extends V> fn,
1994 Executor e) {
1995 if (other == null || fn == null) throw new NullPointerException();
1996 CompletableFuture<V> dst = new CompletableFuture<V>();
1997 BiApplyCompletion<T,U,V> d = null;
1998 Object r, s = null;
1999 if ((r = result) == null || (s = other.result) == null) {
2000 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
2001 CompletionNode q = null, p = new CompletionNode(d);
2002 while ((r == null && (r = result) == null) ||
2003 (s == null && (s = other.result) == null)) {
2004 if (q != null) {
2005 if (s != null ||
2006 UNSAFE.compareAndSwapObject
2007 (other, COMPLETIONS, q.next = other.completions, q))
2008 break;
2009 }
2010 else if (r != null ||
2011 UNSAFE.compareAndSwapObject
2012 (this, COMPLETIONS, p.next = completions, p)) {
2013 if (s != null)
2014 break;
2015 q = new CompletionNode(d);
2016 }
2017 }
2018 }
2019 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2020 T t; U u; Throwable ex;
2021 if (r instanceof AltResult) {
2022 ex = ((AltResult)r).ex;
2023 t = null;
2024 }
2025 else {
2026 ex = null;
2027 @SuppressWarnings("unchecked") T tr = (T) r;
2028 t = tr;
2029 }
2030 if (ex != null)
2031 u = null;
2032 else if (s instanceof AltResult) {
2033 ex = ((AltResult)s).ex;
2034 u = null;
2035 }
2036 else {
2037 @SuppressWarnings("unchecked") U us = (U) s;
2038 u = us;
2039 }
2040 V v = null;
2041 if (ex == null) {
2042 try {
2043 if (e != null)
2044 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
2045 else
2046 v = fn.apply(t, u);
2047 } catch (Throwable rex) {
2048 ex = rex;
2049 }
2050 }
2051 if (e == null || ex != null)
2052 dst.internalComplete(v, ex);
2053 }
2054 helpPostComplete();
2055 other.helpPostComplete();
2056 return dst;
2057 }
2058
2059 /**
2060 * Returns a new CompletableFuture that is completed
2061 * when both this and the other given CompletableFuture complete,
2062 * after performing the given action with the results of the two
2063 * CompletableFutures.
2064 *
2065 * <p>If this and/or the other CompletableFuture complete
2066 * exceptionally, or the supplied action throws an exception,
2067 * then the returned CompletableFuture completes exceptionally
2068 * with a CompletionException holding the exception as its cause.
2069 *
2070 * @param other the other CompletableFuture
2071 * @param block the action to perform before completing the
2072 * returned CompletableFuture
2073 * @return the new CompletableFuture
2074 */
2075 public <U> CompletableFuture<Void> thenAcceptBoth
2076 (CompletableFuture<? extends U> other,
2077 BiConsumer<? super T, ? super U> block) {
2078 return doThenBiAccept(other, block, null);
2079 }
2080
2081 /**
2082 * Returns a new CompletableFuture that is asynchronously completed
2083 * when both this and the other given CompletableFuture complete,
2084 * after performing the given action with the results of the two
2085 * CompletableFutures from a task running in the {@link
2086 * ForkJoinPool#commonPool()}.
2087 *
2088 * <p>If this and/or the other CompletableFuture complete
2089 * exceptionally, or the supplied action throws an exception,
2090 * then the returned CompletableFuture completes exceptionally
2091 * with a CompletionException holding the exception as its cause.
2092 *
2093 * @param other the other CompletableFuture
2094 * @param block the action to perform before completing the
2095 * returned CompletableFuture
2096 * @return the new CompletableFuture
2097 */
2098 public <U> CompletableFuture<Void> thenAcceptBothAsync
2099 (CompletableFuture<? extends U> other,
2100 BiConsumer<? super T, ? super U> block) {
2101 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2102 }
2103
2104 /**
2105 * Returns a new CompletableFuture that is asynchronously completed
2106 * when both this and the other given CompletableFuture complete,
2107 * after performing the given action with the results of the two
2108 * CompletableFutures from a task running in the given executor.
2109 *
2110 * <p>If this and/or the other CompletableFuture complete
2111 * exceptionally, or the supplied action throws an exception,
2112 * then the returned CompletableFuture completes exceptionally
2113 * with a CompletionException holding the exception as its cause.
2114 *
2115 * @param other the other CompletableFuture
2116 * @param block the action to perform before completing the
2117 * returned CompletableFuture
2118 * @param executor the executor to use for asynchronous execution
2119 * @return the new CompletableFuture
2120 */
2121 public <U> CompletableFuture<Void> thenAcceptBothAsync
2122 (CompletableFuture<? extends U> other,
2123 BiConsumer<? super T, ? super U> block,
2124 Executor executor) {
2125 if (executor == null) throw new NullPointerException();
2126 return doThenBiAccept(other, block, executor);
2127 }
2128
2129 private <U> CompletableFuture<Void> doThenBiAccept
2130 (CompletableFuture<? extends U> other,
2131 BiConsumer<? super T,? super U> fn,
2132 Executor e) {
2133 if (other == null || fn == null) throw new NullPointerException();
2134 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2135 BiAcceptCompletion<T,U> d = null;
2136 Object r, s = null;
2137 if ((r = result) == null || (s = other.result) == null) {
2138 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2139 CompletionNode q = null, p = new CompletionNode(d);
2140 while ((r == null && (r = result) == null) ||
2141 (s == null && (s = other.result) == null)) {
2142 if (q != null) {
2143 if (s != null ||
2144 UNSAFE.compareAndSwapObject
2145 (other, COMPLETIONS, q.next = other.completions, q))
2146 break;
2147 }
2148 else if (r != null ||
2149 UNSAFE.compareAndSwapObject
2150 (this, COMPLETIONS, p.next = completions, p)) {
2151 if (s != null)
2152 break;
2153 q = new CompletionNode(d);
2154 }
2155 }
2156 }
2157 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2158 T t; U u; Throwable ex;
2159 if (r instanceof AltResult) {
2160 ex = ((AltResult)r).ex;
2161 t = null;
2162 }
2163 else {
2164 ex = null;
2165 @SuppressWarnings("unchecked") T tr = (T) r;
2166 t = tr;
2167 }
2168 if (ex != null)
2169 u = null;
2170 else if (s instanceof AltResult) {
2171 ex = ((AltResult)s).ex;
2172 u = null;
2173 }
2174 else {
2175 @SuppressWarnings("unchecked") U us = (U) s;
2176 u = us;
2177 }
2178 if (ex == null) {
2179 try {
2180 if (e != null)
2181 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2182 else
2183 fn.accept(t, u);
2184 } catch (Throwable rex) {
2185 ex = rex;
2186 }
2187 }
2188 if (e == null || ex != null)
2189 dst.internalComplete(null, ex);
2190 }
2191 helpPostComplete();
2192 other.helpPostComplete();
2193 return dst;
2194 }
2195
2196 /**
2197 * Returns a new CompletableFuture that is completed
2198 * when both this and the other given CompletableFuture complete,
2199 * after performing the given action.
2200 *
2201 * <p>If this and/or the other CompletableFuture complete
2202 * exceptionally, or the supplied action throws an exception,
2203 * then the returned CompletableFuture completes exceptionally
2204 * with a CompletionException holding the exception as its cause.
2205 *
2206 * @param other the other CompletableFuture
2207 * @param action the action to perform before completing the
2208 * returned CompletableFuture
2209 * @return the new CompletableFuture
2210 */
2211 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2212 Runnable action) {
2213 return doThenBiRun(other, action, null);
2214 }
2215
2216 /**
2217 * Returns a new CompletableFuture that is asynchronously completed
2218 * when both this and the other given CompletableFuture complete,
2219 * after performing the given action from a task running in the
2220 * {@link ForkJoinPool#commonPool()}.
2221 *
2222 * <p>If this and/or the other CompletableFuture complete
2223 * exceptionally, or the supplied action throws an exception,
2224 * then the returned CompletableFuture completes exceptionally
2225 * with a CompletionException holding the exception as its cause.
2226 *
2227 * @param other the other CompletableFuture
2228 * @param action the action to perform before completing the
2229 * returned CompletableFuture
2230 * @return the new CompletableFuture
2231 */
2232 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2233 Runnable action) {
2234 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2235 }
2236
2237 /**
2238 * Returns a new CompletableFuture that is asynchronously completed
2239 * when both this and the other given CompletableFuture complete,
2240 * after performing the given action from a task running in the
2241 * given executor.
2242 *
2243 * <p>If this and/or the other CompletableFuture complete
2244 * exceptionally, or the supplied action throws an exception,
2245 * then the returned CompletableFuture completes exceptionally
2246 * with a CompletionException holding the exception as its cause.
2247 *
2248 * @param other the other CompletableFuture
2249 * @param action the action to perform before completing the
2250 * returned CompletableFuture
2251 * @param executor the executor to use for asynchronous execution
2252 * @return the new CompletableFuture
2253 */
2254 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2255 Runnable action,
2256 Executor executor) {
2257 if (executor == null) throw new NullPointerException();
2258 return doThenBiRun(other, action, executor);
2259 }
2260
2261 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2262 Runnable action,
2263 Executor e) {
2264 if (other == null || action == null) throw new NullPointerException();
2265 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2266 BiRunCompletion<T> d = null;
2267 Object r, s = null;
2268 if ((r = result) == null || (s = other.result) == null) {
2269 d = new BiRunCompletion<T>(this, other, action, dst, e);
2270 CompletionNode q = null, p = new CompletionNode(d);
2271 while ((r == null && (r = result) == null) ||
2272 (s == null && (s = other.result) == null)) {
2273 if (q != null) {
2274 if (s != null ||
2275 UNSAFE.compareAndSwapObject
2276 (other, COMPLETIONS, q.next = other.completions, q))
2277 break;
2278 }
2279 else if (r != null ||
2280 UNSAFE.compareAndSwapObject
2281 (this, COMPLETIONS, p.next = completions, p)) {
2282 if (s != null)
2283 break;
2284 q = new CompletionNode(d);
2285 }
2286 }
2287 }
2288 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2289 Throwable ex;
2290 if (r instanceof AltResult)
2291 ex = ((AltResult)r).ex;
2292 else
2293 ex = null;
2294 if (ex == null && (s instanceof AltResult))
2295 ex = ((AltResult)s).ex;
2296 if (ex == null) {
2297 try {
2298 if (e != null)
2299 e.execute(new AsyncRun(action, dst));
2300 else
2301 action.run();
2302 } catch (Throwable rex) {
2303 ex = rex;
2304 }
2305 }
2306 if (e == null || ex != null)
2307 dst.internalComplete(null, ex);
2308 }
2309 helpPostComplete();
2310 other.helpPostComplete();
2311 return dst;
2312 }
2313
2314 /**
2315 * Returns a new CompletableFuture that is completed
2316 * when either this or the other given CompletableFuture completes,
2317 * with the result of the given function of either this or the other
2318 * CompletableFuture's result.
2319 *
2320 * <p>If this and/or the other CompletableFuture complete
2321 * exceptionally, then the returned CompletableFuture may also do so,
2322 * with a CompletionException holding one of these exceptions as its
2323 * cause. No guarantees are made about which result or exception is
2324 * used in the returned CompletableFuture. If the supplied function
2325 * throws an exception, then the returned CompletableFuture completes
2326 * exceptionally with a CompletionException holding the exception as
2327 * its cause.
2328 *
2329 * @param other the other CompletableFuture
2330 * @param fn the function to use to compute the value of
2331 * the returned CompletableFuture
2332 * @return the new CompletableFuture
2333 */
2334 public <U> CompletableFuture<U> applyToEither
2335 (CompletableFuture<? extends T> other,
2336 Function<? super T, U> fn) {
2337 return doOrApply(other, fn, null);
2338 }
2339
2340 /**
2341 * Returns a new CompletableFuture that is asynchronously completed
2342 * when either this or the other given CompletableFuture completes,
2343 * with the result of the given function of either this or the other
2344 * CompletableFuture's result from a task running in the
2345 * {@link ForkJoinPool#commonPool()}.
2346 *
2347 * <p>If this and/or the other CompletableFuture complete
2348 * exceptionally, then the returned CompletableFuture may also do so,
2349 * with a CompletionException holding one of these exceptions as its
2350 * cause. No guarantees are made about which result or exception is
2351 * used in the returned CompletableFuture. If the supplied function
2352 * throws an exception, then the returned CompletableFuture completes
2353 * exceptionally with a CompletionException holding the exception as
2354 * its cause.
2355 *
2356 * @param other the other CompletableFuture
2357 * @param fn the function to use to compute the value of
2358 * the returned CompletableFuture
2359 * @return the new CompletableFuture
2360 */
2361 public <U> CompletableFuture<U> applyToEitherAsync
2362 (CompletableFuture<? extends T> other,
2363 Function<? super T, U> fn) {
2364 return doOrApply(other, fn, ForkJoinPool.commonPool());
2365 }
2366
2367 /**
2368 * Returns a new CompletableFuture that is asynchronously completed
2369 * when either this or the other given CompletableFuture completes,
2370 * with the result of the given function of either this or the other
2371 * CompletableFuture's result from a task running in the
2372 * given executor.
2373 *
2374 * <p>If this and/or the other CompletableFuture complete
2375 * exceptionally, then the returned CompletableFuture may also do so,
2376 * with a CompletionException holding one of these exceptions as its
2377 * cause. No guarantees are made about which result or exception is
2378 * used in the returned CompletableFuture. If the supplied function
2379 * throws an exception, then the returned CompletableFuture completes
2380 * exceptionally with a CompletionException holding the exception as
2381 * its cause.
2382 *
2383 * @param other the other CompletableFuture
2384 * @param fn the function to use to compute the value of
2385 * the returned CompletableFuture
2386 * @param executor the executor to use for asynchronous execution
2387 * @return the new CompletableFuture
2388 */
2389 public <U> CompletableFuture<U> applyToEitherAsync
2390 (CompletableFuture<? extends T> other,
2391 Function<? super T, U> fn,
2392 Executor executor) {
2393 if (executor == null) throw new NullPointerException();
2394 return doOrApply(other, fn, executor);
2395 }
2396
2397 private <U> CompletableFuture<U> doOrApply
2398 (CompletableFuture<? extends T> other,
2399 Function<? super T, U> fn,
2400 Executor e) {
2401 if (other == null || fn == null) throw new NullPointerException();
2402 CompletableFuture<U> dst = new CompletableFuture<U>();
2403 OrApplyCompletion<T,U> d = null;
2404 Object r;
2405 if ((r = result) == null && (r = other.result) == null) {
2406 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2407 CompletionNode q = null, p = new CompletionNode(d);
2408 while ((r = result) == null && (r = other.result) == null) {
2409 if (q != null) {
2410 if (UNSAFE.compareAndSwapObject
2411 (other, COMPLETIONS, q.next = other.completions, q))
2412 break;
2413 }
2414 else if (UNSAFE.compareAndSwapObject
2415 (this, COMPLETIONS, p.next = completions, p))
2416 q = new CompletionNode(d);
2417 }
2418 }
2419 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2420 T t; Throwable ex;
2421 if (r instanceof AltResult) {
2422 ex = ((AltResult)r).ex;
2423 t = null;
2424 }
2425 else {
2426 ex = null;
2427 @SuppressWarnings("unchecked") T tr = (T) r;
2428 t = tr;
2429 }
2430 U u = null;
2431 if (ex == null) {
2432 try {
2433 if (e != null)
2434 e.execute(new AsyncApply<T,U>(t, fn, dst));
2435 else
2436 u = fn.apply(t);
2437 } catch (Throwable rex) {
2438 ex = rex;
2439 }
2440 }
2441 if (e == null || ex != null)
2442 dst.internalComplete(u, ex);
2443 }
2444 helpPostComplete();
2445 other.helpPostComplete();
2446 return dst;
2447 }
2448
2449 /**
2450 * Returns a new CompletableFuture that is completed
2451 * when either this or the other given CompletableFuture completes,
2452 * after performing the given action with the result of either this
2453 * or the other CompletableFuture's result.
2454 *
2455 * <p>If this and/or the other CompletableFuture complete
2456 * exceptionally, then the returned CompletableFuture may also do so,
2457 * with a CompletionException holding one of these exceptions as its
2458 * cause. No guarantees are made about which result or exception is
2459 * used in the returned CompletableFuture. If the supplied action
2460 * throws an exception, then the returned CompletableFuture completes
2461 * exceptionally with a CompletionException holding the exception as
2462 * its cause.
2463 *
2464 * @param other the other CompletableFuture
2465 * @param block the action to perform before completing the
2466 * returned CompletableFuture
2467 * @return the new CompletableFuture
2468 */
2469 public CompletableFuture<Void> acceptEither
2470 (CompletableFuture<? extends T> other,
2471 Consumer<? super T> block) {
2472 return doOrAccept(other, block, null);
2473 }
2474
2475 /**
2476 * Returns a new CompletableFuture that is asynchronously completed
2477 * when either this or the other given CompletableFuture completes,
2478 * after performing the given action with the result of either this
2479 * or the other CompletableFuture's result from a task running in
2480 * the {@link ForkJoinPool#commonPool()}.
2481 *
2482 * <p>If this and/or the other CompletableFuture complete
2483 * exceptionally, then the returned CompletableFuture may also do so,
2484 * with a CompletionException holding one of these exceptions as its
2485 * cause. No guarantees are made about which result or exception is
2486 * used in the returned CompletableFuture. If the supplied action
2487 * throws an exception, then the returned CompletableFuture completes
2488 * exceptionally with a CompletionException holding the exception as
2489 * its cause.
2490 *
2491 * @param other the other CompletableFuture
2492 * @param block the action to perform before completing the
2493 * returned CompletableFuture
2494 * @return the new CompletableFuture
2495 */
2496 public CompletableFuture<Void> acceptEitherAsync
2497 (CompletableFuture<? extends T> other,
2498 Consumer<? super T> block) {
2499 return doOrAccept(other, block, ForkJoinPool.commonPool());
2500 }
2501
2502 /**
2503 * Returns a new CompletableFuture that is asynchronously completed
2504 * when either this or the other given CompletableFuture completes,
2505 * after performing the given action with the result of either this
2506 * or the other CompletableFuture's result from a task running in
2507 * the given executor.
2508 *
2509 * <p>If this and/or the other CompletableFuture complete
2510 * exceptionally, then the returned CompletableFuture may also do so,
2511 * with a CompletionException holding one of these exceptions as its
2512 * cause. No guarantees are made about which result or exception is
2513 * used in the returned CompletableFuture. If the supplied action
2514 * throws an exception, then the returned CompletableFuture completes
2515 * exceptionally with a CompletionException holding the exception as
2516 * its cause.
2517 *
2518 * @param other the other CompletableFuture
2519 * @param block the action to perform before completing the
2520 * returned CompletableFuture
2521 * @param executor the executor to use for asynchronous execution
2522 * @return the new CompletableFuture
2523 */
2524 public CompletableFuture<Void> acceptEitherAsync
2525 (CompletableFuture<? extends T> other,
2526 Consumer<? super T> block,
2527 Executor executor) {
2528 if (executor == null) throw new NullPointerException();
2529 return doOrAccept(other, block, executor);
2530 }
2531
2532 private CompletableFuture<Void> doOrAccept
2533 (CompletableFuture<? extends T> other,
2534 Consumer<? super T> fn,
2535 Executor e) {
2536 if (other == null || fn == null) throw new NullPointerException();
2537 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2538 OrAcceptCompletion<T> d = null;
2539 Object r;
2540 if ((r = result) == null && (r = other.result) == null) {
2541 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2542 CompletionNode q = null, p = new CompletionNode(d);
2543 while ((r = result) == null && (r = other.result) == null) {
2544 if (q != null) {
2545 if (UNSAFE.compareAndSwapObject
2546 (other, COMPLETIONS, q.next = other.completions, q))
2547 break;
2548 }
2549 else if (UNSAFE.compareAndSwapObject
2550 (this, COMPLETIONS, p.next = completions, p))
2551 q = new CompletionNode(d);
2552 }
2553 }
2554 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2555 T t; Throwable ex;
2556 if (r instanceof AltResult) {
2557 ex = ((AltResult)r).ex;
2558 t = null;
2559 }
2560 else {
2561 ex = null;
2562 @SuppressWarnings("unchecked") T tr = (T) r;
2563 t = tr;
2564 }
2565 if (ex == null) {
2566 try {
2567 if (e != null)
2568 e.execute(new AsyncAccept<T>(t, fn, dst));
2569 else
2570 fn.accept(t);
2571 } catch (Throwable rex) {
2572 ex = rex;
2573 }
2574 }
2575 if (e == null || ex != null)
2576 dst.internalComplete(null, ex);
2577 }
2578 helpPostComplete();
2579 other.helpPostComplete();
2580 return dst;
2581 }
2582
2583 /**
2584 * Returns a new CompletableFuture that is completed
2585 * when either this or the other given CompletableFuture completes,
2586 * after performing the given action.
2587 *
2588 * <p>If this and/or the other CompletableFuture complete
2589 * exceptionally, then the returned CompletableFuture may also do so,
2590 * with a CompletionException holding one of these exceptions as its
2591 * cause. No guarantees are made about which result or exception is
2592 * used in the returned CompletableFuture. If the supplied action
2593 * throws an exception, then the returned CompletableFuture completes
2594 * exceptionally with a CompletionException holding the exception as
2595 * its cause.
2596 *
2597 * @param other the other CompletableFuture
2598 * @param action the action to perform before completing the
2599 * returned CompletableFuture
2600 * @return the new CompletableFuture
2601 */
2602 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2603 Runnable action) {
2604 return doOrRun(other, action, null);
2605 }
2606
2607 /**
2608 * Returns a new CompletableFuture that is asynchronously completed
2609 * when either this or the other given CompletableFuture completes,
2610 * after performing the given action from a task running in the
2611 * {@link ForkJoinPool#commonPool()}.
2612 *
2613 * <p>If this and/or the other CompletableFuture complete
2614 * exceptionally, then the returned CompletableFuture may also do so,
2615 * with a CompletionException holding one of these exceptions as its
2616 * cause. No guarantees are made about which result or exception is
2617 * used in the returned CompletableFuture. If the supplied action
2618 * throws an exception, then the returned CompletableFuture completes
2619 * exceptionally with a CompletionException holding the exception as
2620 * its cause.
2621 *
2622 * @param other the other CompletableFuture
2623 * @param action the action to perform before completing the
2624 * returned CompletableFuture
2625 * @return the new CompletableFuture
2626 */
2627 public CompletableFuture<Void> runAfterEitherAsync
2628 (CompletableFuture<?> other,
2629 Runnable action) {
2630 return doOrRun(other, action, ForkJoinPool.commonPool());
2631 }
2632
2633 /**
2634 * Returns a new CompletableFuture that is asynchronously completed
2635 * when either this or the other given CompletableFuture completes,
2636 * after performing the given action from a task running in the
2637 * given executor.
2638 *
2639 * <p>If this and/or the other CompletableFuture complete
2640 * exceptionally, then the returned CompletableFuture may also do so,
2641 * with a CompletionException holding one of these exceptions as its
2642 * cause. No guarantees are made about which result or exception is
2643 * used in the returned CompletableFuture. If the supplied action
2644 * throws an exception, then the returned CompletableFuture completes
2645 * exceptionally with a CompletionException holding the exception as
2646 * its cause.
2647 *
2648 * @param other the other CompletableFuture
2649 * @param action the action to perform before completing the
2650 * returned CompletableFuture
2651 * @param executor the executor to use for asynchronous execution
2652 * @return the new CompletableFuture
2653 */
2654 public CompletableFuture<Void> runAfterEitherAsync
2655 (CompletableFuture<?> other,
2656 Runnable action,
2657 Executor executor) {
2658 if (executor == null) throw new NullPointerException();
2659 return doOrRun(other, action, executor);
2660 }
2661
2662 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2663 Runnable action,
2664 Executor e) {
2665 if (other == null || action == null) throw new NullPointerException();
2666 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2667 OrRunCompletion<T> d = null;
2668 Object r;
2669 if ((r = result) == null && (r = other.result) == null) {
2670 d = new OrRunCompletion<T>(this, other, action, dst, e);
2671 CompletionNode q = null, p = new CompletionNode(d);
2672 while ((r = result) == null && (r = other.result) == null) {
2673 if (q != null) {
2674 if (UNSAFE.compareAndSwapObject
2675 (other, COMPLETIONS, q.next = other.completions, q))
2676 break;
2677 }
2678 else if (UNSAFE.compareAndSwapObject
2679 (this, COMPLETIONS, p.next = completions, p))
2680 q = new CompletionNode(d);
2681 }
2682 }
2683 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2684 Throwable ex;
2685 if (r instanceof AltResult)
2686 ex = ((AltResult)r).ex;
2687 else
2688 ex = null;
2689 if (ex == null) {
2690 try {
2691 if (e != null)
2692 e.execute(new AsyncRun(action, dst));
2693 else
2694 action.run();
2695 } catch (Throwable rex) {
2696 ex = rex;
2697 }
2698 }
2699 if (e == null || ex != null)
2700 dst.internalComplete(null, ex);
2701 }
2702 helpPostComplete();
2703 other.helpPostComplete();
2704 return dst;
2705 }
2706
2707 /**
2708 * Returns a CompletableFuture that upon completion, has the same
2709 * value as produced by the given function of the result of this
2710 * CompletableFuture.
2711 *
2712 * <p>If this CompletableFuture completes exceptionally, then the
2713 * returned CompletableFuture also does so, with a
2714 * CompletionException holding this exception as its cause.
2715 * Similarly, if the computed CompletableFuture completes
2716 * exceptionally, then so does the returned CompletableFuture.
2717 *
2718 * @param fn the function returning a new CompletableFuture
2719 * @return the CompletableFuture
2720 */
2721 public <U> CompletableFuture<U> thenCompose
2722 (Function<? super T, CompletableFuture<U>> fn) {
2723 return doCompose(fn, null);
2724 }
2725
2726 /**
2727 * Returns a CompletableFuture that upon completion, has the same
2728 * value as that produced asynchronously using the {@link
2729 * ForkJoinPool#commonPool()} by the given function of the result
2730 * of this CompletableFuture.
2731 *
2732 * <p>If this CompletableFuture completes exceptionally, then the
2733 * returned CompletableFuture also does so, with a
2734 * CompletionException holding this exception as its cause.
2735 * Similarly, if the computed CompletableFuture completes
2736 * exceptionally, then so does the returned CompletableFuture.
2737 *
2738 * @param fn the function returning a new CompletableFuture
2739 * @return the CompletableFuture
2740 */
2741 public <U> CompletableFuture<U> thenComposeAsync
2742 (Function<? super T, CompletableFuture<U>> fn) {
2743 return doCompose(fn, ForkJoinPool.commonPool());
2744 }
2745
2746 /**
2747 * Returns a CompletableFuture that upon completion, has the same
2748 * value as that produced asynchronously using the given executor
2749 * by the given function of this CompletableFuture.
2750 *
2751 * <p>If this CompletableFuture completes exceptionally, then the
2752 * returned CompletableFuture also does so, with a
2753 * CompletionException holding this exception as its cause.
2754 * Similarly, if the computed CompletableFuture completes
2755 * exceptionally, then so does the returned CompletableFuture.
2756 *
2757 * @param fn the function returning a new CompletableFuture
2758 * @param executor the executor to use for asynchronous execution
2759 * @return the CompletableFuture
2760 */
2761 public <U> CompletableFuture<U> thenComposeAsync
2762 (Function<? super T, CompletableFuture<U>> fn,
2763 Executor executor) {
2764 if (executor == null) throw new NullPointerException();
2765 return doCompose(fn, executor);
2766 }
2767
2768 private <U> CompletableFuture<U> doCompose
2769 (Function<? super T, CompletableFuture<U>> fn,
2770 Executor e) {
2771 if (fn == null) throw new NullPointerException();
2772 CompletableFuture<U> dst = null;
2773 ComposeCompletion<T,U> d = null;
2774 Object r;
2775 if ((r = result) == null) {
2776 dst = new CompletableFuture<U>();
2777 CompletionNode p = new CompletionNode
2778 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2779 while ((r = result) == null) {
2780 if (UNSAFE.compareAndSwapObject
2781 (this, COMPLETIONS, p.next = completions, p))
2782 break;
2783 }
2784 }
2785 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2786 T t; Throwable ex;
2787 if (r instanceof AltResult) {
2788 ex = ((AltResult)r).ex;
2789 t = null;
2790 }
2791 else {
2792 ex = null;
2793 @SuppressWarnings("unchecked") T tr = (T) r;
2794 t = tr;
2795 }
2796 if (ex == null) {
2797 if (e != null) {
2798 if (dst == null)
2799 dst = new CompletableFuture<U>();
2800 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2801 }
2802 else {
2803 try {
2804 if ((dst = fn.apply(t)) == null)
2805 ex = new NullPointerException();
2806 } catch (Throwable rex) {
2807 ex = rex;
2808 }
2809 if (dst == null)
2810 dst = new CompletableFuture<U>();
2811 }
2812 }
2813 if (e == null && ex != null)
2814 dst.internalComplete(null, ex);
2815 }
2816 helpPostComplete();
2817 dst.helpPostComplete();
2818 return dst;
2819 }
2820
2821 /**
2822 * Returns a new CompletableFuture that is completed when this
2823 * CompletableFuture completes, with the result of the given
2824 * function of the exception triggering this CompletableFuture's
2825 * completion when it completes exceptionally; otherwise, if this
2826 * CompletableFuture completes normally, then the returned
2827 * CompletableFuture also completes normally with the same value.
2828 *
2829 * @param fn the function to use to compute the value of the
2830 * returned CompletableFuture if this CompletableFuture completed
2831 * exceptionally
2832 * @return the new CompletableFuture
2833 */
2834 public CompletableFuture<T> exceptionally
2835 (Function<Throwable, ? extends T> fn) {
2836 if (fn == null) throw new NullPointerException();
2837 CompletableFuture<T> dst = new CompletableFuture<T>();
2838 ExceptionCompletion<T> d = null;
2839 Object r;
2840 if ((r = result) == null) {
2841 CompletionNode p =
2842 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2843 while ((r = result) == null) {
2844 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2845 p.next = completions, p))
2846 break;
2847 }
2848 }
2849 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2850 T t = null; Throwable ex, dx = null;
2851 if (r instanceof AltResult) {
2852 if ((ex = ((AltResult)r).ex) != null) {
2853 try {
2854 t = fn.apply(ex);
2855 } catch (Throwable rex) {
2856 dx = rex;
2857 }
2858 }
2859 }
2860 else {
2861 @SuppressWarnings("unchecked") T tr = (T) r;
2862 t = tr;
2863 }
2864 dst.internalComplete(t, dx);
2865 }
2866 helpPostComplete();
2867 return dst;
2868 }
2869
2870 /**
2871 * Returns a new CompletableFuture that is completed when this
2872 * CompletableFuture completes, with the result of the given
2873 * function of the result and exception of this CompletableFuture's
2874 * completion. The given function is invoked with the result (or
2875 * {@code null} if none) and the exception (or {@code null} if none)
2876 * of this CompletableFuture when complete.
2877 *
2878 * @param fn the function to use to compute the value of the
2879 * returned CompletableFuture
2880 * @return the new CompletableFuture
2881 */
2882 public <U> CompletableFuture<U> handle
2883 (BiFunction<? super T, Throwable, ? extends U> fn) {
2884 if (fn == null) throw new NullPointerException();
2885 CompletableFuture<U> dst = new CompletableFuture<U>();
2886 HandleCompletion<T,U> d = null;
2887 Object r;
2888 if ((r = result) == null) {
2889 CompletionNode p =
2890 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2891 while ((r = result) == null) {
2892 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2893 p.next = completions, p))
2894 break;
2895 }
2896 }
2897 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2898 T t; Throwable ex;
2899 if (r instanceof AltResult) {
2900 ex = ((AltResult)r).ex;
2901 t = null;
2902 }
2903 else {
2904 ex = null;
2905 @SuppressWarnings("unchecked") T tr = (T) r;
2906 t = tr;
2907 }
2908 U u; Throwable dx;
2909 try {
2910 u = fn.apply(t, ex);
2911 dx = null;
2912 } catch (Throwable rex) {
2913 dx = rex;
2914 u = null;
2915 }
2916 dst.internalComplete(u, dx);
2917 }
2918 helpPostComplete();
2919 return dst;
2920 }
2921
2922
2923 /* ------------- Arbitrary-arity constructions -------------- */
2924
2925 /*
2926 * The basic plan of attack is to recursively form binary
2927 * completion trees of elements. This can be overkill for small
2928 * sets, but scales nicely. The And/All vs Or/Any forms use the
2929 * same idea, but details differ.
2930 */
2931
2932 /**
2933 * Returns a new CompletableFuture that is completed when all of
2934 * the given CompletableFutures complete. If any of the given
2935 * CompletableFutures complete exceptionally, then the returned
2936 * CompletableFuture also does so, with a CompletionException
2937 * holding this exception as its cause. Otherwise, the results,
2938 * if any, of the given CompletableFutures are not reflected in
2939 * the returned CompletableFuture, but may be obtained by
2940 * inspecting them individually. If no CompletableFutures are
2941 * provided, returns a CompletableFuture completed with the value
2942 * {@code null}.
2943 *
2944 * <p>Among the applications of this method is to await completion
2945 * of a set of independent CompletableFutures before continuing a
2946 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2947 * c3).join();}.
2948 *
2949 * @param cfs the CompletableFutures
2950 * @return a new CompletableFuture that is completed when all of the
2951 * given CompletableFutures complete
2952 * @throws NullPointerException if the array or any of its elements are
2953 * {@code null}
2954 */
2955 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2956 int len = cfs.length; // Directly handle empty and singleton cases
2957 if (len > 1)
2958 return allTree(cfs, 0, len - 1);
2959 else {
2960 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2961 CompletableFuture<?> f;
2962 if (len == 0)
2963 dst.result = NIL;
2964 else if ((f = cfs[0]) == null)
2965 throw new NullPointerException();
2966 else {
2967 ThenPropagate d = null;
2968 CompletionNode p = null;
2969 Object r;
2970 while ((r = f.result) == null) {
2971 if (d == null)
2972 d = new ThenPropagate(f, dst);
2973 else if (p == null)
2974 p = new CompletionNode(d);
2975 else if (UNSAFE.compareAndSwapObject
2976 (f, COMPLETIONS, p.next = f.completions, p))
2977 break;
2978 }
2979 if (r != null && (d == null || d.compareAndSet(0, 1)))
2980 dst.internalComplete(null, (r instanceof AltResult) ?
2981 ((AltResult)r).ex : null);
2982 f.helpPostComplete();
2983 }
2984 return dst;
2985 }
2986 }
2987
2988 /**
2989 * Recursively constructs an And'ed tree of CompletableFutures.
2990 * Called only when array known to have at least two elements.
2991 */
2992 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2993 int lo, int hi) {
2994 CompletableFuture<?> fst, snd;
2995 int mid = (lo + hi) >>> 1;
2996 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2997 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2998 throw new NullPointerException();
2999 CompletableFuture<Void> dst = new CompletableFuture<Void>();
3000 AndCompletion d = null;
3001 CompletionNode p = null, q = null;
3002 Object r = null, s = null;
3003 while ((r = fst.result) == null || (s = snd.result) == null) {
3004 if (d == null)
3005 d = new AndCompletion(fst, snd, dst);
3006 else if (p == null)
3007 p = new CompletionNode(d);
3008 else if (q == null) {
3009 if (UNSAFE.compareAndSwapObject
3010 (fst, COMPLETIONS, p.next = fst.completions, p))
3011 q = new CompletionNode(d);
3012 }
3013 else if (UNSAFE.compareAndSwapObject
3014 (snd, COMPLETIONS, q.next = snd.completions, q))
3015 break;
3016 }
3017 if ((r != null || (r = fst.result) != null) &&
3018 (s != null || (s = snd.result) != null) &&
3019 (d == null || d.compareAndSet(0, 1))) {
3020 Throwable ex;
3021 if (r instanceof AltResult)
3022 ex = ((AltResult)r).ex;
3023 else
3024 ex = null;
3025 if (ex == null && (s instanceof AltResult))
3026 ex = ((AltResult)s).ex;
3027 dst.internalComplete(null, ex);
3028 }
3029 fst.helpPostComplete();
3030 snd.helpPostComplete();
3031 return dst;
3032 }
3033
3034 /**
3035 * Returns a new CompletableFuture that is completed when any of
3036 * the given CompletableFutures complete, with the same result.
3037 * Otherwise, if it completed exceptionally, the returned
3038 * CompletableFuture also does so, with a CompletionException
3039 * holding this exception as its cause. If no CompletableFutures
3040 * are provided, returns an incomplete CompletableFuture.
3041 *
3042 * @param cfs the CompletableFutures
3043 * @return a new CompletableFuture that is completed with the
3044 * result or exception of any of the given CompletableFutures when
3045 * one completes
3046 * @throws NullPointerException if the array or any of its elements are
3047 * {@code null}
3048 */
3049 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
3050 int len = cfs.length; // Same idea as allOf
3051 if (len > 1)
3052 return anyTree(cfs, 0, len - 1);
3053 else {
3054 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3055 CompletableFuture<?> f;
3056 if (len == 0)
3057 ; // skip
3058 else if ((f = cfs[0]) == null)
3059 throw new NullPointerException();
3060 else {
3061 ThenCopy<Object> d = null;
3062 CompletionNode p = null;
3063 Object r;
3064 while ((r = f.result) == null) {
3065 if (d == null)
3066 d = new ThenCopy<Object>(f, dst);
3067 else if (p == null)
3068 p = new CompletionNode(d);
3069 else if (UNSAFE.compareAndSwapObject
3070 (f, COMPLETIONS, p.next = f.completions, p))
3071 break;
3072 }
3073 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3074 Throwable ex; Object t;
3075 if (r instanceof AltResult) {
3076 ex = ((AltResult)r).ex;
3077 t = null;
3078 }
3079 else {
3080 ex = null;
3081 t = r;
3082 }
3083 dst.internalComplete(t, ex);
3084 }
3085 f.helpPostComplete();
3086 }
3087 return dst;
3088 }
3089 }
3090
3091 /**
3092 * Recursively constructs an Or'ed tree of CompletableFutures.
3093 */
3094 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
3095 int lo, int hi) {
3096 CompletableFuture<?> fst, snd;
3097 int mid = (lo + hi) >>> 1;
3098 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3099 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3100 throw new NullPointerException();
3101 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3102 OrCompletion d = null;
3103 CompletionNode p = null, q = null;
3104 Object r;
3105 while ((r = fst.result) == null && (r = snd.result) == null) {
3106 if (d == null)
3107 d = new OrCompletion(fst, snd, dst);
3108 else if (p == null)
3109 p = new CompletionNode(d);
3110 else if (q == null) {
3111 if (UNSAFE.compareAndSwapObject
3112 (fst, COMPLETIONS, p.next = fst.completions, p))
3113 q = new CompletionNode(d);
3114 }
3115 else if (UNSAFE.compareAndSwapObject
3116 (snd, COMPLETIONS, q.next = snd.completions, q))
3117 break;
3118 }
3119 if ((r != null || (r = fst.result) != null ||
3120 (r = snd.result) != null) &&
3121 (d == null || d.compareAndSet(0, 1))) {
3122 Throwable ex; Object t;
3123 if (r instanceof AltResult) {
3124 ex = ((AltResult)r).ex;
3125 t = null;
3126 }
3127 else {
3128 ex = null;
3129 t = r;
3130 }
3131 dst.internalComplete(t, ex);
3132 }
3133 fst.helpPostComplete();
3134 snd.helpPostComplete();
3135 return dst;
3136 }
3137
3138 /* ------------- Control and status methods -------------- */
3139
3140 /**
3141 * If not already completed, completes this CompletableFuture with
3142 * a {@link CancellationException}. Dependent CompletableFutures
3143 * that have not already completed will also complete
3144 * exceptionally, with a {@link CompletionException} caused by
3145 * this {@code CancellationException}.
3146 *
3147 * @param mayInterruptIfRunning this value has no effect in this
3148 * implementation because interrupts are not used to control
3149 * processing.
3150 *
3151 * @return {@code true} if this task is now cancelled
3152 */
3153 public boolean cancel(boolean mayInterruptIfRunning) {
3154 boolean cancelled = (result == null) &&
3155 UNSAFE.compareAndSwapObject
3156 (this, RESULT, null, new AltResult(new CancellationException()));
3157 postComplete();
3158 return cancelled || isCancelled();
3159 }
3160
3161 /**
3162 * Returns {@code true} if this CompletableFuture was cancelled
3163 * before it completed normally.
3164 *
3165 * @return {@code true} if this CompletableFuture was cancelled
3166 * before it completed normally
3167 */
3168 public boolean isCancelled() {
3169 Object r;
3170 return ((r = result) instanceof AltResult) &&
3171 (((AltResult)r).ex instanceof CancellationException);
3172 }
3173
3174 /**
3175 * Forcibly sets or resets the value subsequently returned by
3176 * method {@link #get()} and related methods, whether or not
3177 * already completed. This method is designed for use only in
3178 * error recovery actions, and even in such situations may result
3179 * in ongoing dependent completions using established versus
3180 * overwritten outcomes.
3181 *
3182 * @param value the completion value
3183 */
3184 public void obtrudeValue(T value) {
3185 result = (value == null) ? NIL : value;
3186 postComplete();
3187 }
3188
3189 /**
3190 * Forcibly causes subsequent invocations of method {@link #get()}
3191 * and related methods to throw the given exception, whether or
3192 * not already completed. This method is designed for use only in
3193 * recovery actions, and even in such situations may result in
3194 * ongoing dependent completions using established versus
3195 * overwritten outcomes.
3196 *
3197 * @param ex the exception
3198 */
3199 public void obtrudeException(Throwable ex) {
3200 if (ex == null) throw new NullPointerException();
3201 result = new AltResult(ex);
3202 postComplete();
3203 }
3204
3205 /**
3206 * Returns the estimated number of CompletableFutures whose
3207 * completions are awaiting completion of this CompletableFuture.
3208 * This method is designed for use in monitoring system state, not
3209 * for synchronization control.
3210 *
3211 * @return the number of dependent CompletableFutures
3212 */
3213 public int getNumberOfDependents() {
3214 int count = 0;
3215 for (CompletionNode p = completions; p != null; p = p.next)
3216 ++count;
3217 return count;
3218 }
3219
3220 /**
3221 * Returns a string identifying this CompletableFuture, as well as
3222 * its completion state. The state, in brackets, contains the
3223 * String {@code "Completed Normally"} or the String {@code
3224 * "Completed Exceptionally"}, or the String {@code "Not
3225 * completed"} followed by the number of CompletableFutures
3226 * dependent upon its completion, if any.
3227 *
3228 * @return a string identifying this CompletableFuture, as well as its state
3229 */
3230 public String toString() {
3231 Object r = result;
3232 int count;
3233 return super.toString() +
3234 ((r == null) ?
3235 (((count = getNumberOfDependents()) == 0) ?
3236 "[Not completed]" :
3237 "[Not completed, " + count + " dependents]") :
3238 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3239 "[Completed exceptionally]" :
3240 "[Completed normally]"));
3241 }
3242
3243 // Unsafe mechanics
3244 private static final sun.misc.Unsafe UNSAFE;
3245 private static final long RESULT;
3246 private static final long WAITERS;
3247 private static final long COMPLETIONS;
3248 static {
3249 try {
3250 UNSAFE = sun.misc.Unsafe.getUnsafe();
3251 Class<?> k = CompletableFuture.class;
3252 RESULT = UNSAFE.objectFieldOffset
3253 (k.getDeclaredField("result"));
3254 WAITERS = UNSAFE.objectFieldOffset
3255 (k.getDeclaredField("waiters"));
3256 COMPLETIONS = UNSAFE.objectFieldOffset
3257 (k.getDeclaredField("completions"));
3258 } catch (Exception e) {
3259 throw new Error(e);
3260 }
3261 }
3262 }