ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/jsr166e/CompletableFuture.java
Revision: 1.19
Committed: Sun Jul 14 19:55:05 2013 UTC (10 years, 10 months ago) by jsr166
Branch: MAIN
Changes since 1.18: +0 -1 lines
Log Message:
backport jsr166e to run on jdk6; backport all applicable tck tests from tck to tck-jsr166e

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package jsr166e;
8 import java.util.concurrent.Future;
9 import java.util.concurrent.FutureTask;
10 import java.util.concurrent.TimeUnit;
11 import java.util.concurrent.Executor;
12 import java.util.concurrent.ExecutionException;
13 import java.util.concurrent.TimeoutException;
14 import java.util.concurrent.CancellationException;
15 import java.util.concurrent.atomic.AtomicInteger;
16 import java.util.concurrent.locks.LockSupport;
17
18 /**
19 * A {@link Future} that may be explicitly completed (setting its
20 * value and status), and may include dependent functions and actions
21 * that trigger upon its completion.
22 *
23 * <p>When two or more threads attempt to
24 * {@link #complete complete},
25 * {@link #completeExceptionally completeExceptionally}, or
26 * {@link #cancel cancel}
27 * a CompletableFuture, only one of them succeeds.
28 *
29 * <p>Methods are available for adding dependents based on
30 * user-provided Functions, Actions, or Runnables. The appropriate
31 * form to use depends on whether actions require arguments and/or
32 * produce results. Completion of a dependent action will trigger the
33 * completion of another CompletableFuture. Actions may also be
34 * triggered after either or both the current and another
35 * CompletableFuture complete. Multiple CompletableFutures may also
36 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
37 * {@link #allOf(CompletableFuture...)}.
38 *
39 * <p>CompletableFutures themselves do not execute asynchronously.
40 * However, actions supplied for dependent completions of another
41 * CompletableFuture may do so, depending on whether they are provided
42 * via one of the <em>async</em> methods (that is, methods with names
43 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
44 * methods provide a way to commence asynchronous processing of an
45 * action using either a given {@link Executor} or by default the
46 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
47 * debugging, and tracking, all generated asynchronous tasks are
48 * instances of the marker interface {@link AsynchronousCompletionTask}.
49 *
50 * <p>Actions supplied for dependent completions of <em>non-async</em>
51 * methods may be performed by the thread that completes the current
52 * CompletableFuture, or by any other caller of these methods. There
53 * are no guarantees about the order of processing completions unless
54 * constrained by these methods.
55 *
56 * <p>Since (unlike {@link FutureTask}) this class has no direct
57 * control over the computation that causes it to be completed,
58 * cancellation is treated as just another form of exceptional completion.
59 * Method {@link #cancel cancel} has the same effect as
60 * {@code completeExceptionally(new CancellationException())}.
61 *
62 * <p>Upon exceptional completion (including cancellation), or when a
63 * completion entails an additional computation which terminates
64 * abruptly with an (unchecked) exception or error, then all of their
65 * dependent completions (and their dependents in turn) generally act
66 * as {@code completeExceptionally} with a {@link CompletionException}
67 * holding that exception as its cause. However, the {@link
68 * #exceptionally exceptionally} and {@link #handle handle}
69 * completions <em>are</em> able to handle exceptional completions of
70 * the CompletableFutures they depend on.
71 *
72 * <p>In case of exceptional completion with a CompletionException,
73 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
74 * {@link ExecutionException} with the same cause as held in the
75 * corresponding CompletionException. However, in these cases,
76 * methods {@link #join()} and {@link #getNow} throw the
77 * CompletionException, which simplifies usage.
78 *
79 * <p>Arguments used to pass a completion result (that is, for parameters
80 * of type {@code T}) may be null, but passing a null value for any other
81 * parameter will result in a {@link NullPointerException} being thrown.
82 *
83 * @author Doug Lea
84 */
85 public class CompletableFuture<T> implements Future<T> {
86 // jsr166e nested interfaces
87
88 /** Interface describing a void action of one argument */
89 public interface Action<A> { void accept(A a); }
90 /** Interface describing a void action of two arguments */
91 public interface BiAction<A,B> { void accept(A a, B b); }
92 /** Interface describing a function of one argument */
93 public interface Fun<A,T> { T apply(A a); }
94 /** Interface describing a function of two arguments */
95 public interface BiFun<A,B,T> { T apply(A a, B b); }
96 /** Interface describing a function of no arguments */
97 public interface Generator<T> { T get(); }
98
99
100 /*
101 * Overview:
102 *
103 * 1. Non-nullness of field result (set via CAS) indicates done.
104 * An AltResult is used to box null as a result, as well as to
105 * hold exceptions. Using a single field makes completion fast
106 * and simple to detect and trigger, at the expense of a lot of
107 * encoding and decoding that infiltrates many methods. One minor
108 * simplification relies on the (static) NIL (to box null results)
109 * being the only AltResult with a null exception field, so we
110 * don't usually need explicit comparisons with NIL. The CF
111 * exception propagation mechanics surrounding decoding rely on
112 * unchecked casts of decoded results really being unchecked,
113 * where user type errors are caught at point of use, as is
114 * currently the case in Java. These are highlighted by using
115 * SuppressWarnings-annotated temporaries.
116 *
117 * 2. Waiters are held in a Treiber stack similar to the one used
118 * in FutureTask, Phaser, and SynchronousQueue. See their
119 * internal documentation for algorithmic details.
120 *
121 * 3. Completions are also kept in a list/stack, and pulled off
122 * and run when completion is triggered. (We could even use the
123 * same stack as for waiters, but would give up the potential
124 * parallelism obtained because woken waiters help release/run
125 * others -- see method postComplete). Because post-processing
126 * may race with direct calls, class Completion opportunistically
127 * extends AtomicInteger so callers can claim the action via
128 * compareAndSet(0, 1). The Completion.run methods are all
129 * written a boringly similar uniform way (that sometimes includes
130 * unnecessary-looking checks, kept to maintain uniformity).
131 * There are enough dimensions upon which they differ that
132 * attempts to factor commonalities while maintaining efficiency
133 * require more lines of code than they would save.
134 *
135 * 4. The exported then/and/or methods do support a bit of
136 * factoring (see doThenApply etc). They must cope with the
137 * intrinsic races surrounding addition of a dependent action
138 * versus performing the action directly because the task is
139 * already complete. For example, a CF may not be complete upon
140 * entry, so a dependent completion is added, but by the time it
141 * is added, the target CF is complete, so must be directly
142 * executed. This is all done while avoiding unnecessary object
143 * construction in safe-bypass cases.
144 */
145
146 // preliminaries
147
148 static final class AltResult {
149 final Throwable ex; // null only for NIL
150 AltResult(Throwable ex) { this.ex = ex; }
151 }
152
153 static final AltResult NIL = new AltResult(null);
154
155 // Fields
156
157 volatile Object result; // Either the result or boxed AltResult
158 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
159 volatile CompletionNode completions; // list (Treiber stack) of completions
160
161 // Basic utilities for triggering and processing completions
162
163 /**
164 * Removes and signals all waiting threads and runs all completions.
165 */
166 final void postComplete() {
167 WaitNode q; Thread t;
168 while ((q = waiters) != null) {
169 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
170 (t = q.thread) != null) {
171 q.thread = null;
172 LockSupport.unpark(t);
173 }
174 }
175
176 CompletionNode h; Completion c;
177 while ((h = completions) != null) {
178 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
179 (c = h.completion) != null)
180 c.run();
181 }
182 }
183
184 /**
185 * Triggers completion with the encoding of the given arguments:
186 * if the exception is non-null, encodes it as a wrapped
187 * CompletionException unless it is one already. Otherwise uses
188 * the given result, boxed as NIL if null.
189 */
190 final void internalComplete(T v, Throwable ex) {
191 if (result == null)
192 UNSAFE.compareAndSwapObject
193 (this, RESULT, null,
194 (ex == null) ? (v == null) ? NIL : v :
195 new AltResult((ex instanceof CompletionException) ? ex :
196 new CompletionException(ex)));
197 postComplete(); // help out even if not triggered
198 }
199
200 /**
201 * If triggered, helps release and/or process completions.
202 */
203 final void helpPostComplete() {
204 if (result != null)
205 postComplete();
206 }
207
208 /* ------------- waiting for completions -------------- */
209
210 /** Number of processors, for spin control */
211 static final int NCPU = Runtime.getRuntime().availableProcessors();
212
213 /**
214 * Heuristic spin value for waitingGet() before blocking on
215 * multiprocessors
216 */
217 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
218
219 /**
220 * Linked nodes to record waiting threads in a Treiber stack. See
221 * other classes such as Phaser and SynchronousQueue for more
222 * detailed explanation. This class implements ManagedBlocker to
223 * avoid starvation when blocking actions pile up in
224 * ForkJoinPools.
225 */
226 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
227 long nanos; // wait time if timed
228 final long deadline; // non-zero if timed
229 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
230 volatile Thread thread;
231 volatile WaitNode next;
232 WaitNode(boolean interruptible, long nanos, long deadline) {
233 this.thread = Thread.currentThread();
234 this.interruptControl = interruptible ? 1 : 0;
235 this.nanos = nanos;
236 this.deadline = deadline;
237 }
238 public boolean isReleasable() {
239 if (thread == null)
240 return true;
241 if (Thread.interrupted()) {
242 int i = interruptControl;
243 interruptControl = -1;
244 if (i > 0)
245 return true;
246 }
247 if (deadline != 0L &&
248 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
249 thread = null;
250 return true;
251 }
252 return false;
253 }
254 public boolean block() {
255 if (isReleasable())
256 return true;
257 else if (deadline == 0L)
258 LockSupport.park(this);
259 else if (nanos > 0L)
260 LockSupport.parkNanos(this, nanos);
261 return isReleasable();
262 }
263 }
264
265 /**
266 * Returns raw result after waiting, or null if interruptible and
267 * interrupted.
268 */
269 private Object waitingGet(boolean interruptible) {
270 WaitNode q = null;
271 boolean queued = false;
272 int spins = SPINS;
273 for (Object r;;) {
274 if ((r = result) != null) {
275 if (q != null) { // suppress unpark
276 q.thread = null;
277 if (q.interruptControl < 0) {
278 if (interruptible) {
279 removeWaiter(q);
280 return null;
281 }
282 Thread.currentThread().interrupt();
283 }
284 }
285 postComplete(); // help release others
286 return r;
287 }
288 else if (spins > 0) {
289 int rnd = ThreadLocalRandom.current().nextInt();
290 if (rnd >= 0)
291 --spins;
292 }
293 else if (q == null)
294 q = new WaitNode(interruptible, 0L, 0L);
295 else if (!queued)
296 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
297 q.next = waiters, q);
298 else if (interruptible && q.interruptControl < 0) {
299 removeWaiter(q);
300 return null;
301 }
302 else if (q.thread != null && result == null) {
303 try {
304 ForkJoinPool.managedBlock(q);
305 } catch (InterruptedException ex) {
306 q.interruptControl = -1;
307 }
308 }
309 }
310 }
311
312 /**
313 * Awaits completion or aborts on interrupt or timeout.
314 *
315 * @param nanos time to wait
316 * @return raw result
317 */
318 private Object timedAwaitDone(long nanos)
319 throws InterruptedException, TimeoutException {
320 WaitNode q = null;
321 boolean queued = false;
322 for (Object r;;) {
323 if ((r = result) != null) {
324 if (q != null) {
325 q.thread = null;
326 if (q.interruptControl < 0) {
327 removeWaiter(q);
328 throw new InterruptedException();
329 }
330 }
331 postComplete();
332 return r;
333 }
334 else if (q == null) {
335 if (nanos <= 0L)
336 throw new TimeoutException();
337 long d = System.nanoTime() + nanos;
338 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
339 }
340 else if (!queued)
341 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
342 q.next = waiters, q);
343 else if (q.interruptControl < 0) {
344 removeWaiter(q);
345 throw new InterruptedException();
346 }
347 else if (q.nanos <= 0L) {
348 if (result == null) {
349 removeWaiter(q);
350 throw new TimeoutException();
351 }
352 }
353 else if (q.thread != null && result == null) {
354 try {
355 ForkJoinPool.managedBlock(q);
356 } catch (InterruptedException ex) {
357 q.interruptControl = -1;
358 }
359 }
360 }
361 }
362
363 /**
364 * Tries to unlink a timed-out or interrupted wait node to avoid
365 * accumulating garbage. Internal nodes are simply unspliced
366 * without CAS since it is harmless if they are traversed anyway
367 * by releasers. To avoid effects of unsplicing from already
368 * removed nodes, the list is retraversed in case of an apparent
369 * race. This is slow when there are a lot of nodes, but we don't
370 * expect lists to be long enough to outweigh higher-overhead
371 * schemes.
372 */
373 private void removeWaiter(WaitNode node) {
374 if (node != null) {
375 node.thread = null;
376 retry:
377 for (;;) { // restart on removeWaiter race
378 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
379 s = q.next;
380 if (q.thread != null)
381 pred = q;
382 else if (pred != null) {
383 pred.next = s;
384 if (pred.thread == null) // check for race
385 continue retry;
386 }
387 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
388 continue retry;
389 }
390 break;
391 }
392 }
393 }
394
395 /* ------------- Async tasks -------------- */
396
397 /**
398 * A marker interface identifying asynchronous tasks produced by
399 * {@code async} methods. This may be useful for monitoring,
400 * debugging, and tracking asynchronous activities.
401 *
402 * @since 1.8
403 */
404 public static interface AsynchronousCompletionTask {
405 }
406
407 /** Base class can act as either FJ or plain Runnable */
408 abstract static class Async extends ForkJoinTask<Void>
409 implements Runnable, AsynchronousCompletionTask {
410 public final Void getRawResult() { return null; }
411 public final void setRawResult(Void v) { }
412 public final void run() { exec(); }
413 }
414
415 static final class AsyncRun extends Async {
416 final Runnable fn;
417 final CompletableFuture<Void> dst;
418 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
419 this.fn = fn; this.dst = dst;
420 }
421 public final boolean exec() {
422 CompletableFuture<Void> d; Throwable ex;
423 if ((d = this.dst) != null && d.result == null) {
424 try {
425 fn.run();
426 ex = null;
427 } catch (Throwable rex) {
428 ex = rex;
429 }
430 d.internalComplete(null, ex);
431 }
432 return true;
433 }
434 private static final long serialVersionUID = 5232453952276885070L;
435 }
436
437 static final class AsyncSupply<U> extends Async {
438 final Generator<U> fn;
439 final CompletableFuture<U> dst;
440 AsyncSupply(Generator<U> fn, CompletableFuture<U> dst) {
441 this.fn = fn; this.dst = dst;
442 }
443 public final boolean exec() {
444 CompletableFuture<U> d; U u; Throwable ex;
445 if ((d = this.dst) != null && d.result == null) {
446 try {
447 u = fn.get();
448 ex = null;
449 } catch (Throwable rex) {
450 ex = rex;
451 u = null;
452 }
453 d.internalComplete(u, ex);
454 }
455 return true;
456 }
457 private static final long serialVersionUID = 5232453952276885070L;
458 }
459
460 static final class AsyncApply<T,U> extends Async {
461 final T arg;
462 final Fun<? super T,? extends U> fn;
463 final CompletableFuture<U> dst;
464 AsyncApply(T arg, Fun<? super T,? extends U> fn,
465 CompletableFuture<U> dst) {
466 this.arg = arg; this.fn = fn; this.dst = dst;
467 }
468 public final boolean exec() {
469 CompletableFuture<U> d; U u; Throwable ex;
470 if ((d = this.dst) != null && d.result == null) {
471 try {
472 u = fn.apply(arg);
473 ex = null;
474 } catch (Throwable rex) {
475 ex = rex;
476 u = null;
477 }
478 d.internalComplete(u, ex);
479 }
480 return true;
481 }
482 private static final long serialVersionUID = 5232453952276885070L;
483 }
484
485 static final class AsyncCombine<T,U,V> extends Async {
486 final T arg1;
487 final U arg2;
488 final BiFun<? super T,? super U,? extends V> fn;
489 final CompletableFuture<V> dst;
490 AsyncCombine(T arg1, U arg2,
491 BiFun<? super T,? super U,? extends V> fn,
492 CompletableFuture<V> dst) {
493 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
494 }
495 public final boolean exec() {
496 CompletableFuture<V> d; V v; Throwable ex;
497 if ((d = this.dst) != null && d.result == null) {
498 try {
499 v = fn.apply(arg1, arg2);
500 ex = null;
501 } catch (Throwable rex) {
502 ex = rex;
503 v = null;
504 }
505 d.internalComplete(v, ex);
506 }
507 return true;
508 }
509 private static final long serialVersionUID = 5232453952276885070L;
510 }
511
512 static final class AsyncAccept<T> extends Async {
513 final T arg;
514 final Action<? super T> fn;
515 final CompletableFuture<Void> dst;
516 AsyncAccept(T arg, Action<? super T> fn,
517 CompletableFuture<Void> dst) {
518 this.arg = arg; this.fn = fn; this.dst = dst;
519 }
520 public final boolean exec() {
521 CompletableFuture<Void> d; Throwable ex;
522 if ((d = this.dst) != null && d.result == null) {
523 try {
524 fn.accept(arg);
525 ex = null;
526 } catch (Throwable rex) {
527 ex = rex;
528 }
529 d.internalComplete(null, ex);
530 }
531 return true;
532 }
533 private static final long serialVersionUID = 5232453952276885070L;
534 }
535
536 static final class AsyncAcceptBoth<T,U> extends Async {
537 final T arg1;
538 final U arg2;
539 final BiAction<? super T,? super U> fn;
540 final CompletableFuture<Void> dst;
541 AsyncAcceptBoth(T arg1, U arg2,
542 BiAction<? super T,? super U> fn,
543 CompletableFuture<Void> dst) {
544 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
545 }
546 public final boolean exec() {
547 CompletableFuture<Void> d; Throwable ex;
548 if ((d = this.dst) != null && d.result == null) {
549 try {
550 fn.accept(arg1, arg2);
551 ex = null;
552 } catch (Throwable rex) {
553 ex = rex;
554 }
555 d.internalComplete(null, ex);
556 }
557 return true;
558 }
559 private static final long serialVersionUID = 5232453952276885070L;
560 }
561
562 static final class AsyncCompose<T,U> extends Async {
563 final T arg;
564 final Fun<? super T, CompletableFuture<U>> fn;
565 final CompletableFuture<U> dst;
566 AsyncCompose(T arg,
567 Fun<? super T, CompletableFuture<U>> fn,
568 CompletableFuture<U> dst) {
569 this.arg = arg; this.fn = fn; this.dst = dst;
570 }
571 public final boolean exec() {
572 CompletableFuture<U> d, fr; U u; Throwable ex;
573 if ((d = this.dst) != null && d.result == null) {
574 try {
575 fr = fn.apply(arg);
576 ex = (fr == null) ? new NullPointerException() : null;
577 } catch (Throwable rex) {
578 ex = rex;
579 fr = null;
580 }
581 if (ex != null)
582 u = null;
583 else {
584 Object r = fr.result;
585 if (r == null)
586 r = fr.waitingGet(false);
587 if (r instanceof AltResult) {
588 ex = ((AltResult)r).ex;
589 u = null;
590 }
591 else {
592 @SuppressWarnings("unchecked") U ur = (U) r;
593 u = ur;
594 }
595 }
596 d.internalComplete(u, ex);
597 }
598 return true;
599 }
600 private static final long serialVersionUID = 5232453952276885070L;
601 }
602
603 /* ------------- Completions -------------- */
604
605 /**
606 * Simple linked list nodes to record completions, used in
607 * basically the same way as WaitNodes. (We separate nodes from
608 * the Completions themselves mainly because for the And and Or
609 * methods, the same Completion object resides in two lists.)
610 */
611 static final class CompletionNode {
612 final Completion completion;
613 volatile CompletionNode next;
614 CompletionNode(Completion completion) { this.completion = completion; }
615 }
616
617 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
618 abstract static class Completion extends AtomicInteger implements Runnable {
619 }
620
621 static final class ThenApply<T,U> extends Completion {
622 final CompletableFuture<? extends T> src;
623 final Fun<? super T,? extends U> fn;
624 final CompletableFuture<U> dst;
625 final Executor executor;
626 ThenApply(CompletableFuture<? extends T> src,
627 Fun<? super T,? extends U> fn,
628 CompletableFuture<U> dst,
629 Executor executor) {
630 this.src = src; this.fn = fn; this.dst = dst;
631 this.executor = executor;
632 }
633 public final void run() {
634 final CompletableFuture<? extends T> a;
635 final Fun<? super T,? extends U> fn;
636 final CompletableFuture<U> dst;
637 Object r; T t; Throwable ex;
638 if ((dst = this.dst) != null &&
639 (fn = this.fn) != null &&
640 (a = this.src) != null &&
641 (r = a.result) != null &&
642 compareAndSet(0, 1)) {
643 if (r instanceof AltResult) {
644 ex = ((AltResult)r).ex;
645 t = null;
646 }
647 else {
648 ex = null;
649 @SuppressWarnings("unchecked") T tr = (T) r;
650 t = tr;
651 }
652 Executor e = executor;
653 U u = null;
654 if (ex == null) {
655 try {
656 if (e != null)
657 e.execute(new AsyncApply<T,U>(t, fn, dst));
658 else
659 u = fn.apply(t);
660 } catch (Throwable rex) {
661 ex = rex;
662 }
663 }
664 if (e == null || ex != null)
665 dst.internalComplete(u, ex);
666 }
667 }
668 private static final long serialVersionUID = 5232453952276885070L;
669 }
670
671 static final class ThenAccept<T> extends Completion {
672 final CompletableFuture<? extends T> src;
673 final Action<? super T> fn;
674 final CompletableFuture<Void> dst;
675 final Executor executor;
676 ThenAccept(CompletableFuture<? extends T> src,
677 Action<? super T> fn,
678 CompletableFuture<Void> dst,
679 Executor executor) {
680 this.src = src; this.fn = fn; this.dst = dst;
681 this.executor = executor;
682 }
683 public final void run() {
684 final CompletableFuture<? extends T> a;
685 final Action<? super T> fn;
686 final CompletableFuture<Void> dst;
687 Object r; T t; Throwable ex;
688 if ((dst = this.dst) != null &&
689 (fn = this.fn) != null &&
690 (a = this.src) != null &&
691 (r = a.result) != null &&
692 compareAndSet(0, 1)) {
693 if (r instanceof AltResult) {
694 ex = ((AltResult)r).ex;
695 t = null;
696 }
697 else {
698 ex = null;
699 @SuppressWarnings("unchecked") T tr = (T) r;
700 t = tr;
701 }
702 Executor e = executor;
703 if (ex == null) {
704 try {
705 if (e != null)
706 e.execute(new AsyncAccept<T>(t, fn, dst));
707 else
708 fn.accept(t);
709 } catch (Throwable rex) {
710 ex = rex;
711 }
712 }
713 if (e == null || ex != null)
714 dst.internalComplete(null, ex);
715 }
716 }
717 private static final long serialVersionUID = 5232453952276885070L;
718 }
719
720 static final class ThenRun extends Completion {
721 final CompletableFuture<?> src;
722 final Runnable fn;
723 final CompletableFuture<Void> dst;
724 final Executor executor;
725 ThenRun(CompletableFuture<?> src,
726 Runnable fn,
727 CompletableFuture<Void> dst,
728 Executor executor) {
729 this.src = src; this.fn = fn; this.dst = dst;
730 this.executor = executor;
731 }
732 public final void run() {
733 final CompletableFuture<?> a;
734 final Runnable fn;
735 final CompletableFuture<Void> dst;
736 Object r; Throwable ex;
737 if ((dst = this.dst) != null &&
738 (fn = this.fn) != null &&
739 (a = this.src) != null &&
740 (r = a.result) != null &&
741 compareAndSet(0, 1)) {
742 if (r instanceof AltResult)
743 ex = ((AltResult)r).ex;
744 else
745 ex = null;
746 Executor e = executor;
747 if (ex == null) {
748 try {
749 if (e != null)
750 e.execute(new AsyncRun(fn, dst));
751 else
752 fn.run();
753 } catch (Throwable rex) {
754 ex = rex;
755 }
756 }
757 if (e == null || ex != null)
758 dst.internalComplete(null, ex);
759 }
760 }
761 private static final long serialVersionUID = 5232453952276885070L;
762 }
763
764 static final class ThenCombine<T,U,V> extends Completion {
765 final CompletableFuture<? extends T> src;
766 final CompletableFuture<? extends U> snd;
767 final BiFun<? super T,? super U,? extends V> fn;
768 final CompletableFuture<V> dst;
769 final Executor executor;
770 ThenCombine(CompletableFuture<? extends T> src,
771 CompletableFuture<? extends U> snd,
772 BiFun<? super T,? super U,? extends V> fn,
773 CompletableFuture<V> dst,
774 Executor executor) {
775 this.src = src; this.snd = snd;
776 this.fn = fn; this.dst = dst;
777 this.executor = executor;
778 }
779 public final void run() {
780 final CompletableFuture<? extends T> a;
781 final CompletableFuture<? extends U> b;
782 final BiFun<? super T,? super U,? extends V> fn;
783 final CompletableFuture<V> dst;
784 Object r, s; T t; U u; Throwable ex;
785 if ((dst = this.dst) != null &&
786 (fn = this.fn) != null &&
787 (a = this.src) != null &&
788 (r = a.result) != null &&
789 (b = this.snd) != null &&
790 (s = b.result) != null &&
791 compareAndSet(0, 1)) {
792 if (r instanceof AltResult) {
793 ex = ((AltResult)r).ex;
794 t = null;
795 }
796 else {
797 ex = null;
798 @SuppressWarnings("unchecked") T tr = (T) r;
799 t = tr;
800 }
801 if (ex != null)
802 u = null;
803 else if (s instanceof AltResult) {
804 ex = ((AltResult)s).ex;
805 u = null;
806 }
807 else {
808 @SuppressWarnings("unchecked") U us = (U) s;
809 u = us;
810 }
811 Executor e = executor;
812 V v = null;
813 if (ex == null) {
814 try {
815 if (e != null)
816 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
817 else
818 v = fn.apply(t, u);
819 } catch (Throwable rex) {
820 ex = rex;
821 }
822 }
823 if (e == null || ex != null)
824 dst.internalComplete(v, ex);
825 }
826 }
827 private static final long serialVersionUID = 5232453952276885070L;
828 }
829
830 static final class ThenAcceptBoth<T,U> extends Completion {
831 final CompletableFuture<? extends T> src;
832 final CompletableFuture<? extends U> snd;
833 final BiAction<? super T,? super U> fn;
834 final CompletableFuture<Void> dst;
835 final Executor executor;
836 ThenAcceptBoth(CompletableFuture<? extends T> src,
837 CompletableFuture<? extends U> snd,
838 BiAction<? super T,? super U> fn,
839 CompletableFuture<Void> dst,
840 Executor executor) {
841 this.src = src; this.snd = snd;
842 this.fn = fn; this.dst = dst;
843 this.executor = executor;
844 }
845 public final void run() {
846 final CompletableFuture<? extends T> a;
847 final CompletableFuture<? extends U> b;
848 final BiAction<? super T,? super U> fn;
849 final CompletableFuture<Void> dst;
850 Object r, s; T t; U u; Throwable ex;
851 if ((dst = this.dst) != null &&
852 (fn = this.fn) != null &&
853 (a = this.src) != null &&
854 (r = a.result) != null &&
855 (b = this.snd) != null &&
856 (s = b.result) != null &&
857 compareAndSet(0, 1)) {
858 if (r instanceof AltResult) {
859 ex = ((AltResult)r).ex;
860 t = null;
861 }
862 else {
863 ex = null;
864 @SuppressWarnings("unchecked") T tr = (T) r;
865 t = tr;
866 }
867 if (ex != null)
868 u = null;
869 else if (s instanceof AltResult) {
870 ex = ((AltResult)s).ex;
871 u = null;
872 }
873 else {
874 @SuppressWarnings("unchecked") U us = (U) s;
875 u = us;
876 }
877 Executor e = executor;
878 if (ex == null) {
879 try {
880 if (e != null)
881 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
882 else
883 fn.accept(t, u);
884 } catch (Throwable rex) {
885 ex = rex;
886 }
887 }
888 if (e == null || ex != null)
889 dst.internalComplete(null, ex);
890 }
891 }
892 private static final long serialVersionUID = 5232453952276885070L;
893 }
894
895 static final class RunAfterBoth extends Completion {
896 final CompletableFuture<?> src;
897 final CompletableFuture<?> snd;
898 final Runnable fn;
899 final CompletableFuture<Void> dst;
900 final Executor executor;
901 RunAfterBoth(CompletableFuture<?> src,
902 CompletableFuture<?> snd,
903 Runnable fn,
904 CompletableFuture<Void> dst,
905 Executor executor) {
906 this.src = src; this.snd = snd;
907 this.fn = fn; this.dst = dst;
908 this.executor = executor;
909 }
910 public final void run() {
911 final CompletableFuture<?> a;
912 final CompletableFuture<?> b;
913 final Runnable fn;
914 final CompletableFuture<Void> dst;
915 Object r, s; Throwable ex;
916 if ((dst = this.dst) != null &&
917 (fn = this.fn) != null &&
918 (a = this.src) != null &&
919 (r = a.result) != null &&
920 (b = this.snd) != null &&
921 (s = b.result) != null &&
922 compareAndSet(0, 1)) {
923 if (r instanceof AltResult)
924 ex = ((AltResult)r).ex;
925 else
926 ex = null;
927 if (ex == null && (s instanceof AltResult))
928 ex = ((AltResult)s).ex;
929 Executor e = executor;
930 if (ex == null) {
931 try {
932 if (e != null)
933 e.execute(new AsyncRun(fn, dst));
934 else
935 fn.run();
936 } catch (Throwable rex) {
937 ex = rex;
938 }
939 }
940 if (e == null || ex != null)
941 dst.internalComplete(null, ex);
942 }
943 }
944 private static final long serialVersionUID = 5232453952276885070L;
945 }
946
947 static final class AndCompletion extends Completion {
948 final CompletableFuture<?> src;
949 final CompletableFuture<?> snd;
950 final CompletableFuture<Void> dst;
951 AndCompletion(CompletableFuture<?> src,
952 CompletableFuture<?> snd,
953 CompletableFuture<Void> dst) {
954 this.src = src; this.snd = snd; this.dst = dst;
955 }
956 public final void run() {
957 final CompletableFuture<?> a;
958 final CompletableFuture<?> b;
959 final CompletableFuture<Void> dst;
960 Object r, s; Throwable ex;
961 if ((dst = this.dst) != null &&
962 (a = this.src) != null &&
963 (r = a.result) != null &&
964 (b = this.snd) != null &&
965 (s = b.result) != null &&
966 compareAndSet(0, 1)) {
967 if (r instanceof AltResult)
968 ex = ((AltResult)r).ex;
969 else
970 ex = null;
971 if (ex == null && (s instanceof AltResult))
972 ex = ((AltResult)s).ex;
973 dst.internalComplete(null, ex);
974 }
975 }
976 private static final long serialVersionUID = 5232453952276885070L;
977 }
978
979 static final class ApplyToEither<T,U> extends Completion {
980 final CompletableFuture<? extends T> src;
981 final CompletableFuture<? extends T> snd;
982 final Fun<? super T,? extends U> fn;
983 final CompletableFuture<U> dst;
984 final Executor executor;
985 ApplyToEither(CompletableFuture<? extends T> src,
986 CompletableFuture<? extends T> snd,
987 Fun<? super T,? extends U> fn,
988 CompletableFuture<U> dst,
989 Executor executor) {
990 this.src = src; this.snd = snd;
991 this.fn = fn; this.dst = dst;
992 this.executor = executor;
993 }
994 public final void run() {
995 final CompletableFuture<? extends T> a;
996 final CompletableFuture<? extends T> b;
997 final Fun<? super T,? extends U> fn;
998 final CompletableFuture<U> dst;
999 Object r; T t; Throwable ex;
1000 if ((dst = this.dst) != null &&
1001 (fn = this.fn) != null &&
1002 (((a = this.src) != null && (r = a.result) != null) ||
1003 ((b = this.snd) != null && (r = b.result) != null)) &&
1004 compareAndSet(0, 1)) {
1005 if (r instanceof AltResult) {
1006 ex = ((AltResult)r).ex;
1007 t = null;
1008 }
1009 else {
1010 ex = null;
1011 @SuppressWarnings("unchecked") T tr = (T) r;
1012 t = tr;
1013 }
1014 Executor e = executor;
1015 U u = null;
1016 if (ex == null) {
1017 try {
1018 if (e != null)
1019 e.execute(new AsyncApply<T,U>(t, fn, dst));
1020 else
1021 u = fn.apply(t);
1022 } catch (Throwable rex) {
1023 ex = rex;
1024 }
1025 }
1026 if (e == null || ex != null)
1027 dst.internalComplete(u, ex);
1028 }
1029 }
1030 private static final long serialVersionUID = 5232453952276885070L;
1031 }
1032
1033 static final class AcceptEither<T> extends Completion {
1034 final CompletableFuture<? extends T> src;
1035 final CompletableFuture<? extends T> snd;
1036 final Action<? super T> fn;
1037 final CompletableFuture<Void> dst;
1038 final Executor executor;
1039 AcceptEither(CompletableFuture<? extends T> src,
1040 CompletableFuture<? extends T> snd,
1041 Action<? super T> fn,
1042 CompletableFuture<Void> dst,
1043 Executor executor) {
1044 this.src = src; this.snd = snd;
1045 this.fn = fn; this.dst = dst;
1046 this.executor = executor;
1047 }
1048 public final void run() {
1049 final CompletableFuture<? extends T> a;
1050 final CompletableFuture<? extends T> b;
1051 final Action<? super T> fn;
1052 final CompletableFuture<Void> dst;
1053 Object r; T t; Throwable ex;
1054 if ((dst = this.dst) != null &&
1055 (fn = this.fn) != null &&
1056 (((a = this.src) != null && (r = a.result) != null) ||
1057 ((b = this.snd) != null && (r = b.result) != null)) &&
1058 compareAndSet(0, 1)) {
1059 if (r instanceof AltResult) {
1060 ex = ((AltResult)r).ex;
1061 t = null;
1062 }
1063 else {
1064 ex = null;
1065 @SuppressWarnings("unchecked") T tr = (T) r;
1066 t = tr;
1067 }
1068 Executor e = executor;
1069 if (ex == null) {
1070 try {
1071 if (e != null)
1072 e.execute(new AsyncAccept<T>(t, fn, dst));
1073 else
1074 fn.accept(t);
1075 } catch (Throwable rex) {
1076 ex = rex;
1077 }
1078 }
1079 if (e == null || ex != null)
1080 dst.internalComplete(null, ex);
1081 }
1082 }
1083 private static final long serialVersionUID = 5232453952276885070L;
1084 }
1085
1086 static final class RunAfterEither extends Completion {
1087 final CompletableFuture<?> src;
1088 final CompletableFuture<?> snd;
1089 final Runnable fn;
1090 final CompletableFuture<Void> dst;
1091 final Executor executor;
1092 RunAfterEither(CompletableFuture<?> src,
1093 CompletableFuture<?> snd,
1094 Runnable fn,
1095 CompletableFuture<Void> dst,
1096 Executor executor) {
1097 this.src = src; this.snd = snd;
1098 this.fn = fn; this.dst = dst;
1099 this.executor = executor;
1100 }
1101 public final void run() {
1102 final CompletableFuture<?> a;
1103 final CompletableFuture<?> b;
1104 final Runnable fn;
1105 final CompletableFuture<Void> dst;
1106 Object r; Throwable ex;
1107 if ((dst = this.dst) != null &&
1108 (fn = this.fn) != null &&
1109 (((a = this.src) != null && (r = a.result) != null) ||
1110 ((b = this.snd) != null && (r = b.result) != null)) &&
1111 compareAndSet(0, 1)) {
1112 if (r instanceof AltResult)
1113 ex = ((AltResult)r).ex;
1114 else
1115 ex = null;
1116 Executor e = executor;
1117 if (ex == null) {
1118 try {
1119 if (e != null)
1120 e.execute(new AsyncRun(fn, dst));
1121 else
1122 fn.run();
1123 } catch (Throwable rex) {
1124 ex = rex;
1125 }
1126 }
1127 if (e == null || ex != null)
1128 dst.internalComplete(null, ex);
1129 }
1130 }
1131 private static final long serialVersionUID = 5232453952276885070L;
1132 }
1133
1134 static final class OrCompletion extends Completion {
1135 final CompletableFuture<?> src;
1136 final CompletableFuture<?> snd;
1137 final CompletableFuture<Object> dst;
1138 OrCompletion(CompletableFuture<?> src,
1139 CompletableFuture<?> snd,
1140 CompletableFuture<Object> dst) {
1141 this.src = src; this.snd = snd; this.dst = dst;
1142 }
1143 public final void run() {
1144 final CompletableFuture<?> a;
1145 final CompletableFuture<?> b;
1146 final CompletableFuture<Object> dst;
1147 Object r, t; Throwable ex;
1148 if ((dst = this.dst) != null &&
1149 (((a = this.src) != null && (r = a.result) != null) ||
1150 ((b = this.snd) != null && (r = b.result) != null)) &&
1151 compareAndSet(0, 1)) {
1152 if (r instanceof AltResult) {
1153 ex = ((AltResult)r).ex;
1154 t = null;
1155 }
1156 else {
1157 ex = null;
1158 t = r;
1159 }
1160 dst.internalComplete(t, ex);
1161 }
1162 }
1163 private static final long serialVersionUID = 5232453952276885070L;
1164 }
1165
1166 static final class ExceptionCompletion<T> extends Completion {
1167 final CompletableFuture<? extends T> src;
1168 final Fun<? super Throwable, ? extends T> fn;
1169 final CompletableFuture<T> dst;
1170 ExceptionCompletion(CompletableFuture<? extends T> src,
1171 Fun<? super Throwable, ? extends T> fn,
1172 CompletableFuture<T> dst) {
1173 this.src = src; this.fn = fn; this.dst = dst;
1174 }
1175 public final void run() {
1176 final CompletableFuture<? extends T> a;
1177 final Fun<? super Throwable, ? extends T> fn;
1178 final CompletableFuture<T> dst;
1179 Object r; T t = null; Throwable ex, dx = null;
1180 if ((dst = this.dst) != null &&
1181 (fn = this.fn) != null &&
1182 (a = this.src) != null &&
1183 (r = a.result) != null &&
1184 compareAndSet(0, 1)) {
1185 if ((r instanceof AltResult) &&
1186 (ex = ((AltResult)r).ex) != null) {
1187 try {
1188 t = fn.apply(ex);
1189 } catch (Throwable rex) {
1190 dx = rex;
1191 }
1192 }
1193 else {
1194 @SuppressWarnings("unchecked") T tr = (T) r;
1195 t = tr;
1196 }
1197 dst.internalComplete(t, dx);
1198 }
1199 }
1200 private static final long serialVersionUID = 5232453952276885070L;
1201 }
1202
1203 static final class ThenCopy<T> extends Completion {
1204 final CompletableFuture<?> src;
1205 final CompletableFuture<T> dst;
1206 ThenCopy(CompletableFuture<?> src,
1207 CompletableFuture<T> dst) {
1208 this.src = src; this.dst = dst;
1209 }
1210 public final void run() {
1211 final CompletableFuture<?> a;
1212 final CompletableFuture<T> dst;
1213 Object r; T t; Throwable ex;
1214 if ((dst = this.dst) != null &&
1215 (a = this.src) != null &&
1216 (r = a.result) != null &&
1217 compareAndSet(0, 1)) {
1218 if (r instanceof AltResult) {
1219 ex = ((AltResult)r).ex;
1220 t = null;
1221 }
1222 else {
1223 ex = null;
1224 @SuppressWarnings("unchecked") T tr = (T) r;
1225 t = tr;
1226 }
1227 dst.internalComplete(t, ex);
1228 }
1229 }
1230 private static final long serialVersionUID = 5232453952276885070L;
1231 }
1232
1233 // version of ThenCopy for CompletableFuture<Void> dst
1234 static final class ThenPropagate extends Completion {
1235 final CompletableFuture<?> src;
1236 final CompletableFuture<Void> dst;
1237 ThenPropagate(CompletableFuture<?> src,
1238 CompletableFuture<Void> dst) {
1239 this.src = src; this.dst = dst;
1240 }
1241 public final void run() {
1242 final CompletableFuture<?> a;
1243 final CompletableFuture<Void> dst;
1244 Object r; Throwable ex;
1245 if ((dst = this.dst) != null &&
1246 (a = this.src) != null &&
1247 (r = a.result) != null &&
1248 compareAndSet(0, 1)) {
1249 if (r instanceof AltResult)
1250 ex = ((AltResult)r).ex;
1251 else
1252 ex = null;
1253 dst.internalComplete(null, ex);
1254 }
1255 }
1256 private static final long serialVersionUID = 5232453952276885070L;
1257 }
1258
1259 static final class HandleCompletion<T,U> extends Completion {
1260 final CompletableFuture<? extends T> src;
1261 final BiFun<? super T, Throwable, ? extends U> fn;
1262 final CompletableFuture<U> dst;
1263 HandleCompletion(CompletableFuture<? extends T> src,
1264 BiFun<? super T, Throwable, ? extends U> fn,
1265 CompletableFuture<U> dst) {
1266 this.src = src; this.fn = fn; this.dst = dst;
1267 }
1268 public final void run() {
1269 final CompletableFuture<? extends T> a;
1270 final BiFun<? super T, Throwable, ? extends U> fn;
1271 final CompletableFuture<U> dst;
1272 Object r; T t; Throwable ex;
1273 if ((dst = this.dst) != null &&
1274 (fn = this.fn) != null &&
1275 (a = this.src) != null &&
1276 (r = a.result) != null &&
1277 compareAndSet(0, 1)) {
1278 if (r instanceof AltResult) {
1279 ex = ((AltResult)r).ex;
1280 t = null;
1281 }
1282 else {
1283 ex = null;
1284 @SuppressWarnings("unchecked") T tr = (T) r;
1285 t = tr;
1286 }
1287 U u = null; Throwable dx = null;
1288 try {
1289 u = fn.apply(t, ex);
1290 } catch (Throwable rex) {
1291 dx = rex;
1292 }
1293 dst.internalComplete(u, dx);
1294 }
1295 }
1296 private static final long serialVersionUID = 5232453952276885070L;
1297 }
1298
1299 static final class ThenCompose<T,U> extends Completion {
1300 final CompletableFuture<? extends T> src;
1301 final Fun<? super T, CompletableFuture<U>> fn;
1302 final CompletableFuture<U> dst;
1303 final Executor executor;
1304 ThenCompose(CompletableFuture<? extends T> src,
1305 Fun<? super T, CompletableFuture<U>> fn,
1306 CompletableFuture<U> dst,
1307 Executor executor) {
1308 this.src = src; this.fn = fn; this.dst = dst;
1309 this.executor = executor;
1310 }
1311 public final void run() {
1312 final CompletableFuture<? extends T> a;
1313 final Fun<? super T, CompletableFuture<U>> fn;
1314 final CompletableFuture<U> dst;
1315 Object r; T t; Throwable ex; Executor e;
1316 if ((dst = this.dst) != null &&
1317 (fn = this.fn) != null &&
1318 (a = this.src) != null &&
1319 (r = a.result) != null &&
1320 compareAndSet(0, 1)) {
1321 if (r instanceof AltResult) {
1322 ex = ((AltResult)r).ex;
1323 t = null;
1324 }
1325 else {
1326 ex = null;
1327 @SuppressWarnings("unchecked") T tr = (T) r;
1328 t = tr;
1329 }
1330 CompletableFuture<U> c = null;
1331 U u = null;
1332 boolean complete = false;
1333 if (ex == null) {
1334 if ((e = executor) != null)
1335 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1336 else {
1337 try {
1338 if ((c = fn.apply(t)) == null)
1339 ex = new NullPointerException();
1340 } catch (Throwable rex) {
1341 ex = rex;
1342 }
1343 }
1344 }
1345 if (c != null) {
1346 ThenCopy<U> d = null;
1347 Object s;
1348 if ((s = c.result) == null) {
1349 CompletionNode p = new CompletionNode
1350 (d = new ThenCopy<U>(c, dst));
1351 while ((s = c.result) == null) {
1352 if (UNSAFE.compareAndSwapObject
1353 (c, COMPLETIONS, p.next = c.completions, p))
1354 break;
1355 }
1356 }
1357 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1358 complete = true;
1359 if (s instanceof AltResult) {
1360 ex = ((AltResult)s).ex; // no rewrap
1361 u = null;
1362 }
1363 else {
1364 @SuppressWarnings("unchecked") U us = (U) s;
1365 u = us;
1366 }
1367 }
1368 }
1369 if (complete || ex != null)
1370 dst.internalComplete(u, ex);
1371 if (c != null)
1372 c.helpPostComplete();
1373 }
1374 }
1375 private static final long serialVersionUID = 5232453952276885070L;
1376 }
1377
1378 // public methods
1379
1380 /**
1381 * Creates a new incomplete CompletableFuture.
1382 */
1383 public CompletableFuture() {
1384 }
1385
1386 /**
1387 * Returns a new CompletableFuture that is asynchronously completed
1388 * by a task running in the {@link ForkJoinPool#commonPool()} with
1389 * the value obtained by calling the given Generator.
1390 *
1391 * @param supplier a function returning the value to be used
1392 * to complete the returned CompletableFuture
1393 * @return the new CompletableFuture
1394 */
1395 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier) {
1396 if (supplier == null) throw new NullPointerException();
1397 CompletableFuture<U> f = new CompletableFuture<U>();
1398 ForkJoinPool.commonPool().
1399 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1400 return f;
1401 }
1402
1403 /**
1404 * Returns a new CompletableFuture that is asynchronously completed
1405 * by a task running in the given executor with the value obtained
1406 * by calling the given Generator.
1407 *
1408 * @param supplier a function returning the value to be used
1409 * to complete the returned CompletableFuture
1410 * @param executor the executor to use for asynchronous execution
1411 * @return the new CompletableFuture
1412 */
1413 public static <U> CompletableFuture<U> supplyAsync(Generator<U> supplier,
1414 Executor executor) {
1415 if (executor == null || supplier == null)
1416 throw new NullPointerException();
1417 CompletableFuture<U> f = new CompletableFuture<U>();
1418 executor.execute(new AsyncSupply<U>(supplier, f));
1419 return f;
1420 }
1421
1422 /**
1423 * Returns a new CompletableFuture that is asynchronously completed
1424 * by a task running in the {@link ForkJoinPool#commonPool()} after
1425 * it runs the given action.
1426 *
1427 * @param runnable the action to run before completing the
1428 * returned CompletableFuture
1429 * @return the new CompletableFuture
1430 */
1431 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1432 if (runnable == null) throw new NullPointerException();
1433 CompletableFuture<Void> f = new CompletableFuture<Void>();
1434 ForkJoinPool.commonPool().
1435 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1436 return f;
1437 }
1438
1439 /**
1440 * Returns a new CompletableFuture that is asynchronously completed
1441 * by a task running in the given executor after it runs the given
1442 * action.
1443 *
1444 * @param runnable the action to run before completing the
1445 * returned CompletableFuture
1446 * @param executor the executor to use for asynchronous execution
1447 * @return the new CompletableFuture
1448 */
1449 public static CompletableFuture<Void> runAsync(Runnable runnable,
1450 Executor executor) {
1451 if (executor == null || runnable == null)
1452 throw new NullPointerException();
1453 CompletableFuture<Void> f = new CompletableFuture<Void>();
1454 executor.execute(new AsyncRun(runnable, f));
1455 return f;
1456 }
1457
1458 /**
1459 * Returns a new CompletableFuture that is already completed with
1460 * the given value.
1461 *
1462 * @param value the value
1463 * @return the completed CompletableFuture
1464 */
1465 public static <U> CompletableFuture<U> completedFuture(U value) {
1466 CompletableFuture<U> f = new CompletableFuture<U>();
1467 f.result = (value == null) ? NIL : value;
1468 return f;
1469 }
1470
1471 /**
1472 * Returns {@code true} if completed in any fashion: normally,
1473 * exceptionally, or via cancellation.
1474 *
1475 * @return {@code true} if completed
1476 */
1477 public boolean isDone() {
1478 return result != null;
1479 }
1480
1481 /**
1482 * Waits if necessary for this future to complete, and then
1483 * returns its result.
1484 *
1485 * @return the result value
1486 * @throws CancellationException if this future was cancelled
1487 * @throws ExecutionException if this future completed exceptionally
1488 * @throws InterruptedException if the current thread was interrupted
1489 * while waiting
1490 */
1491 public T get() throws InterruptedException, ExecutionException {
1492 Object r; Throwable ex, cause;
1493 if ((r = result) == null && (r = waitingGet(true)) == null)
1494 throw new InterruptedException();
1495 if (!(r instanceof AltResult)) {
1496 @SuppressWarnings("unchecked") T tr = (T) r;
1497 return tr;
1498 }
1499 if ((ex = ((AltResult)r).ex) == null)
1500 return null;
1501 if (ex instanceof CancellationException)
1502 throw (CancellationException)ex;
1503 if ((ex instanceof CompletionException) &&
1504 (cause = ex.getCause()) != null)
1505 ex = cause;
1506 throw new ExecutionException(ex);
1507 }
1508
1509 /**
1510 * Waits if necessary for at most the given time for this future
1511 * to complete, and then returns its result, if available.
1512 *
1513 * @param timeout the maximum time to wait
1514 * @param unit the time unit of the timeout argument
1515 * @return the result value
1516 * @throws CancellationException if this future was cancelled
1517 * @throws ExecutionException if this future completed exceptionally
1518 * @throws InterruptedException if the current thread was interrupted
1519 * while waiting
1520 * @throws TimeoutException if the wait timed out
1521 */
1522 public T get(long timeout, TimeUnit unit)
1523 throws InterruptedException, ExecutionException, TimeoutException {
1524 Object r; Throwable ex, cause;
1525 long nanos = unit.toNanos(timeout);
1526 if (Thread.interrupted())
1527 throw new InterruptedException();
1528 if ((r = result) == null)
1529 r = timedAwaitDone(nanos);
1530 if (!(r instanceof AltResult)) {
1531 @SuppressWarnings("unchecked") T tr = (T) r;
1532 return tr;
1533 }
1534 if ((ex = ((AltResult)r).ex) == null)
1535 return null;
1536 if (ex instanceof CancellationException)
1537 throw (CancellationException)ex;
1538 if ((ex instanceof CompletionException) &&
1539 (cause = ex.getCause()) != null)
1540 ex = cause;
1541 throw new ExecutionException(ex);
1542 }
1543
1544 /**
1545 * Returns the result value when complete, or throws an
1546 * (unchecked) exception if completed exceptionally. To better
1547 * conform with the use of common functional forms, if a
1548 * computation involved in the completion of this
1549 * CompletableFuture threw an exception, this method throws an
1550 * (unchecked) {@link CompletionException} with the underlying
1551 * exception as its cause.
1552 *
1553 * @return the result value
1554 * @throws CancellationException if the computation was cancelled
1555 * @throws CompletionException if this future completed
1556 * exceptionally or a completion computation threw an exception
1557 */
1558 public T join() {
1559 Object r; Throwable ex;
1560 if ((r = result) == null)
1561 r = waitingGet(false);
1562 if (!(r instanceof AltResult)) {
1563 @SuppressWarnings("unchecked") T tr = (T) r;
1564 return tr;
1565 }
1566 if ((ex = ((AltResult)r).ex) == null)
1567 return null;
1568 if (ex instanceof CancellationException)
1569 throw (CancellationException)ex;
1570 if (ex instanceof CompletionException)
1571 throw (CompletionException)ex;
1572 throw new CompletionException(ex);
1573 }
1574
1575 /**
1576 * Returns the result value (or throws any encountered exception)
1577 * if completed, else returns the given valueIfAbsent.
1578 *
1579 * @param valueIfAbsent the value to return if not completed
1580 * @return the result value, if completed, else the given valueIfAbsent
1581 * @throws CancellationException if the computation was cancelled
1582 * @throws CompletionException if this future completed
1583 * exceptionally or a completion computation threw an exception
1584 */
1585 public T getNow(T valueIfAbsent) {
1586 Object r; Throwable ex;
1587 if ((r = result) == null)
1588 return valueIfAbsent;
1589 if (!(r instanceof AltResult)) {
1590 @SuppressWarnings("unchecked") T tr = (T) r;
1591 return tr;
1592 }
1593 if ((ex = ((AltResult)r).ex) == null)
1594 return null;
1595 if (ex instanceof CancellationException)
1596 throw (CancellationException)ex;
1597 if (ex instanceof CompletionException)
1598 throw (CompletionException)ex;
1599 throw new CompletionException(ex);
1600 }
1601
1602 /**
1603 * If not already completed, sets the value returned by {@link
1604 * #get()} and related methods to the given value.
1605 *
1606 * @param value the result value
1607 * @return {@code true} if this invocation caused this CompletableFuture
1608 * to transition to a completed state, else {@code false}
1609 */
1610 public boolean complete(T value) {
1611 boolean triggered = result == null &&
1612 UNSAFE.compareAndSwapObject(this, RESULT, null,
1613 value == null ? NIL : value);
1614 postComplete();
1615 return triggered;
1616 }
1617
1618 /**
1619 * If not already completed, causes invocations of {@link #get()}
1620 * and related methods to throw the given exception.
1621 *
1622 * @param ex the exception
1623 * @return {@code true} if this invocation caused this CompletableFuture
1624 * to transition to a completed state, else {@code false}
1625 */
1626 public boolean completeExceptionally(Throwable ex) {
1627 if (ex == null) throw new NullPointerException();
1628 boolean triggered = result == null &&
1629 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1630 postComplete();
1631 return triggered;
1632 }
1633
1634 /**
1635 * Returns a new CompletableFuture that is completed
1636 * when this CompletableFuture completes, with the result of the
1637 * given function of this CompletableFuture's result.
1638 *
1639 * <p>If this CompletableFuture completes exceptionally, or the
1640 * supplied function throws an exception, then the returned
1641 * CompletableFuture completes exceptionally with a
1642 * CompletionException holding the exception as its cause.
1643 *
1644 * @param fn the function to use to compute the value of
1645 * the returned CompletableFuture
1646 * @return the new CompletableFuture
1647 */
1648 public <U> CompletableFuture<U> thenApply(Fun<? super T,? extends U> fn) {
1649 return doThenApply(fn, null);
1650 }
1651
1652 /**
1653 * Returns a new CompletableFuture that is asynchronously completed
1654 * when this CompletableFuture completes, with the result of the
1655 * given function of this CompletableFuture's result from a
1656 * task running in the {@link ForkJoinPool#commonPool()}.
1657 *
1658 * <p>If this CompletableFuture completes exceptionally, or the
1659 * supplied function throws an exception, then the returned
1660 * CompletableFuture completes exceptionally with a
1661 * CompletionException holding the exception as its cause.
1662 *
1663 * @param fn the function to use to compute the value of
1664 * the returned CompletableFuture
1665 * @return the new CompletableFuture
1666 */
1667 public <U> CompletableFuture<U> thenApplyAsync
1668 (Fun<? super T,? extends U> fn) {
1669 return doThenApply(fn, ForkJoinPool.commonPool());
1670 }
1671
1672 /**
1673 * Returns a new CompletableFuture that is asynchronously completed
1674 * when this CompletableFuture completes, with the result of the
1675 * given function of this CompletableFuture's result from a
1676 * task running in the given executor.
1677 *
1678 * <p>If this CompletableFuture completes exceptionally, or the
1679 * supplied function throws an exception, then the returned
1680 * CompletableFuture completes exceptionally with a
1681 * CompletionException holding the exception as its cause.
1682 *
1683 * @param fn the function to use to compute the value of
1684 * the returned CompletableFuture
1685 * @param executor the executor to use for asynchronous execution
1686 * @return the new CompletableFuture
1687 */
1688 public <U> CompletableFuture<U> thenApplyAsync
1689 (Fun<? super T,? extends U> fn,
1690 Executor executor) {
1691 if (executor == null) throw new NullPointerException();
1692 return doThenApply(fn, executor);
1693 }
1694
1695 private <U> CompletableFuture<U> doThenApply
1696 (Fun<? super T,? extends U> fn,
1697 Executor e) {
1698 if (fn == null) throw new NullPointerException();
1699 CompletableFuture<U> dst = new CompletableFuture<U>();
1700 ThenApply<T,U> d = null;
1701 Object r;
1702 if ((r = result) == null) {
1703 CompletionNode p = new CompletionNode
1704 (d = new ThenApply<T,U>(this, fn, dst, e));
1705 while ((r = result) == null) {
1706 if (UNSAFE.compareAndSwapObject
1707 (this, COMPLETIONS, p.next = completions, p))
1708 break;
1709 }
1710 }
1711 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1712 T t; Throwable ex;
1713 if (r instanceof AltResult) {
1714 ex = ((AltResult)r).ex;
1715 t = null;
1716 }
1717 else {
1718 ex = null;
1719 @SuppressWarnings("unchecked") T tr = (T) r;
1720 t = tr;
1721 }
1722 U u = null;
1723 if (ex == null) {
1724 try {
1725 if (e != null)
1726 e.execute(new AsyncApply<T,U>(t, fn, dst));
1727 else
1728 u = fn.apply(t);
1729 } catch (Throwable rex) {
1730 ex = rex;
1731 }
1732 }
1733 if (e == null || ex != null)
1734 dst.internalComplete(u, ex);
1735 }
1736 helpPostComplete();
1737 return dst;
1738 }
1739
1740 /**
1741 * Returns a new CompletableFuture that is completed
1742 * when this CompletableFuture completes, after performing the given
1743 * action with this CompletableFuture's result.
1744 *
1745 * <p>If this CompletableFuture completes exceptionally, or the
1746 * supplied action throws an exception, then the returned
1747 * CompletableFuture completes exceptionally with a
1748 * CompletionException holding the exception as its cause.
1749 *
1750 * @param block the action to perform before completing the
1751 * returned CompletableFuture
1752 * @return the new CompletableFuture
1753 */
1754 public CompletableFuture<Void> thenAccept(Action<? super T> block) {
1755 return doThenAccept(block, null);
1756 }
1757
1758 /**
1759 * Returns a new CompletableFuture that is asynchronously completed
1760 * when this CompletableFuture completes, after performing the given
1761 * action with this CompletableFuture's result from a task running
1762 * in the {@link ForkJoinPool#commonPool()}.
1763 *
1764 * <p>If this CompletableFuture completes exceptionally, or the
1765 * supplied action throws an exception, then the returned
1766 * CompletableFuture completes exceptionally with a
1767 * CompletionException holding the exception as its cause.
1768 *
1769 * @param block the action to perform before completing the
1770 * returned CompletableFuture
1771 * @return the new CompletableFuture
1772 */
1773 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block) {
1774 return doThenAccept(block, ForkJoinPool.commonPool());
1775 }
1776
1777 /**
1778 * Returns a new CompletableFuture that is asynchronously completed
1779 * when this CompletableFuture completes, after performing the given
1780 * action with this CompletableFuture's result from a task running
1781 * in the given executor.
1782 *
1783 * <p>If this CompletableFuture completes exceptionally, or the
1784 * supplied action throws an exception, then the returned
1785 * CompletableFuture completes exceptionally with a
1786 * CompletionException holding the exception as its cause.
1787 *
1788 * @param block the action to perform before completing the
1789 * returned CompletableFuture
1790 * @param executor the executor to use for asynchronous execution
1791 * @return the new CompletableFuture
1792 */
1793 public CompletableFuture<Void> thenAcceptAsync(Action<? super T> block,
1794 Executor executor) {
1795 if (executor == null) throw new NullPointerException();
1796 return doThenAccept(block, executor);
1797 }
1798
1799 private CompletableFuture<Void> doThenAccept(Action<? super T> fn,
1800 Executor e) {
1801 if (fn == null) throw new NullPointerException();
1802 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1803 ThenAccept<T> d = null;
1804 Object r;
1805 if ((r = result) == null) {
1806 CompletionNode p = new CompletionNode
1807 (d = new ThenAccept<T>(this, fn, dst, e));
1808 while ((r = result) == null) {
1809 if (UNSAFE.compareAndSwapObject
1810 (this, COMPLETIONS, p.next = completions, p))
1811 break;
1812 }
1813 }
1814 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1815 T t; Throwable ex;
1816 if (r instanceof AltResult) {
1817 ex = ((AltResult)r).ex;
1818 t = null;
1819 }
1820 else {
1821 ex = null;
1822 @SuppressWarnings("unchecked") T tr = (T) r;
1823 t = tr;
1824 }
1825 if (ex == null) {
1826 try {
1827 if (e != null)
1828 e.execute(new AsyncAccept<T>(t, fn, dst));
1829 else
1830 fn.accept(t);
1831 } catch (Throwable rex) {
1832 ex = rex;
1833 }
1834 }
1835 if (e == null || ex != null)
1836 dst.internalComplete(null, ex);
1837 }
1838 helpPostComplete();
1839 return dst;
1840 }
1841
1842 /**
1843 * Returns a new CompletableFuture that is completed
1844 * when this CompletableFuture completes, after performing the given
1845 * action.
1846 *
1847 * <p>If this CompletableFuture completes exceptionally, or the
1848 * supplied action throws an exception, then the returned
1849 * CompletableFuture completes exceptionally with a
1850 * CompletionException holding the exception as its cause.
1851 *
1852 * @param action the action to perform before completing the
1853 * returned CompletableFuture
1854 * @return the new CompletableFuture
1855 */
1856 public CompletableFuture<Void> thenRun(Runnable action) {
1857 return doThenRun(action, null);
1858 }
1859
1860 /**
1861 * Returns a new CompletableFuture that is asynchronously completed
1862 * when this CompletableFuture completes, after performing the given
1863 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1864 *
1865 * <p>If this CompletableFuture completes exceptionally, or the
1866 * supplied action throws an exception, then the returned
1867 * CompletableFuture completes exceptionally with a
1868 * CompletionException holding the exception as its cause.
1869 *
1870 * @param action the action to perform before completing the
1871 * returned CompletableFuture
1872 * @return the new CompletableFuture
1873 */
1874 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1875 return doThenRun(action, ForkJoinPool.commonPool());
1876 }
1877
1878 /**
1879 * Returns a new CompletableFuture that is asynchronously completed
1880 * when this CompletableFuture completes, after performing the given
1881 * action from a task running in the given executor.
1882 *
1883 * <p>If this CompletableFuture completes exceptionally, or the
1884 * supplied action throws an exception, then the returned
1885 * CompletableFuture completes exceptionally with a
1886 * CompletionException holding the exception as its cause.
1887 *
1888 * @param action the action to perform before completing the
1889 * returned CompletableFuture
1890 * @param executor the executor to use for asynchronous execution
1891 * @return the new CompletableFuture
1892 */
1893 public CompletableFuture<Void> thenRunAsync(Runnable action,
1894 Executor executor) {
1895 if (executor == null) throw new NullPointerException();
1896 return doThenRun(action, executor);
1897 }
1898
1899 private CompletableFuture<Void> doThenRun(Runnable action,
1900 Executor e) {
1901 if (action == null) throw new NullPointerException();
1902 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1903 ThenRun d = null;
1904 Object r;
1905 if ((r = result) == null) {
1906 CompletionNode p = new CompletionNode
1907 (d = new ThenRun(this, action, dst, e));
1908 while ((r = result) == null) {
1909 if (UNSAFE.compareAndSwapObject
1910 (this, COMPLETIONS, p.next = completions, p))
1911 break;
1912 }
1913 }
1914 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1915 Throwable ex;
1916 if (r instanceof AltResult)
1917 ex = ((AltResult)r).ex;
1918 else
1919 ex = null;
1920 if (ex == null) {
1921 try {
1922 if (e != null)
1923 e.execute(new AsyncRun(action, dst));
1924 else
1925 action.run();
1926 } catch (Throwable rex) {
1927 ex = rex;
1928 }
1929 }
1930 if (e == null || ex != null)
1931 dst.internalComplete(null, ex);
1932 }
1933 helpPostComplete();
1934 return dst;
1935 }
1936
1937 /**
1938 * Returns a new CompletableFuture that is completed
1939 * when both this and the other given CompletableFuture complete,
1940 * with the result of the given function of the results of the two
1941 * CompletableFutures.
1942 *
1943 * <p>If this and/or the other CompletableFuture complete
1944 * exceptionally, or the supplied function throws an exception,
1945 * then the returned CompletableFuture completes exceptionally
1946 * with a CompletionException holding the exception as its cause.
1947 *
1948 * @param other the other CompletableFuture
1949 * @param fn the function to use to compute the value of
1950 * the returned CompletableFuture
1951 * @return the new CompletableFuture
1952 */
1953 public <U,V> CompletableFuture<V> thenCombine
1954 (CompletableFuture<? extends U> other,
1955 BiFun<? super T,? super U,? extends V> fn) {
1956 return doThenCombine(other, fn, null);
1957 }
1958
1959 /**
1960 * Returns a new CompletableFuture that is asynchronously completed
1961 * when both this and the other given CompletableFuture complete,
1962 * with the result of the given function of the results of the two
1963 * CompletableFutures from a task running in the
1964 * {@link ForkJoinPool#commonPool()}.
1965 *
1966 * <p>If this and/or the other CompletableFuture complete
1967 * exceptionally, or the supplied function throws an exception,
1968 * then the returned CompletableFuture completes exceptionally
1969 * with a CompletionException holding the exception as its cause.
1970 *
1971 * @param other the other CompletableFuture
1972 * @param fn the function to use to compute the value of
1973 * the returned CompletableFuture
1974 * @return the new CompletableFuture
1975 */
1976 public <U,V> CompletableFuture<V> thenCombineAsync
1977 (CompletableFuture<? extends U> other,
1978 BiFun<? super T,? super U,? extends V> fn) {
1979 return doThenCombine(other, fn, ForkJoinPool.commonPool());
1980 }
1981
1982 /**
1983 * Returns a new CompletableFuture that is asynchronously completed
1984 * when both this and the other given CompletableFuture complete,
1985 * with the result of the given function of the results of the two
1986 * CompletableFutures from a task running in the given executor.
1987 *
1988 * <p>If this and/or the other CompletableFuture complete
1989 * exceptionally, or the supplied function throws an exception,
1990 * then the returned CompletableFuture completes exceptionally
1991 * with a CompletionException holding the exception as its cause.
1992 *
1993 * @param other the other CompletableFuture
1994 * @param fn the function to use to compute the value of
1995 * the returned CompletableFuture
1996 * @param executor the executor to use for asynchronous execution
1997 * @return the new CompletableFuture
1998 */
1999 public <U,V> CompletableFuture<V> thenCombineAsync
2000 (CompletableFuture<? extends U> other,
2001 BiFun<? super T,? super U,? extends V> fn,
2002 Executor executor) {
2003 if (executor == null) throw new NullPointerException();
2004 return doThenCombine(other, fn, executor);
2005 }
2006
2007 private <U,V> CompletableFuture<V> doThenCombine
2008 (CompletableFuture<? extends U> other,
2009 BiFun<? super T,? super U,? extends V> fn,
2010 Executor e) {
2011 if (other == null || fn == null) throw new NullPointerException();
2012 CompletableFuture<V> dst = new CompletableFuture<V>();
2013 ThenCombine<T,U,V> d = null;
2014 Object r, s = null;
2015 if ((r = result) == null || (s = other.result) == null) {
2016 d = new ThenCombine<T,U,V>(this, other, fn, dst, e);
2017 CompletionNode q = null, p = new CompletionNode(d);
2018 while ((r == null && (r = result) == null) ||
2019 (s == null && (s = other.result) == null)) {
2020 if (q != null) {
2021 if (s != null ||
2022 UNSAFE.compareAndSwapObject
2023 (other, COMPLETIONS, q.next = other.completions, q))
2024 break;
2025 }
2026 else if (r != null ||
2027 UNSAFE.compareAndSwapObject
2028 (this, COMPLETIONS, p.next = completions, p)) {
2029 if (s != null)
2030 break;
2031 q = new CompletionNode(d);
2032 }
2033 }
2034 }
2035 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2036 T t; U u; Throwable ex;
2037 if (r instanceof AltResult) {
2038 ex = ((AltResult)r).ex;
2039 t = null;
2040 }
2041 else {
2042 ex = null;
2043 @SuppressWarnings("unchecked") T tr = (T) r;
2044 t = tr;
2045 }
2046 if (ex != null)
2047 u = null;
2048 else if (s instanceof AltResult) {
2049 ex = ((AltResult)s).ex;
2050 u = null;
2051 }
2052 else {
2053 @SuppressWarnings("unchecked") U us = (U) s;
2054 u = us;
2055 }
2056 V v = null;
2057 if (ex == null) {
2058 try {
2059 if (e != null)
2060 e.execute(new AsyncCombine<T,U,V>(t, u, fn, dst));
2061 else
2062 v = fn.apply(t, u);
2063 } catch (Throwable rex) {
2064 ex = rex;
2065 }
2066 }
2067 if (e == null || ex != null)
2068 dst.internalComplete(v, ex);
2069 }
2070 helpPostComplete();
2071 other.helpPostComplete();
2072 return dst;
2073 }
2074
2075 /**
2076 * Returns a new CompletableFuture that is completed
2077 * when both this and the other given CompletableFuture complete,
2078 * after performing the given action with the results of the two
2079 * CompletableFutures.
2080 *
2081 * <p>If this and/or the other CompletableFuture complete
2082 * exceptionally, or the supplied action throws an exception,
2083 * then the returned CompletableFuture completes exceptionally
2084 * with a CompletionException holding the exception as its cause.
2085 *
2086 * @param other the other CompletableFuture
2087 * @param block the action to perform before completing the
2088 * returned CompletableFuture
2089 * @return the new CompletableFuture
2090 */
2091 public <U> CompletableFuture<Void> thenAcceptBoth
2092 (CompletableFuture<? extends U> other,
2093 BiAction<? super T, ? super U> block) {
2094 return doThenAcceptBoth(other, block, null);
2095 }
2096
2097 /**
2098 * Returns a new CompletableFuture that is asynchronously completed
2099 * when both this and the other given CompletableFuture complete,
2100 * after performing the given action with the results of the two
2101 * CompletableFutures from a task running in the {@link
2102 * ForkJoinPool#commonPool()}.
2103 *
2104 * <p>If this and/or the other CompletableFuture complete
2105 * exceptionally, or the supplied action throws an exception,
2106 * then the returned CompletableFuture completes exceptionally
2107 * with a CompletionException holding the exception as its cause.
2108 *
2109 * @param other the other CompletableFuture
2110 * @param block the action to perform before completing the
2111 * returned CompletableFuture
2112 * @return the new CompletableFuture
2113 */
2114 public <U> CompletableFuture<Void> thenAcceptBothAsync
2115 (CompletableFuture<? extends U> other,
2116 BiAction<? super T, ? super U> block) {
2117 return doThenAcceptBoth(other, block, ForkJoinPool.commonPool());
2118 }
2119
2120 /**
2121 * Returns a new CompletableFuture that is asynchronously completed
2122 * when both this and the other given CompletableFuture complete,
2123 * after performing the given action with the results of the two
2124 * CompletableFutures from a task running in the given executor.
2125 *
2126 * <p>If this and/or the other CompletableFuture complete
2127 * exceptionally, or the supplied action throws an exception,
2128 * then the returned CompletableFuture completes exceptionally
2129 * with a CompletionException holding the exception as its cause.
2130 *
2131 * @param other the other CompletableFuture
2132 * @param block the action to perform before completing the
2133 * returned CompletableFuture
2134 * @param executor the executor to use for asynchronous execution
2135 * @return the new CompletableFuture
2136 */
2137 public <U> CompletableFuture<Void> thenAcceptBothAsync
2138 (CompletableFuture<? extends U> other,
2139 BiAction<? super T, ? super U> block,
2140 Executor executor) {
2141 if (executor == null) throw new NullPointerException();
2142 return doThenAcceptBoth(other, block, executor);
2143 }
2144
2145 private <U> CompletableFuture<Void> doThenAcceptBoth
2146 (CompletableFuture<? extends U> other,
2147 BiAction<? super T,? super U> fn,
2148 Executor e) {
2149 if (other == null || fn == null) throw new NullPointerException();
2150 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2151 ThenAcceptBoth<T,U> d = null;
2152 Object r, s = null;
2153 if ((r = result) == null || (s = other.result) == null) {
2154 d = new ThenAcceptBoth<T,U>(this, other, fn, dst, e);
2155 CompletionNode q = null, p = new CompletionNode(d);
2156 while ((r == null && (r = result) == null) ||
2157 (s == null && (s = other.result) == null)) {
2158 if (q != null) {
2159 if (s != null ||
2160 UNSAFE.compareAndSwapObject
2161 (other, COMPLETIONS, q.next = other.completions, q))
2162 break;
2163 }
2164 else if (r != null ||
2165 UNSAFE.compareAndSwapObject
2166 (this, COMPLETIONS, p.next = completions, p)) {
2167 if (s != null)
2168 break;
2169 q = new CompletionNode(d);
2170 }
2171 }
2172 }
2173 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2174 T t; U u; Throwable ex;
2175 if (r instanceof AltResult) {
2176 ex = ((AltResult)r).ex;
2177 t = null;
2178 }
2179 else {
2180 ex = null;
2181 @SuppressWarnings("unchecked") T tr = (T) r;
2182 t = tr;
2183 }
2184 if (ex != null)
2185 u = null;
2186 else if (s instanceof AltResult) {
2187 ex = ((AltResult)s).ex;
2188 u = null;
2189 }
2190 else {
2191 @SuppressWarnings("unchecked") U us = (U) s;
2192 u = us;
2193 }
2194 if (ex == null) {
2195 try {
2196 if (e != null)
2197 e.execute(new AsyncAcceptBoth<T,U>(t, u, fn, dst));
2198 else
2199 fn.accept(t, u);
2200 } catch (Throwable rex) {
2201 ex = rex;
2202 }
2203 }
2204 if (e == null || ex != null)
2205 dst.internalComplete(null, ex);
2206 }
2207 helpPostComplete();
2208 other.helpPostComplete();
2209 return dst;
2210 }
2211
2212 /**
2213 * Returns a new CompletableFuture that is completed
2214 * when both this and the other given CompletableFuture complete,
2215 * after performing the given action.
2216 *
2217 * <p>If this and/or the other CompletableFuture complete
2218 * exceptionally, or the supplied action throws an exception,
2219 * then the returned CompletableFuture completes exceptionally
2220 * with a CompletionException holding the exception as its cause.
2221 *
2222 * @param other the other CompletableFuture
2223 * @param action the action to perform before completing the
2224 * returned CompletableFuture
2225 * @return the new CompletableFuture
2226 */
2227 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2228 Runnable action) {
2229 return doRunAfterBoth(other, action, null);
2230 }
2231
2232 /**
2233 * Returns a new CompletableFuture that is asynchronously completed
2234 * when both this and the other given CompletableFuture complete,
2235 * after performing the given action from a task running in the
2236 * {@link ForkJoinPool#commonPool()}.
2237 *
2238 * <p>If this and/or the other CompletableFuture complete
2239 * exceptionally, or the supplied action throws an exception,
2240 * then the returned CompletableFuture completes exceptionally
2241 * with a CompletionException holding the exception as its cause.
2242 *
2243 * @param other the other CompletableFuture
2244 * @param action the action to perform before completing the
2245 * returned CompletableFuture
2246 * @return the new CompletableFuture
2247 */
2248 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2249 Runnable action) {
2250 return doRunAfterBoth(other, action, ForkJoinPool.commonPool());
2251 }
2252
2253 /**
2254 * Returns a new CompletableFuture that is asynchronously completed
2255 * when both this and the other given CompletableFuture complete,
2256 * after performing the given action from a task running in the
2257 * given executor.
2258 *
2259 * <p>If this and/or the other CompletableFuture complete
2260 * exceptionally, or the supplied action throws an exception,
2261 * then the returned CompletableFuture completes exceptionally
2262 * with a CompletionException holding the exception as its cause.
2263 *
2264 * @param other the other CompletableFuture
2265 * @param action the action to perform before completing the
2266 * returned CompletableFuture
2267 * @param executor the executor to use for asynchronous execution
2268 * @return the new CompletableFuture
2269 */
2270 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2271 Runnable action,
2272 Executor executor) {
2273 if (executor == null) throw new NullPointerException();
2274 return doRunAfterBoth(other, action, executor);
2275 }
2276
2277 private CompletableFuture<Void> doRunAfterBoth(CompletableFuture<?> other,
2278 Runnable action,
2279 Executor e) {
2280 if (other == null || action == null) throw new NullPointerException();
2281 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2282 RunAfterBoth d = null;
2283 Object r, s = null;
2284 if ((r = result) == null || (s = other.result) == null) {
2285 d = new RunAfterBoth(this, other, action, dst, e);
2286 CompletionNode q = null, p = new CompletionNode(d);
2287 while ((r == null && (r = result) == null) ||
2288 (s == null && (s = other.result) == null)) {
2289 if (q != null) {
2290 if (s != null ||
2291 UNSAFE.compareAndSwapObject
2292 (other, COMPLETIONS, q.next = other.completions, q))
2293 break;
2294 }
2295 else if (r != null ||
2296 UNSAFE.compareAndSwapObject
2297 (this, COMPLETIONS, p.next = completions, p)) {
2298 if (s != null)
2299 break;
2300 q = new CompletionNode(d);
2301 }
2302 }
2303 }
2304 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2305 Throwable ex;
2306 if (r instanceof AltResult)
2307 ex = ((AltResult)r).ex;
2308 else
2309 ex = null;
2310 if (ex == null && (s instanceof AltResult))
2311 ex = ((AltResult)s).ex;
2312 if (ex == null) {
2313 try {
2314 if (e != null)
2315 e.execute(new AsyncRun(action, dst));
2316 else
2317 action.run();
2318 } catch (Throwable rex) {
2319 ex = rex;
2320 }
2321 }
2322 if (e == null || ex != null)
2323 dst.internalComplete(null, ex);
2324 }
2325 helpPostComplete();
2326 other.helpPostComplete();
2327 return dst;
2328 }
2329
2330 /**
2331 * Returns a new CompletableFuture that is completed
2332 * when either this or the other given CompletableFuture completes,
2333 * with the result of the given function of either this or the other
2334 * CompletableFuture's result.
2335 *
2336 * <p>If this and/or the other CompletableFuture complete
2337 * exceptionally, then the returned CompletableFuture may also do so,
2338 * with a CompletionException holding one of these exceptions as its
2339 * cause. No guarantees are made about which result or exception is
2340 * used in the returned CompletableFuture. If the supplied function
2341 * throws an exception, then the returned CompletableFuture completes
2342 * exceptionally with a CompletionException holding the exception as
2343 * its cause.
2344 *
2345 * @param other the other CompletableFuture
2346 * @param fn the function to use to compute the value of
2347 * the returned CompletableFuture
2348 * @return the new CompletableFuture
2349 */
2350 public <U> CompletableFuture<U> applyToEither
2351 (CompletableFuture<? extends T> other,
2352 Fun<? super T, U> fn) {
2353 return doApplyToEither(other, fn, null);
2354 }
2355
2356 /**
2357 * Returns a new CompletableFuture that is asynchronously completed
2358 * when either this or the other given CompletableFuture completes,
2359 * with the result of the given function of either this or the other
2360 * CompletableFuture's result from a task running in the
2361 * {@link ForkJoinPool#commonPool()}.
2362 *
2363 * <p>If this and/or the other CompletableFuture complete
2364 * exceptionally, then the returned CompletableFuture may also do so,
2365 * with a CompletionException holding one of these exceptions as its
2366 * cause. No guarantees are made about which result or exception is
2367 * used in the returned CompletableFuture. If the supplied function
2368 * throws an exception, then the returned CompletableFuture completes
2369 * exceptionally with a CompletionException holding the exception as
2370 * its cause.
2371 *
2372 * @param other the other CompletableFuture
2373 * @param fn the function to use to compute the value of
2374 * the returned CompletableFuture
2375 * @return the new CompletableFuture
2376 */
2377 public <U> CompletableFuture<U> applyToEitherAsync
2378 (CompletableFuture<? extends T> other,
2379 Fun<? super T, U> fn) {
2380 return doApplyToEither(other, fn, ForkJoinPool.commonPool());
2381 }
2382
2383 /**
2384 * Returns a new CompletableFuture that is asynchronously completed
2385 * when either this or the other given CompletableFuture completes,
2386 * with the result of the given function of either this or the other
2387 * CompletableFuture's result from a task running in the
2388 * given executor.
2389 *
2390 * <p>If this and/or the other CompletableFuture complete
2391 * exceptionally, then the returned CompletableFuture may also do so,
2392 * with a CompletionException holding one of these exceptions as its
2393 * cause. No guarantees are made about which result or exception is
2394 * used in the returned CompletableFuture. If the supplied function
2395 * throws an exception, then the returned CompletableFuture completes
2396 * exceptionally with a CompletionException holding the exception as
2397 * its cause.
2398 *
2399 * @param other the other CompletableFuture
2400 * @param fn the function to use to compute the value of
2401 * the returned CompletableFuture
2402 * @param executor the executor to use for asynchronous execution
2403 * @return the new CompletableFuture
2404 */
2405 public <U> CompletableFuture<U> applyToEitherAsync
2406 (CompletableFuture<? extends T> other,
2407 Fun<? super T, U> fn,
2408 Executor executor) {
2409 if (executor == null) throw new NullPointerException();
2410 return doApplyToEither(other, fn, executor);
2411 }
2412
2413 private <U> CompletableFuture<U> doApplyToEither
2414 (CompletableFuture<? extends T> other,
2415 Fun<? super T, U> fn,
2416 Executor e) {
2417 if (other == null || fn == null) throw new NullPointerException();
2418 CompletableFuture<U> dst = new CompletableFuture<U>();
2419 ApplyToEither<T,U> d = null;
2420 Object r;
2421 if ((r = result) == null && (r = other.result) == null) {
2422 d = new ApplyToEither<T,U>(this, other, fn, dst, e);
2423 CompletionNode q = null, p = new CompletionNode(d);
2424 while ((r = result) == null && (r = other.result) == null) {
2425 if (q != null) {
2426 if (UNSAFE.compareAndSwapObject
2427 (other, COMPLETIONS, q.next = other.completions, q))
2428 break;
2429 }
2430 else if (UNSAFE.compareAndSwapObject
2431 (this, COMPLETIONS, p.next = completions, p))
2432 q = new CompletionNode(d);
2433 }
2434 }
2435 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2436 T t; Throwable ex;
2437 if (r instanceof AltResult) {
2438 ex = ((AltResult)r).ex;
2439 t = null;
2440 }
2441 else {
2442 ex = null;
2443 @SuppressWarnings("unchecked") T tr = (T) r;
2444 t = tr;
2445 }
2446 U u = null;
2447 if (ex == null) {
2448 try {
2449 if (e != null)
2450 e.execute(new AsyncApply<T,U>(t, fn, dst));
2451 else
2452 u = fn.apply(t);
2453 } catch (Throwable rex) {
2454 ex = rex;
2455 }
2456 }
2457 if (e == null || ex != null)
2458 dst.internalComplete(u, ex);
2459 }
2460 helpPostComplete();
2461 other.helpPostComplete();
2462 return dst;
2463 }
2464
2465 /**
2466 * Returns a new CompletableFuture that is completed
2467 * when either this or the other given CompletableFuture completes,
2468 * after performing the given action with the result of either this
2469 * or the other CompletableFuture's result.
2470 *
2471 * <p>If this and/or the other CompletableFuture complete
2472 * exceptionally, then the returned CompletableFuture may also do so,
2473 * with a CompletionException holding one of these exceptions as its
2474 * cause. No guarantees are made about which result or exception is
2475 * used in the returned CompletableFuture. If the supplied action
2476 * throws an exception, then the returned CompletableFuture completes
2477 * exceptionally with a CompletionException holding the exception as
2478 * its cause.
2479 *
2480 * @param other the other CompletableFuture
2481 * @param block the action to perform before completing the
2482 * returned CompletableFuture
2483 * @return the new CompletableFuture
2484 */
2485 public CompletableFuture<Void> acceptEither
2486 (CompletableFuture<? extends T> other,
2487 Action<? super T> block) {
2488 return doAcceptEither(other, block, null);
2489 }
2490
2491 /**
2492 * Returns a new CompletableFuture that is asynchronously completed
2493 * when either this or the other given CompletableFuture completes,
2494 * after performing the given action with the result of either this
2495 * or the other CompletableFuture's result from a task running in
2496 * the {@link ForkJoinPool#commonPool()}.
2497 *
2498 * <p>If this and/or the other CompletableFuture complete
2499 * exceptionally, then the returned CompletableFuture may also do so,
2500 * with a CompletionException holding one of these exceptions as its
2501 * cause. No guarantees are made about which result or exception is
2502 * used in the returned CompletableFuture. If the supplied action
2503 * throws an exception, then the returned CompletableFuture completes
2504 * exceptionally with a CompletionException holding the exception as
2505 * its cause.
2506 *
2507 * @param other the other CompletableFuture
2508 * @param block the action to perform before completing the
2509 * returned CompletableFuture
2510 * @return the new CompletableFuture
2511 */
2512 public CompletableFuture<Void> acceptEitherAsync
2513 (CompletableFuture<? extends T> other,
2514 Action<? super T> block) {
2515 return doAcceptEither(other, block, ForkJoinPool.commonPool());
2516 }
2517
2518 /**
2519 * Returns a new CompletableFuture that is asynchronously completed
2520 * when either this or the other given CompletableFuture completes,
2521 * after performing the given action with the result of either this
2522 * or the other CompletableFuture's result from a task running in
2523 * the given executor.
2524 *
2525 * <p>If this and/or the other CompletableFuture complete
2526 * exceptionally, then the returned CompletableFuture may also do so,
2527 * with a CompletionException holding one of these exceptions as its
2528 * cause. No guarantees are made about which result or exception is
2529 * used in the returned CompletableFuture. If the supplied action
2530 * throws an exception, then the returned CompletableFuture completes
2531 * exceptionally with a CompletionException holding the exception as
2532 * its cause.
2533 *
2534 * @param other the other CompletableFuture
2535 * @param block the action to perform before completing the
2536 * returned CompletableFuture
2537 * @param executor the executor to use for asynchronous execution
2538 * @return the new CompletableFuture
2539 */
2540 public CompletableFuture<Void> acceptEitherAsync
2541 (CompletableFuture<? extends T> other,
2542 Action<? super T> block,
2543 Executor executor) {
2544 if (executor == null) throw new NullPointerException();
2545 return doAcceptEither(other, block, executor);
2546 }
2547
2548 private CompletableFuture<Void> doAcceptEither
2549 (CompletableFuture<? extends T> other,
2550 Action<? super T> fn,
2551 Executor e) {
2552 if (other == null || fn == null) throw new NullPointerException();
2553 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2554 AcceptEither<T> d = null;
2555 Object r;
2556 if ((r = result) == null && (r = other.result) == null) {
2557 d = new AcceptEither<T>(this, other, fn, dst, e);
2558 CompletionNode q = null, p = new CompletionNode(d);
2559 while ((r = result) == null && (r = other.result) == null) {
2560 if (q != null) {
2561 if (UNSAFE.compareAndSwapObject
2562 (other, COMPLETIONS, q.next = other.completions, q))
2563 break;
2564 }
2565 else if (UNSAFE.compareAndSwapObject
2566 (this, COMPLETIONS, p.next = completions, p))
2567 q = new CompletionNode(d);
2568 }
2569 }
2570 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2571 T t; Throwable ex;
2572 if (r instanceof AltResult) {
2573 ex = ((AltResult)r).ex;
2574 t = null;
2575 }
2576 else {
2577 ex = null;
2578 @SuppressWarnings("unchecked") T tr = (T) r;
2579 t = tr;
2580 }
2581 if (ex == null) {
2582 try {
2583 if (e != null)
2584 e.execute(new AsyncAccept<T>(t, fn, dst));
2585 else
2586 fn.accept(t);
2587 } catch (Throwable rex) {
2588 ex = rex;
2589 }
2590 }
2591 if (e == null || ex != null)
2592 dst.internalComplete(null, ex);
2593 }
2594 helpPostComplete();
2595 other.helpPostComplete();
2596 return dst;
2597 }
2598
2599 /**
2600 * Returns a new CompletableFuture that is completed
2601 * when either this or the other given CompletableFuture completes,
2602 * after performing the given action.
2603 *
2604 * <p>If this and/or the other CompletableFuture complete
2605 * exceptionally, then the returned CompletableFuture may also do so,
2606 * with a CompletionException holding one of these exceptions as its
2607 * cause. No guarantees are made about which result or exception is
2608 * used in the returned CompletableFuture. If the supplied action
2609 * throws an exception, then the returned CompletableFuture completes
2610 * exceptionally with a CompletionException holding the exception as
2611 * its cause.
2612 *
2613 * @param other the other CompletableFuture
2614 * @param action the action to perform before completing the
2615 * returned CompletableFuture
2616 * @return the new CompletableFuture
2617 */
2618 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2619 Runnable action) {
2620 return doRunAfterEither(other, action, null);
2621 }
2622
2623 /**
2624 * Returns a new CompletableFuture that is asynchronously completed
2625 * when either this or the other given CompletableFuture completes,
2626 * after performing the given action from a task running in the
2627 * {@link ForkJoinPool#commonPool()}.
2628 *
2629 * <p>If this and/or the other CompletableFuture complete
2630 * exceptionally, then the returned CompletableFuture may also do so,
2631 * with a CompletionException holding one of these exceptions as its
2632 * cause. No guarantees are made about which result or exception is
2633 * used in the returned CompletableFuture. If the supplied action
2634 * throws an exception, then the returned CompletableFuture completes
2635 * exceptionally with a CompletionException holding the exception as
2636 * its cause.
2637 *
2638 * @param other the other CompletableFuture
2639 * @param action the action to perform before completing the
2640 * returned CompletableFuture
2641 * @return the new CompletableFuture
2642 */
2643 public CompletableFuture<Void> runAfterEitherAsync
2644 (CompletableFuture<?> other,
2645 Runnable action) {
2646 return doRunAfterEither(other, action, ForkJoinPool.commonPool());
2647 }
2648
2649 /**
2650 * Returns a new CompletableFuture that is asynchronously completed
2651 * when either this or the other given CompletableFuture completes,
2652 * after performing the given action from a task running in the
2653 * given executor.
2654 *
2655 * <p>If this and/or the other CompletableFuture complete
2656 * exceptionally, then the returned CompletableFuture may also do so,
2657 * with a CompletionException holding one of these exceptions as its
2658 * cause. No guarantees are made about which result or exception is
2659 * used in the returned CompletableFuture. If the supplied action
2660 * throws an exception, then the returned CompletableFuture completes
2661 * exceptionally with a CompletionException holding the exception as
2662 * its cause.
2663 *
2664 * @param other the other CompletableFuture
2665 * @param action the action to perform before completing the
2666 * returned CompletableFuture
2667 * @param executor the executor to use for asynchronous execution
2668 * @return the new CompletableFuture
2669 */
2670 public CompletableFuture<Void> runAfterEitherAsync
2671 (CompletableFuture<?> other,
2672 Runnable action,
2673 Executor executor) {
2674 if (executor == null) throw new NullPointerException();
2675 return doRunAfterEither(other, action, executor);
2676 }
2677
2678 private CompletableFuture<Void> doRunAfterEither
2679 (CompletableFuture<?> other,
2680 Runnable action,
2681 Executor e) {
2682 if (other == null || action == null) throw new NullPointerException();
2683 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2684 RunAfterEither d = null;
2685 Object r;
2686 if ((r = result) == null && (r = other.result) == null) {
2687 d = new RunAfterEither(this, other, action, dst, e);
2688 CompletionNode q = null, p = new CompletionNode(d);
2689 while ((r = result) == null && (r = other.result) == null) {
2690 if (q != null) {
2691 if (UNSAFE.compareAndSwapObject
2692 (other, COMPLETIONS, q.next = other.completions, q))
2693 break;
2694 }
2695 else if (UNSAFE.compareAndSwapObject
2696 (this, COMPLETIONS, p.next = completions, p))
2697 q = new CompletionNode(d);
2698 }
2699 }
2700 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2701 Throwable ex;
2702 if (r instanceof AltResult)
2703 ex = ((AltResult)r).ex;
2704 else
2705 ex = null;
2706 if (ex == null) {
2707 try {
2708 if (e != null)
2709 e.execute(new AsyncRun(action, dst));
2710 else
2711 action.run();
2712 } catch (Throwable rex) {
2713 ex = rex;
2714 }
2715 }
2716 if (e == null || ex != null)
2717 dst.internalComplete(null, ex);
2718 }
2719 helpPostComplete();
2720 other.helpPostComplete();
2721 return dst;
2722 }
2723
2724 /**
2725 * Returns a CompletableFuture that upon completion, has the same
2726 * value as produced by the given function of the result of this
2727 * CompletableFuture.
2728 *
2729 * <p>If this CompletableFuture completes exceptionally, then the
2730 * returned CompletableFuture also does so, with a
2731 * CompletionException holding this exception as its cause.
2732 * Similarly, if the computed CompletableFuture completes
2733 * exceptionally, then so does the returned CompletableFuture.
2734 *
2735 * @param fn the function returning a new CompletableFuture
2736 * @return the CompletableFuture
2737 */
2738 public <U> CompletableFuture<U> thenCompose
2739 (Fun<? super T, CompletableFuture<U>> fn) {
2740 return doThenCompose(fn, null);
2741 }
2742
2743 /**
2744 * Returns a CompletableFuture that upon completion, has the same
2745 * value as that produced asynchronously using the {@link
2746 * ForkJoinPool#commonPool()} by the given function of the result
2747 * of this CompletableFuture.
2748 *
2749 * <p>If this CompletableFuture completes exceptionally, then the
2750 * returned CompletableFuture also does so, with a
2751 * CompletionException holding this exception as its cause.
2752 * Similarly, if the computed CompletableFuture completes
2753 * exceptionally, then so does the returned CompletableFuture.
2754 *
2755 * @param fn the function returning a new CompletableFuture
2756 * @return the CompletableFuture
2757 */
2758 public <U> CompletableFuture<U> thenComposeAsync
2759 (Fun<? super T, CompletableFuture<U>> fn) {
2760 return doThenCompose(fn, ForkJoinPool.commonPool());
2761 }
2762
2763 /**
2764 * Returns a CompletableFuture that upon completion, has the same
2765 * value as that produced asynchronously using the given executor
2766 * by the given function of this CompletableFuture.
2767 *
2768 * <p>If this CompletableFuture completes exceptionally, then the
2769 * returned CompletableFuture also does so, with a
2770 * CompletionException holding this exception as its cause.
2771 * Similarly, if the computed CompletableFuture completes
2772 * exceptionally, then so does the returned CompletableFuture.
2773 *
2774 * @param fn the function returning a new CompletableFuture
2775 * @param executor the executor to use for asynchronous execution
2776 * @return the CompletableFuture
2777 */
2778 public <U> CompletableFuture<U> thenComposeAsync
2779 (Fun<? super T, CompletableFuture<U>> fn,
2780 Executor executor) {
2781 if (executor == null) throw new NullPointerException();
2782 return doThenCompose(fn, executor);
2783 }
2784
2785 private <U> CompletableFuture<U> doThenCompose
2786 (Fun<? super T, CompletableFuture<U>> fn,
2787 Executor e) {
2788 if (fn == null) throw new NullPointerException();
2789 CompletableFuture<U> dst = null;
2790 ThenCompose<T,U> d = null;
2791 Object r;
2792 if ((r = result) == null) {
2793 dst = new CompletableFuture<U>();
2794 CompletionNode p = new CompletionNode
2795 (d = new ThenCompose<T,U>(this, fn, dst, e));
2796 while ((r = result) == null) {
2797 if (UNSAFE.compareAndSwapObject
2798 (this, COMPLETIONS, p.next = completions, p))
2799 break;
2800 }
2801 }
2802 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2803 T t; Throwable ex;
2804 if (r instanceof AltResult) {
2805 ex = ((AltResult)r).ex;
2806 t = null;
2807 }
2808 else {
2809 ex = null;
2810 @SuppressWarnings("unchecked") T tr = (T) r;
2811 t = tr;
2812 }
2813 if (ex == null) {
2814 if (e != null) {
2815 if (dst == null)
2816 dst = new CompletableFuture<U>();
2817 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2818 }
2819 else {
2820 try {
2821 if ((dst = fn.apply(t)) == null)
2822 ex = new NullPointerException();
2823 } catch (Throwable rex) {
2824 ex = rex;
2825 }
2826 }
2827 }
2828 if (dst == null)
2829 dst = new CompletableFuture<U>();
2830 if (e == null || ex != null)
2831 dst.internalComplete(null, ex);
2832 }
2833 helpPostComplete();
2834 dst.helpPostComplete();
2835 return dst;
2836 }
2837
2838 /**
2839 * Returns a new CompletableFuture that is completed when this
2840 * CompletableFuture completes, with the result of the given
2841 * function of the exception triggering this CompletableFuture's
2842 * completion when it completes exceptionally; otherwise, if this
2843 * CompletableFuture completes normally, then the returned
2844 * CompletableFuture also completes normally with the same value.
2845 *
2846 * @param fn the function to use to compute the value of the
2847 * returned CompletableFuture if this CompletableFuture completed
2848 * exceptionally
2849 * @return the new CompletableFuture
2850 */
2851 public CompletableFuture<T> exceptionally
2852 (Fun<Throwable, ? extends T> fn) {
2853 if (fn == null) throw new NullPointerException();
2854 CompletableFuture<T> dst = new CompletableFuture<T>();
2855 ExceptionCompletion<T> d = null;
2856 Object r;
2857 if ((r = result) == null) {
2858 CompletionNode p =
2859 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2860 while ((r = result) == null) {
2861 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2862 p.next = completions, p))
2863 break;
2864 }
2865 }
2866 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2867 T t = null; Throwable ex, dx = null;
2868 if (r instanceof AltResult) {
2869 if ((ex = ((AltResult)r).ex) != null) {
2870 try {
2871 t = fn.apply(ex);
2872 } catch (Throwable rex) {
2873 dx = rex;
2874 }
2875 }
2876 }
2877 else {
2878 @SuppressWarnings("unchecked") T tr = (T) r;
2879 t = tr;
2880 }
2881 dst.internalComplete(t, dx);
2882 }
2883 helpPostComplete();
2884 return dst;
2885 }
2886
2887 /**
2888 * Returns a new CompletableFuture that is completed when this
2889 * CompletableFuture completes, with the result of the given
2890 * function of the result and exception of this CompletableFuture's
2891 * completion. The given function is invoked with the result (or
2892 * {@code null} if none) and the exception (or {@code null} if none)
2893 * of this CompletableFuture when complete.
2894 *
2895 * @param fn the function to use to compute the value of the
2896 * returned CompletableFuture
2897 * @return the new CompletableFuture
2898 */
2899 public <U> CompletableFuture<U> handle
2900 (BiFun<? super T, Throwable, ? extends U> fn) {
2901 if (fn == null) throw new NullPointerException();
2902 CompletableFuture<U> dst = new CompletableFuture<U>();
2903 HandleCompletion<T,U> d = null;
2904 Object r;
2905 if ((r = result) == null) {
2906 CompletionNode p =
2907 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2908 while ((r = result) == null) {
2909 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2910 p.next = completions, p))
2911 break;
2912 }
2913 }
2914 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2915 T t; Throwable ex;
2916 if (r instanceof AltResult) {
2917 ex = ((AltResult)r).ex;
2918 t = null;
2919 }
2920 else {
2921 ex = null;
2922 @SuppressWarnings("unchecked") T tr = (T) r;
2923 t = tr;
2924 }
2925 U u; Throwable dx;
2926 try {
2927 u = fn.apply(t, ex);
2928 dx = null;
2929 } catch (Throwable rex) {
2930 dx = rex;
2931 u = null;
2932 }
2933 dst.internalComplete(u, dx);
2934 }
2935 helpPostComplete();
2936 return dst;
2937 }
2938
2939
2940 /* ------------- Arbitrary-arity constructions -------------- */
2941
2942 /*
2943 * The basic plan of attack is to recursively form binary
2944 * completion trees of elements. This can be overkill for small
2945 * sets, but scales nicely. The And/All vs Or/Any forms use the
2946 * same idea, but details differ.
2947 */
2948
2949 /**
2950 * Returns a new CompletableFuture that is completed when all of
2951 * the given CompletableFutures complete. If any of the given
2952 * CompletableFutures complete exceptionally, then the returned
2953 * CompletableFuture also does so, with a CompletionException
2954 * holding this exception as its cause. Otherwise, the results,
2955 * if any, of the given CompletableFutures are not reflected in
2956 * the returned CompletableFuture, but may be obtained by
2957 * inspecting them individually. If no CompletableFutures are
2958 * provided, returns a CompletableFuture completed with the value
2959 * {@code null}.
2960 *
2961 * <p>Among the applications of this method is to await completion
2962 * of a set of independent CompletableFutures before continuing a
2963 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2964 * c3).join();}.
2965 *
2966 * @param cfs the CompletableFutures
2967 * @return a new CompletableFuture that is completed when all of the
2968 * given CompletableFutures complete
2969 * @throws NullPointerException if the array or any of its elements are
2970 * {@code null}
2971 */
2972 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2973 int len = cfs.length; // Directly handle empty and singleton cases
2974 if (len > 1)
2975 return allTree(cfs, 0, len - 1);
2976 else {
2977 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2978 CompletableFuture<?> f;
2979 if (len == 0)
2980 dst.result = NIL;
2981 else if ((f = cfs[0]) == null)
2982 throw new NullPointerException();
2983 else {
2984 ThenPropagate d = null;
2985 CompletionNode p = null;
2986 Object r;
2987 while ((r = f.result) == null) {
2988 if (d == null)
2989 d = new ThenPropagate(f, dst);
2990 else if (p == null)
2991 p = new CompletionNode(d);
2992 else if (UNSAFE.compareAndSwapObject
2993 (f, COMPLETIONS, p.next = f.completions, p))
2994 break;
2995 }
2996 if (r != null && (d == null || d.compareAndSet(0, 1)))
2997 dst.internalComplete(null, (r instanceof AltResult) ?
2998 ((AltResult)r).ex : null);
2999 f.helpPostComplete();
3000 }
3001 return dst;
3002 }
3003 }
3004
3005 /**
3006 * Recursively constructs an And'ed tree of CompletableFutures.
3007 * Called only when array known to have at least two elements.
3008 */
3009 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
3010 int lo, int hi) {
3011 CompletableFuture<?> fst, snd;
3012 int mid = (lo + hi) >>> 1;
3013 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
3014 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
3015 throw new NullPointerException();
3016 CompletableFuture<Void> dst = new CompletableFuture<Void>();
3017 AndCompletion d = null;
3018 CompletionNode p = null, q = null;
3019 Object r = null, s = null;
3020 while ((r = fst.result) == null || (s = snd.result) == null) {
3021 if (d == null)
3022 d = new AndCompletion(fst, snd, dst);
3023 else if (p == null)
3024 p = new CompletionNode(d);
3025 else if (q == null) {
3026 if (UNSAFE.compareAndSwapObject
3027 (fst, COMPLETIONS, p.next = fst.completions, p))
3028 q = new CompletionNode(d);
3029 }
3030 else if (UNSAFE.compareAndSwapObject
3031 (snd, COMPLETIONS, q.next = snd.completions, q))
3032 break;
3033 }
3034 if ((r != null || (r = fst.result) != null) &&
3035 (s != null || (s = snd.result) != null) &&
3036 (d == null || d.compareAndSet(0, 1))) {
3037 Throwable ex;
3038 if (r instanceof AltResult)
3039 ex = ((AltResult)r).ex;
3040 else
3041 ex = null;
3042 if (ex == null && (s instanceof AltResult))
3043 ex = ((AltResult)s).ex;
3044 dst.internalComplete(null, ex);
3045 }
3046 fst.helpPostComplete();
3047 snd.helpPostComplete();
3048 return dst;
3049 }
3050
3051 /**
3052 * Returns a new CompletableFuture that is completed when any of
3053 * the given CompletableFutures complete, with the same result.
3054 * Otherwise, if it completed exceptionally, the returned
3055 * CompletableFuture also does so, with a CompletionException
3056 * holding this exception as its cause. If no CompletableFutures
3057 * are provided, returns an incomplete CompletableFuture.
3058 *
3059 * @param cfs the CompletableFutures
3060 * @return a new CompletableFuture that is completed with the
3061 * result or exception of any of the given CompletableFutures when
3062 * one completes
3063 * @throws NullPointerException if the array or any of its elements are
3064 * {@code null}
3065 */
3066 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
3067 int len = cfs.length; // Same idea as allOf
3068 if (len > 1)
3069 return anyTree(cfs, 0, len - 1);
3070 else {
3071 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3072 CompletableFuture<?> f;
3073 if (len == 0)
3074 ; // skip
3075 else if ((f = cfs[0]) == null)
3076 throw new NullPointerException();
3077 else {
3078 ThenCopy<Object> d = null;
3079 CompletionNode p = null;
3080 Object r;
3081 while ((r = f.result) == null) {
3082 if (d == null)
3083 d = new ThenCopy<Object>(f, dst);
3084 else if (p == null)
3085 p = new CompletionNode(d);
3086 else if (UNSAFE.compareAndSwapObject
3087 (f, COMPLETIONS, p.next = f.completions, p))
3088 break;
3089 }
3090 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3091 Throwable ex; Object t;
3092 if (r instanceof AltResult) {
3093 ex = ((AltResult)r).ex;
3094 t = null;
3095 }
3096 else {
3097 ex = null;
3098 t = r;
3099 }
3100 dst.internalComplete(t, ex);
3101 }
3102 f.helpPostComplete();
3103 }
3104 return dst;
3105 }
3106 }
3107
3108 /**
3109 * Recursively constructs an Or'ed tree of CompletableFutures.
3110 */
3111 private static CompletableFuture<Object> anyTree(CompletableFuture<?>[] cfs,
3112 int lo, int hi) {
3113 CompletableFuture<?> fst, snd;
3114 int mid = (lo + hi) >>> 1;
3115 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3116 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3117 throw new NullPointerException();
3118 CompletableFuture<Object> dst = new CompletableFuture<Object>();
3119 OrCompletion d = null;
3120 CompletionNode p = null, q = null;
3121 Object r;
3122 while ((r = fst.result) == null && (r = snd.result) == null) {
3123 if (d == null)
3124 d = new OrCompletion(fst, snd, dst);
3125 else if (p == null)
3126 p = new CompletionNode(d);
3127 else if (q == null) {
3128 if (UNSAFE.compareAndSwapObject
3129 (fst, COMPLETIONS, p.next = fst.completions, p))
3130 q = new CompletionNode(d);
3131 }
3132 else if (UNSAFE.compareAndSwapObject
3133 (snd, COMPLETIONS, q.next = snd.completions, q))
3134 break;
3135 }
3136 if ((r != null || (r = fst.result) != null ||
3137 (r = snd.result) != null) &&
3138 (d == null || d.compareAndSet(0, 1))) {
3139 Throwable ex; Object t;
3140 if (r instanceof AltResult) {
3141 ex = ((AltResult)r).ex;
3142 t = null;
3143 }
3144 else {
3145 ex = null;
3146 t = r;
3147 }
3148 dst.internalComplete(t, ex);
3149 }
3150 fst.helpPostComplete();
3151 snd.helpPostComplete();
3152 return dst;
3153 }
3154
3155 /* ------------- Control and status methods -------------- */
3156
3157 /**
3158 * If not already completed, completes this CompletableFuture with
3159 * a {@link CancellationException}. Dependent CompletableFutures
3160 * that have not already completed will also complete
3161 * exceptionally, with a {@link CompletionException} caused by
3162 * this {@code CancellationException}.
3163 *
3164 * @param mayInterruptIfRunning this value has no effect in this
3165 * implementation because interrupts are not used to control
3166 * processing.
3167 *
3168 * @return {@code true} if this task is now cancelled
3169 */
3170 public boolean cancel(boolean mayInterruptIfRunning) {
3171 boolean cancelled = (result == null) &&
3172 UNSAFE.compareAndSwapObject
3173 (this, RESULT, null, new AltResult(new CancellationException()));
3174 postComplete();
3175 return cancelled || isCancelled();
3176 }
3177
3178 /**
3179 * Returns {@code true} if this CompletableFuture was cancelled
3180 * before it completed normally.
3181 *
3182 * @return {@code true} if this CompletableFuture was cancelled
3183 * before it completed normally
3184 */
3185 public boolean isCancelled() {
3186 Object r;
3187 return ((r = result) instanceof AltResult) &&
3188 (((AltResult)r).ex instanceof CancellationException);
3189 }
3190
3191 /**
3192 * Forcibly sets or resets the value subsequently returned by
3193 * method {@link #get()} and related methods, whether or not
3194 * already completed. This method is designed for use only in
3195 * error recovery actions, and even in such situations may result
3196 * in ongoing dependent completions using established versus
3197 * overwritten outcomes.
3198 *
3199 * @param value the completion value
3200 */
3201 public void obtrudeValue(T value) {
3202 result = (value == null) ? NIL : value;
3203 postComplete();
3204 }
3205
3206 /**
3207 * Forcibly causes subsequent invocations of method {@link #get()}
3208 * and related methods to throw the given exception, whether or
3209 * not already completed. This method is designed for use only in
3210 * recovery actions, and even in such situations may result in
3211 * ongoing dependent completions using established versus
3212 * overwritten outcomes.
3213 *
3214 * @param ex the exception
3215 */
3216 public void obtrudeException(Throwable ex) {
3217 if (ex == null) throw new NullPointerException();
3218 result = new AltResult(ex);
3219 postComplete();
3220 }
3221
3222 /**
3223 * Returns the estimated number of CompletableFutures whose
3224 * completions are awaiting completion of this CompletableFuture.
3225 * This method is designed for use in monitoring system state, not
3226 * for synchronization control.
3227 *
3228 * @return the number of dependent CompletableFutures
3229 */
3230 public int getNumberOfDependents() {
3231 int count = 0;
3232 for (CompletionNode p = completions; p != null; p = p.next)
3233 ++count;
3234 return count;
3235 }
3236
3237 /**
3238 * Returns a string identifying this CompletableFuture, as well as
3239 * its completion state. The state, in brackets, contains the
3240 * String {@code "Completed Normally"} or the String {@code
3241 * "Completed Exceptionally"}, or the String {@code "Not
3242 * completed"} followed by the number of CompletableFutures
3243 * dependent upon its completion, if any.
3244 *
3245 * @return a string identifying this CompletableFuture, as well as its state
3246 */
3247 public String toString() {
3248 Object r = result;
3249 int count;
3250 return super.toString() +
3251 ((r == null) ?
3252 (((count = getNumberOfDependents()) == 0) ?
3253 "[Not completed]" :
3254 "[Not completed, " + count + " dependents]") :
3255 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3256 "[Completed exceptionally]" :
3257 "[Completed normally]"));
3258 }
3259
3260 // Unsafe mechanics
3261 private static final sun.misc.Unsafe UNSAFE;
3262 private static final long RESULT;
3263 private static final long WAITERS;
3264 private static final long COMPLETIONS;
3265 static {
3266 try {
3267 UNSAFE = getUnsafe();
3268 Class<?> k = CompletableFuture.class;
3269 RESULT = UNSAFE.objectFieldOffset
3270 (k.getDeclaredField("result"));
3271 WAITERS = UNSAFE.objectFieldOffset
3272 (k.getDeclaredField("waiters"));
3273 COMPLETIONS = UNSAFE.objectFieldOffset
3274 (k.getDeclaredField("completions"));
3275 } catch (Exception e) {
3276 throw new Error(e);
3277 }
3278 }
3279
3280 /**
3281 * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
3282 * Replace with a simple call to Unsafe.getUnsafe when integrating
3283 * into a jdk.
3284 *
3285 * @return a sun.misc.Unsafe
3286 */
3287 private static sun.misc.Unsafe getUnsafe() {
3288 try {
3289 return sun.misc.Unsafe.getUnsafe();
3290 } catch (SecurityException tryReflectionInstead) {}
3291 try {
3292 return java.security.AccessController.doPrivileged
3293 (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
3294 public sun.misc.Unsafe run() throws Exception {
3295 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
3296 for (java.lang.reflect.Field f : k.getDeclaredFields()) {
3297 f.setAccessible(true);
3298 Object x = f.get(null);
3299 if (k.isInstance(x))
3300 return k.cast(x);
3301 }
3302 throw new NoSuchFieldError("the Unsafe");
3303 }});
3304 } catch (java.security.PrivilegedActionException e) {
3305 throw new RuntimeException("Could not initialize intrinsics",
3306 e.getCause());
3307 }
3308 }
3309 }