ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.75
Committed: Sat Mar 23 10:58:57 2013 UTC (11 years, 2 months ago) by dl
Branch: MAIN
Changes since 1.74: +61 -46 lines
Log Message:
Fix signature mismatch between allOf and allTree; same for any

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on
37 * user-provided Functions, Consumers, or Runnables. The appropriate
38 * form to use depends on whether actions require arguments and/or
39 * produce results. Completion of a dependent action will trigger the
40 * completion of another CompletableFuture. Actions may also be
41 * triggered after either or both the current and another
42 * CompletableFuture complete. Multiple CompletableFutures may also
43 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44 * {@link #allOf(CompletableFuture...)}.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, actions supplied for dependent completions of another
48 * CompletableFuture may do so, depending on whether they are provided
49 * via one of the <em>async</em> methods (that is, methods with names
50 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51 * methods provide a way to commence asynchronous processing of an
52 * action using either a given {@link Executor} or by default the
53 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54 * debugging, and tracking, all generated asynchronous tasks are
55 * instances of the marker interface {@link AsynchronousCompletionTask}.
56 *
57 * <p>Actions supplied for dependent completions of <em>non-async</em>
58 * methods may be performed by the thread that completes the current
59 * CompletableFuture, or by any other caller of these methods. There
60 * are no guarantees about the order of processing completions unless
61 * constrained by these methods.
62 *
63 * <p>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional completion.
66 * Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}.
68 *
69 * <p>Upon exceptional completion (including cancellation), or when a
70 * completion entails an additional computation which terminates
71 * abruptly with an (unchecked) exception or error, then all of their
72 * dependent completions (and their dependents in turn) generally act
73 * as {@code completeExceptionally} with a {@link CompletionException}
74 * holding that exception as its cause. However, the {@link
75 * #exceptionally exceptionally} and {@link #handle handle}
76 * completions <em>are</em> able to handle exceptional completions of
77 * the CompletableFutures they depend on.
78 *
79 * <p>In case of exceptional completion with a CompletionException,
80 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81 * {@link ExecutionException} with the same cause as held in the
82 * corresponding CompletionException. However, in these cases,
83 * methods {@link #join()} and {@link #getNow} throw the
84 * CompletionException, which simplifies usage.
85 *
86 * @author Doug Lea
87 * @since 1.8
88 */
89 public class CompletableFuture<T> implements Future<T> {
90
91 /*
92 * Overview:
93 *
94 * 1. Non-nullness of field result (set via CAS) indicates done.
95 * An AltResult is used to box null as a result, as well as to
96 * hold exceptions. Using a single field makes completion fast
97 * and simple to detect and trigger, at the expense of a lot of
98 * encoding and decoding that infiltrates many methods. One minor
99 * simplification relies on the (static) NIL (to box null results)
100 * being the only AltResult with a null exception field, so we
101 * don't usually need explicit comparisons with NIL. The CF
102 * exception propagation mechanics surrounding decoding rely on
103 * unchecked casts of decoded results really being unchecked,
104 * where user type errors are caught at point of use, as is
105 * currently the case in Java. These are highlighted by using
106 * SuppressWarnings-annotated temporaries.
107 *
108 * 2. Waiters are held in a Treiber stack similar to the one used
109 * in FutureTask, Phaser, and SynchronousQueue. See their
110 * internal documentation for algorithmic details.
111 *
112 * 3. Completions are also kept in a list/stack, and pulled off
113 * and run when completion is triggered. (We could even use the
114 * same stack as for waiters, but would give up the potential
115 * parallelism obtained because woken waiters help release/run
116 * others -- see method postComplete). Because post-processing
117 * may race with direct calls, class Completion opportunistically
118 * extends AtomicInteger so callers can claim the action via
119 * compareAndSet(0, 1). The Completion.run methods are all
120 * written a boringly similar uniform way (that sometimes includes
121 * unnecessary-looking checks, kept to maintain uniformity).
122 * There are enough dimensions upon which they differ that
123 * attempts to factor commonalities while maintaining efficiency
124 * require more lines of code than they would save.
125 *
126 * 4. The exported then/and/or methods do support a bit of
127 * factoring (see doThenApply etc). They must cope with the
128 * intrinsic races surrounding addition of a dependent action
129 * versus performing the action directly because the task is
130 * already complete. For example, a CF may not be complete upon
131 * entry, so a dependent completion is added, but by the time it
132 * is added, the target CF is complete, so must be directly
133 * executed. This is all done while avoiding unnecessary object
134 * construction in safe-bypass cases.
135 */
136
137 // preliminaries
138
139 static final class AltResult {
140 final Throwable ex; // null only for NIL
141 AltResult(Throwable ex) { this.ex = ex; }
142 }
143
144 static final AltResult NIL = new AltResult(null);
145
146 // Fields
147
148 volatile Object result; // Either the result or boxed AltResult
149 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
150 volatile CompletionNode completions; // list (Treiber stack) of completions
151
152 // Basic utilities for triggering and processing completions
153
154 /**
155 * Removes and signals all waiting threads and runs all completions.
156 */
157 final void postComplete() {
158 WaitNode q; Thread t;
159 while ((q = waiters) != null) {
160 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
161 (t = q.thread) != null) {
162 q.thread = null;
163 LockSupport.unpark(t);
164 }
165 }
166
167 CompletionNode h; Completion c;
168 while ((h = completions) != null) {
169 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
170 (c = h.completion) != null)
171 c.run();
172 }
173 }
174
175 /**
176 * Triggers completion with the encoding of the given arguments:
177 * if the exception is non-null, encodes it as a wrapped
178 * CompletionException unless it is one already. Otherwise uses
179 * the given result, boxed as NIL if null.
180 */
181 final void internalComplete(T v, Throwable ex) {
182 if (result == null)
183 UNSAFE.compareAndSwapObject
184 (this, RESULT, null,
185 (ex == null) ? (v == null) ? NIL : v :
186 new AltResult((ex instanceof CompletionException) ? ex :
187 new CompletionException(ex)));
188 postComplete(); // help out even if not triggered
189 }
190
191 /**
192 * If triggered, helps release and/or process completions.
193 */
194 final void helpPostComplete() {
195 if (result != null)
196 postComplete();
197 }
198
199 /* ------------- waiting for completions -------------- */
200
201 /** Number of processors, for spin control */
202 static final int NCPU = Runtime.getRuntime().availableProcessors();
203
204 /**
205 * Heuristic spin value for waitingGet() before blocking on
206 * multiprocessors
207 */
208 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
209
210 /**
211 * Linked nodes to record waiting threads in a Treiber stack. See
212 * other classes such as Phaser and SynchronousQueue for more
213 * detailed explanation. This class implements ManagedBlocker to
214 * avoid starvation when blocking actions pile up in
215 * ForkJoinPools.
216 */
217 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
218 long nanos; // wait time if timed
219 final long deadline; // non-zero if timed
220 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
221 volatile Thread thread;
222 volatile WaitNode next;
223 WaitNode(boolean interruptible, long nanos, long deadline) {
224 this.thread = Thread.currentThread();
225 this.interruptControl = interruptible ? 1 : 0;
226 this.nanos = nanos;
227 this.deadline = deadline;
228 }
229 public boolean isReleasable() {
230 if (thread == null)
231 return true;
232 if (Thread.interrupted()) {
233 int i = interruptControl;
234 interruptControl = -1;
235 if (i > 0)
236 return true;
237 }
238 if (deadline != 0L &&
239 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
240 thread = null;
241 return true;
242 }
243 return false;
244 }
245 public boolean block() {
246 if (isReleasable())
247 return true;
248 else if (deadline == 0L)
249 LockSupport.park(this);
250 else if (nanos > 0L)
251 LockSupport.parkNanos(this, nanos);
252 return isReleasable();
253 }
254 }
255
256 /**
257 * Returns raw result after waiting, or null if interruptible and
258 * interrupted.
259 */
260 private Object waitingGet(boolean interruptible) {
261 WaitNode q = null;
262 boolean queued = false;
263 int spins = SPINS;
264 for (Object r;;) {
265 if ((r = result) != null) {
266 if (q != null) { // suppress unpark
267 q.thread = null;
268 if (q.interruptControl < 0) {
269 if (interruptible) {
270 removeWaiter(q);
271 return null;
272 }
273 Thread.currentThread().interrupt();
274 }
275 }
276 postComplete(); // help release others
277 return r;
278 }
279 else if (spins > 0) {
280 int rnd = ThreadLocalRandom.nextSecondarySeed();
281 if (rnd == 0)
282 rnd = ThreadLocalRandom.current().nextInt();
283 if (rnd >= 0)
284 --spins;
285 }
286 else if (q == null)
287 q = new WaitNode(interruptible, 0L, 0L);
288 else if (!queued)
289 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
290 q.next = waiters, q);
291 else if (interruptible && q.interruptControl < 0) {
292 removeWaiter(q);
293 return null;
294 }
295 else if (q.thread != null && result == null) {
296 try {
297 ForkJoinPool.managedBlock(q);
298 } catch (InterruptedException ex) {
299 q.interruptControl = -1;
300 }
301 }
302 }
303 }
304
305 /**
306 * Awaits completion or aborts on interrupt or timeout.
307 *
308 * @param nanos time to wait
309 * @return raw result
310 */
311 private Object timedAwaitDone(long nanos)
312 throws InterruptedException, TimeoutException {
313 WaitNode q = null;
314 boolean queued = false;
315 for (Object r;;) {
316 if ((r = result) != null) {
317 if (q != null) {
318 q.thread = null;
319 if (q.interruptControl < 0) {
320 removeWaiter(q);
321 throw new InterruptedException();
322 }
323 }
324 postComplete();
325 return r;
326 }
327 else if (q == null) {
328 if (nanos <= 0L)
329 throw new TimeoutException();
330 long d = System.nanoTime() + nanos;
331 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
332 }
333 else if (!queued)
334 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
335 q.next = waiters, q);
336 else if (q.interruptControl < 0) {
337 removeWaiter(q);
338 throw new InterruptedException();
339 }
340 else if (q.nanos <= 0L) {
341 if (result == null) {
342 removeWaiter(q);
343 throw new TimeoutException();
344 }
345 }
346 else if (q.thread != null && result == null) {
347 try {
348 ForkJoinPool.managedBlock(q);
349 } catch (InterruptedException ex) {
350 q.interruptControl = -1;
351 }
352 }
353 }
354 }
355
356 /**
357 * Tries to unlink a timed-out or interrupted wait node to avoid
358 * accumulating garbage. Internal nodes are simply unspliced
359 * without CAS since it is harmless if they are traversed anyway
360 * by releasers. To avoid effects of unsplicing from already
361 * removed nodes, the list is retraversed in case of an apparent
362 * race. This is slow when there are a lot of nodes, but we don't
363 * expect lists to be long enough to outweigh higher-overhead
364 * schemes.
365 */
366 private void removeWaiter(WaitNode node) {
367 if (node != null) {
368 node.thread = null;
369 retry:
370 for (;;) { // restart on removeWaiter race
371 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
372 s = q.next;
373 if (q.thread != null)
374 pred = q;
375 else if (pred != null) {
376 pred.next = s;
377 if (pred.thread == null) // check for race
378 continue retry;
379 }
380 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
381 continue retry;
382 }
383 break;
384 }
385 }
386 }
387
388 /* ------------- Async tasks -------------- */
389
390 /**
391 * A marker interface identifying asynchronous tasks produced by
392 * {@code async} methods. This may be useful for monitoring,
393 * debugging, and tracking asynchronous activities.
394 *
395 * @since 1.8
396 */
397 public static interface AsynchronousCompletionTask {
398 }
399
400 /** Base class can act as either FJ or plain Runnable */
401 abstract static class Async extends ForkJoinTask<Void>
402 implements Runnable, AsynchronousCompletionTask {
403 public final Void getRawResult() { return null; }
404 public final void setRawResult(Void v) { }
405 public final void run() { exec(); }
406 }
407
408 static final class AsyncRun extends Async {
409 final Runnable fn;
410 final CompletableFuture<Void> dst;
411 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
412 this.fn = fn; this.dst = dst;
413 }
414 public final boolean exec() {
415 CompletableFuture<Void> d; Throwable ex;
416 if ((d = this.dst) != null && d.result == null) {
417 try {
418 fn.run();
419 ex = null;
420 } catch (Throwable rex) {
421 ex = rex;
422 }
423 d.internalComplete(null, ex);
424 }
425 return true;
426 }
427 private static final long serialVersionUID = 5232453952276885070L;
428 }
429
430 static final class AsyncSupply<U> extends Async {
431 final Supplier<U> fn;
432 final CompletableFuture<U> dst;
433 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
434 this.fn = fn; this.dst = dst;
435 }
436 public final boolean exec() {
437 CompletableFuture<U> d; U u; Throwable ex;
438 if ((d = this.dst) != null && d.result == null) {
439 try {
440 u = fn.get();
441 ex = null;
442 } catch (Throwable rex) {
443 ex = rex;
444 u = null;
445 }
446 d.internalComplete(u, ex);
447 }
448 return true;
449 }
450 private static final long serialVersionUID = 5232453952276885070L;
451 }
452
453 static final class AsyncApply<T,U> extends Async {
454 final T arg;
455 final Function<? super T,? extends U> fn;
456 final CompletableFuture<U> dst;
457 AsyncApply(T arg, Function<? super T,? extends U> fn,
458 CompletableFuture<U> dst) {
459 this.arg = arg; this.fn = fn; this.dst = dst;
460 }
461 public final boolean exec() {
462 CompletableFuture<U> d; U u; Throwable ex;
463 if ((d = this.dst) != null && d.result == null) {
464 try {
465 u = fn.apply(arg);
466 ex = null;
467 } catch (Throwable rex) {
468 ex = rex;
469 u = null;
470 }
471 d.internalComplete(u, ex);
472 }
473 return true;
474 }
475 private static final long serialVersionUID = 5232453952276885070L;
476 }
477
478 static final class AsyncBiApply<T,U,V> extends Async {
479 final T arg1;
480 final U arg2;
481 final BiFunction<? super T,? super U,? extends V> fn;
482 final CompletableFuture<V> dst;
483 AsyncBiApply(T arg1, U arg2,
484 BiFunction<? super T,? super U,? extends V> fn,
485 CompletableFuture<V> dst) {
486 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
487 }
488 public final boolean exec() {
489 CompletableFuture<V> d; V v; Throwable ex;
490 if ((d = this.dst) != null && d.result == null) {
491 try {
492 v = fn.apply(arg1, arg2);
493 ex = null;
494 } catch (Throwable rex) {
495 ex = rex;
496 v = null;
497 }
498 d.internalComplete(v, ex);
499 }
500 return true;
501 }
502 private static final long serialVersionUID = 5232453952276885070L;
503 }
504
505 static final class AsyncAccept<T> extends Async {
506 final T arg;
507 final Consumer<? super T> fn;
508 final CompletableFuture<Void> dst;
509 AsyncAccept(T arg, Consumer<? super T> fn,
510 CompletableFuture<Void> dst) {
511 this.arg = arg; this.fn = fn; this.dst = dst;
512 }
513 public final boolean exec() {
514 CompletableFuture<Void> d; Throwable ex;
515 if ((d = this.dst) != null && d.result == null) {
516 try {
517 fn.accept(arg);
518 ex = null;
519 } catch (Throwable rex) {
520 ex = rex;
521 }
522 d.internalComplete(null, ex);
523 }
524 return true;
525 }
526 private static final long serialVersionUID = 5232453952276885070L;
527 }
528
529 static final class AsyncBiAccept<T,U> extends Async {
530 final T arg1;
531 final U arg2;
532 final BiConsumer<? super T,? super U> fn;
533 final CompletableFuture<Void> dst;
534 AsyncBiAccept(T arg1, U arg2,
535 BiConsumer<? super T,? super U> fn,
536 CompletableFuture<Void> dst) {
537 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
538 }
539 public final boolean exec() {
540 CompletableFuture<Void> d; Throwable ex;
541 if ((d = this.dst) != null && d.result == null) {
542 try {
543 fn.accept(arg1, arg2);
544 ex = null;
545 } catch (Throwable rex) {
546 ex = rex;
547 }
548 d.internalComplete(null, ex);
549 }
550 return true;
551 }
552 private static final long serialVersionUID = 5232453952276885070L;
553 }
554
555 static final class AsyncCompose<T,U> extends Async {
556 final T arg;
557 final Function<? super T, CompletableFuture<U>> fn;
558 final CompletableFuture<U> dst;
559 AsyncCompose(T arg,
560 Function<? super T, CompletableFuture<U>> fn,
561 CompletableFuture<U> dst) {
562 this.arg = arg; this.fn = fn; this.dst = dst;
563 }
564 public final boolean exec() {
565 CompletableFuture<U> d, fr; U u; Throwable ex;
566 if ((d = this.dst) != null && d.result == null) {
567 try {
568 fr = fn.apply(arg);
569 ex = (fr == null) ? new NullPointerException() : null;
570 } catch (Throwable rex) {
571 ex = rex;
572 fr = null;
573 }
574 if (ex != null)
575 u = null;
576 else {
577 Object r = fr.result;
578 if (r == null)
579 r = fr.waitingGet(false);
580 if (r instanceof AltResult) {
581 ex = ((AltResult)r).ex;
582 u = null;
583 }
584 else {
585 @SuppressWarnings("unchecked") U ur = (U) r;
586 u = ur;
587 }
588 }
589 d.internalComplete(u, ex);
590 }
591 return true;
592 }
593 private static final long serialVersionUID = 5232453952276885070L;
594 }
595
596 /* ------------- Completions -------------- */
597
598 /**
599 * Simple linked list nodes to record completions, used in
600 * basically the same way as WaitNodes. (We separate nodes from
601 * the Completions themselves mainly because for the And and Or
602 * methods, the same Completion object resides in two lists.)
603 */
604 static final class CompletionNode {
605 final Completion completion;
606 volatile CompletionNode next;
607 CompletionNode(Completion completion) { this.completion = completion; }
608 }
609
610 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
611 abstract static class Completion extends AtomicInteger implements Runnable {
612 }
613
614 static final class ApplyCompletion<T,U> extends Completion {
615 final CompletableFuture<? extends T> src;
616 final Function<? super T,? extends U> fn;
617 final CompletableFuture<U> dst;
618 final Executor executor;
619 ApplyCompletion(CompletableFuture<? extends T> src,
620 Function<? super T,? extends U> fn,
621 CompletableFuture<U> dst, Executor executor) {
622 this.src = src; this.fn = fn; this.dst = dst;
623 this.executor = executor;
624 }
625 public final void run() {
626 final CompletableFuture<? extends T> a;
627 final Function<? super T,? extends U> fn;
628 final CompletableFuture<U> dst;
629 Object r; T t; Throwable ex;
630 if ((dst = this.dst) != null &&
631 (fn = this.fn) != null &&
632 (a = this.src) != null &&
633 (r = a.result) != null &&
634 compareAndSet(0, 1)) {
635 if (r instanceof AltResult) {
636 ex = ((AltResult)r).ex;
637 t = null;
638 }
639 else {
640 ex = null;
641 @SuppressWarnings("unchecked") T tr = (T) r;
642 t = tr;
643 }
644 Executor e = executor;
645 U u = null;
646 if (ex == null) {
647 try {
648 if (e != null)
649 e.execute(new AsyncApply<T,U>(t, fn, dst));
650 else
651 u = fn.apply(t);
652 } catch (Throwable rex) {
653 ex = rex;
654 }
655 }
656 if (e == null || ex != null)
657 dst.internalComplete(u, ex);
658 }
659 }
660 private static final long serialVersionUID = 5232453952276885070L;
661 }
662
663 static final class AcceptCompletion<T> extends Completion {
664 final CompletableFuture<? extends T> src;
665 final Consumer<? super T> fn;
666 final CompletableFuture<Void> dst;
667 final Executor executor;
668 AcceptCompletion(CompletableFuture<? extends T> src,
669 Consumer<? super T> fn,
670 CompletableFuture<Void> dst, Executor executor) {
671 this.src = src; this.fn = fn; this.dst = dst;
672 this.executor = executor;
673 }
674 public final void run() {
675 final CompletableFuture<? extends T> a;
676 final Consumer<? super T> fn;
677 final CompletableFuture<Void> dst;
678 Object r; T t; Throwable ex;
679 if ((dst = this.dst) != null &&
680 (fn = this.fn) != null &&
681 (a = this.src) != null &&
682 (r = a.result) != null &&
683 compareAndSet(0, 1)) {
684 if (r instanceof AltResult) {
685 ex = ((AltResult)r).ex;
686 t = null;
687 }
688 else {
689 ex = null;
690 @SuppressWarnings("unchecked") T tr = (T) r;
691 t = tr;
692 }
693 Executor e = executor;
694 if (ex == null) {
695 try {
696 if (e != null)
697 e.execute(new AsyncAccept<T>(t, fn, dst));
698 else
699 fn.accept(t);
700 } catch (Throwable rex) {
701 ex = rex;
702 }
703 }
704 if (e == null || ex != null)
705 dst.internalComplete(null, ex);
706 }
707 }
708 private static final long serialVersionUID = 5232453952276885070L;
709 }
710
711 static final class RunCompletion<T> extends Completion {
712 final CompletableFuture<? extends T> src;
713 final Runnable fn;
714 final CompletableFuture<Void> dst;
715 final Executor executor;
716 RunCompletion(CompletableFuture<? extends T> src,
717 Runnable fn,
718 CompletableFuture<Void> dst,
719 Executor executor) {
720 this.src = src; this.fn = fn; this.dst = dst;
721 this.executor = executor;
722 }
723 public final void run() {
724 final CompletableFuture<? extends T> a;
725 final Runnable fn;
726 final CompletableFuture<Void> dst;
727 Object r; Throwable ex;
728 if ((dst = this.dst) != null &&
729 (fn = this.fn) != null &&
730 (a = this.src) != null &&
731 (r = a.result) != null &&
732 compareAndSet(0, 1)) {
733 if (r instanceof AltResult)
734 ex = ((AltResult)r).ex;
735 else
736 ex = null;
737 Executor e = executor;
738 if (ex == null) {
739 try {
740 if (e != null)
741 e.execute(new AsyncRun(fn, dst));
742 else
743 fn.run();
744 } catch (Throwable rex) {
745 ex = rex;
746 }
747 }
748 if (e == null || ex != null)
749 dst.internalComplete(null, ex);
750 }
751 }
752 private static final long serialVersionUID = 5232453952276885070L;
753 }
754
755 static final class BiApplyCompletion<T,U,V> extends Completion {
756 final CompletableFuture<? extends T> src;
757 final CompletableFuture<? extends U> snd;
758 final BiFunction<? super T,? super U,? extends V> fn;
759 final CompletableFuture<V> dst;
760 final Executor executor;
761 BiApplyCompletion(CompletableFuture<? extends T> src,
762 CompletableFuture<? extends U> snd,
763 BiFunction<? super T,? super U,? extends V> fn,
764 CompletableFuture<V> dst, Executor executor) {
765 this.src = src; this.snd = snd;
766 this.fn = fn; this.dst = dst;
767 this.executor = executor;
768 }
769 public final void run() {
770 final CompletableFuture<? extends T> a;
771 final CompletableFuture<? extends U> b;
772 final BiFunction<? super T,? super U,? extends V> fn;
773 final CompletableFuture<V> dst;
774 Object r, s; T t; U u; Throwable ex;
775 if ((dst = this.dst) != null &&
776 (fn = this.fn) != null &&
777 (a = this.src) != null &&
778 (r = a.result) != null &&
779 (b = this.snd) != null &&
780 (s = b.result) != null &&
781 compareAndSet(0, 1)) {
782 if (r instanceof AltResult) {
783 ex = ((AltResult)r).ex;
784 t = null;
785 }
786 else {
787 ex = null;
788 @SuppressWarnings("unchecked") T tr = (T) r;
789 t = tr;
790 }
791 if (ex != null)
792 u = null;
793 else if (s instanceof AltResult) {
794 ex = ((AltResult)s).ex;
795 u = null;
796 }
797 else {
798 @SuppressWarnings("unchecked") U us = (U) s;
799 u = us;
800 }
801 Executor e = executor;
802 V v = null;
803 if (ex == null) {
804 try {
805 if (e != null)
806 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
807 else
808 v = fn.apply(t, u);
809 } catch (Throwable rex) {
810 ex = rex;
811 }
812 }
813 if (e == null || ex != null)
814 dst.internalComplete(v, ex);
815 }
816 }
817 private static final long serialVersionUID = 5232453952276885070L;
818 }
819
820 static final class BiAcceptCompletion<T,U> extends Completion {
821 final CompletableFuture<? extends T> src;
822 final CompletableFuture<? extends U> snd;
823 final BiConsumer<? super T,? super U> fn;
824 final CompletableFuture<Void> dst;
825 final Executor executor;
826 BiAcceptCompletion(CompletableFuture<? extends T> src,
827 CompletableFuture<? extends U> snd,
828 BiConsumer<? super T,? super U> fn,
829 CompletableFuture<Void> dst, Executor executor) {
830 this.src = src; this.snd = snd;
831 this.fn = fn; this.dst = dst;
832 this.executor = executor;
833 }
834 public final void run() {
835 final CompletableFuture<? extends T> a;
836 final CompletableFuture<? extends U> b;
837 final BiConsumer<? super T,? super U> fn;
838 final CompletableFuture<Void> dst;
839 Object r, s; T t; U u; Throwable ex;
840 if ((dst = this.dst) != null &&
841 (fn = this.fn) != null &&
842 (a = this.src) != null &&
843 (r = a.result) != null &&
844 (b = this.snd) != null &&
845 (s = b.result) != null &&
846 compareAndSet(0, 1)) {
847 if (r instanceof AltResult) {
848 ex = ((AltResult)r).ex;
849 t = null;
850 }
851 else {
852 ex = null;
853 @SuppressWarnings("unchecked") T tr = (T) r;
854 t = tr;
855 }
856 if (ex != null)
857 u = null;
858 else if (s instanceof AltResult) {
859 ex = ((AltResult)s).ex;
860 u = null;
861 }
862 else {
863 @SuppressWarnings("unchecked") U us = (U) s;
864 u = us;
865 }
866 Executor e = executor;
867 if (ex == null) {
868 try {
869 if (e != null)
870 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
871 else
872 fn.accept(t, u);
873 } catch (Throwable rex) {
874 ex = rex;
875 }
876 }
877 if (e == null || ex != null)
878 dst.internalComplete(null, ex);
879 }
880 }
881 private static final long serialVersionUID = 5232453952276885070L;
882 }
883
884 static final class BiRunCompletion<T> extends Completion {
885 final CompletableFuture<? extends T> src;
886 final CompletableFuture<?> snd;
887 final Runnable fn;
888 final CompletableFuture<Void> dst;
889 final Executor executor;
890 BiRunCompletion(CompletableFuture<? extends T> src,
891 CompletableFuture<?> snd,
892 Runnable fn,
893 CompletableFuture<Void> dst, Executor executor) {
894 this.src = src; this.snd = snd;
895 this.fn = fn; this.dst = dst;
896 this.executor = executor;
897 }
898 public final void run() {
899 final CompletableFuture<? extends T> a;
900 final CompletableFuture<?> b;
901 final Runnable fn;
902 final CompletableFuture<Void> dst;
903 Object r, s; Throwable ex;
904 if ((dst = this.dst) != null &&
905 (fn = this.fn) != null &&
906 (a = this.src) != null &&
907 (r = a.result) != null &&
908 (b = this.snd) != null &&
909 (s = b.result) != null &&
910 compareAndSet(0, 1)) {
911 if (r instanceof AltResult)
912 ex = ((AltResult)r).ex;
913 else
914 ex = null;
915 if (ex == null && (s instanceof AltResult))
916 ex = ((AltResult)s).ex;
917 Executor e = executor;
918 if (ex == null) {
919 try {
920 if (e != null)
921 e.execute(new AsyncRun(fn, dst));
922 else
923 fn.run();
924 } catch (Throwable rex) {
925 ex = rex;
926 }
927 }
928 if (e == null || ex != null)
929 dst.internalComplete(null, ex);
930 }
931 }
932 private static final long serialVersionUID = 5232453952276885070L;
933 }
934
935 static final class AndCompletion extends Completion {
936 final CompletableFuture<?> src;
937 final CompletableFuture<?> snd;
938 final CompletableFuture<Void> dst;
939 AndCompletion(CompletableFuture<?> src,
940 CompletableFuture<?> snd,
941 CompletableFuture<Void> dst) {
942 this.src = src; this.snd = snd; this.dst = dst;
943 }
944 public final void run() {
945 final CompletableFuture<?> a;
946 final CompletableFuture<?> b;
947 final CompletableFuture<Void> dst;
948 Object r, s; Throwable ex;
949 if ((dst = this.dst) != null &&
950 (a = this.src) != null &&
951 (r = a.result) != null &&
952 (b = this.snd) != null &&
953 (s = b.result) != null &&
954 compareAndSet(0, 1)) {
955 if (r instanceof AltResult)
956 ex = ((AltResult)r).ex;
957 else
958 ex = null;
959 if (ex == null && (s instanceof AltResult))
960 ex = ((AltResult)s).ex;
961 dst.internalComplete(null, ex);
962 }
963 }
964 private static final long serialVersionUID = 5232453952276885070L;
965 }
966
967 static final class OrApplyCompletion<T,U> extends Completion {
968 final CompletableFuture<? extends T> src;
969 final CompletableFuture<? extends T> snd;
970 final Function<? super T,? extends U> fn;
971 final CompletableFuture<U> dst;
972 final Executor executor;
973 OrApplyCompletion(CompletableFuture<? extends T> src,
974 CompletableFuture<? extends T> snd,
975 Function<? super T,? extends U> fn,
976 CompletableFuture<U> dst, Executor executor) {
977 this.src = src; this.snd = snd;
978 this.fn = fn; this.dst = dst;
979 this.executor = executor;
980 }
981 public final void run() {
982 final CompletableFuture<? extends T> a;
983 final CompletableFuture<? extends T> b;
984 final Function<? super T,? extends U> fn;
985 final CompletableFuture<U> dst;
986 Object r; T t; Throwable ex;
987 if ((dst = this.dst) != null &&
988 (fn = this.fn) != null &&
989 (((a = this.src) != null && (r = a.result) != null) ||
990 ((b = this.snd) != null && (r = b.result) != null)) &&
991 compareAndSet(0, 1)) {
992 if (r instanceof AltResult) {
993 ex = ((AltResult)r).ex;
994 t = null;
995 }
996 else {
997 ex = null;
998 @SuppressWarnings("unchecked") T tr = (T) r;
999 t = tr;
1000 }
1001 Executor e = executor;
1002 U u = null;
1003 if (ex == null) {
1004 try {
1005 if (e != null)
1006 e.execute(new AsyncApply<T,U>(t, fn, dst));
1007 else
1008 u = fn.apply(t);
1009 } catch (Throwable rex) {
1010 ex = rex;
1011 }
1012 }
1013 if (e == null || ex != null)
1014 dst.internalComplete(u, ex);
1015 }
1016 }
1017 private static final long serialVersionUID = 5232453952276885070L;
1018 }
1019
1020 static final class OrAcceptCompletion<T> extends Completion {
1021 final CompletableFuture<? extends T> src;
1022 final CompletableFuture<? extends T> snd;
1023 final Consumer<? super T> fn;
1024 final CompletableFuture<Void> dst;
1025 final Executor executor;
1026 OrAcceptCompletion(CompletableFuture<? extends T> src,
1027 CompletableFuture<? extends T> snd,
1028 Consumer<? super T> fn,
1029 CompletableFuture<Void> dst, Executor executor) {
1030 this.src = src; this.snd = snd;
1031 this.fn = fn; this.dst = dst;
1032 this.executor = executor;
1033 }
1034 public final void run() {
1035 final CompletableFuture<? extends T> a;
1036 final CompletableFuture<? extends T> b;
1037 final Consumer<? super T> fn;
1038 final CompletableFuture<Void> dst;
1039 Object r; T t; Throwable ex;
1040 if ((dst = this.dst) != null &&
1041 (fn = this.fn) != null &&
1042 (((a = this.src) != null && (r = a.result) != null) ||
1043 ((b = this.snd) != null && (r = b.result) != null)) &&
1044 compareAndSet(0, 1)) {
1045 if (r instanceof AltResult) {
1046 ex = ((AltResult)r).ex;
1047 t = null;
1048 }
1049 else {
1050 ex = null;
1051 @SuppressWarnings("unchecked") T tr = (T) r;
1052 t = tr;
1053 }
1054 Executor e = executor;
1055 if (ex == null) {
1056 try {
1057 if (e != null)
1058 e.execute(new AsyncAccept<T>(t, fn, dst));
1059 else
1060 fn.accept(t);
1061 } catch (Throwable rex) {
1062 ex = rex;
1063 }
1064 }
1065 if (e == null || ex != null)
1066 dst.internalComplete(null, ex);
1067 }
1068 }
1069 private static final long serialVersionUID = 5232453952276885070L;
1070 }
1071
1072 static final class OrRunCompletion<T> extends Completion {
1073 final CompletableFuture<? extends T> src;
1074 final CompletableFuture<?> snd;
1075 final Runnable fn;
1076 final CompletableFuture<Void> dst;
1077 final Executor executor;
1078 OrRunCompletion(CompletableFuture<? extends T> src,
1079 CompletableFuture<?> snd,
1080 Runnable fn,
1081 CompletableFuture<Void> dst, Executor executor) {
1082 this.src = src; this.snd = snd;
1083 this.fn = fn; this.dst = dst;
1084 this.executor = executor;
1085 }
1086 public final void run() {
1087 final CompletableFuture<? extends T> a;
1088 final CompletableFuture<?> b;
1089 final Runnable fn;
1090 final CompletableFuture<Void> dst;
1091 Object r; Throwable ex;
1092 if ((dst = this.dst) != null &&
1093 (fn = this.fn) != null &&
1094 (((a = this.src) != null && (r = a.result) != null) ||
1095 ((b = this.snd) != null && (r = b.result) != null)) &&
1096 compareAndSet(0, 1)) {
1097 if (r instanceof AltResult)
1098 ex = ((AltResult)r).ex;
1099 else
1100 ex = null;
1101 Executor e = executor;
1102 if (ex == null) {
1103 try {
1104 if (e != null)
1105 e.execute(new AsyncRun(fn, dst));
1106 else
1107 fn.run();
1108 } catch (Throwable rex) {
1109 ex = rex;
1110 }
1111 }
1112 if (e == null || ex != null)
1113 dst.internalComplete(null, ex);
1114 }
1115 }
1116 private static final long serialVersionUID = 5232453952276885070L;
1117 }
1118
1119 static final class OrCompletion extends Completion {
1120 final CompletableFuture<?> src;
1121 final CompletableFuture<?> snd;
1122 final CompletableFuture<Void> dst;
1123 OrCompletion(CompletableFuture<?> src,
1124 CompletableFuture<?> snd,
1125 CompletableFuture<Void> dst) {
1126 this.src = src; this.snd = snd; this.dst = dst;
1127 }
1128 public final void run() {
1129 final CompletableFuture<?> a;
1130 final CompletableFuture<?> b;
1131 final CompletableFuture<Void> dst;
1132 Object r; Throwable ex;
1133 if ((dst = this.dst) != null &&
1134 (((a = this.src) != null && (r = a.result) != null) ||
1135 ((b = this.snd) != null && (r = b.result) != null)) &&
1136 compareAndSet(0, 1)) {
1137 if (r instanceof AltResult)
1138 ex = ((AltResult)r).ex;
1139 else
1140 ex = null;
1141 dst.internalComplete(null, ex);
1142 }
1143 }
1144 private static final long serialVersionUID = 5232453952276885070L;
1145 }
1146
1147 static final class ExceptionCompletion<T> extends Completion {
1148 final CompletableFuture<? extends T> src;
1149 final Function<? super Throwable, ? extends T> fn;
1150 final CompletableFuture<T> dst;
1151 ExceptionCompletion(CompletableFuture<? extends T> src,
1152 Function<? super Throwable, ? extends T> fn,
1153 CompletableFuture<T> dst) {
1154 this.src = src; this.fn = fn; this.dst = dst;
1155 }
1156 public final void run() {
1157 final CompletableFuture<? extends T> a;
1158 final Function<? super Throwable, ? extends T> fn;
1159 final CompletableFuture<T> dst;
1160 Object r; T t = null; Throwable ex, dx = null;
1161 if ((dst = this.dst) != null &&
1162 (fn = this.fn) != null &&
1163 (a = this.src) != null &&
1164 (r = a.result) != null &&
1165 compareAndSet(0, 1)) {
1166 if ((r instanceof AltResult) &&
1167 (ex = ((AltResult)r).ex) != null) {
1168 try {
1169 t = fn.apply(ex);
1170 } catch (Throwable rex) {
1171 dx = rex;
1172 }
1173 }
1174 else {
1175 @SuppressWarnings("unchecked") T tr = (T) r;
1176 t = tr;
1177 }
1178 dst.internalComplete(t, dx);
1179 }
1180 }
1181 private static final long serialVersionUID = 5232453952276885070L;
1182 }
1183
1184 static final class ThenCopy<T> extends Completion {
1185 final CompletableFuture<T> src;
1186 final CompletableFuture<T> dst;
1187 ThenCopy(CompletableFuture<T> src,
1188 CompletableFuture<T> dst) {
1189 this.src = src; this.dst = dst;
1190 }
1191 public final void run() {
1192 final CompletableFuture<T> a;
1193 final CompletableFuture<T> dst;
1194 Object r; T t; Throwable ex;
1195 if ((dst = this.dst) != null &&
1196 (a = this.src) != null &&
1197 (r = a.result) != null &&
1198 compareAndSet(0, 1)) {
1199 if (r instanceof AltResult) {
1200 ex = ((AltResult)r).ex;
1201 t = null;
1202 }
1203 else {
1204 ex = null;
1205 @SuppressWarnings("unchecked") T tr = (T) r;
1206 t = tr;
1207 }
1208 dst.internalComplete(t, ex);
1209 }
1210 }
1211 private static final long serialVersionUID = 5232453952276885070L;
1212 }
1213
1214 // version of ThenCopy for CompletableFuture<Void> dst
1215 static final class ThenPropagate extends Completion {
1216 final CompletableFuture<?> src;
1217 final CompletableFuture<Void> dst;
1218 ThenPropagate(CompletableFuture<?> src,
1219 CompletableFuture<Void> dst) {
1220 this.src = src; this.dst = dst;
1221 }
1222 public final void run() {
1223 final CompletableFuture<?> a;
1224 final CompletableFuture<Void> dst;
1225 Object r; Throwable ex;
1226 if ((dst = this.dst) != null &&
1227 (a = this.src) != null &&
1228 (r = a.result) != null &&
1229 compareAndSet(0, 1)) {
1230 if (r instanceof AltResult)
1231 ex = ((AltResult)r).ex;
1232 else
1233 ex = null;
1234 dst.internalComplete(null, ex);
1235 }
1236 }
1237 private static final long serialVersionUID = 5232453952276885070L;
1238 }
1239
1240 static final class HandleCompletion<T,U> extends Completion {
1241 final CompletableFuture<? extends T> src;
1242 final BiFunction<? super T, Throwable, ? extends U> fn;
1243 final CompletableFuture<U> dst;
1244 HandleCompletion(CompletableFuture<? extends T> src,
1245 BiFunction<? super T, Throwable, ? extends U> fn,
1246 CompletableFuture<U> dst) {
1247 this.src = src; this.fn = fn; this.dst = dst;
1248 }
1249 public final void run() {
1250 final CompletableFuture<? extends T> a;
1251 final BiFunction<? super T, Throwable, ? extends U> fn;
1252 final CompletableFuture<U> dst;
1253 Object r; T t; Throwable ex;
1254 if ((dst = this.dst) != null &&
1255 (fn = this.fn) != null &&
1256 (a = this.src) != null &&
1257 (r = a.result) != null &&
1258 compareAndSet(0, 1)) {
1259 if (r instanceof AltResult) {
1260 ex = ((AltResult)r).ex;
1261 t = null;
1262 }
1263 else {
1264 ex = null;
1265 @SuppressWarnings("unchecked") T tr = (T) r;
1266 t = tr;
1267 }
1268 U u = null; Throwable dx = null;
1269 try {
1270 u = fn.apply(t, ex);
1271 } catch (Throwable rex) {
1272 dx = rex;
1273 }
1274 dst.internalComplete(u, dx);
1275 }
1276 }
1277 private static final long serialVersionUID = 5232453952276885070L;
1278 }
1279
1280 static final class ComposeCompletion<T,U> extends Completion {
1281 final CompletableFuture<? extends T> src;
1282 final Function<? super T, CompletableFuture<U>> fn;
1283 final CompletableFuture<U> dst;
1284 final Executor executor;
1285 ComposeCompletion(CompletableFuture<? extends T> src,
1286 Function<? super T, CompletableFuture<U>> fn,
1287 CompletableFuture<U> dst, Executor executor) {
1288 this.src = src; this.fn = fn; this.dst = dst;
1289 this.executor = executor;
1290 }
1291 public final void run() {
1292 final CompletableFuture<? extends T> a;
1293 final Function<? super T, CompletableFuture<U>> fn;
1294 final CompletableFuture<U> dst;
1295 Object r; T t; Throwable ex; Executor e;
1296 if ((dst = this.dst) != null &&
1297 (fn = this.fn) != null &&
1298 (a = this.src) != null &&
1299 (r = a.result) != null &&
1300 compareAndSet(0, 1)) {
1301 if (r instanceof AltResult) {
1302 ex = ((AltResult)r).ex;
1303 t = null;
1304 }
1305 else {
1306 ex = null;
1307 @SuppressWarnings("unchecked") T tr = (T) r;
1308 t = tr;
1309 }
1310 CompletableFuture<U> c = null;
1311 U u = null;
1312 boolean complete = false;
1313 if (ex == null) {
1314 if ((e = executor) != null)
1315 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1316 else {
1317 try {
1318 if ((c = fn.apply(t)) == null)
1319 ex = new NullPointerException();
1320 } catch (Throwable rex) {
1321 ex = rex;
1322 }
1323 }
1324 }
1325 if (c != null) {
1326 ThenCopy<U> d = null;
1327 Object s;
1328 if ((s = c.result) == null) {
1329 CompletionNode p = new CompletionNode
1330 (d = new ThenCopy<U>(c, dst));
1331 while ((s = c.result) == null) {
1332 if (UNSAFE.compareAndSwapObject
1333 (c, COMPLETIONS, p.next = c.completions, p))
1334 break;
1335 }
1336 }
1337 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1338 complete = true;
1339 if (s instanceof AltResult) {
1340 ex = ((AltResult)s).ex; // no rewrap
1341 u = null;
1342 }
1343 else {
1344 @SuppressWarnings("unchecked") U us = (U) s;
1345 u = us;
1346 }
1347 }
1348 }
1349 if (complete || ex != null)
1350 dst.internalComplete(u, ex);
1351 if (c != null)
1352 c.helpPostComplete();
1353 }
1354 }
1355 private static final long serialVersionUID = 5232453952276885070L;
1356 }
1357
1358 // public methods
1359
1360 /**
1361 * Creates a new incomplete CompletableFuture.
1362 */
1363 public CompletableFuture() {
1364 }
1365
1366 /**
1367 * Returns a new CompletableFuture that is asynchronously completed
1368 * by a task running in the {@link ForkJoinPool#commonPool()} with
1369 * the value obtained by calling the given Supplier.
1370 *
1371 * @param supplier a function returning the value to be used
1372 * to complete the returned CompletableFuture
1373 * @return the new CompletableFuture
1374 */
1375 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1376 if (supplier == null) throw new NullPointerException();
1377 CompletableFuture<U> f = new CompletableFuture<U>();
1378 ForkJoinPool.commonPool().
1379 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1380 return f;
1381 }
1382
1383 /**
1384 * Returns a new CompletableFuture that is asynchronously completed
1385 * by a task running in the given executor with the value obtained
1386 * by calling the given Supplier.
1387 *
1388 * @param supplier a function returning the value to be used
1389 * to complete the returned CompletableFuture
1390 * @param executor the executor to use for asynchronous execution
1391 * @return the new CompletableFuture
1392 */
1393 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1394 Executor executor) {
1395 if (executor == null || supplier == null)
1396 throw new NullPointerException();
1397 CompletableFuture<U> f = new CompletableFuture<U>();
1398 executor.execute(new AsyncSupply<U>(supplier, f));
1399 return f;
1400 }
1401
1402 /**
1403 * Returns a new CompletableFuture that is asynchronously completed
1404 * by a task running in the {@link ForkJoinPool#commonPool()} after
1405 * it runs the given action.
1406 *
1407 * @param runnable the action to run before completing the
1408 * returned CompletableFuture
1409 * @return the new CompletableFuture
1410 */
1411 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1412 if (runnable == null) throw new NullPointerException();
1413 CompletableFuture<Void> f = new CompletableFuture<Void>();
1414 ForkJoinPool.commonPool().
1415 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1416 return f;
1417 }
1418
1419 /**
1420 * Returns a new CompletableFuture that is asynchronously completed
1421 * by a task running in the given executor after it runs the given
1422 * action.
1423 *
1424 * @param runnable the action to run before completing the
1425 * returned CompletableFuture
1426 * @param executor the executor to use for asynchronous execution
1427 * @return the new CompletableFuture
1428 */
1429 public static CompletableFuture<Void> runAsync(Runnable runnable,
1430 Executor executor) {
1431 if (executor == null || runnable == null)
1432 throw new NullPointerException();
1433 CompletableFuture<Void> f = new CompletableFuture<Void>();
1434 executor.execute(new AsyncRun(runnable, f));
1435 return f;
1436 }
1437
1438 /**
1439 * Returns {@code true} if completed in any fashion: normally,
1440 * exceptionally, or via cancellation.
1441 *
1442 * @return {@code true} if completed
1443 */
1444 public boolean isDone() {
1445 return result != null;
1446 }
1447
1448 /**
1449 * Waits if necessary for this future to complete, and then
1450 * returns its result.
1451 *
1452 * @return the result value
1453 * @throws CancellationException if this future was cancelled
1454 * @throws ExecutionException if this future completed exceptionally
1455 * @throws InterruptedException if the current thread was interrupted
1456 * while waiting
1457 */
1458 public T get() throws InterruptedException, ExecutionException {
1459 Object r; Throwable ex, cause;
1460 if ((r = result) == null && (r = waitingGet(true)) == null)
1461 throw new InterruptedException();
1462 if (!(r instanceof AltResult)) {
1463 @SuppressWarnings("unchecked") T tr = (T) r;
1464 return tr;
1465 }
1466 if ((ex = ((AltResult)r).ex) == null)
1467 return null;
1468 if (ex instanceof CancellationException)
1469 throw (CancellationException)ex;
1470 if ((ex instanceof CompletionException) &&
1471 (cause = ex.getCause()) != null)
1472 ex = cause;
1473 throw new ExecutionException(ex);
1474 }
1475
1476 /**
1477 * Waits if necessary for at most the given time for this future
1478 * to complete, and then returns its result, if available.
1479 *
1480 * @param timeout the maximum time to wait
1481 * @param unit the time unit of the timeout argument
1482 * @return the result value
1483 * @throws CancellationException if this future was cancelled
1484 * @throws ExecutionException if this future completed exceptionally
1485 * @throws InterruptedException if the current thread was interrupted
1486 * while waiting
1487 * @throws TimeoutException if the wait timed out
1488 */
1489 public T get(long timeout, TimeUnit unit)
1490 throws InterruptedException, ExecutionException, TimeoutException {
1491 Object r; Throwable ex, cause;
1492 long nanos = unit.toNanos(timeout);
1493 if (Thread.interrupted())
1494 throw new InterruptedException();
1495 if ((r = result) == null)
1496 r = timedAwaitDone(nanos);
1497 if (!(r instanceof AltResult)) {
1498 @SuppressWarnings("unchecked") T tr = (T) r;
1499 return tr;
1500 }
1501 if ((ex = ((AltResult)r).ex) == null)
1502 return null;
1503 if (ex instanceof CancellationException)
1504 throw (CancellationException)ex;
1505 if ((ex instanceof CompletionException) &&
1506 (cause = ex.getCause()) != null)
1507 ex = cause;
1508 throw new ExecutionException(ex);
1509 }
1510
1511 /**
1512 * Returns the result value when complete, or throws an
1513 * (unchecked) exception if completed exceptionally. To better
1514 * conform with the use of common functional forms, if a
1515 * computation involved in the completion of this
1516 * CompletableFuture threw an exception, this method throws an
1517 * (unchecked) {@link CompletionException} with the underlying
1518 * exception as its cause.
1519 *
1520 * @return the result value
1521 * @throws CancellationException if the computation was cancelled
1522 * @throws CompletionException if this future completed
1523 * exceptionally or a completion computation threw an exception
1524 */
1525 public T join() {
1526 Object r; Throwable ex;
1527 if ((r = result) == null)
1528 r = waitingGet(false);
1529 if (!(r instanceof AltResult)) {
1530 @SuppressWarnings("unchecked") T tr = (T) r;
1531 return tr;
1532 }
1533 if ((ex = ((AltResult)r).ex) == null)
1534 return null;
1535 if (ex instanceof CancellationException)
1536 throw (CancellationException)ex;
1537 if (ex instanceof CompletionException)
1538 throw (CompletionException)ex;
1539 throw new CompletionException(ex);
1540 }
1541
1542 /**
1543 * Returns the result value (or throws any encountered exception)
1544 * if completed, else returns the given valueIfAbsent.
1545 *
1546 * @param valueIfAbsent the value to return if not completed
1547 * @return the result value, if completed, else the given valueIfAbsent
1548 * @throws CancellationException if the computation was cancelled
1549 * @throws CompletionException if this future completed
1550 * exceptionally or a completion computation threw an exception
1551 */
1552 public T getNow(T valueIfAbsent) {
1553 Object r; Throwable ex;
1554 if ((r = result) == null)
1555 return valueIfAbsent;
1556 if (!(r instanceof AltResult)) {
1557 @SuppressWarnings("unchecked") T tr = (T) r;
1558 return tr;
1559 }
1560 if ((ex = ((AltResult)r).ex) == null)
1561 return null;
1562 if (ex instanceof CancellationException)
1563 throw (CancellationException)ex;
1564 if (ex instanceof CompletionException)
1565 throw (CompletionException)ex;
1566 throw new CompletionException(ex);
1567 }
1568
1569 /**
1570 * If not already completed, sets the value returned by {@link
1571 * #get()} and related methods to the given value.
1572 *
1573 * @param value the result value
1574 * @return {@code true} if this invocation caused this CompletableFuture
1575 * to transition to a completed state, else {@code false}
1576 */
1577 public boolean complete(T value) {
1578 boolean triggered = result == null &&
1579 UNSAFE.compareAndSwapObject(this, RESULT, null,
1580 value == null ? NIL : value);
1581 postComplete();
1582 return triggered;
1583 }
1584
1585 /**
1586 * If not already completed, causes invocations of {@link #get()}
1587 * and related methods to throw the given exception.
1588 *
1589 * @param ex the exception
1590 * @return {@code true} if this invocation caused this CompletableFuture
1591 * to transition to a completed state, else {@code false}
1592 */
1593 public boolean completeExceptionally(Throwable ex) {
1594 if (ex == null) throw new NullPointerException();
1595 boolean triggered = result == null &&
1596 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1597 postComplete();
1598 return triggered;
1599 }
1600
1601 /**
1602 * Returns a new CompletableFuture that is completed
1603 * when this CompletableFuture completes, with the result of the
1604 * given function of this CompletableFuture's result.
1605 *
1606 * <p>If this CompletableFuture completes exceptionally, or the
1607 * supplied function throws an exception, then the returned
1608 * CompletableFuture completes exceptionally with a
1609 * CompletionException holding the exception as its cause.
1610 *
1611 * @param fn the function to use to compute the value of
1612 * the returned CompletableFuture
1613 * @return the new CompletableFuture
1614 */
1615 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1616 return doThenApply(fn, null);
1617 }
1618
1619 /**
1620 * Returns a new CompletableFuture that is asynchronously completed
1621 * when this CompletableFuture completes, with the result of the
1622 * given function of this CompletableFuture's result from a
1623 * task running in the {@link ForkJoinPool#commonPool()}.
1624 *
1625 * <p>If this CompletableFuture completes exceptionally, or the
1626 * supplied function throws an exception, then the returned
1627 * CompletableFuture completes exceptionally with a
1628 * CompletionException holding the exception as its cause.
1629 *
1630 * @param fn the function to use to compute the value of
1631 * the returned CompletableFuture
1632 * @return the new CompletableFuture
1633 */
1634 public <U> CompletableFuture<U> thenApplyAsync
1635 (Function<? super T,? extends U> fn) {
1636 return doThenApply(fn, ForkJoinPool.commonPool());
1637 }
1638
1639 /**
1640 * Returns a new CompletableFuture that is asynchronously completed
1641 * when this CompletableFuture completes, with the result of the
1642 * given function of this CompletableFuture's result from a
1643 * task running in the given executor.
1644 *
1645 * <p>If this CompletableFuture completes exceptionally, or the
1646 * supplied function throws an exception, then the returned
1647 * CompletableFuture completes exceptionally with a
1648 * CompletionException holding the exception as its cause.
1649 *
1650 * @param fn the function to use to compute the value of
1651 * the returned CompletableFuture
1652 * @param executor the executor to use for asynchronous execution
1653 * @return the new CompletableFuture
1654 */
1655 public <U> CompletableFuture<U> thenApplyAsync
1656 (Function<? super T,? extends U> fn,
1657 Executor executor) {
1658 if (executor == null) throw new NullPointerException();
1659 return doThenApply(fn, executor);
1660 }
1661
1662 private <U> CompletableFuture<U> doThenApply
1663 (Function<? super T,? extends U> fn,
1664 Executor e) {
1665 if (fn == null) throw new NullPointerException();
1666 CompletableFuture<U> dst = new CompletableFuture<U>();
1667 ApplyCompletion<T,U> d = null;
1668 Object r;
1669 if ((r = result) == null) {
1670 CompletionNode p = new CompletionNode
1671 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1672 while ((r = result) == null) {
1673 if (UNSAFE.compareAndSwapObject
1674 (this, COMPLETIONS, p.next = completions, p))
1675 break;
1676 }
1677 }
1678 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1679 T t; Throwable ex;
1680 if (r instanceof AltResult) {
1681 ex = ((AltResult)r).ex;
1682 t = null;
1683 }
1684 else {
1685 ex = null;
1686 @SuppressWarnings("unchecked") T tr = (T) r;
1687 t = tr;
1688 }
1689 U u = null;
1690 if (ex == null) {
1691 try {
1692 if (e != null)
1693 e.execute(new AsyncApply<T,U>(t, fn, dst));
1694 else
1695 u = fn.apply(t);
1696 } catch (Throwable rex) {
1697 ex = rex;
1698 }
1699 }
1700 if (e == null || ex != null)
1701 dst.internalComplete(u, ex);
1702 }
1703 helpPostComplete();
1704 return dst;
1705 }
1706
1707 /**
1708 * Returns a new CompletableFuture that is completed
1709 * when this CompletableFuture completes, after performing the given
1710 * action with this CompletableFuture's result.
1711 *
1712 * <p>If this CompletableFuture completes exceptionally, or the
1713 * supplied action throws an exception, then the returned
1714 * CompletableFuture completes exceptionally with a
1715 * CompletionException holding the exception as its cause.
1716 *
1717 * @param block the action to perform before completing the
1718 * returned CompletableFuture
1719 * @return the new CompletableFuture
1720 */
1721 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1722 return doThenAccept(block, null);
1723 }
1724
1725 /**
1726 * Returns a new CompletableFuture that is asynchronously completed
1727 * when this CompletableFuture completes, after performing the given
1728 * action with this CompletableFuture's result from a task running
1729 * in the {@link ForkJoinPool#commonPool()}.
1730 *
1731 * <p>If this CompletableFuture completes exceptionally, or the
1732 * supplied action throws an exception, then the returned
1733 * CompletableFuture completes exceptionally with a
1734 * CompletionException holding the exception as its cause.
1735 *
1736 * @param block the action to perform before completing the
1737 * returned CompletableFuture
1738 * @return the new CompletableFuture
1739 */
1740 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1741 return doThenAccept(block, ForkJoinPool.commonPool());
1742 }
1743
1744 /**
1745 * Returns a new CompletableFuture that is asynchronously completed
1746 * when this CompletableFuture completes, after performing the given
1747 * action with this CompletableFuture's result from a task running
1748 * in the given executor.
1749 *
1750 * <p>If this CompletableFuture completes exceptionally, or the
1751 * supplied action throws an exception, then the returned
1752 * CompletableFuture completes exceptionally with a
1753 * CompletionException holding the exception as its cause.
1754 *
1755 * @param block the action to perform before completing the
1756 * returned CompletableFuture
1757 * @param executor the executor to use for asynchronous execution
1758 * @return the new CompletableFuture
1759 */
1760 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1761 Executor executor) {
1762 if (executor == null) throw new NullPointerException();
1763 return doThenAccept(block, executor);
1764 }
1765
1766 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1767 Executor e) {
1768 if (fn == null) throw new NullPointerException();
1769 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1770 AcceptCompletion<T> d = null;
1771 Object r;
1772 if ((r = result) == null) {
1773 CompletionNode p = new CompletionNode
1774 (d = new AcceptCompletion<T>(this, fn, dst, e));
1775 while ((r = result) == null) {
1776 if (UNSAFE.compareAndSwapObject
1777 (this, COMPLETIONS, p.next = completions, p))
1778 break;
1779 }
1780 }
1781 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1782 T t; Throwable ex;
1783 if (r instanceof AltResult) {
1784 ex = ((AltResult)r).ex;
1785 t = null;
1786 }
1787 else {
1788 ex = null;
1789 @SuppressWarnings("unchecked") T tr = (T) r;
1790 t = tr;
1791 }
1792 if (ex == null) {
1793 try {
1794 if (e != null)
1795 e.execute(new AsyncAccept<T>(t, fn, dst));
1796 else
1797 fn.accept(t);
1798 } catch (Throwable rex) {
1799 ex = rex;
1800 }
1801 }
1802 if (e == null || ex != null)
1803 dst.internalComplete(null, ex);
1804 }
1805 helpPostComplete();
1806 return dst;
1807 }
1808
1809 /**
1810 * Returns a new CompletableFuture that is completed
1811 * when this CompletableFuture completes, after performing the given
1812 * action.
1813 *
1814 * <p>If this CompletableFuture completes exceptionally, or the
1815 * supplied action throws an exception, then the returned
1816 * CompletableFuture completes exceptionally with a
1817 * CompletionException holding the exception as its cause.
1818 *
1819 * @param action the action to perform before completing the
1820 * returned CompletableFuture
1821 * @return the new CompletableFuture
1822 */
1823 public CompletableFuture<Void> thenRun(Runnable action) {
1824 return doThenRun(action, null);
1825 }
1826
1827 /**
1828 * Returns a new CompletableFuture that is asynchronously completed
1829 * when this CompletableFuture completes, after performing the given
1830 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1831 *
1832 * <p>If this CompletableFuture completes exceptionally, or the
1833 * supplied action throws an exception, then the returned
1834 * CompletableFuture completes exceptionally with a
1835 * CompletionException holding the exception as its cause.
1836 *
1837 * @param action the action to perform before completing the
1838 * returned CompletableFuture
1839 * @return the new CompletableFuture
1840 */
1841 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1842 return doThenRun(action, ForkJoinPool.commonPool());
1843 }
1844
1845 /**
1846 * Returns a new CompletableFuture that is asynchronously completed
1847 * when this CompletableFuture completes, after performing the given
1848 * action from a task running in the given executor.
1849 *
1850 * <p>If this CompletableFuture completes exceptionally, or the
1851 * supplied action throws an exception, then the returned
1852 * CompletableFuture completes exceptionally with a
1853 * CompletionException holding the exception as its cause.
1854 *
1855 * @param action the action to perform before completing the
1856 * returned CompletableFuture
1857 * @param executor the executor to use for asynchronous execution
1858 * @return the new CompletableFuture
1859 */
1860 public CompletableFuture<Void> thenRunAsync(Runnable action,
1861 Executor executor) {
1862 if (executor == null) throw new NullPointerException();
1863 return doThenRun(action, executor);
1864 }
1865
1866 private CompletableFuture<Void> doThenRun(Runnable action,
1867 Executor e) {
1868 if (action == null) throw new NullPointerException();
1869 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1870 RunCompletion<T> d = null;
1871 Object r;
1872 if ((r = result) == null) {
1873 CompletionNode p = new CompletionNode
1874 (d = new RunCompletion<T>(this, action, dst, e));
1875 while ((r = result) == null) {
1876 if (UNSAFE.compareAndSwapObject
1877 (this, COMPLETIONS, p.next = completions, p))
1878 break;
1879 }
1880 }
1881 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1882 Throwable ex;
1883 if (r instanceof AltResult)
1884 ex = ((AltResult)r).ex;
1885 else
1886 ex = null;
1887 if (ex == null) {
1888 try {
1889 if (e != null)
1890 e.execute(new AsyncRun(action, dst));
1891 else
1892 action.run();
1893 } catch (Throwable rex) {
1894 ex = rex;
1895 }
1896 }
1897 if (e == null || ex != null)
1898 dst.internalComplete(null, ex);
1899 }
1900 helpPostComplete();
1901 return dst;
1902 }
1903
1904 /**
1905 * Returns a new CompletableFuture that is completed
1906 * when both this and the other given CompletableFuture complete,
1907 * with the result of the given function of the results of the two
1908 * CompletableFutures.
1909 *
1910 * <p>If this and/or the other CompletableFuture complete
1911 * exceptionally, or the supplied function throws an exception,
1912 * then the returned CompletableFuture completes exceptionally
1913 * with a CompletionException holding the exception as its cause.
1914 *
1915 * @param other the other CompletableFuture
1916 * @param fn the function to use to compute the value of
1917 * the returned CompletableFuture
1918 * @return the new CompletableFuture
1919 */
1920 public <U,V> CompletableFuture<V> thenCombine
1921 (CompletableFuture<? extends U> other,
1922 BiFunction<? super T,? super U,? extends V> fn) {
1923 return doThenBiApply(other, fn, null);
1924 }
1925
1926 /**
1927 * Returns a new CompletableFuture that is asynchronously completed
1928 * when both this and the other given CompletableFuture complete,
1929 * with the result of the given function of the results of the two
1930 * CompletableFutures from a task running in the
1931 * {@link ForkJoinPool#commonPool()}.
1932 *
1933 * <p>If this and/or the other CompletableFuture complete
1934 * exceptionally, or the supplied function throws an exception,
1935 * then the returned CompletableFuture completes exceptionally
1936 * with a CompletionException holding the exception as its cause.
1937 *
1938 * @param other the other CompletableFuture
1939 * @param fn the function to use to compute the value of
1940 * the returned CompletableFuture
1941 * @return the new CompletableFuture
1942 */
1943 public <U,V> CompletableFuture<V> thenCombineAsync
1944 (CompletableFuture<? extends U> other,
1945 BiFunction<? super T,? super U,? extends V> fn) {
1946 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1947 }
1948
1949 /**
1950 * Returns a new CompletableFuture that is asynchronously completed
1951 * when both this and the other given CompletableFuture complete,
1952 * with the result of the given function of the results of the two
1953 * CompletableFutures from a task running in the given executor.
1954 *
1955 * <p>If this and/or the other CompletableFuture complete
1956 * exceptionally, or the supplied function throws an exception,
1957 * then the returned CompletableFuture completes exceptionally
1958 * with a CompletionException holding the exception as its cause.
1959 *
1960 * @param other the other CompletableFuture
1961 * @param fn the function to use to compute the value of
1962 * the returned CompletableFuture
1963 * @param executor the executor to use for asynchronous execution
1964 * @return the new CompletableFuture
1965 */
1966 public <U,V> CompletableFuture<V> thenCombineAsync
1967 (CompletableFuture<? extends U> other,
1968 BiFunction<? super T,? super U,? extends V> fn,
1969 Executor executor) {
1970 if (executor == null) throw new NullPointerException();
1971 return doThenBiApply(other, fn, executor);
1972 }
1973
1974 private <U,V> CompletableFuture<V> doThenBiApply
1975 (CompletableFuture<? extends U> other,
1976 BiFunction<? super T,? super U,? extends V> fn,
1977 Executor e) {
1978 if (other == null || fn == null) throw new NullPointerException();
1979 CompletableFuture<V> dst = new CompletableFuture<V>();
1980 BiApplyCompletion<T,U,V> d = null;
1981 Object r, s = null;
1982 if ((r = result) == null || (s = other.result) == null) {
1983 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1984 CompletionNode q = null, p = new CompletionNode(d);
1985 while ((r == null && (r = result) == null) ||
1986 (s == null && (s = other.result) == null)) {
1987 if (q != null) {
1988 if (s != null ||
1989 UNSAFE.compareAndSwapObject
1990 (other, COMPLETIONS, q.next = other.completions, q))
1991 break;
1992 }
1993 else if (r != null ||
1994 UNSAFE.compareAndSwapObject
1995 (this, COMPLETIONS, p.next = completions, p)) {
1996 if (s != null)
1997 break;
1998 q = new CompletionNode(d);
1999 }
2000 }
2001 }
2002 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2003 T t; U u; Throwable ex;
2004 if (r instanceof AltResult) {
2005 ex = ((AltResult)r).ex;
2006 t = null;
2007 }
2008 else {
2009 ex = null;
2010 @SuppressWarnings("unchecked") T tr = (T) r;
2011 t = tr;
2012 }
2013 if (ex != null)
2014 u = null;
2015 else if (s instanceof AltResult) {
2016 ex = ((AltResult)s).ex;
2017 u = null;
2018 }
2019 else {
2020 @SuppressWarnings("unchecked") U us = (U) s;
2021 u = us;
2022 }
2023 V v = null;
2024 if (ex == null) {
2025 try {
2026 if (e != null)
2027 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
2028 else
2029 v = fn.apply(t, u);
2030 } catch (Throwable rex) {
2031 ex = rex;
2032 }
2033 }
2034 if (e == null || ex != null)
2035 dst.internalComplete(v, ex);
2036 }
2037 helpPostComplete();
2038 other.helpPostComplete();
2039 return dst;
2040 }
2041
2042 /**
2043 * Returns a new CompletableFuture that is completed
2044 * when both this and the other given CompletableFuture complete,
2045 * after performing the given action with the results of the two
2046 * CompletableFutures.
2047 *
2048 * <p>If this and/or the other CompletableFuture complete
2049 * exceptionally, or the supplied action throws an exception,
2050 * then the returned CompletableFuture completes exceptionally
2051 * with a CompletionException holding the exception as its cause.
2052 *
2053 * @param other the other CompletableFuture
2054 * @param block the action to perform before completing the
2055 * returned CompletableFuture
2056 * @return the new CompletableFuture
2057 */
2058 public <U> CompletableFuture<Void> thenAcceptBoth
2059 (CompletableFuture<? extends U> other,
2060 BiConsumer<? super T, ? super U> block) {
2061 return doThenBiAccept(other, block, null);
2062 }
2063
2064 /**
2065 * Returns a new CompletableFuture that is asynchronously completed
2066 * when both this and the other given CompletableFuture complete,
2067 * after performing the given action with the results of the two
2068 * CompletableFutures from a task running in the {@link
2069 * ForkJoinPool#commonPool()}.
2070 *
2071 * <p>If this and/or the other CompletableFuture complete
2072 * exceptionally, or the supplied action throws an exception,
2073 * then the returned CompletableFuture completes exceptionally
2074 * with a CompletionException holding the exception as its cause.
2075 *
2076 * @param other the other CompletableFuture
2077 * @param block the action to perform before completing the
2078 * returned CompletableFuture
2079 * @return the new CompletableFuture
2080 */
2081 public <U> CompletableFuture<Void> thenAcceptBothAsync
2082 (CompletableFuture<? extends U> other,
2083 BiConsumer<? super T, ? super U> block) {
2084 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2085 }
2086
2087 /**
2088 * Returns a new CompletableFuture that is asynchronously completed
2089 * when both this and the other given CompletableFuture complete,
2090 * after performing the given action with the results of the two
2091 * CompletableFutures from a task running in the given executor.
2092 *
2093 * <p>If this and/or the other CompletableFuture complete
2094 * exceptionally, or the supplied action throws an exception,
2095 * then the returned CompletableFuture completes exceptionally
2096 * with a CompletionException holding the exception as its cause.
2097 *
2098 * @param other the other CompletableFuture
2099 * @param block the action to perform before completing the
2100 * returned CompletableFuture
2101 * @param executor the executor to use for asynchronous execution
2102 * @return the new CompletableFuture
2103 */
2104 public <U> CompletableFuture<Void> thenAcceptBothAsync
2105 (CompletableFuture<? extends U> other,
2106 BiConsumer<? super T, ? super U> block,
2107 Executor executor) {
2108 if (executor == null) throw new NullPointerException();
2109 return doThenBiAccept(other, block, executor);
2110 }
2111
2112 private <U> CompletableFuture<Void> doThenBiAccept
2113 (CompletableFuture<? extends U> other,
2114 BiConsumer<? super T,? super U> fn,
2115 Executor e) {
2116 if (other == null || fn == null) throw new NullPointerException();
2117 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2118 BiAcceptCompletion<T,U> d = null;
2119 Object r, s = null;
2120 if ((r = result) == null || (s = other.result) == null) {
2121 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2122 CompletionNode q = null, p = new CompletionNode(d);
2123 while ((r == null && (r = result) == null) ||
2124 (s == null && (s = other.result) == null)) {
2125 if (q != null) {
2126 if (s != null ||
2127 UNSAFE.compareAndSwapObject
2128 (other, COMPLETIONS, q.next = other.completions, q))
2129 break;
2130 }
2131 else if (r != null ||
2132 UNSAFE.compareAndSwapObject
2133 (this, COMPLETIONS, p.next = completions, p)) {
2134 if (s != null)
2135 break;
2136 q = new CompletionNode(d);
2137 }
2138 }
2139 }
2140 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2141 T t; U u; Throwable ex;
2142 if (r instanceof AltResult) {
2143 ex = ((AltResult)r).ex;
2144 t = null;
2145 }
2146 else {
2147 ex = null;
2148 @SuppressWarnings("unchecked") T tr = (T) r;
2149 t = tr;
2150 }
2151 if (ex != null)
2152 u = null;
2153 else if (s instanceof AltResult) {
2154 ex = ((AltResult)s).ex;
2155 u = null;
2156 }
2157 else {
2158 @SuppressWarnings("unchecked") U us = (U) s;
2159 u = us;
2160 }
2161 if (ex == null) {
2162 try {
2163 if (e != null)
2164 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2165 else
2166 fn.accept(t, u);
2167 } catch (Throwable rex) {
2168 ex = rex;
2169 }
2170 }
2171 if (e == null || ex != null)
2172 dst.internalComplete(null, ex);
2173 }
2174 helpPostComplete();
2175 other.helpPostComplete();
2176 return dst;
2177 }
2178
2179 /**
2180 * Returns a new CompletableFuture that is completed
2181 * when both this and the other given CompletableFuture complete,
2182 * after performing the given action.
2183 *
2184 * <p>If this and/or the other CompletableFuture complete
2185 * exceptionally, or the supplied action throws an exception,
2186 * then the returned CompletableFuture completes exceptionally
2187 * with a CompletionException holding the exception as its cause.
2188 *
2189 * @param other the other CompletableFuture
2190 * @param action the action to perform before completing the
2191 * returned CompletableFuture
2192 * @return the new CompletableFuture
2193 */
2194 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2195 Runnable action) {
2196 return doThenBiRun(other, action, null);
2197 }
2198
2199 /**
2200 * Returns a new CompletableFuture that is asynchronously completed
2201 * when both this and the other given CompletableFuture complete,
2202 * after performing the given action from a task running in the
2203 * {@link ForkJoinPool#commonPool()}.
2204 *
2205 * <p>If this and/or the other CompletableFuture complete
2206 * exceptionally, or the supplied action throws an exception,
2207 * then the returned CompletableFuture completes exceptionally
2208 * with a CompletionException holding the exception as its cause.
2209 *
2210 * @param other the other CompletableFuture
2211 * @param action the action to perform before completing the
2212 * returned CompletableFuture
2213 * @return the new CompletableFuture
2214 */
2215 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2216 Runnable action) {
2217 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2218 }
2219
2220 /**
2221 * Returns a new CompletableFuture that is asynchronously completed
2222 * when both this and the other given CompletableFuture complete,
2223 * after performing the given action from a task running in the
2224 * given executor.
2225 *
2226 * <p>If this and/or the other CompletableFuture complete
2227 * exceptionally, or the supplied action throws an exception,
2228 * then the returned CompletableFuture completes exceptionally
2229 * with a CompletionException holding the exception as its cause.
2230 *
2231 * @param other the other CompletableFuture
2232 * @param action the action to perform before completing the
2233 * returned CompletableFuture
2234 * @param executor the executor to use for asynchronous execution
2235 * @return the new CompletableFuture
2236 */
2237 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2238 Runnable action,
2239 Executor executor) {
2240 if (executor == null) throw new NullPointerException();
2241 return doThenBiRun(other, action, executor);
2242 }
2243
2244 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2245 Runnable action,
2246 Executor e) {
2247 if (other == null || action == null) throw new NullPointerException();
2248 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2249 BiRunCompletion<T> d = null;
2250 Object r, s = null;
2251 if ((r = result) == null || (s = other.result) == null) {
2252 d = new BiRunCompletion<T>(this, other, action, dst, e);
2253 CompletionNode q = null, p = new CompletionNode(d);
2254 while ((r == null && (r = result) == null) ||
2255 (s == null && (s = other.result) == null)) {
2256 if (q != null) {
2257 if (s != null ||
2258 UNSAFE.compareAndSwapObject
2259 (other, COMPLETIONS, q.next = other.completions, q))
2260 break;
2261 }
2262 else if (r != null ||
2263 UNSAFE.compareAndSwapObject
2264 (this, COMPLETIONS, p.next = completions, p)) {
2265 if (s != null)
2266 break;
2267 q = new CompletionNode(d);
2268 }
2269 }
2270 }
2271 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2272 Throwable ex;
2273 if (r instanceof AltResult)
2274 ex = ((AltResult)r).ex;
2275 else
2276 ex = null;
2277 if (ex == null && (s instanceof AltResult))
2278 ex = ((AltResult)s).ex;
2279 if (ex == null) {
2280 try {
2281 if (e != null)
2282 e.execute(new AsyncRun(action, dst));
2283 else
2284 action.run();
2285 } catch (Throwable rex) {
2286 ex = rex;
2287 }
2288 }
2289 if (e == null || ex != null)
2290 dst.internalComplete(null, ex);
2291 }
2292 helpPostComplete();
2293 other.helpPostComplete();
2294 return dst;
2295 }
2296
2297 /**
2298 * Returns a new CompletableFuture that is completed
2299 * when either this or the other given CompletableFuture completes,
2300 * with the result of the given function of either this or the other
2301 * CompletableFuture's result.
2302 *
2303 * <p>If this and/or the other CompletableFuture complete
2304 * exceptionally, then the returned CompletableFuture may also do so,
2305 * with a CompletionException holding one of these exceptions as its
2306 * cause. No guarantees are made about which result or exception is
2307 * used in the returned CompletableFuture. If the supplied function
2308 * throws an exception, then the returned CompletableFuture completes
2309 * exceptionally with a CompletionException holding the exception as
2310 * its cause.
2311 *
2312 * @param other the other CompletableFuture
2313 * @param fn the function to use to compute the value of
2314 * the returned CompletableFuture
2315 * @return the new CompletableFuture
2316 */
2317 public <U> CompletableFuture<U> applyToEither
2318 (CompletableFuture<? extends T> other,
2319 Function<? super T, U> fn) {
2320 return doOrApply(other, fn, null);
2321 }
2322
2323 /**
2324 * Returns a new CompletableFuture that is asynchronously completed
2325 * when either this or the other given CompletableFuture completes,
2326 * with the result of the given function of either this or the other
2327 * CompletableFuture's result from a task running in the
2328 * {@link ForkJoinPool#commonPool()}.
2329 *
2330 * <p>If this and/or the other CompletableFuture complete
2331 * exceptionally, then the returned CompletableFuture may also do so,
2332 * with a CompletionException holding one of these exceptions as its
2333 * cause. No guarantees are made about which result or exception is
2334 * used in the returned CompletableFuture. If the supplied function
2335 * throws an exception, then the returned CompletableFuture completes
2336 * exceptionally with a CompletionException holding the exception as
2337 * its cause.
2338 *
2339 * @param other the other CompletableFuture
2340 * @param fn the function to use to compute the value of
2341 * the returned CompletableFuture
2342 * @return the new CompletableFuture
2343 */
2344 public <U> CompletableFuture<U> applyToEitherAsync
2345 (CompletableFuture<? extends T> other,
2346 Function<? super T, U> fn) {
2347 return doOrApply(other, fn, ForkJoinPool.commonPool());
2348 }
2349
2350 /**
2351 * Returns a new CompletableFuture that is asynchronously completed
2352 * when either this or the other given CompletableFuture completes,
2353 * with the result of the given function of either this or the other
2354 * CompletableFuture's result from a task running in the
2355 * given executor.
2356 *
2357 * <p>If this and/or the other CompletableFuture complete
2358 * exceptionally, then the returned CompletableFuture may also do so,
2359 * with a CompletionException holding one of these exceptions as its
2360 * cause. No guarantees are made about which result or exception is
2361 * used in the returned CompletableFuture. If the supplied function
2362 * throws an exception, then the returned CompletableFuture completes
2363 * exceptionally with a CompletionException holding the exception as
2364 * its cause.
2365 *
2366 * @param other the other CompletableFuture
2367 * @param fn the function to use to compute the value of
2368 * the returned CompletableFuture
2369 * @param executor the executor to use for asynchronous execution
2370 * @return the new CompletableFuture
2371 */
2372 public <U> CompletableFuture<U> applyToEitherAsync
2373 (CompletableFuture<? extends T> other,
2374 Function<? super T, U> fn,
2375 Executor executor) {
2376 if (executor == null) throw new NullPointerException();
2377 return doOrApply(other, fn, executor);
2378 }
2379
2380 private <U> CompletableFuture<U> doOrApply
2381 (CompletableFuture<? extends T> other,
2382 Function<? super T, U> fn,
2383 Executor e) {
2384 if (other == null || fn == null) throw new NullPointerException();
2385 CompletableFuture<U> dst = new CompletableFuture<U>();
2386 OrApplyCompletion<T,U> d = null;
2387 Object r;
2388 if ((r = result) == null && (r = other.result) == null) {
2389 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2390 CompletionNode q = null, p = new CompletionNode(d);
2391 while ((r = result) == null && (r = other.result) == null) {
2392 if (q != null) {
2393 if (UNSAFE.compareAndSwapObject
2394 (other, COMPLETIONS, q.next = other.completions, q))
2395 break;
2396 }
2397 else if (UNSAFE.compareAndSwapObject
2398 (this, COMPLETIONS, p.next = completions, p))
2399 q = new CompletionNode(d);
2400 }
2401 }
2402 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2403 T t; Throwable ex;
2404 if (r instanceof AltResult) {
2405 ex = ((AltResult)r).ex;
2406 t = null;
2407 }
2408 else {
2409 ex = null;
2410 @SuppressWarnings("unchecked") T tr = (T) r;
2411 t = tr;
2412 }
2413 U u = null;
2414 if (ex == null) {
2415 try {
2416 if (e != null)
2417 e.execute(new AsyncApply<T,U>(t, fn, dst));
2418 else
2419 u = fn.apply(t);
2420 } catch (Throwable rex) {
2421 ex = rex;
2422 }
2423 }
2424 if (e == null || ex != null)
2425 dst.internalComplete(u, ex);
2426 }
2427 helpPostComplete();
2428 other.helpPostComplete();
2429 return dst;
2430 }
2431
2432 /**
2433 * Returns a new CompletableFuture that is completed
2434 * when either this or the other given CompletableFuture completes,
2435 * after performing the given action with the result of either this
2436 * or the other CompletableFuture's result.
2437 *
2438 * <p>If this and/or the other CompletableFuture complete
2439 * exceptionally, then the returned CompletableFuture may also do so,
2440 * with a CompletionException holding one of these exceptions as its
2441 * cause. No guarantees are made about which result or exception is
2442 * used in the returned CompletableFuture. If the supplied action
2443 * throws an exception, then the returned CompletableFuture completes
2444 * exceptionally with a CompletionException holding the exception as
2445 * its cause.
2446 *
2447 * @param other the other CompletableFuture
2448 * @param block the action to perform before completing the
2449 * returned CompletableFuture
2450 * @return the new CompletableFuture
2451 */
2452 public CompletableFuture<Void> acceptEither
2453 (CompletableFuture<? extends T> other,
2454 Consumer<? super T> block) {
2455 return doOrAccept(other, block, null);
2456 }
2457
2458 /**
2459 * Returns a new CompletableFuture that is asynchronously completed
2460 * when either this or the other given CompletableFuture completes,
2461 * after performing the given action with the result of either this
2462 * or the other CompletableFuture's result from a task running in
2463 * the {@link ForkJoinPool#commonPool()}.
2464 *
2465 * <p>If this and/or the other CompletableFuture complete
2466 * exceptionally, then the returned CompletableFuture may also do so,
2467 * with a CompletionException holding one of these exceptions as its
2468 * cause. No guarantees are made about which result or exception is
2469 * used in the returned CompletableFuture. If the supplied action
2470 * throws an exception, then the returned CompletableFuture completes
2471 * exceptionally with a CompletionException holding the exception as
2472 * its cause.
2473 *
2474 * @param other the other CompletableFuture
2475 * @param block the action to perform before completing the
2476 * returned CompletableFuture
2477 * @return the new CompletableFuture
2478 */
2479 public CompletableFuture<Void> acceptEitherAsync
2480 (CompletableFuture<? extends T> other,
2481 Consumer<? super T> block) {
2482 return doOrAccept(other, block, ForkJoinPool.commonPool());
2483 }
2484
2485 /**
2486 * Returns a new CompletableFuture that is asynchronously completed
2487 * when either this or the other given CompletableFuture completes,
2488 * after performing the given action with the result of either this
2489 * or the other CompletableFuture's result from a task running in
2490 * the given executor.
2491 *
2492 * <p>If this and/or the other CompletableFuture complete
2493 * exceptionally, then the returned CompletableFuture may also do so,
2494 * with a CompletionException holding one of these exceptions as its
2495 * cause. No guarantees are made about which result or exception is
2496 * used in the returned CompletableFuture. If the supplied action
2497 * throws an exception, then the returned CompletableFuture completes
2498 * exceptionally with a CompletionException holding the exception as
2499 * its cause.
2500 *
2501 * @param other the other CompletableFuture
2502 * @param block the action to perform before completing the
2503 * returned CompletableFuture
2504 * @param executor the executor to use for asynchronous execution
2505 * @return the new CompletableFuture
2506 */
2507 public CompletableFuture<Void> acceptEitherAsync
2508 (CompletableFuture<? extends T> other,
2509 Consumer<? super T> block,
2510 Executor executor) {
2511 if (executor == null) throw new NullPointerException();
2512 return doOrAccept(other, block, executor);
2513 }
2514
2515 private CompletableFuture<Void> doOrAccept
2516 (CompletableFuture<? extends T> other,
2517 Consumer<? super T> fn,
2518 Executor e) {
2519 if (other == null || fn == null) throw new NullPointerException();
2520 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2521 OrAcceptCompletion<T> d = null;
2522 Object r;
2523 if ((r = result) == null && (r = other.result) == null) {
2524 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2525 CompletionNode q = null, p = new CompletionNode(d);
2526 while ((r = result) == null && (r = other.result) == null) {
2527 if (q != null) {
2528 if (UNSAFE.compareAndSwapObject
2529 (other, COMPLETIONS, q.next = other.completions, q))
2530 break;
2531 }
2532 else if (UNSAFE.compareAndSwapObject
2533 (this, COMPLETIONS, p.next = completions, p))
2534 q = new CompletionNode(d);
2535 }
2536 }
2537 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2538 T t; Throwable ex;
2539 if (r instanceof AltResult) {
2540 ex = ((AltResult)r).ex;
2541 t = null;
2542 }
2543 else {
2544 ex = null;
2545 @SuppressWarnings("unchecked") T tr = (T) r;
2546 t = tr;
2547 }
2548 if (ex == null) {
2549 try {
2550 if (e != null)
2551 e.execute(new AsyncAccept<T>(t, fn, dst));
2552 else
2553 fn.accept(t);
2554 } catch (Throwable rex) {
2555 ex = rex;
2556 }
2557 }
2558 if (e == null || ex != null)
2559 dst.internalComplete(null, ex);
2560 }
2561 helpPostComplete();
2562 other.helpPostComplete();
2563 return dst;
2564 }
2565
2566 /**
2567 * Returns a new CompletableFuture that is completed
2568 * when either this or the other given CompletableFuture completes,
2569 * after performing the given action.
2570 *
2571 * <p>If this and/or the other CompletableFuture complete
2572 * exceptionally, then the returned CompletableFuture may also do so,
2573 * with a CompletionException holding one of these exceptions as its
2574 * cause. No guarantees are made about which result or exception is
2575 * used in the returned CompletableFuture. If the supplied action
2576 * throws an exception, then the returned CompletableFuture completes
2577 * exceptionally with a CompletionException holding the exception as
2578 * its cause.
2579 *
2580 * @param other the other CompletableFuture
2581 * @param action the action to perform before completing the
2582 * returned CompletableFuture
2583 * @return the new CompletableFuture
2584 */
2585 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2586 Runnable action) {
2587 return doOrRun(other, action, null);
2588 }
2589
2590 /**
2591 * Returns a new CompletableFuture that is asynchronously completed
2592 * when either this or the other given CompletableFuture completes,
2593 * after performing the given action from a task running in the
2594 * {@link ForkJoinPool#commonPool()}.
2595 *
2596 * <p>If this and/or the other CompletableFuture complete
2597 * exceptionally, then the returned CompletableFuture may also do so,
2598 * with a CompletionException holding one of these exceptions as its
2599 * cause. No guarantees are made about which result or exception is
2600 * used in the returned CompletableFuture. If the supplied action
2601 * throws an exception, then the returned CompletableFuture completes
2602 * exceptionally with a CompletionException holding the exception as
2603 * its cause.
2604 *
2605 * @param other the other CompletableFuture
2606 * @param action the action to perform before completing the
2607 * returned CompletableFuture
2608 * @return the new CompletableFuture
2609 */
2610 public CompletableFuture<Void> runAfterEitherAsync
2611 (CompletableFuture<?> other,
2612 Runnable action) {
2613 return doOrRun(other, action, ForkJoinPool.commonPool());
2614 }
2615
2616 /**
2617 * Returns a new CompletableFuture that is asynchronously completed
2618 * when either this or the other given CompletableFuture completes,
2619 * after performing the given action from a task running in the
2620 * given executor.
2621 *
2622 * <p>If this and/or the other CompletableFuture complete
2623 * exceptionally, then the returned CompletableFuture may also do so,
2624 * with a CompletionException holding one of these exceptions as its
2625 * cause. No guarantees are made about which result or exception is
2626 * used in the returned CompletableFuture. If the supplied action
2627 * throws an exception, then the returned CompletableFuture completes
2628 * exceptionally with a CompletionException holding the exception as
2629 * its cause.
2630 *
2631 * @param other the other CompletableFuture
2632 * @param action the action to perform before completing the
2633 * returned CompletableFuture
2634 * @param executor the executor to use for asynchronous execution
2635 * @return the new CompletableFuture
2636 */
2637 public CompletableFuture<Void> runAfterEitherAsync
2638 (CompletableFuture<?> other,
2639 Runnable action,
2640 Executor executor) {
2641 if (executor == null) throw new NullPointerException();
2642 return doOrRun(other, action, executor);
2643 }
2644
2645 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2646 Runnable action,
2647 Executor e) {
2648 if (other == null || action == null) throw new NullPointerException();
2649 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2650 OrRunCompletion<T> d = null;
2651 Object r;
2652 if ((r = result) == null && (r = other.result) == null) {
2653 d = new OrRunCompletion<T>(this, other, action, dst, e);
2654 CompletionNode q = null, p = new CompletionNode(d);
2655 while ((r = result) == null && (r = other.result) == null) {
2656 if (q != null) {
2657 if (UNSAFE.compareAndSwapObject
2658 (other, COMPLETIONS, q.next = other.completions, q))
2659 break;
2660 }
2661 else if (UNSAFE.compareAndSwapObject
2662 (this, COMPLETIONS, p.next = completions, p))
2663 q = new CompletionNode(d);
2664 }
2665 }
2666 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2667 Throwable ex;
2668 if (r instanceof AltResult)
2669 ex = ((AltResult)r).ex;
2670 else
2671 ex = null;
2672 if (ex == null) {
2673 try {
2674 if (e != null)
2675 e.execute(new AsyncRun(action, dst));
2676 else
2677 action.run();
2678 } catch (Throwable rex) {
2679 ex = rex;
2680 }
2681 }
2682 if (e == null || ex != null)
2683 dst.internalComplete(null, ex);
2684 }
2685 helpPostComplete();
2686 other.helpPostComplete();
2687 return dst;
2688 }
2689
2690 /**
2691 * Returns a CompletableFuture that upon completion, has the same
2692 * value as produced by the given function of the result of this
2693 * CompletableFuture.
2694 *
2695 * <p>If this CompletableFuture completes exceptionally, then the
2696 * returned CompletableFuture also does so, with a
2697 * CompletionException holding this exception as its cause.
2698 * Similarly, if the computed CompletableFuture completes
2699 * exceptionally, then so does the returned CompletableFuture.
2700 *
2701 * @param fn the function returning a new CompletableFuture
2702 * @return the CompletableFuture
2703 */
2704 public <U> CompletableFuture<U> thenCompose
2705 (Function<? super T, CompletableFuture<U>> fn) {
2706 return doCompose(fn, null);
2707 }
2708
2709 /**
2710 * Returns a CompletableFuture that upon completion, has the same
2711 * value as that produced asynchronously using the {@link
2712 * ForkJoinPool#commonPool()} by the given function of the result
2713 * of this CompletableFuture.
2714 *
2715 * <p>If this CompletableFuture completes exceptionally, then the
2716 * returned CompletableFuture also does so, with a
2717 * CompletionException holding this exception as its cause.
2718 * Similarly, if the computed CompletableFuture completes
2719 * exceptionally, then so does the returned CompletableFuture.
2720 *
2721 * @param fn the function returning a new CompletableFuture
2722 * @return the CompletableFuture
2723 */
2724 public <U> CompletableFuture<U> thenComposeAsync
2725 (Function<? super T, CompletableFuture<U>> fn) {
2726 return doCompose(fn, ForkJoinPool.commonPool());
2727 }
2728
2729 /**
2730 * Returns a CompletableFuture that upon completion, has the same
2731 * value as that produced asynchronously using the given executor
2732 * by the given function of this CompletableFuture.
2733 *
2734 * <p>If this CompletableFuture completes exceptionally, then the
2735 * returned CompletableFuture also does so, with a
2736 * CompletionException holding this exception as its cause.
2737 * Similarly, if the computed CompletableFuture completes
2738 * exceptionally, then so does the returned CompletableFuture.
2739 *
2740 * @param fn the function returning a new CompletableFuture
2741 * @param executor the executor to use for asynchronous execution
2742 * @return the CompletableFuture
2743 */
2744 public <U> CompletableFuture<U> thenComposeAsync
2745 (Function<? super T, CompletableFuture<U>> fn,
2746 Executor executor) {
2747 if (executor == null) throw new NullPointerException();
2748 return doCompose(fn, executor);
2749 }
2750
2751 private <U> CompletableFuture<U> doCompose
2752 (Function<? super T, CompletableFuture<U>> fn,
2753 Executor e) {
2754 if (fn == null) throw new NullPointerException();
2755 CompletableFuture<U> dst = null;
2756 ComposeCompletion<T,U> d = null;
2757 Object r;
2758 if ((r = result) == null) {
2759 dst = new CompletableFuture<U>();
2760 CompletionNode p = new CompletionNode
2761 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2762 while ((r = result) == null) {
2763 if (UNSAFE.compareAndSwapObject
2764 (this, COMPLETIONS, p.next = completions, p))
2765 break;
2766 }
2767 }
2768 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2769 T t; Throwable ex;
2770 if (r instanceof AltResult) {
2771 ex = ((AltResult)r).ex;
2772 t = null;
2773 }
2774 else {
2775 ex = null;
2776 @SuppressWarnings("unchecked") T tr = (T) r;
2777 t = tr;
2778 }
2779 if (ex == null) {
2780 if (e != null) {
2781 if (dst == null)
2782 dst = new CompletableFuture<U>();
2783 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2784 }
2785 else {
2786 try {
2787 if ((dst = fn.apply(t)) == null)
2788 ex = new NullPointerException();
2789 } catch (Throwable rex) {
2790 ex = rex;
2791 }
2792 if (dst == null)
2793 dst = new CompletableFuture<U>();
2794 }
2795 }
2796 if (e == null && ex != null)
2797 dst.internalComplete(null, ex);
2798 }
2799 helpPostComplete();
2800 dst.helpPostComplete();
2801 return dst;
2802 }
2803
2804 /**
2805 * Returns a new CompletableFuture that is completed when this
2806 * CompletableFuture completes, with the result of the given
2807 * function of the exception triggering this CompletableFuture's
2808 * completion when it completes exceptionally; otherwise, if this
2809 * CompletableFuture completes normally, then the returned
2810 * CompletableFuture also completes normally with the same value.
2811 *
2812 * @param fn the function to use to compute the value of the
2813 * returned CompletableFuture if this CompletableFuture completed
2814 * exceptionally
2815 * @return the new CompletableFuture
2816 */
2817 public CompletableFuture<T> exceptionally
2818 (Function<Throwable, ? extends T> fn) {
2819 if (fn == null) throw new NullPointerException();
2820 CompletableFuture<T> dst = new CompletableFuture<T>();
2821 ExceptionCompletion<T> d = null;
2822 Object r;
2823 if ((r = result) == null) {
2824 CompletionNode p =
2825 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2826 while ((r = result) == null) {
2827 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2828 p.next = completions, p))
2829 break;
2830 }
2831 }
2832 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2833 T t = null; Throwable ex, dx = null;
2834 if (r instanceof AltResult) {
2835 if ((ex = ((AltResult)r).ex) != null) {
2836 try {
2837 t = fn.apply(ex);
2838 } catch (Throwable rex) {
2839 dx = rex;
2840 }
2841 }
2842 }
2843 else {
2844 @SuppressWarnings("unchecked") T tr = (T) r;
2845 t = tr;
2846 }
2847 dst.internalComplete(t, dx);
2848 }
2849 helpPostComplete();
2850 return dst;
2851 }
2852
2853 /**
2854 * Returns a new CompletableFuture that is completed when this
2855 * CompletableFuture completes, with the result of the given
2856 * function of the result and exception of this CompletableFuture's
2857 * completion. The given function is invoked with the result (or
2858 * {@code null} if none) and the exception (or {@code null} if none)
2859 * of this CompletableFuture when complete.
2860 *
2861 * @param fn the function to use to compute the value of the
2862 * returned CompletableFuture
2863 * @return the new CompletableFuture
2864 */
2865 public <U> CompletableFuture<U> handle
2866 (BiFunction<? super T, Throwable, ? extends U> fn) {
2867 if (fn == null) throw new NullPointerException();
2868 CompletableFuture<U> dst = new CompletableFuture<U>();
2869 HandleCompletion<T,U> d = null;
2870 Object r;
2871 if ((r = result) == null) {
2872 CompletionNode p =
2873 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2874 while ((r = result) == null) {
2875 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2876 p.next = completions, p))
2877 break;
2878 }
2879 }
2880 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2881 T t; Throwable ex;
2882 if (r instanceof AltResult) {
2883 ex = ((AltResult)r).ex;
2884 t = null;
2885 }
2886 else {
2887 ex = null;
2888 @SuppressWarnings("unchecked") T tr = (T) r;
2889 t = tr;
2890 }
2891 U u; Throwable dx;
2892 try {
2893 u = fn.apply(t, ex);
2894 dx = null;
2895 } catch (Throwable rex) {
2896 dx = rex;
2897 u = null;
2898 }
2899 dst.internalComplete(u, dx);
2900 }
2901 helpPostComplete();
2902 return dst;
2903 }
2904
2905
2906 /* ------------- Arbitrary-arity constructions -------------- */
2907
2908 /*
2909 * The basic plan of attack is to recursively form binary
2910 * completion trees of elements. This can be overkill for small
2911 * sets, but scales nicely. The And/All vs Or/Any forms use the
2912 * same idea, but details differ.
2913 */
2914
2915 /**
2916 * Returns a new CompletableFuture that is completed when all of
2917 * the given CompletableFutures complete. If any of the given
2918 * CompletableFutures complete exceptionally, then the returned
2919 * CompletableFuture also does so, with a CompletionException
2920 * holding this exception as its cause. Otherwise, the results,
2921 * if any, of the given CompletableFutures are not reflected in
2922 * the returned CompletableFuture, but may be obtained by
2923 * inspecting them individually. If no CompletableFutures are
2924 * provided, returns a CompletableFuture completed with the value
2925 * {@code null}.
2926 *
2927 * <p>Among the applications of this method is to await completion
2928 * of a set of independent CompletableFutures before continuing a
2929 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2930 * c3).join();}.
2931 *
2932 * @param cfs the CompletableFutures
2933 * @return a new CompletableFuture that is completed when all of the
2934 * given CompletableFutures complete
2935 * @throws NullPointerException if the array or any of its elements are
2936 * {@code null}
2937 */
2938 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2939 int len = cfs.length; // Directly handle empty and singleton cases
2940 if (len > 1)
2941 return allTree(cfs, 0, len - 1);
2942 else {
2943 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2944 CompletableFuture<?> f;
2945 if (len == 0)
2946 dst.result = NIL;
2947 else if ((f = cfs[0]) == null)
2948 throw new NullPointerException();
2949 else {
2950 ThenPropagate d = null;
2951 CompletionNode p = null;
2952 Object r;
2953 while ((r = f.result) == null) {
2954 if (d == null)
2955 d = new ThenPropagate(f, dst);
2956 else if (p == null)
2957 p = new CompletionNode(d);
2958 else if (UNSAFE.compareAndSwapObject
2959 (f, COMPLETIONS, p.next = f.completions, p))
2960 break;
2961 }
2962 if (r != null && (d == null || d.compareAndSet(0, 1)))
2963 dst.internalComplete(null, (r instanceof AltResult) ?
2964 ((AltResult)r).ex : null);
2965 f.helpPostComplete();
2966 }
2967 return dst;
2968 }
2969 }
2970
2971 /**
2972 * Recursively constructs an And'ed tree of CompletableFutures.
2973 * Called only when array known to have at least two elements.
2974 */
2975 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2976 int lo, int hi) {
2977 CompletableFuture<?> fst, snd;
2978 int mid = (lo + hi) >>> 1;
2979 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2980 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2981 throw new NullPointerException();
2982 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2983 AndCompletion d = null;
2984 CompletionNode p = null, q = null;
2985 Object r = null, s = null;
2986 while ((r = fst.result) == null || (s = snd.result) == null) {
2987 if (d == null)
2988 d = new AndCompletion(fst, snd, dst);
2989 else if (p == null)
2990 p = new CompletionNode(d);
2991 else if (q == null) {
2992 if (UNSAFE.compareAndSwapObject
2993 (fst, COMPLETIONS, p.next = fst.completions, p))
2994 q = new CompletionNode(d);
2995 }
2996 else if (UNSAFE.compareAndSwapObject
2997 (snd, COMPLETIONS, q.next = snd.completions, q))
2998 break;
2999 }
3000 if ((r != null || (r = fst.result) != null) &&
3001 (s != null || (s = snd.result) != null) &&
3002 (d == null || d.compareAndSet(0, 1))) {
3003 Throwable ex;
3004 if (r instanceof AltResult)
3005 ex = ((AltResult)r).ex;
3006 else
3007 ex = null;
3008 if (ex == null && (s instanceof AltResult))
3009 ex = ((AltResult)s).ex;
3010 dst.internalComplete(null, ex);
3011 }
3012 fst.helpPostComplete();
3013 snd.helpPostComplete();
3014 return dst;
3015 }
3016
3017 /**
3018 * Returns a new CompletableFuture that is completed when any of the
3019 * given CompletableFutures complete, with the same result if it
3020 * completed normally. Otherwise, if it completed exceptionally,
3021 * the returned CompletableFuture also does so, with a
3022 * CompletionException holding this exception as its cause. If no
3023 * CompletableFutures are provided, returns an incomplete
3024 * CompletableFuture.
3025 *
3026 * @param cfs the CompletableFutures
3027 * @return a new CompletableFuture that is completed when any of the
3028 * given CompletableFutures complete
3029 * @throws NullPointerException if the array or any of its elements are
3030 * {@code null}
3031 */
3032 public static CompletableFuture<Void> anyOf(CompletableFuture<?>... cfs) {
3033 int len = cfs.length; // Same idea as allOf
3034 if (len > 1)
3035 return anyTree(cfs, 0, len - 1);
3036 else {
3037 CompletableFuture<Void> dst = new CompletableFuture<Void>();
3038 CompletableFuture<?> f;
3039 if (len == 0)
3040 ; // skip
3041 else if ((f = cfs[0]) == null)
3042 throw new NullPointerException();
3043 else {
3044 ThenPropagate d = null;
3045 CompletionNode p = null;
3046 Object r;
3047 while ((r = f.result) == null) {
3048 if (d == null)
3049 d = new ThenPropagate(f, dst);
3050 else if (p == null)
3051 p = new CompletionNode(d);
3052 else if (UNSAFE.compareAndSwapObject
3053 (f, COMPLETIONS, p.next = f.completions, p))
3054 break;
3055 }
3056 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3057 Throwable ex; Object t;
3058 if (r instanceof AltResult)
3059 ex = ((AltResult)r).ex;
3060 ex = null;
3061 dst.internalComplete(null, ex);
3062 }
3063 f.helpPostComplete();
3064 }
3065 return dst;
3066 }
3067 }
3068
3069 /**
3070 * Recursively constructs an Or'ed tree of CompletableFutures.
3071 */
3072 private static CompletableFuture<Void> anyTree(CompletableFuture<?>[] cfs,
3073 int lo, int hi) {
3074 CompletableFuture<?> fst, snd;
3075 int mid = (lo + hi) >>> 1;
3076 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3077 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3078 throw new NullPointerException();
3079 CompletableFuture<Void> dst = new CompletableFuture<Void>();
3080 OrCompletion d = null;
3081 CompletionNode p = null, q = null;
3082 Object r;
3083 while ((r = fst.result) == null && (r = snd.result) == null) {
3084 if (d == null)
3085 d = new OrCompletion(fst, snd, dst);
3086 else if (p == null)
3087 p = new CompletionNode(d);
3088 else if (q == null) {
3089 if (UNSAFE.compareAndSwapObject
3090 (fst, COMPLETIONS, p.next = fst.completions, p))
3091 q = new CompletionNode(d);
3092 }
3093 else if (UNSAFE.compareAndSwapObject
3094 (snd, COMPLETIONS, q.next = snd.completions, q))
3095 break;
3096 }
3097 if ((r != null || (r = fst.result) != null ||
3098 (r = snd.result) != null) &&
3099 (d == null || d.compareAndSet(0, 1))) {
3100 Throwable ex;
3101 if (r instanceof AltResult) {
3102 ex = ((AltResult)r).ex;
3103 }
3104 else
3105 ex = null;
3106 dst.internalComplete(null, ex);
3107 }
3108 fst.helpPostComplete();
3109 snd.helpPostComplete();
3110 return dst;
3111 }
3112
3113
3114 /* ------------- Control and status methods -------------- */
3115
3116 /**
3117 * If not already completed, completes this CompletableFuture with
3118 * a {@link CancellationException}. Dependent CompletableFutures
3119 * that have not already completed will also complete
3120 * exceptionally, with a {@link CompletionException} caused by
3121 * this {@code CancellationException}.
3122 *
3123 * @param mayInterruptIfRunning this value has no effect in this
3124 * implementation because interrupts are not used to control
3125 * processing.
3126 *
3127 * @return {@code true} if this task is now cancelled
3128 */
3129 public boolean cancel(boolean mayInterruptIfRunning) {
3130 boolean cancelled = (result == null) &&
3131 UNSAFE.compareAndSwapObject
3132 (this, RESULT, null, new AltResult(new CancellationException()));
3133 postComplete();
3134 return cancelled || isCancelled();
3135 }
3136
3137 /**
3138 * Returns {@code true} if this CompletableFuture was cancelled
3139 * before it completed normally.
3140 *
3141 * @return {@code true} if this CompletableFuture was cancelled
3142 * before it completed normally
3143 */
3144 public boolean isCancelled() {
3145 Object r;
3146 return ((r = result) instanceof AltResult) &&
3147 (((AltResult)r).ex instanceof CancellationException);
3148 }
3149
3150 /**
3151 * Forcibly sets or resets the value subsequently returned by
3152 * method {@link #get()} and related methods, whether or not
3153 * already completed. This method is designed for use only in
3154 * error recovery actions, and even in such situations may result
3155 * in ongoing dependent completions using established versus
3156 * overwritten outcomes.
3157 *
3158 * @param value the completion value
3159 */
3160 public void obtrudeValue(T value) {
3161 result = (value == null) ? NIL : value;
3162 postComplete();
3163 }
3164
3165 /**
3166 * Forcibly causes subsequent invocations of method {@link #get()}
3167 * and related methods to throw the given exception, whether or
3168 * not already completed. This method is designed for use only in
3169 * recovery actions, and even in such situations may result in
3170 * ongoing dependent completions using established versus
3171 * overwritten outcomes.
3172 *
3173 * @param ex the exception
3174 */
3175 public void obtrudeException(Throwable ex) {
3176 if (ex == null) throw new NullPointerException();
3177 result = new AltResult(ex);
3178 postComplete();
3179 }
3180
3181 /**
3182 * Returns the estimated number of CompletableFutures whose
3183 * completions are awaiting completion of this CompletableFuture.
3184 * This method is designed for use in monitoring system state, not
3185 * for synchronization control.
3186 *
3187 * @return the number of dependent CompletableFutures
3188 */
3189 public int getNumberOfDependents() {
3190 int count = 0;
3191 for (CompletionNode p = completions; p != null; p = p.next)
3192 ++count;
3193 return count;
3194 }
3195
3196 /**
3197 * Returns a string identifying this CompletableFuture, as well as
3198 * its completion state. The state, in brackets, contains the
3199 * String {@code "Completed Normally"} or the String {@code
3200 * "Completed Exceptionally"}, or the String {@code "Not
3201 * completed"} followed by the number of CompletableFutures
3202 * dependent upon its completion, if any.
3203 *
3204 * @return a string identifying this CompletableFuture, as well as its state
3205 */
3206 public String toString() {
3207 Object r = result;
3208 int count;
3209 return super.toString() +
3210 ((r == null) ?
3211 (((count = getNumberOfDependents()) == 0) ?
3212 "[Not completed]" :
3213 "[Not completed, " + count + " dependents]") :
3214 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3215 "[Completed exceptionally]" :
3216 "[Completed normally]"));
3217 }
3218
3219 // Unsafe mechanics
3220 private static final sun.misc.Unsafe UNSAFE;
3221 private static final long RESULT;
3222 private static final long WAITERS;
3223 private static final long COMPLETIONS;
3224 static {
3225 try {
3226 UNSAFE = sun.misc.Unsafe.getUnsafe();
3227 Class<?> k = CompletableFuture.class;
3228 RESULT = UNSAFE.objectFieldOffset
3229 (k.getDeclaredField("result"));
3230 WAITERS = UNSAFE.objectFieldOffset
3231 (k.getDeclaredField("waiters"));
3232 COMPLETIONS = UNSAFE.objectFieldOffset
3233 (k.getDeclaredField("completions"));
3234 } catch (Exception e) {
3235 throw new Error(e);
3236 }
3237 }
3238 }