ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.61
Committed: Sun Mar 17 22:24:02 2013 UTC (11 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.60: +4 -10 lines
Log Message:
simplify handling of compose functions returning null CompletableFutures

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on Functions,
37 * Consumers, and Runnables. The appropriate form to use depends on
38 * whether actions require arguments and/or produce results. Actions
39 * may also be triggered after either or both the current and another
40 * CompletableFuture complete. Multiple CompletableFutures may also
41 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
42 * {@link #allOf(CompletableFuture...)}.
43 *
44 * <p>Actions supplied for dependent completions (mainly using methods
45 * with prefix {@code then}) may be performed by the thread that
46 * completes the current CompletableFuture, or by any other caller of
47 * these methods. There are no guarantees about the order of
48 * processing completions unless constrained by these methods.
49 *
50 * <p>Since (unlike {@link FutureTask}) this class has no direct
51 * control over the computation that causes it to be completed,
52 * cancellation is treated as just another form of exceptional completion.
53 * Method {@link #cancel cancel} has the same effect as
54 * {@code completeExceptionally(new CancellationException())}.
55 *
56 * <p>Upon exceptional completion (including cancellation), or when a
57 * completion entails an additional computation which terminates
58 * abruptly with an (unchecked) exception or error, then all of their
59 * dependent completions (and their dependents in turn) generally act
60 * as {@code completeExceptionally} with a {@link CompletionException}
61 * holding that exception as its cause. However, the {@link
62 * #exceptionally exceptionally} and {@link #handle handle}
63 * completions <em>are</em> able to handle exceptional completions of
64 * the CompletableFutures they depend on.
65 *
66 * <p>In case of exceptional completion with a CompletionException,
67 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
68 * {@link ExecutionException} with the same cause as held in the
69 * corresponding CompletionException. However, in these cases,
70 * methods {@link #join()} and {@link #getNow} throw the
71 * CompletionException, which simplifies usage.
72 *
73 * <p>CompletableFutures themselves do not execute asynchronously.
74 * However, the {@code async} methods provide commonly useful ways to
75 * commence asynchronous processing, using either a given {@link
76 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
77 * function or action that will result in the completion of a new
78 * CompletableFuture. To simplify monitoring, debugging, and tracking,
79 * all generated asynchronous tasks are instances of the tagging
80 * interface {@link AsynchronousCompletionTask}.
81 *
82 * @author Doug Lea
83 * @since 1.8
84 */
85 public class CompletableFuture<T> implements Future<T> {
86
87 /*
88 * Overview:
89 *
90 * 1. Non-nullness of field result (set via CAS) indicates done.
91 * An AltResult is used to box null as a result, as well as to
92 * hold exceptions. Using a single field makes completion fast
93 * and simple to detect and trigger, at the expense of a lot of
94 * encoding and decoding that infiltrates many methods. One minor
95 * simplification relies on the (static) NIL (to box null results)
96 * being the only AltResult with a null exception field, so we
97 * don't usually need explicit comparisons with NIL. The CF
98 * exception propagation mechanics surrounding decoding rely on
99 * unchecked casts of decoded results really being unchecked,
100 * where user type errors are caught at point of use, as is
101 * currently the case in Java. These are highlighted by using
102 * SuppressWarnings-annotated temporaries.
103 *
104 * 2. Waiters are held in a Treiber stack similar to the one used
105 * in FutureTask, Phaser, and SynchronousQueue. See their
106 * internal documentation for algorithmic details.
107 *
108 * 3. Completions are also kept in a list/stack, and pulled off
109 * and run when completion is triggered. (We could even use the
110 * same stack as for waiters, but would give up the potential
111 * parallelism obtained because woken waiters help release/run
112 * others -- see method postComplete). Because post-processing
113 * may race with direct calls, class Completion opportunistically
114 * extends AtomicInteger so callers can claim the action via
115 * compareAndSet(0, 1). The Completion.run methods are all
116 * written a boringly similar uniform way (that sometimes includes
117 * unnecessary-looking checks, kept to maintain uniformity).
118 * There are enough dimensions upon which they differ that
119 * attempts to factor commonalities while maintaining efficiency
120 * require more lines of code than they would save.
121 *
122 * 4. The exported then/and/or methods do support a bit of
123 * factoring (see doThenApply etc). They must cope with the
124 * intrinsic races surrounding addition of a dependent action
125 * versus performing the action directly because the task is
126 * already complete. For example, a CF may not be complete upon
127 * entry, so a dependent completion is added, but by the time it
128 * is added, the target CF is complete, so must be directly
129 * executed. This is all done while avoiding unnecessary object
130 * construction in safe-bypass cases.
131 */
132
133 // preliminaries
134
135 static final class AltResult {
136 final Throwable ex; // null only for NIL
137 AltResult(Throwable ex) { this.ex = ex; }
138 }
139
140 static final AltResult NIL = new AltResult(null);
141
142 // Fields
143
144 volatile Object result; // Either the result or boxed AltResult
145 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
146 volatile CompletionNode completions; // list (Treiber stack) of completions
147
148 // Basic utilities for triggering and processing completions
149
150 /**
151 * Removes and signals all waiting threads and runs all completions.
152 */
153 final void postComplete() {
154 WaitNode q; Thread t;
155 while ((q = waiters) != null) {
156 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
157 (t = q.thread) != null) {
158 q.thread = null;
159 LockSupport.unpark(t);
160 }
161 }
162
163 CompletionNode h; Completion c;
164 while ((h = completions) != null) {
165 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
166 (c = h.completion) != null)
167 c.run();
168 }
169 }
170
171 /**
172 * Triggers completion with the encoding of the given arguments:
173 * if the exception is non-null, encodes it as a wrapped
174 * CompletionException unless it is one already. Otherwise uses
175 * the given result, boxed as NIL if null.
176 */
177 final void internalComplete(Object v, Throwable ex) {
178 if (result == null)
179 UNSAFE.compareAndSwapObject
180 (this, RESULT, null,
181 (ex == null) ? (v == null) ? NIL : v :
182 new AltResult((ex instanceof CompletionException) ? ex :
183 new CompletionException(ex)));
184 postComplete(); // help out even if not triggered
185 }
186
187 /**
188 * If triggered, helps release and/or process completions.
189 */
190 final void helpPostComplete() {
191 if (result != null)
192 postComplete();
193 }
194
195 /* ------------- waiting for completions -------------- */
196
197 /** Number of processors, for spin control */
198 static final int NCPU = Runtime.getRuntime().availableProcessors();
199
200 /**
201 * Heuristic spin value for waitingGet() before blocking on
202 * multiprocessors
203 */
204 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
205
206 /**
207 * Linked nodes to record waiting threads in a Treiber stack. See
208 * other classes such as Phaser and SynchronousQueue for more
209 * detailed explanation. This class implements ManagedBlocker to
210 * avoid starvation when blocking actions pile up in
211 * ForkJoinPools.
212 */
213 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
214 long nanos; // wait time if timed
215 final long deadline; // non-zero if timed
216 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
217 volatile Thread thread;
218 volatile WaitNode next;
219 WaitNode(boolean interruptible, long nanos, long deadline) {
220 this.thread = Thread.currentThread();
221 this.interruptControl = interruptible ? 1 : 0;
222 this.nanos = nanos;
223 this.deadline = deadline;
224 }
225 public boolean isReleasable() {
226 if (thread == null)
227 return true;
228 if (Thread.interrupted()) {
229 int i = interruptControl;
230 interruptControl = -1;
231 if (i > 0)
232 return true;
233 }
234 if (deadline != 0L &&
235 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
236 thread = null;
237 return true;
238 }
239 return false;
240 }
241 public boolean block() {
242 if (isReleasable())
243 return true;
244 else if (deadline == 0L)
245 LockSupport.park(this);
246 else if (nanos > 0L)
247 LockSupport.parkNanos(this, nanos);
248 return isReleasable();
249 }
250 }
251
252 /**
253 * Returns raw result after waiting, or null if interruptible and
254 * interrupted.
255 */
256 private Object waitingGet(boolean interruptible) {
257 WaitNode q = null;
258 boolean queued = false;
259 int spins = SPINS;
260 for (Object r;;) {
261 if ((r = result) != null) {
262 if (q != null) { // suppress unpark
263 q.thread = null;
264 if (q.interruptControl < 0) {
265 if (interruptible) {
266 removeWaiter(q);
267 return null;
268 }
269 Thread.currentThread().interrupt();
270 }
271 }
272 postComplete(); // help release others
273 return r;
274 }
275 else if (spins > 0) {
276 int rnd = ThreadLocalRandom.nextSecondarySeed();
277 if (rnd == 0)
278 rnd = ThreadLocalRandom.current().nextInt();
279 if (rnd >= 0)
280 --spins;
281 }
282 else if (q == null)
283 q = new WaitNode(interruptible, 0L, 0L);
284 else if (!queued)
285 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
286 q.next = waiters, q);
287 else if (interruptible && q.interruptControl < 0) {
288 removeWaiter(q);
289 return null;
290 }
291 else if (q.thread != null && result == null) {
292 try {
293 ForkJoinPool.managedBlock(q);
294 } catch (InterruptedException ex) {
295 q.interruptControl = -1;
296 }
297 }
298 }
299 }
300
301 /**
302 * Awaits completion or aborts on interrupt or timeout.
303 *
304 * @param nanos time to wait
305 * @return raw result
306 */
307 private Object timedAwaitDone(long nanos)
308 throws InterruptedException, TimeoutException {
309 WaitNode q = null;
310 boolean queued = false;
311 for (Object r;;) {
312 if ((r = result) != null) {
313 if (q != null) {
314 q.thread = null;
315 if (q.interruptControl < 0) {
316 removeWaiter(q);
317 throw new InterruptedException();
318 }
319 }
320 postComplete();
321 return r;
322 }
323 else if (q == null) {
324 if (nanos <= 0L)
325 throw new TimeoutException();
326 long d = System.nanoTime() + nanos;
327 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
328 }
329 else if (!queued)
330 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
331 q.next = waiters, q);
332 else if (q.interruptControl < 0) {
333 removeWaiter(q);
334 throw new InterruptedException();
335 }
336 else if (q.nanos <= 0L) {
337 if (result == null) {
338 removeWaiter(q);
339 throw new TimeoutException();
340 }
341 }
342 else if (q.thread != null && result == null) {
343 try {
344 ForkJoinPool.managedBlock(q);
345 } catch (InterruptedException ex) {
346 q.interruptControl = -1;
347 }
348 }
349 }
350 }
351
352 /**
353 * Tries to unlink a timed-out or interrupted wait node to avoid
354 * accumulating garbage. Internal nodes are simply unspliced
355 * without CAS since it is harmless if they are traversed anyway
356 * by releasers. To avoid effects of unsplicing from already
357 * removed nodes, the list is retraversed in case of an apparent
358 * race. This is slow when there are a lot of nodes, but we don't
359 * expect lists to be long enough to outweigh higher-overhead
360 * schemes.
361 */
362 private void removeWaiter(WaitNode node) {
363 if (node != null) {
364 node.thread = null;
365 retry:
366 for (;;) { // restart on removeWaiter race
367 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
368 s = q.next;
369 if (q.thread != null)
370 pred = q;
371 else if (pred != null) {
372 pred.next = s;
373 if (pred.thread == null) // check for race
374 continue retry;
375 }
376 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
377 continue retry;
378 }
379 break;
380 }
381 }
382 }
383
384 /* ------------- Async tasks -------------- */
385
386 /**
387 * A marker interface identifying asynchronous tasks produced by
388 * {@code async} methods. This may be useful for monitoring,
389 * debugging, and tracking asynchronous activities.
390 *
391 * @since 1.8
392 */
393 public static interface AsynchronousCompletionTask {
394 }
395
396 /** Base class can act as either FJ or plain Runnable */
397 abstract static class Async extends ForkJoinTask<Void>
398 implements Runnable, AsynchronousCompletionTask {
399 public final Void getRawResult() { return null; }
400 public final void setRawResult(Void v) { }
401 public final void run() { exec(); }
402 }
403
404 static final class AsyncRun extends Async {
405 final Runnable fn;
406 final CompletableFuture<Void> dst;
407 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
408 this.fn = fn; this.dst = dst;
409 }
410 public final boolean exec() {
411 CompletableFuture<Void> d; Throwable ex;
412 if ((d = this.dst) != null && d.result == null) {
413 try {
414 fn.run();
415 ex = null;
416 } catch (Throwable rex) {
417 ex = rex;
418 }
419 d.internalComplete(null, ex);
420 }
421 return true;
422 }
423 private static final long serialVersionUID = 5232453952276885070L;
424 }
425
426 static final class AsyncSupply<U> extends Async {
427 final Supplier<U> fn;
428 final CompletableFuture<U> dst;
429 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
430 this.fn = fn; this.dst = dst;
431 }
432 public final boolean exec() {
433 CompletableFuture<U> d; U u; Throwable ex;
434 if ((d = this.dst) != null && d.result == null) {
435 try {
436 u = fn.get();
437 ex = null;
438 } catch (Throwable rex) {
439 ex = rex;
440 u = null;
441 }
442 d.internalComplete(u, ex);
443 }
444 return true;
445 }
446 private static final long serialVersionUID = 5232453952276885070L;
447 }
448
449 static final class AsyncApply<T,U> extends Async {
450 final Function<? super T,? extends U> fn;
451 final T arg;
452 final CompletableFuture<U> dst;
453 AsyncApply(T arg, Function<? super T,? extends U> fn,
454 CompletableFuture<U> dst) {
455 this.arg = arg; this.fn = fn; this.dst = dst;
456 }
457 public final boolean exec() {
458 CompletableFuture<U> d; U u; Throwable ex;
459 if ((d = this.dst) != null && d.result == null) {
460 try {
461 u = fn.apply(arg);
462 ex = null;
463 } catch (Throwable rex) {
464 ex = rex;
465 u = null;
466 }
467 d.internalComplete(u, ex);
468 }
469 return true;
470 }
471 private static final long serialVersionUID = 5232453952276885070L;
472 }
473
474 static final class AsyncBiApply<T,U,V> extends Async {
475 final BiFunction<? super T,? super U,? extends V> fn;
476 final T arg1;
477 final U arg2;
478 final CompletableFuture<V> dst;
479 AsyncBiApply(T arg1, U arg2,
480 BiFunction<? super T,? super U,? extends V> fn,
481 CompletableFuture<V> dst) {
482 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
483 }
484 public final boolean exec() {
485 CompletableFuture<V> d; V v; Throwable ex;
486 if ((d = this.dst) != null && d.result == null) {
487 try {
488 v = fn.apply(arg1, arg2);
489 ex = null;
490 } catch (Throwable rex) {
491 ex = rex;
492 v = null;
493 }
494 d.internalComplete(v, ex);
495 }
496 return true;
497 }
498 private static final long serialVersionUID = 5232453952276885070L;
499 }
500
501 static final class AsyncAccept<T> extends Async {
502 final Consumer<? super T> fn;
503 final T arg;
504 final CompletableFuture<Void> dst;
505 AsyncAccept(T arg, Consumer<? super T> fn,
506 CompletableFuture<Void> dst) {
507 this.arg = arg; this.fn = fn; this.dst = dst;
508 }
509 public final boolean exec() {
510 CompletableFuture<Void> d; Throwable ex;
511 if ((d = this.dst) != null && d.result == null) {
512 try {
513 fn.accept(arg);
514 ex = null;
515 } catch (Throwable rex) {
516 ex = rex;
517 }
518 d.internalComplete(null, ex);
519 }
520 return true;
521 }
522 private static final long serialVersionUID = 5232453952276885070L;
523 }
524
525 static final class AsyncBiAccept<T,U> extends Async {
526 final BiConsumer<? super T,? super U> fn;
527 final T arg1;
528 final U arg2;
529 final CompletableFuture<Void> dst;
530 AsyncBiAccept(T arg1, U arg2,
531 BiConsumer<? super T,? super U> fn,
532 CompletableFuture<Void> dst) {
533 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
534 }
535 public final boolean exec() {
536 CompletableFuture<Void> d; Throwable ex;
537 if ((d = this.dst) != null && d.result == null) {
538 try {
539 fn.accept(arg1, arg2);
540 ex = null;
541 } catch (Throwable rex) {
542 ex = rex;
543 }
544 d.internalComplete(null, ex);
545 }
546 return true;
547 }
548 private static final long serialVersionUID = 5232453952276885070L;
549 }
550
551 static final class AsyncCompose<T,U> extends Async {
552 final Function<? super T, CompletableFuture<U>> fn;
553 final T arg;
554 final CompletableFuture<U> dst;
555 AsyncCompose(T arg,
556 Function<? super T, CompletableFuture<U>> fn,
557 CompletableFuture<U> dst) {
558 this.arg = arg; this.fn = fn; this.dst = dst;
559 }
560 public final boolean exec() {
561 CompletableFuture<U> d, fr; U u; Throwable ex;
562 if ((d = this.dst) != null && d.result == null) {
563 try {
564 fr = fn.apply(arg);
565 ex = (fr == null) ? new NullPointerException() : null;
566 } catch (Throwable rex) {
567 ex = rex;
568 fr = null;
569 }
570 if (ex != null)
571 u = null;
572 else {
573 Object r = fr.result;
574 if (r instanceof AltResult) {
575 ex = ((AltResult)r).ex;
576 u = null;
577 }
578 else {
579 @SuppressWarnings("unchecked") U ur = (U) r;
580 u = ur;
581 }
582 }
583 d.internalComplete(u, ex);
584 }
585 return true;
586 }
587 private static final long serialVersionUID = 5232453952276885070L;
588 }
589
590 /* ------------- Completions -------------- */
591
592 /**
593 * Simple linked list nodes to record completions, used in
594 * basically the same way as WaitNodes. (We separate nodes from
595 * the Completions themselves mainly because for the And and Or
596 * methods, the same Completion object resides in two lists.)
597 */
598 static final class CompletionNode {
599 final Completion completion;
600 volatile CompletionNode next;
601 CompletionNode(Completion completion) { this.completion = completion; }
602 }
603
604 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
605 abstract static class Completion extends AtomicInteger implements Runnable {
606 }
607
608 static final class ApplyCompletion<T,U> extends Completion {
609 final CompletableFuture<? extends T> src;
610 final Function<? super T,? extends U> fn;
611 final CompletableFuture<U> dst;
612 final Executor executor;
613 ApplyCompletion(CompletableFuture<? extends T> src,
614 Function<? super T,? extends U> fn,
615 CompletableFuture<U> dst, Executor executor) {
616 this.src = src; this.fn = fn; this.dst = dst;
617 this.executor = executor;
618 }
619 public final void run() {
620 final CompletableFuture<? extends T> a;
621 final Function<? super T,? extends U> fn;
622 final CompletableFuture<U> dst;
623 Object r; T t; Throwable ex;
624 if ((dst = this.dst) != null &&
625 (fn = this.fn) != null &&
626 (a = this.src) != null &&
627 (r = a.result) != null &&
628 compareAndSet(0, 1)) {
629 if (r instanceof AltResult) {
630 ex = ((AltResult)r).ex;
631 t = null;
632 }
633 else {
634 ex = null;
635 @SuppressWarnings("unchecked") T tr = (T) r;
636 t = tr;
637 }
638 Executor e = executor;
639 U u = null;
640 if (ex == null) {
641 try {
642 if (e != null)
643 e.execute(new AsyncApply<T,U>(t, fn, dst));
644 else
645 u = fn.apply(t);
646 } catch (Throwable rex) {
647 ex = rex;
648 }
649 }
650 if (e == null || ex != null)
651 dst.internalComplete(u, ex);
652 }
653 }
654 private static final long serialVersionUID = 5232453952276885070L;
655 }
656
657 static final class AcceptCompletion<T> extends Completion {
658 final CompletableFuture<? extends T> src;
659 final Consumer<? super T> fn;
660 final CompletableFuture<Void> dst;
661 final Executor executor;
662 AcceptCompletion(CompletableFuture<? extends T> src,
663 Consumer<? super T> fn,
664 CompletableFuture<Void> dst, Executor executor) {
665 this.src = src; this.fn = fn; this.dst = dst;
666 this.executor = executor;
667 }
668 public final void run() {
669 final CompletableFuture<? extends T> a;
670 final Consumer<? super T> fn;
671 final CompletableFuture<Void> dst;
672 Object r; T t; Throwable ex;
673 if ((dst = this.dst) != null &&
674 (fn = this.fn) != null &&
675 (a = this.src) != null &&
676 (r = a.result) != null &&
677 compareAndSet(0, 1)) {
678 if (r instanceof AltResult) {
679 ex = ((AltResult)r).ex;
680 t = null;
681 }
682 else {
683 ex = null;
684 @SuppressWarnings("unchecked") T tr = (T) r;
685 t = tr;
686 }
687 Executor e = executor;
688 if (ex == null) {
689 try {
690 if (e != null)
691 e.execute(new AsyncAccept<T>(t, fn, dst));
692 else
693 fn.accept(t);
694 } catch (Throwable rex) {
695 ex = rex;
696 }
697 }
698 if (e == null || ex != null)
699 dst.internalComplete(null, ex);
700 }
701 }
702 private static final long serialVersionUID = 5232453952276885070L;
703 }
704
705 static final class RunCompletion<T> extends Completion {
706 final CompletableFuture<? extends T> src;
707 final Runnable fn;
708 final CompletableFuture<Void> dst;
709 final Executor executor;
710 RunCompletion(CompletableFuture<? extends T> src,
711 Runnable fn,
712 CompletableFuture<Void> dst,
713 Executor executor) {
714 this.src = src; this.fn = fn; this.dst = dst;
715 this.executor = executor;
716 }
717 public final void run() {
718 final CompletableFuture<? extends T> a;
719 final Runnable fn;
720 final CompletableFuture<Void> dst;
721 Object r; Throwable ex;
722 if ((dst = this.dst) != null &&
723 (fn = this.fn) != null &&
724 (a = this.src) != null &&
725 (r = a.result) != null &&
726 compareAndSet(0, 1)) {
727 if (r instanceof AltResult)
728 ex = ((AltResult)r).ex;
729 else
730 ex = null;
731 Executor e = executor;
732 if (ex == null) {
733 try {
734 if (e != null)
735 e.execute(new AsyncRun(fn, dst));
736 else
737 fn.run();
738 } catch (Throwable rex) {
739 ex = rex;
740 }
741 }
742 if (e == null || ex != null)
743 dst.internalComplete(null, ex);
744 }
745 }
746 private static final long serialVersionUID = 5232453952276885070L;
747 }
748
749 static final class BiApplyCompletion<T,U,V> extends Completion {
750 final CompletableFuture<? extends T> src;
751 final CompletableFuture<? extends U> snd;
752 final BiFunction<? super T,? super U,? extends V> fn;
753 final CompletableFuture<V> dst;
754 final Executor executor;
755 BiApplyCompletion(CompletableFuture<? extends T> src,
756 CompletableFuture<? extends U> snd,
757 BiFunction<? super T,? super U,? extends V> fn,
758 CompletableFuture<V> dst, Executor executor) {
759 this.src = src; this.snd = snd;
760 this.fn = fn; this.dst = dst;
761 this.executor = executor;
762 }
763 public final void run() {
764 final CompletableFuture<? extends T> a;
765 final CompletableFuture<? extends U> b;
766 final BiFunction<? super T,? super U,? extends V> fn;
767 final CompletableFuture<V> dst;
768 Object r, s; T t; U u; Throwable ex;
769 if ((dst = this.dst) != null &&
770 (fn = this.fn) != null &&
771 (a = this.src) != null &&
772 (r = a.result) != null &&
773 (b = this.snd) != null &&
774 (s = b.result) != null &&
775 compareAndSet(0, 1)) {
776 if (r instanceof AltResult) {
777 ex = ((AltResult)r).ex;
778 t = null;
779 }
780 else {
781 ex = null;
782 @SuppressWarnings("unchecked") T tr = (T) r;
783 t = tr;
784 }
785 if (ex != null)
786 u = null;
787 else if (s instanceof AltResult) {
788 ex = ((AltResult)s).ex;
789 u = null;
790 }
791 else {
792 @SuppressWarnings("unchecked") U us = (U) s;
793 u = us;
794 }
795 Executor e = executor;
796 V v = null;
797 if (ex == null) {
798 try {
799 if (e != null)
800 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
801 else
802 v = fn.apply(t, u);
803 } catch (Throwable rex) {
804 ex = rex;
805 }
806 }
807 if (e == null || ex != null)
808 dst.internalComplete(v, ex);
809 }
810 }
811 private static final long serialVersionUID = 5232453952276885070L;
812 }
813
814 static final class BiAcceptCompletion<T,U> extends Completion {
815 final CompletableFuture<? extends T> src;
816 final CompletableFuture<? extends U> snd;
817 final BiConsumer<? super T,? super U> fn;
818 final CompletableFuture<Void> dst;
819 final Executor executor;
820 BiAcceptCompletion(CompletableFuture<? extends T> src,
821 CompletableFuture<? extends U> snd,
822 BiConsumer<? super T,? super U> fn,
823 CompletableFuture<Void> dst, Executor executor) {
824 this.src = src; this.snd = snd;
825 this.fn = fn; this.dst = dst;
826 this.executor = executor;
827 }
828 public final void run() {
829 final CompletableFuture<? extends T> a;
830 final CompletableFuture<? extends U> b;
831 final BiConsumer<? super T,? super U> fn;
832 final CompletableFuture<Void> dst;
833 Object r, s; T t; U u; Throwable ex;
834 if ((dst = this.dst) != null &&
835 (fn = this.fn) != null &&
836 (a = this.src) != null &&
837 (r = a.result) != null &&
838 (b = this.snd) != null &&
839 (s = b.result) != null &&
840 compareAndSet(0, 1)) {
841 if (r instanceof AltResult) {
842 ex = ((AltResult)r).ex;
843 t = null;
844 }
845 else {
846 ex = null;
847 @SuppressWarnings("unchecked") T tr = (T) r;
848 t = tr;
849 }
850 if (ex != null)
851 u = null;
852 else if (s instanceof AltResult) {
853 ex = ((AltResult)s).ex;
854 u = null;
855 }
856 else {
857 @SuppressWarnings("unchecked") U us = (U) s;
858 u = us;
859 }
860 Executor e = executor;
861 if (ex == null) {
862 try {
863 if (e != null)
864 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
865 else
866 fn.accept(t, u);
867 } catch (Throwable rex) {
868 ex = rex;
869 }
870 }
871 if (e == null || ex != null)
872 dst.internalComplete(null, ex);
873 }
874 }
875 private static final long serialVersionUID = 5232453952276885070L;
876 }
877
878 static final class BiRunCompletion<T> extends Completion {
879 final CompletableFuture<? extends T> src;
880 final CompletableFuture<?> snd;
881 final Runnable fn;
882 final CompletableFuture<Void> dst;
883 final Executor executor;
884 BiRunCompletion(CompletableFuture<? extends T> src,
885 CompletableFuture<?> snd,
886 Runnable fn,
887 CompletableFuture<Void> dst, Executor executor) {
888 this.src = src; this.snd = snd;
889 this.fn = fn; this.dst = dst;
890 this.executor = executor;
891 }
892 public final void run() {
893 final CompletableFuture<? extends T> a;
894 final CompletableFuture<?> b;
895 final Runnable fn;
896 final CompletableFuture<Void> dst;
897 Object r, s; Throwable ex;
898 if ((dst = this.dst) != null &&
899 (fn = this.fn) != null &&
900 (a = this.src) != null &&
901 (r = a.result) != null &&
902 (b = this.snd) != null &&
903 (s = b.result) != null &&
904 compareAndSet(0, 1)) {
905 if (r instanceof AltResult)
906 ex = ((AltResult)r).ex;
907 else
908 ex = null;
909 if (ex == null && (s instanceof AltResult))
910 ex = ((AltResult)s).ex;
911 Executor e = executor;
912 if (ex == null) {
913 try {
914 if (e != null)
915 e.execute(new AsyncRun(fn, dst));
916 else
917 fn.run();
918 } catch (Throwable rex) {
919 ex = rex;
920 }
921 }
922 if (e == null || ex != null)
923 dst.internalComplete(null, ex);
924 }
925 }
926 private static final long serialVersionUID = 5232453952276885070L;
927 }
928
929 static final class AndCompletion extends Completion {
930 final CompletableFuture<?> src;
931 final CompletableFuture<?> snd;
932 final CompletableFuture<Void> dst;
933 AndCompletion(CompletableFuture<?> src,
934 CompletableFuture<?> snd,
935 CompletableFuture<Void> dst) {
936 this.src = src; this.snd = snd; this.dst = dst;
937 }
938 public final void run() {
939 final CompletableFuture<?> a;
940 final CompletableFuture<?> b;
941 final CompletableFuture<Void> dst;
942 Object r, s; Throwable ex;
943 if ((dst = this.dst) != null &&
944 (a = this.src) != null &&
945 (r = a.result) != null &&
946 (b = this.snd) != null &&
947 (s = b.result) != null &&
948 compareAndSet(0, 1)) {
949 if (r instanceof AltResult)
950 ex = ((AltResult)r).ex;
951 else
952 ex = null;
953 if (ex == null && (s instanceof AltResult))
954 ex = ((AltResult)s).ex;
955 dst.internalComplete(null, ex);
956 }
957 }
958 private static final long serialVersionUID = 5232453952276885070L;
959 }
960
961 static final class OrApplyCompletion<T,U> extends Completion {
962 final CompletableFuture<? extends T> src;
963 final CompletableFuture<? extends T> snd;
964 final Function<? super T,? extends U> fn;
965 final CompletableFuture<U> dst;
966 final Executor executor;
967 OrApplyCompletion(CompletableFuture<? extends T> src,
968 CompletableFuture<? extends T> snd,
969 Function<? super T,? extends U> fn,
970 CompletableFuture<U> dst, Executor executor) {
971 this.src = src; this.snd = snd;
972 this.fn = fn; this.dst = dst;
973 this.executor = executor;
974 }
975 public final void run() {
976 final CompletableFuture<? extends T> a;
977 final CompletableFuture<? extends T> b;
978 final Function<? super T,? extends U> fn;
979 final CompletableFuture<U> dst;
980 Object r; T t; Throwable ex;
981 if ((dst = this.dst) != null &&
982 (fn = this.fn) != null &&
983 (((a = this.src) != null && (r = a.result) != null) ||
984 ((b = this.snd) != null && (r = b.result) != null)) &&
985 compareAndSet(0, 1)) {
986 if (r instanceof AltResult) {
987 ex = ((AltResult)r).ex;
988 t = null;
989 }
990 else {
991 ex = null;
992 @SuppressWarnings("unchecked") T tr = (T) r;
993 t = tr;
994 }
995 Executor e = executor;
996 U u = null;
997 if (ex == null) {
998 try {
999 if (e != null)
1000 e.execute(new AsyncApply<T,U>(t, fn, dst));
1001 else
1002 u = fn.apply(t);
1003 } catch (Throwable rex) {
1004 ex = rex;
1005 }
1006 }
1007 if (e == null || ex != null)
1008 dst.internalComplete(u, ex);
1009 }
1010 }
1011 private static final long serialVersionUID = 5232453952276885070L;
1012 }
1013
1014 static final class OrAcceptCompletion<T> extends Completion {
1015 final CompletableFuture<? extends T> src;
1016 final CompletableFuture<? extends T> snd;
1017 final Consumer<? super T> fn;
1018 final CompletableFuture<Void> dst;
1019 final Executor executor;
1020 OrAcceptCompletion(CompletableFuture<? extends T> src,
1021 CompletableFuture<? extends T> snd,
1022 Consumer<? super T> fn,
1023 CompletableFuture<Void> dst, Executor executor) {
1024 this.src = src; this.snd = snd;
1025 this.fn = fn; this.dst = dst;
1026 this.executor = executor;
1027 }
1028 public final void run() {
1029 final CompletableFuture<? extends T> a;
1030 final CompletableFuture<? extends T> b;
1031 final Consumer<? super T> fn;
1032 final CompletableFuture<Void> dst;
1033 Object r; T t; Throwable ex;
1034 if ((dst = this.dst) != null &&
1035 (fn = this.fn) != null &&
1036 (((a = this.src) != null && (r = a.result) != null) ||
1037 ((b = this.snd) != null && (r = b.result) != null)) &&
1038 compareAndSet(0, 1)) {
1039 if (r instanceof AltResult) {
1040 ex = ((AltResult)r).ex;
1041 t = null;
1042 }
1043 else {
1044 ex = null;
1045 @SuppressWarnings("unchecked") T tr = (T) r;
1046 t = tr;
1047 }
1048 Executor e = executor;
1049 if (ex == null) {
1050 try {
1051 if (e != null)
1052 e.execute(new AsyncAccept<T>(t, fn, dst));
1053 else
1054 fn.accept(t);
1055 } catch (Throwable rex) {
1056 ex = rex;
1057 }
1058 }
1059 if (e == null || ex != null)
1060 dst.internalComplete(null, ex);
1061 }
1062 }
1063 private static final long serialVersionUID = 5232453952276885070L;
1064 }
1065
1066 static final class OrRunCompletion<T> extends Completion {
1067 final CompletableFuture<? extends T> src;
1068 final CompletableFuture<?> snd;
1069 final Runnable fn;
1070 final CompletableFuture<Void> dst;
1071 final Executor executor;
1072 OrRunCompletion(CompletableFuture<? extends T> src,
1073 CompletableFuture<?> snd,
1074 Runnable fn,
1075 CompletableFuture<Void> dst, Executor executor) {
1076 this.src = src; this.snd = snd;
1077 this.fn = fn; this.dst = dst;
1078 this.executor = executor;
1079 }
1080 public final void run() {
1081 final CompletableFuture<? extends T> a;
1082 final CompletableFuture<?> b;
1083 final Runnable fn;
1084 final CompletableFuture<Void> dst;
1085 Object r; Throwable ex;
1086 if ((dst = this.dst) != null &&
1087 (fn = this.fn) != null &&
1088 (((a = this.src) != null && (r = a.result) != null) ||
1089 ((b = this.snd) != null && (r = b.result) != null)) &&
1090 compareAndSet(0, 1)) {
1091 if (r instanceof AltResult)
1092 ex = ((AltResult)r).ex;
1093 else
1094 ex = null;
1095 Executor e = executor;
1096 if (ex == null) {
1097 try {
1098 if (e != null)
1099 e.execute(new AsyncRun(fn, dst));
1100 else
1101 fn.run();
1102 } catch (Throwable rex) {
1103 ex = rex;
1104 }
1105 }
1106 if (e == null || ex != null)
1107 dst.internalComplete(null, ex);
1108 }
1109 }
1110 private static final long serialVersionUID = 5232453952276885070L;
1111 }
1112
1113 static final class OrCompletion extends Completion {
1114 final CompletableFuture<?> src;
1115 final CompletableFuture<?> snd;
1116 final CompletableFuture<?> dst;
1117 OrCompletion(CompletableFuture<?> src,
1118 CompletableFuture<?> snd,
1119 CompletableFuture<?> dst) {
1120 this.src = src; this.snd = snd; this.dst = dst;
1121 }
1122 public final void run() {
1123 final CompletableFuture<?> a;
1124 final CompletableFuture<?> b;
1125 final CompletableFuture<?> dst;
1126 Object r, t; Throwable ex;
1127 if ((dst = this.dst) != null &&
1128 (((a = this.src) != null && (r = a.result) != null) ||
1129 ((b = this.snd) != null && (r = b.result) != null)) &&
1130 compareAndSet(0, 1)) {
1131 if (r instanceof AltResult) {
1132 ex = ((AltResult)r).ex;
1133 t = null;
1134 }
1135 else {
1136 ex = null;
1137 t = r;
1138 }
1139 dst.internalComplete(t, ex);
1140 }
1141 }
1142 private static final long serialVersionUID = 5232453952276885070L;
1143 }
1144
1145 static final class ExceptionCompletion<T> extends Completion {
1146 final CompletableFuture<? extends T> src;
1147 final Function<? super Throwable, ? extends T> fn;
1148 final CompletableFuture<T> dst;
1149 ExceptionCompletion(CompletableFuture<? extends T> src,
1150 Function<? super Throwable, ? extends T> fn,
1151 CompletableFuture<T> dst) {
1152 this.src = src; this.fn = fn; this.dst = dst;
1153 }
1154 public final void run() {
1155 final CompletableFuture<? extends T> a;
1156 final Function<? super Throwable, ? extends T> fn;
1157 final CompletableFuture<T> dst;
1158 Object r; T t = null; Throwable ex, dx = null;
1159 if ((dst = this.dst) != null &&
1160 (fn = this.fn) != null &&
1161 (a = this.src) != null &&
1162 (r = a.result) != null &&
1163 compareAndSet(0, 1)) {
1164 if ((r instanceof AltResult) &&
1165 (ex = ((AltResult)r).ex) != null) {
1166 try {
1167 t = fn.apply(ex);
1168 } catch (Throwable rex) {
1169 dx = rex;
1170 }
1171 }
1172 else {
1173 @SuppressWarnings("unchecked") T tr = (T) r;
1174 t = tr;
1175 }
1176 dst.internalComplete(t, dx);
1177 }
1178 }
1179 private static final long serialVersionUID = 5232453952276885070L;
1180 }
1181
1182 static final class ThenCopy extends Completion {
1183 final CompletableFuture<?> src;
1184 final CompletableFuture<?> dst;
1185 ThenCopy(CompletableFuture<?> src,
1186 CompletableFuture<?> dst) {
1187 this.src = src; this.dst = dst;
1188 }
1189 public final void run() {
1190 final CompletableFuture<?> a;
1191 final CompletableFuture<?> dst;
1192 Object r; Object t; Throwable ex;
1193 if ((dst = this.dst) != null &&
1194 (a = this.src) != null &&
1195 (r = a.result) != null &&
1196 compareAndSet(0, 1)) {
1197 if (r instanceof AltResult) {
1198 ex = ((AltResult)r).ex;
1199 t = null;
1200 }
1201 else {
1202 ex = null;
1203 t = r;
1204 }
1205 dst.internalComplete(t, ex);
1206 }
1207 }
1208 private static final long serialVersionUID = 5232453952276885070L;
1209 }
1210
1211 static final class HandleCompletion<T,U> extends Completion {
1212 final CompletableFuture<? extends T> src;
1213 final BiFunction<? super T, Throwable, ? extends U> fn;
1214 final CompletableFuture<U> dst;
1215 HandleCompletion(CompletableFuture<? extends T> src,
1216 BiFunction<? super T, Throwable, ? extends U> fn,
1217 final CompletableFuture<U> dst) {
1218 this.src = src; this.fn = fn; this.dst = dst;
1219 }
1220 public final void run() {
1221 final CompletableFuture<? extends T> a;
1222 final BiFunction<? super T, Throwable, ? extends U> fn;
1223 final CompletableFuture<U> dst;
1224 Object r; T t; Throwable ex;
1225 if ((dst = this.dst) != null &&
1226 (fn = this.fn) != null &&
1227 (a = this.src) != null &&
1228 (r = a.result) != null &&
1229 compareAndSet(0, 1)) {
1230 if (r instanceof AltResult) {
1231 ex = ((AltResult)r).ex;
1232 t = null;
1233 }
1234 else {
1235 ex = null;
1236 @SuppressWarnings("unchecked") T tr = (T) r;
1237 t = tr;
1238 }
1239 U u = null; Throwable dx = null;
1240 try {
1241 u = fn.apply(t, ex);
1242 } catch (Throwable rex) {
1243 dx = rex;
1244 }
1245 dst.internalComplete(u, dx);
1246 }
1247 }
1248 private static final long serialVersionUID = 5232453952276885070L;
1249 }
1250
1251 static final class ComposeCompletion<T,U> extends Completion {
1252 final CompletableFuture<? extends T> src;
1253 final Function<? super T, CompletableFuture<U>> fn;
1254 final CompletableFuture<U> dst;
1255 final Executor executor;
1256 ComposeCompletion(CompletableFuture<? extends T> src,
1257 Function<? super T, CompletableFuture<U>> fn,
1258 final CompletableFuture<U> dst, Executor executor) {
1259 this.src = src; this.fn = fn; this.dst = dst;
1260 this.executor = executor;
1261 }
1262 public final void run() {
1263 final CompletableFuture<? extends T> a;
1264 final Function<? super T, CompletableFuture<U>> fn;
1265 final CompletableFuture<U> dst;
1266 Object r; T t; Throwable ex; Executor e;
1267 if ((dst = this.dst) != null &&
1268 (fn = this.fn) != null &&
1269 (a = this.src) != null &&
1270 (r = a.result) != null &&
1271 compareAndSet(0, 1)) {
1272 if (r instanceof AltResult) {
1273 ex = ((AltResult)r).ex;
1274 t = null;
1275 }
1276 else {
1277 ex = null;
1278 @SuppressWarnings("unchecked") T tr = (T) r;
1279 t = tr;
1280 }
1281 CompletableFuture<U> c = null;
1282 U u = null;
1283 boolean complete = false;
1284 if (ex == null) {
1285 if ((e = executor) != null)
1286 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1287 else {
1288 try {
1289 if ((c = fn.apply(t)) == null)
1290 ex = new NullPointerException();
1291 } catch (Throwable rex) {
1292 ex = rex;
1293 }
1294 }
1295 }
1296 if (c != null) {
1297 ThenCopy d = null;
1298 Object s;
1299 if ((s = c.result) == null) {
1300 CompletionNode p = new CompletionNode
1301 (d = new ThenCopy(c, dst));
1302 while ((s = c.result) == null) {
1303 if (UNSAFE.compareAndSwapObject
1304 (c, COMPLETIONS, p.next = c.completions, p))
1305 break;
1306 }
1307 }
1308 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1309 complete = true;
1310 if (s instanceof AltResult) {
1311 ex = ((AltResult)s).ex; // no rewrap
1312 u = null;
1313 }
1314 else {
1315 @SuppressWarnings("unchecked") U us = (U) s;
1316 u = us;
1317 }
1318 }
1319 }
1320 if (complete || ex != null)
1321 dst.internalComplete(u, ex);
1322 if (c != null)
1323 c.helpPostComplete();
1324 }
1325 }
1326 private static final long serialVersionUID = 5232453952276885070L;
1327 }
1328
1329 // public methods
1330
1331 /**
1332 * Creates a new incomplete CompletableFuture.
1333 */
1334 public CompletableFuture() {
1335 }
1336
1337 /**
1338 * Asynchronously executes in the {@link
1339 * ForkJoinPool#commonPool()}, a task that completes the returned
1340 * CompletableFuture with the result of the given Supplier.
1341 *
1342 * @param supplier a function returning the value to be used
1343 * to complete the returned CompletableFuture
1344 * @return the new CompletableFuture
1345 */
1346 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1347 if (supplier == null) throw new NullPointerException();
1348 CompletableFuture<U> f = new CompletableFuture<U>();
1349 ForkJoinPool.commonPool().
1350 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1351 return f;
1352 }
1353
1354 /**
1355 * Asynchronously executes using the given executor, a task that
1356 * completes the returned CompletableFuture with the result of the
1357 * given Supplier.
1358 *
1359 * @param supplier a function returning the value to be used
1360 * to complete the returned CompletableFuture
1361 * @param executor the executor to use for asynchronous execution
1362 * @return the new CompletableFuture
1363 */
1364 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1365 Executor executor) {
1366 if (executor == null || supplier == null)
1367 throw new NullPointerException();
1368 CompletableFuture<U> f = new CompletableFuture<U>();
1369 executor.execute(new AsyncSupply<U>(supplier, f));
1370 return f;
1371 }
1372
1373 /**
1374 * Asynchronously executes in the {@link
1375 * ForkJoinPool#commonPool()} a task that runs the given action,
1376 * and then completes the returned CompletableFuture.
1377 *
1378 * @param runnable the action to run before completing the
1379 * returned CompletableFuture
1380 * @return the new CompletableFuture
1381 */
1382 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1383 if (runnable == null) throw new NullPointerException();
1384 CompletableFuture<Void> f = new CompletableFuture<Void>();
1385 ForkJoinPool.commonPool().
1386 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1387 return f;
1388 }
1389
1390 /**
1391 * Asynchronously executes using the given executor, a task that
1392 * runs the given action, and then completes the returned
1393 * CompletableFuture.
1394 *
1395 * @param runnable the action to run before completing the
1396 * returned CompletableFuture
1397 * @param executor the executor to use for asynchronous execution
1398 * @return the new CompletableFuture
1399 */
1400 public static CompletableFuture<Void> runAsync(Runnable runnable,
1401 Executor executor) {
1402 if (executor == null || runnable == null)
1403 throw new NullPointerException();
1404 CompletableFuture<Void> f = new CompletableFuture<Void>();
1405 executor.execute(new AsyncRun(runnable, f));
1406 return f;
1407 }
1408
1409 /**
1410 * Returns {@code true} if completed in any fashion: normally,
1411 * exceptionally, or via cancellation.
1412 *
1413 * @return {@code true} if completed
1414 */
1415 public boolean isDone() {
1416 return result != null;
1417 }
1418
1419 /**
1420 * Waits if necessary for this future to complete, and then
1421 * returns its result.
1422 *
1423 * @return the result value
1424 * @throws CancellationException if this future was cancelled
1425 * @throws ExecutionException if this future completed exceptionally
1426 * @throws InterruptedException if the current thread was interrupted
1427 * while waiting
1428 */
1429 public T get() throws InterruptedException, ExecutionException {
1430 Object r; Throwable ex, cause;
1431 if ((r = result) == null && (r = waitingGet(true)) == null)
1432 throw new InterruptedException();
1433 if (!(r instanceof AltResult)) {
1434 @SuppressWarnings("unchecked") T tr = (T) r;
1435 return tr;
1436 }
1437 if ((ex = ((AltResult)r).ex) == null)
1438 return null;
1439 if (ex instanceof CancellationException)
1440 throw (CancellationException)ex;
1441 if ((ex instanceof CompletionException) &&
1442 (cause = ex.getCause()) != null)
1443 ex = cause;
1444 throw new ExecutionException(ex);
1445 }
1446
1447 /**
1448 * Waits if necessary for at most the given time for this future
1449 * to complete, and then returns its result, if available.
1450 *
1451 * @param timeout the maximum time to wait
1452 * @param unit the time unit of the timeout argument
1453 * @return the result value
1454 * @throws CancellationException if this future was cancelled
1455 * @throws ExecutionException if this future completed exceptionally
1456 * @throws InterruptedException if the current thread was interrupted
1457 * while waiting
1458 * @throws TimeoutException if the wait timed out
1459 */
1460 public T get(long timeout, TimeUnit unit)
1461 throws InterruptedException, ExecutionException, TimeoutException {
1462 Object r; Throwable ex, cause;
1463 long nanos = unit.toNanos(timeout);
1464 if (Thread.interrupted())
1465 throw new InterruptedException();
1466 if ((r = result) == null)
1467 r = timedAwaitDone(nanos);
1468 if (!(r instanceof AltResult)) {
1469 @SuppressWarnings("unchecked") T tr = (T) r;
1470 return tr;
1471 }
1472 if ((ex = ((AltResult)r).ex) == null)
1473 return null;
1474 if (ex instanceof CancellationException)
1475 throw (CancellationException)ex;
1476 if ((ex instanceof CompletionException) &&
1477 (cause = ex.getCause()) != null)
1478 ex = cause;
1479 throw new ExecutionException(ex);
1480 }
1481
1482 /**
1483 * Returns the result value when complete, or throws an
1484 * (unchecked) exception if completed exceptionally. To better
1485 * conform with the use of common functional forms, if a
1486 * computation involved in the completion of this
1487 * CompletableFuture threw an exception, this method throws an
1488 * (unchecked) {@link CompletionException} with the underlying
1489 * exception as its cause.
1490 *
1491 * @return the result value
1492 * @throws CancellationException if the computation was cancelled
1493 * @throws CompletionException if this future completed
1494 * exceptionally or a completion computation threw an exception
1495 */
1496 public T join() {
1497 Object r; Throwable ex;
1498 if ((r = result) == null)
1499 r = waitingGet(false);
1500 if (!(r instanceof AltResult)) {
1501 @SuppressWarnings("unchecked") T tr = (T) r;
1502 return tr;
1503 }
1504 if ((ex = ((AltResult)r).ex) == null)
1505 return null;
1506 if (ex instanceof CancellationException)
1507 throw (CancellationException)ex;
1508 if (ex instanceof CompletionException)
1509 throw (CompletionException)ex;
1510 throw new CompletionException(ex);
1511 }
1512
1513 /**
1514 * Returns the result value (or throws any encountered exception)
1515 * if completed, else returns the given valueIfAbsent.
1516 *
1517 * @param valueIfAbsent the value to return if not completed
1518 * @return the result value, if completed, else the given valueIfAbsent
1519 * @throws CancellationException if the computation was cancelled
1520 * @throws CompletionException if this future completed
1521 * exceptionally or a completion computation threw an exception
1522 */
1523 public T getNow(T valueIfAbsent) {
1524 Object r; Throwable ex;
1525 if ((r = result) == null)
1526 return valueIfAbsent;
1527 if (!(r instanceof AltResult)) {
1528 @SuppressWarnings("unchecked") T tr = (T) r;
1529 return tr;
1530 }
1531 if ((ex = ((AltResult)r).ex) == null)
1532 return null;
1533 if (ex instanceof CancellationException)
1534 throw (CancellationException)ex;
1535 if (ex instanceof CompletionException)
1536 throw (CompletionException)ex;
1537 throw new CompletionException(ex);
1538 }
1539
1540 /**
1541 * If not already completed, sets the value returned by {@link
1542 * #get()} and related methods to the given value.
1543 *
1544 * @param value the result value
1545 * @return {@code true} if this invocation caused this CompletableFuture
1546 * to transition to a completed state, else {@code false}
1547 */
1548 public boolean complete(T value) {
1549 boolean triggered = result == null &&
1550 UNSAFE.compareAndSwapObject(this, RESULT, null,
1551 value == null ? NIL : value);
1552 postComplete();
1553 return triggered;
1554 }
1555
1556 /**
1557 * If not already completed, causes invocations of {@link #get()}
1558 * and related methods to throw the given exception.
1559 *
1560 * @param ex the exception
1561 * @return {@code true} if this invocation caused this CompletableFuture
1562 * to transition to a completed state, else {@code false}
1563 */
1564 public boolean completeExceptionally(Throwable ex) {
1565 if (ex == null) throw new NullPointerException();
1566 boolean triggered = result == null &&
1567 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1568 postComplete();
1569 return triggered;
1570 }
1571
1572 /**
1573 * Returns a new CompletableFuture that is completed with
1574 * the result of the given function of this CompletableFuture.
1575 * If this CompletableFuture completes exceptionally,
1576 * then the returned CompletableFuture also does so,
1577 * with a CompletionException holding this exception as
1578 * its cause.
1579 *
1580 * @param fn the function to use to compute the value of
1581 * the returned CompletableFuture
1582 * @return the new CompletableFuture
1583 */
1584 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1585 return doThenApply(fn, null);
1586 }
1587
1588 /**
1589 * Returns a new CompletableFuture that is asynchronously
1590 * completed using the {@link ForkJoinPool#commonPool()} with the
1591 * result of the given function of this CompletableFuture. If
1592 * this CompletableFuture completes exceptionally, then the
1593 * returned CompletableFuture also does so, with a
1594 * CompletionException holding this exception as its cause.
1595 *
1596 * @param fn the function to use to compute the value of
1597 * the returned CompletableFuture
1598 * @return the new CompletableFuture
1599 */
1600 public <U> CompletableFuture<U> thenApplyAsync
1601 (Function<? super T,? extends U> fn) {
1602 return doThenApply(fn, ForkJoinPool.commonPool());
1603 }
1604
1605 /**
1606 * Returns a new CompletableFuture that is asynchronously
1607 * completed using the given executor with the result of the given
1608 * function of this CompletableFuture. If this CompletableFuture
1609 * completes exceptionally, then the returned CompletableFuture
1610 * also does so, with a CompletionException holding this exception as
1611 * its cause.
1612 *
1613 * @param fn the function to use to compute the value of
1614 * the returned CompletableFuture
1615 * @param executor the executor to use for asynchronous execution
1616 * @return the new CompletableFuture
1617 */
1618 public <U> CompletableFuture<U> thenApplyAsync
1619 (Function<? super T,? extends U> fn,
1620 Executor executor) {
1621 if (executor == null) throw new NullPointerException();
1622 return doThenApply(fn, executor);
1623 }
1624
1625 private <U> CompletableFuture<U> doThenApply
1626 (Function<? super T,? extends U> fn,
1627 Executor e) {
1628 if (fn == null) throw new NullPointerException();
1629 CompletableFuture<U> dst = new CompletableFuture<U>();
1630 ApplyCompletion<T,U> d = null;
1631 Object r;
1632 if ((r = result) == null) {
1633 CompletionNode p = new CompletionNode
1634 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1635 while ((r = result) == null) {
1636 if (UNSAFE.compareAndSwapObject
1637 (this, COMPLETIONS, p.next = completions, p))
1638 break;
1639 }
1640 }
1641 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1642 T t; Throwable ex;
1643 if (r instanceof AltResult) {
1644 ex = ((AltResult)r).ex;
1645 t = null;
1646 }
1647 else {
1648 ex = null;
1649 @SuppressWarnings("unchecked") T tr = (T) r;
1650 t = tr;
1651 }
1652 U u = null;
1653 if (ex == null) {
1654 try {
1655 if (e != null)
1656 e.execute(new AsyncApply<T,U>(t, fn, dst));
1657 else
1658 u = fn.apply(t);
1659 } catch (Throwable rex) {
1660 ex = rex;
1661 }
1662 }
1663 if (e == null || ex != null)
1664 dst.internalComplete(u, ex);
1665 }
1666 helpPostComplete();
1667 return dst;
1668 }
1669
1670 /**
1671 * Returns a new CompletableFuture that is completed after
1672 * performing the given action with this CompletableFuture's
1673 * result when it completes. If this CompletableFuture
1674 * completes exceptionally, then the returned CompletableFuture
1675 * also does so, with a CompletionException holding this exception as
1676 * its cause.
1677 *
1678 * @param block the action to perform before completing the
1679 * returned CompletableFuture
1680 * @return the new CompletableFuture
1681 */
1682 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1683 return doThenAccept(block, null);
1684 }
1685
1686 /**
1687 * Returns a new CompletableFuture that is asynchronously
1688 * completed using the {@link ForkJoinPool#commonPool()} with this
1689 * CompletableFuture's result when it completes. If this
1690 * CompletableFuture completes exceptionally, then the returned
1691 * CompletableFuture also does so, with a CompletionException holding
1692 * this exception as its cause.
1693 *
1694 * @param block the action to perform before completing the
1695 * returned CompletableFuture
1696 * @return the new CompletableFuture
1697 */
1698 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1699 return doThenAccept(block, ForkJoinPool.commonPool());
1700 }
1701
1702 /**
1703 * Returns a new CompletableFuture that is asynchronously
1704 * completed using the given executor with this
1705 * CompletableFuture's result when it completes. If this
1706 * CompletableFuture completes exceptionally, then the returned
1707 * CompletableFuture also does so, with a CompletionException holding
1708 * this exception as its cause.
1709 *
1710 * @param block the action to perform before completing the
1711 * returned CompletableFuture
1712 * @param executor the executor to use for asynchronous execution
1713 * @return the new CompletableFuture
1714 */
1715 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1716 Executor executor) {
1717 if (executor == null) throw new NullPointerException();
1718 return doThenAccept(block, executor);
1719 }
1720
1721 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1722 Executor e) {
1723 if (fn == null) throw new NullPointerException();
1724 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1725 AcceptCompletion<T> d = null;
1726 Object r;
1727 if ((r = result) == null) {
1728 CompletionNode p = new CompletionNode
1729 (d = new AcceptCompletion<T>(this, fn, dst, e));
1730 while ((r = result) == null) {
1731 if (UNSAFE.compareAndSwapObject
1732 (this, COMPLETIONS, p.next = completions, p))
1733 break;
1734 }
1735 }
1736 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1737 T t; Throwable ex;
1738 if (r instanceof AltResult) {
1739 ex = ((AltResult)r).ex;
1740 t = null;
1741 }
1742 else {
1743 ex = null;
1744 @SuppressWarnings("unchecked") T tr = (T) r;
1745 t = tr;
1746 }
1747 if (ex == null) {
1748 try {
1749 if (e != null)
1750 e.execute(new AsyncAccept<T>(t, fn, dst));
1751 else
1752 fn.accept(t);
1753 } catch (Throwable rex) {
1754 ex = rex;
1755 }
1756 }
1757 if (e == null || ex != null)
1758 dst.internalComplete(null, ex);
1759 }
1760 helpPostComplete();
1761 return dst;
1762 }
1763
1764 /**
1765 * Returns a new CompletableFuture that is completed after
1766 * performing the given action when this CompletableFuture
1767 * completes. If this CompletableFuture completes exceptionally,
1768 * then the returned CompletableFuture also does so, with a
1769 * CompletionException holding this exception as its cause.
1770 *
1771 * @param action the action to perform before completing the
1772 * returned CompletableFuture
1773 * @return the new CompletableFuture
1774 */
1775 public CompletableFuture<Void> thenRun(Runnable action) {
1776 return doThenRun(action, null);
1777 }
1778
1779 /**
1780 * Returns a new CompletableFuture that is asynchronously
1781 * completed using the {@link ForkJoinPool#commonPool()} after
1782 * performing the given action when this CompletableFuture
1783 * completes. If this CompletableFuture completes exceptionally,
1784 * then the returned CompletableFuture also does so, with a
1785 * CompletionException holding this exception as its cause.
1786 *
1787 * @param action the action to perform before completing the
1788 * returned CompletableFuture
1789 * @return the new CompletableFuture
1790 */
1791 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1792 return doThenRun(action, ForkJoinPool.commonPool());
1793 }
1794
1795 /**
1796 * Returns a new CompletableFuture that is asynchronously
1797 * completed using the given executor after performing the given
1798 * action when this CompletableFuture completes. If this
1799 * CompletableFuture completes exceptionally, then the returned
1800 * CompletableFuture also does so, with a CompletionException holding
1801 * this exception as its cause.
1802 *
1803 * @param action the action to perform before completing the
1804 * returned CompletableFuture
1805 * @param executor the executor to use for asynchronous execution
1806 * @return the new CompletableFuture
1807 */
1808 public CompletableFuture<Void> thenRunAsync(Runnable action,
1809 Executor executor) {
1810 if (executor == null) throw new NullPointerException();
1811 return doThenRun(action, executor);
1812 }
1813
1814 private CompletableFuture<Void> doThenRun(Runnable action,
1815 Executor e) {
1816 if (action == null) throw new NullPointerException();
1817 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1818 RunCompletion<T> d = null;
1819 Object r;
1820 if ((r = result) == null) {
1821 CompletionNode p = new CompletionNode
1822 (d = new RunCompletion<T>(this, action, dst, e));
1823 while ((r = result) == null) {
1824 if (UNSAFE.compareAndSwapObject
1825 (this, COMPLETIONS, p.next = completions, p))
1826 break;
1827 }
1828 }
1829 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1830 Throwable ex;
1831 if (r instanceof AltResult)
1832 ex = ((AltResult)r).ex;
1833 else
1834 ex = null;
1835 if (ex == null) {
1836 try {
1837 if (e != null)
1838 e.execute(new AsyncRun(action, dst));
1839 else
1840 action.run();
1841 } catch (Throwable rex) {
1842 ex = rex;
1843 }
1844 }
1845 if (e == null || ex != null)
1846 dst.internalComplete(null, ex);
1847 }
1848 helpPostComplete();
1849 return dst;
1850 }
1851
1852 /**
1853 * Returns a new CompletableFuture that is completed with
1854 * the result of the given function of this and the other given
1855 * CompletableFuture's results when both complete. If this or
1856 * the other CompletableFuture complete exceptionally, then the
1857 * returned CompletableFuture also does so, with a
1858 * CompletionException holding the exception as its cause.
1859 *
1860 * @param other the other CompletableFuture
1861 * @param fn the function to use to compute the value of
1862 * the returned CompletableFuture
1863 * @return the new CompletableFuture
1864 */
1865 public <U,V> CompletableFuture<V> thenCombine
1866 (CompletableFuture<? extends U> other,
1867 BiFunction<? super T,? super U,? extends V> fn) {
1868 return doThenBiApply(other, fn, null);
1869 }
1870
1871 /**
1872 * Returns a new CompletableFuture that is asynchronously
1873 * completed using the {@link ForkJoinPool#commonPool()} with
1874 * the result of the given function of this and the other given
1875 * CompletableFuture's results when both complete. If this or
1876 * the other CompletableFuture complete exceptionally, then the
1877 * returned CompletableFuture also does so, with a
1878 * CompletionException holding the exception as its cause.
1879 *
1880 * @param other the other CompletableFuture
1881 * @param fn the function to use to compute the value of
1882 * the returned CompletableFuture
1883 * @return the new CompletableFuture
1884 */
1885 public <U,V> CompletableFuture<V> thenCombineAsync
1886 (CompletableFuture<? extends U> other,
1887 BiFunction<? super T,? super U,? extends V> fn) {
1888 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1889 }
1890
1891 /**
1892 * Returns a new CompletableFuture that is
1893 * asynchronously completed using the given executor with the
1894 * result of the given function of this and the other given
1895 * CompletableFuture's results when both complete. If this or
1896 * the other CompletableFuture complete exceptionally, then the
1897 * returned CompletableFuture also does so, with a
1898 * CompletionException holding the exception as its cause.
1899 *
1900 * @param other the other CompletableFuture
1901 * @param fn the function to use to compute the value of
1902 * the returned CompletableFuture
1903 * @param executor the executor to use for asynchronous execution
1904 * @return the new CompletableFuture
1905 */
1906 public <U,V> CompletableFuture<V> thenCombineAsync
1907 (CompletableFuture<? extends U> other,
1908 BiFunction<? super T,? super U,? extends V> fn,
1909 Executor executor) {
1910 if (executor == null) throw new NullPointerException();
1911 return doThenBiApply(other, fn, executor);
1912 }
1913
1914 private <U,V> CompletableFuture<V> doThenBiApply
1915 (CompletableFuture<? extends U> other,
1916 BiFunction<? super T,? super U,? extends V> fn,
1917 Executor e) {
1918 if (other == null || fn == null) throw new NullPointerException();
1919 CompletableFuture<V> dst = new CompletableFuture<V>();
1920 BiApplyCompletion<T,U,V> d = null;
1921 Object r, s = null;
1922 if ((r = result) == null || (s = other.result) == null) {
1923 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1924 CompletionNode q = null, p = new CompletionNode(d);
1925 while ((r == null && (r = result) == null) ||
1926 (s == null && (s = other.result) == null)) {
1927 if (q != null) {
1928 if (s != null ||
1929 UNSAFE.compareAndSwapObject
1930 (other, COMPLETIONS, q.next = other.completions, q))
1931 break;
1932 }
1933 else if (r != null ||
1934 UNSAFE.compareAndSwapObject
1935 (this, COMPLETIONS, p.next = completions, p)) {
1936 if (s != null)
1937 break;
1938 q = new CompletionNode(d);
1939 }
1940 }
1941 }
1942 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1943 T t; U u; Throwable ex;
1944 if (r instanceof AltResult) {
1945 ex = ((AltResult)r).ex;
1946 t = null;
1947 }
1948 else {
1949 ex = null;
1950 @SuppressWarnings("unchecked") T tr = (T) r;
1951 t = tr;
1952 }
1953 if (ex != null)
1954 u = null;
1955 else if (s instanceof AltResult) {
1956 ex = ((AltResult)s).ex;
1957 u = null;
1958 }
1959 else {
1960 @SuppressWarnings("unchecked") U us = (U) s;
1961 u = us;
1962 }
1963 V v = null;
1964 if (ex == null) {
1965 try {
1966 if (e != null)
1967 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1968 else
1969 v = fn.apply(t, u);
1970 } catch (Throwable rex) {
1971 ex = rex;
1972 }
1973 }
1974 if (e == null || ex != null)
1975 dst.internalComplete(v, ex);
1976 }
1977 helpPostComplete();
1978 other.helpPostComplete();
1979 return dst;
1980 }
1981
1982 /**
1983 * Returns a new CompletableFuture that is completed with
1984 * the results of this and the other given CompletableFuture if
1985 * both complete. If this and/or the other CompletableFuture
1986 * complete exceptionally, then the returned CompletableFuture
1987 * also does so, with a CompletionException holding one of these
1988 * exceptions as its cause.
1989 *
1990 * @param other the other CompletableFuture
1991 * @param block the action to perform before completing the
1992 * returned CompletableFuture
1993 * @return the new CompletableFuture
1994 */
1995 public <U> CompletableFuture<Void> thenAcceptBoth
1996 (CompletableFuture<? extends U> other,
1997 BiConsumer<? super T, ? super U> block) {
1998 return doThenBiAccept(other, block, null);
1999 }
2000
2001 /**
2002 * Returns a new CompletableFuture that is completed
2003 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2004 * the results of this and the other given CompletableFuture when
2005 * both complete. If this and/or the other CompletableFuture
2006 * complete exceptionally, then the returned CompletableFuture
2007 * also does so, with a CompletionException holding one of these
2008 * exceptions as its cause.
2009 *
2010 * @param other the other CompletableFuture
2011 * @param block the action to perform before completing the
2012 * returned CompletableFuture
2013 * @return the new CompletableFuture
2014 */
2015 public <U> CompletableFuture<Void> thenAcceptBothAsync
2016 (CompletableFuture<? extends U> other,
2017 BiConsumer<? super T, ? super U> block) {
2018 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2019 }
2020
2021 /**
2022 * Returns a new CompletableFuture that is completed
2023 * asynchronously using the given executor with the results of
2024 * this and the other given CompletableFuture when both complete.
2025 * If this and/or the other CompletableFuture complete exceptionally,
2026 * then the returned CompletableFuture also does so, with a
2027 * CompletionException holding one of these exceptions as its cause.
2028 *
2029 * @param other the other CompletableFuture
2030 * @param block the action to perform before completing the
2031 * returned CompletableFuture
2032 * @param executor the executor to use for asynchronous execution
2033 * @return the new CompletableFuture
2034 */
2035 public <U> CompletableFuture<Void> thenAcceptBothAsync
2036 (CompletableFuture<? extends U> other,
2037 BiConsumer<? super T, ? super U> block,
2038 Executor executor) {
2039 if (executor == null) throw new NullPointerException();
2040 return doThenBiAccept(other, block, executor);
2041 }
2042
2043 private <U> CompletableFuture<Void> doThenBiAccept
2044 (CompletableFuture<? extends U> other,
2045 BiConsumer<? super T,? super U> fn,
2046 Executor e) {
2047 if (other == null || fn == null) throw new NullPointerException();
2048 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2049 BiAcceptCompletion<T,U> d = null;
2050 Object r, s = null;
2051 if ((r = result) == null || (s = other.result) == null) {
2052 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2053 CompletionNode q = null, p = new CompletionNode(d);
2054 while ((r == null && (r = result) == null) ||
2055 (s == null && (s = other.result) == null)) {
2056 if (q != null) {
2057 if (s != null ||
2058 UNSAFE.compareAndSwapObject
2059 (other, COMPLETIONS, q.next = other.completions, q))
2060 break;
2061 }
2062 else if (r != null ||
2063 UNSAFE.compareAndSwapObject
2064 (this, COMPLETIONS, p.next = completions, p)) {
2065 if (s != null)
2066 break;
2067 q = new CompletionNode(d);
2068 }
2069 }
2070 }
2071 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2072 T t; U u; Throwable ex;
2073 if (r instanceof AltResult) {
2074 ex = ((AltResult)r).ex;
2075 t = null;
2076 }
2077 else {
2078 ex = null;
2079 @SuppressWarnings("unchecked") T tr = (T) r;
2080 t = tr;
2081 }
2082 if (ex != null)
2083 u = null;
2084 else if (s instanceof AltResult) {
2085 ex = ((AltResult)s).ex;
2086 u = null;
2087 }
2088 else {
2089 @SuppressWarnings("unchecked") U us = (U) s;
2090 u = us;
2091 }
2092 if (ex == null) {
2093 try {
2094 if (e != null)
2095 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2096 else
2097 fn.accept(t, u);
2098 } catch (Throwable rex) {
2099 ex = rex;
2100 }
2101 }
2102 if (e == null || ex != null)
2103 dst.internalComplete(null, ex);
2104 }
2105 helpPostComplete();
2106 other.helpPostComplete();
2107 return dst;
2108 }
2109
2110 /**
2111 * Returns a new CompletableFuture that is completed when
2112 * this and the other given CompletableFuture both complete.
2113 * If this and/or the other CompletableFuture complete exceptionally,
2114 * then the returned CompletableFuture also does so, with a
2115 * CompletionException holding one of these exceptions as its cause.
2116 *
2117 * @param other the other CompletableFuture
2118 * @param action the action to perform before completing the
2119 * returned CompletableFuture
2120 * @return the new CompletableFuture
2121 */
2122 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2123 Runnable action) {
2124 return doThenBiRun(other, action, null);
2125 }
2126
2127 /**
2128 * Returns a new CompletableFuture that is completed
2129 * asynchronously using the {@link ForkJoinPool#commonPool()}
2130 * when this and the other given CompletableFuture both complete.
2131 * If this and/or the other CompletableFuture complete exceptionally,
2132 * then the returned CompletableFuture also does so, with a
2133 * CompletionException holding one of these exceptions as its cause.
2134 *
2135 * @param other the other CompletableFuture
2136 * @param action the action to perform before completing the
2137 * returned CompletableFuture
2138 * @return the new CompletableFuture
2139 */
2140 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2141 Runnable action) {
2142 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2143 }
2144
2145 /**
2146 * Returns a new CompletableFuture that is completed
2147 * asynchronously using the given executor when this and the
2148 * other given CompletableFuture both complete.
2149 * If this and/or the other CompletableFuture complete exceptionally,
2150 * then the returned CompletableFuture also does so, with a
2151 * CompletionException holding one of these exceptions as its cause.
2152 *
2153 * @param other the other CompletableFuture
2154 * @param action the action to perform before completing the
2155 * returned CompletableFuture
2156 * @param executor the executor to use for asynchronous execution
2157 * @return the new CompletableFuture
2158 */
2159 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2160 Runnable action,
2161 Executor executor) {
2162 if (executor == null) throw new NullPointerException();
2163 return doThenBiRun(other, action, executor);
2164 }
2165
2166 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2167 Runnable action,
2168 Executor e) {
2169 if (other == null || action == null) throw new NullPointerException();
2170 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2171 BiRunCompletion<T> d = null;
2172 Object r, s = null;
2173 if ((r = result) == null || (s = other.result) == null) {
2174 d = new BiRunCompletion<T>(this, other, action, dst, e);
2175 CompletionNode q = null, p = new CompletionNode(d);
2176 while ((r == null && (r = result) == null) ||
2177 (s == null && (s = other.result) == null)) {
2178 if (q != null) {
2179 if (s != null ||
2180 UNSAFE.compareAndSwapObject
2181 (other, COMPLETIONS, q.next = other.completions, q))
2182 break;
2183 }
2184 else if (r != null ||
2185 UNSAFE.compareAndSwapObject
2186 (this, COMPLETIONS, p.next = completions, p)) {
2187 if (s != null)
2188 break;
2189 q = new CompletionNode(d);
2190 }
2191 }
2192 }
2193 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2194 Throwable ex;
2195 if (r instanceof AltResult)
2196 ex = ((AltResult)r).ex;
2197 else
2198 ex = null;
2199 if (ex == null && (s instanceof AltResult))
2200 ex = ((AltResult)s).ex;
2201 if (ex == null) {
2202 try {
2203 if (e != null)
2204 e.execute(new AsyncRun(action, dst));
2205 else
2206 action.run();
2207 } catch (Throwable rex) {
2208 ex = rex;
2209 }
2210 }
2211 if (e == null || ex != null)
2212 dst.internalComplete(null, ex);
2213 }
2214 helpPostComplete();
2215 other.helpPostComplete();
2216 return dst;
2217 }
2218
2219 /**
2220 * Returns a new CompletableFuture that is completed with
2221 * the result of the given function of either this or the other
2222 * given CompletableFuture's results when either complete.
2223 * If this and/or the other CompletableFuture complete exceptionally,
2224 * then the returned CompletableFuture may also do so, with a
2225 * CompletionException holding one of these exceptions as its cause.
2226 * No guarantees are made about which result or exception is used
2227 * in the returned CompletableFuture.
2228 *
2229 * @param other the other CompletableFuture
2230 * @param fn the function to use to compute the value of
2231 * the returned CompletableFuture
2232 * @return the new CompletableFuture
2233 */
2234 public <U> CompletableFuture<U> applyToEither
2235 (CompletableFuture<? extends T> other,
2236 Function<? super T, U> fn) {
2237 return doOrApply(other, fn, null);
2238 }
2239
2240 /**
2241 * Returns a new CompletableFuture that is completed
2242 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2243 * the result of the given function of either this or the other
2244 * given CompletableFuture's results when either complete.
2245 * If this and/or the other CompletableFuture complete exceptionally,
2246 * then the returned CompletableFuture may also do so, with a
2247 * CompletionException holding one of these exceptions as its cause.
2248 * No guarantees are made about which result or exception is used
2249 * in the returned CompletableFuture.
2250 *
2251 * @param other the other CompletableFuture
2252 * @param fn the function to use to compute the value of
2253 * the returned CompletableFuture
2254 * @return the new CompletableFuture
2255 */
2256 public <U> CompletableFuture<U> applyToEitherAsync
2257 (CompletableFuture<? extends T> other,
2258 Function<? super T, U> fn) {
2259 return doOrApply(other, fn, ForkJoinPool.commonPool());
2260 }
2261
2262 /**
2263 * Returns a new CompletableFuture that is completed
2264 * asynchronously using the given executor with the result of the
2265 * given function of either this or the other given
2266 * CompletableFuture's results when either complete. If this
2267 * and/or the other CompletableFuture complete exceptionally, then
2268 * the returned CompletableFuture may also do so, with a
2269 * CompletionException holding one of these exceptions as its cause.
2270 * No guarantees are made about which result or exception is used
2271 * in the returned CompletableFuture.
2272 *
2273 * @param other the other CompletableFuture
2274 * @param fn the function to use to compute the value of
2275 * the returned CompletableFuture
2276 * @param executor the executor to use for asynchronous execution
2277 * @return the new CompletableFuture
2278 */
2279 public <U> CompletableFuture<U> applyToEitherAsync
2280 (CompletableFuture<? extends T> other,
2281 Function<? super T, U> fn,
2282 Executor executor) {
2283 if (executor == null) throw new NullPointerException();
2284 return doOrApply(other, fn, executor);
2285 }
2286
2287 private <U> CompletableFuture<U> doOrApply
2288 (CompletableFuture<? extends T> other,
2289 Function<? super T, U> fn,
2290 Executor e) {
2291 if (other == null || fn == null) throw new NullPointerException();
2292 CompletableFuture<U> dst = new CompletableFuture<U>();
2293 OrApplyCompletion<T,U> d = null;
2294 Object r;
2295 if ((r = result) == null && (r = other.result) == null) {
2296 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2297 CompletionNode q = null, p = new CompletionNode(d);
2298 while ((r = result) == null && (r = other.result) == null) {
2299 if (q != null) {
2300 if (UNSAFE.compareAndSwapObject
2301 (other, COMPLETIONS, q.next = other.completions, q))
2302 break;
2303 }
2304 else if (UNSAFE.compareAndSwapObject
2305 (this, COMPLETIONS, p.next = completions, p))
2306 q = new CompletionNode(d);
2307 }
2308 }
2309 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2310 T t; Throwable ex;
2311 if (r instanceof AltResult) {
2312 ex = ((AltResult)r).ex;
2313 t = null;
2314 }
2315 else {
2316 ex = null;
2317 @SuppressWarnings("unchecked") T tr = (T) r;
2318 t = tr;
2319 }
2320 U u = null;
2321 if (ex == null) {
2322 try {
2323 if (e != null)
2324 e.execute(new AsyncApply<T,U>(t, fn, dst));
2325 else
2326 u = fn.apply(t);
2327 } catch (Throwable rex) {
2328 ex = rex;
2329 }
2330 }
2331 if (e == null || ex != null)
2332 dst.internalComplete(u, ex);
2333 }
2334 helpPostComplete();
2335 other.helpPostComplete();
2336 return dst;
2337 }
2338
2339 /**
2340 * Returns a new CompletableFuture that is completed after
2341 * performing the given action with the result of either this or the
2342 * other given CompletableFuture's result, when either complete.
2343 * If this and/or the other CompletableFuture complete exceptionally,
2344 * then the returned CompletableFuture may also do so, with a
2345 * CompletionException holding one of these exceptions as its cause.
2346 * No guarantees are made about which exception is used in the
2347 * returned CompletableFuture.
2348 *
2349 * @param other the other CompletableFuture
2350 * @param block the action to perform before completing the
2351 * returned CompletableFuture
2352 * @return the new CompletableFuture
2353 */
2354 public CompletableFuture<Void> acceptEither
2355 (CompletableFuture<? extends T> other,
2356 Consumer<? super T> block) {
2357 return doOrAccept(other, block, null);
2358 }
2359
2360 /**
2361 * Returns a new CompletableFuture that is completed
2362 * asynchronously using the {@link ForkJoinPool#commonPool()},
2363 * performing the given action with the result of either this or
2364 * the other given CompletableFuture's result, when either complete.
2365 * If this and/or the other CompletableFuture complete exceptionally,
2366 * then the returned CompletableFuture may also do so, with a
2367 * CompletionException holding one of these exceptions as its cause.
2368 * No guarantees are made about which exception is used in the
2369 * returned CompletableFuture.
2370 *
2371 * @param other the other CompletableFuture
2372 * @param block the action to perform before completing the
2373 * returned CompletableFuture
2374 * @return the new CompletableFuture
2375 */
2376 public CompletableFuture<Void> acceptEitherAsync
2377 (CompletableFuture<? extends T> other,
2378 Consumer<? super T> block) {
2379 return doOrAccept(other, block, ForkJoinPool.commonPool());
2380 }
2381
2382 /**
2383 * Returns a new CompletableFuture that is completed
2384 * asynchronously using the given executor, performing the given
2385 * action with the result of either this or the other given
2386 * CompletableFuture's result, when either complete.
2387 * If this and/or the other CompletableFuture complete exceptionally,
2388 * then the returned CompletableFuture may also do so, with a
2389 * CompletionException holding one of these exceptions as its cause.
2390 * No guarantees are made about which exception is used in the
2391 * returned CompletableFuture.
2392 *
2393 * @param other the other CompletableFuture
2394 * @param block the action to perform before completing the
2395 * returned CompletableFuture
2396 * @param executor the executor to use for asynchronous execution
2397 * @return the new CompletableFuture
2398 */
2399 public CompletableFuture<Void> acceptEitherAsync
2400 (CompletableFuture<? extends T> other,
2401 Consumer<? super T> block,
2402 Executor executor) {
2403 if (executor == null) throw new NullPointerException();
2404 return doOrAccept(other, block, executor);
2405 }
2406
2407 private CompletableFuture<Void> doOrAccept
2408 (CompletableFuture<? extends T> other,
2409 Consumer<? super T> fn,
2410 Executor e) {
2411 if (other == null || fn == null) throw new NullPointerException();
2412 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2413 OrAcceptCompletion<T> d = null;
2414 Object r;
2415 if ((r = result) == null && (r = other.result) == null) {
2416 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2417 CompletionNode q = null, p = new CompletionNode(d);
2418 while ((r = result) == null && (r = other.result) == null) {
2419 if (q != null) {
2420 if (UNSAFE.compareAndSwapObject
2421 (other, COMPLETIONS, q.next = other.completions, q))
2422 break;
2423 }
2424 else if (UNSAFE.compareAndSwapObject
2425 (this, COMPLETIONS, p.next = completions, p))
2426 q = new CompletionNode(d);
2427 }
2428 }
2429 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2430 T t; Throwable ex;
2431 if (r instanceof AltResult) {
2432 ex = ((AltResult)r).ex;
2433 t = null;
2434 }
2435 else {
2436 ex = null;
2437 @SuppressWarnings("unchecked") T tr = (T) r;
2438 t = tr;
2439 }
2440 if (ex == null) {
2441 try {
2442 if (e != null)
2443 e.execute(new AsyncAccept<T>(t, fn, dst));
2444 else
2445 fn.accept(t);
2446 } catch (Throwable rex) {
2447 ex = rex;
2448 }
2449 }
2450 if (e == null || ex != null)
2451 dst.internalComplete(null, ex);
2452 }
2453 helpPostComplete();
2454 other.helpPostComplete();
2455 return dst;
2456 }
2457
2458 /**
2459 * Returns a new CompletableFuture that is completed
2460 * after this or the other given CompletableFuture complete.
2461 * If this and/or the other CompletableFuture complete exceptionally,
2462 * then the returned CompletableFuture may also do so, with a
2463 * CompletionException holding one of these exceptions as its cause.
2464 * No guarantees are made about which exception is used in the
2465 * returned CompletableFuture.
2466 *
2467 * @param other the other CompletableFuture
2468 * @param action the action to perform before completing the
2469 * returned CompletableFuture
2470 * @return the new CompletableFuture
2471 */
2472 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2473 Runnable action) {
2474 return doOrRun(other, action, null);
2475 }
2476
2477 /**
2478 * Returns a new CompletableFuture that is completed
2479 * asynchronously using the {@link ForkJoinPool#commonPool()}
2480 * after this or the other given CompletableFuture complete.
2481 * If this and/or the other CompletableFuture complete exceptionally,
2482 * then the returned CompletableFuture may also do so, with a
2483 * CompletionException holding one of these exceptions as its cause.
2484 * No guarantees are made about which exception is used in the
2485 * returned CompletableFuture.
2486 *
2487 * @param other the other CompletableFuture
2488 * @param action the action to perform before completing the
2489 * returned CompletableFuture
2490 * @return the new CompletableFuture
2491 */
2492 public CompletableFuture<Void> runAfterEitherAsync
2493 (CompletableFuture<?> other,
2494 Runnable action) {
2495 return doOrRun(other, action, ForkJoinPool.commonPool());
2496 }
2497
2498 /**
2499 * Returns a new CompletableFuture that is completed
2500 * asynchronously using the given executor after this or the other
2501 * given CompletableFuture complete.
2502 * If this and/or the other CompletableFuture complete exceptionally,
2503 * then the returned CompletableFuture may also do so, with a
2504 * CompletionException holding one of these exceptions as its cause.
2505 * No guarantees are made about which exception is used in the
2506 * returned CompletableFuture.
2507 *
2508 * @param other the other CompletableFuture
2509 * @param action the action to perform before completing the
2510 * returned CompletableFuture
2511 * @param executor the executor to use for asynchronous execution
2512 * @return the new CompletableFuture
2513 */
2514 public CompletableFuture<Void> runAfterEitherAsync
2515 (CompletableFuture<?> other,
2516 Runnable action,
2517 Executor executor) {
2518 if (executor == null) throw new NullPointerException();
2519 return doOrRun(other, action, executor);
2520 }
2521
2522 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2523 Runnable action,
2524 Executor e) {
2525 if (other == null || action == null) throw new NullPointerException();
2526 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2527 OrRunCompletion<T> d = null;
2528 Object r;
2529 if ((r = result) == null && (r = other.result) == null) {
2530 d = new OrRunCompletion<T>(this, other, action, dst, e);
2531 CompletionNode q = null, p = new CompletionNode(d);
2532 while ((r = result) == null && (r = other.result) == null) {
2533 if (q != null) {
2534 if (UNSAFE.compareAndSwapObject
2535 (other, COMPLETIONS, q.next = other.completions, q))
2536 break;
2537 }
2538 else if (UNSAFE.compareAndSwapObject
2539 (this, COMPLETIONS, p.next = completions, p))
2540 q = new CompletionNode(d);
2541 }
2542 }
2543 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2544 Throwable ex;
2545 if (r instanceof AltResult)
2546 ex = ((AltResult)r).ex;
2547 else
2548 ex = null;
2549 if (ex == null) {
2550 try {
2551 if (e != null)
2552 e.execute(new AsyncRun(action, dst));
2553 else
2554 action.run();
2555 } catch (Throwable rex) {
2556 ex = rex;
2557 }
2558 }
2559 if (e == null || ex != null)
2560 dst.internalComplete(null, ex);
2561 }
2562 helpPostComplete();
2563 other.helpPostComplete();
2564 return dst;
2565 }
2566
2567 /**
2568 * Returns a CompletableFuture (or an equivalent one) produced by
2569 * the given function of the result of this CompletableFuture when
2570 * completed. If this CompletableFuture completes exceptionally,
2571 * then the returned CompletableFuture also does so, with a
2572 * CompletionException holding this exception as its cause.
2573 *
2574 * @param fn the function returning a new CompletableFuture
2575 * @return the CompletableFuture, that {@code isDone()} upon
2576 * return if completed by the given function, or an exception
2577 * occurs
2578 */
2579 public <U> CompletableFuture<U> thenCompose
2580 (Function<? super T, CompletableFuture<U>> fn) {
2581 return doCompose(fn, null);
2582 }
2583
2584 /**
2585 * Returns a CompletableFuture (or an equivalent one) produced
2586 * asynchronously using the {@link ForkJoinPool#commonPool()} by
2587 * the given function of the result of this CompletableFuture when
2588 * completed. If this CompletableFuture completes exceptionally,
2589 * then the returned CompletableFuture also does so, with a
2590 * CompletionException holding this exception as its cause.
2591 *
2592 * @param fn the function returning a new CompletableFuture
2593 * @return the CompletableFuture, that {@code isDone()} upon
2594 * return if completed by the given function, or an exception
2595 * occurs
2596 */
2597 public <U> CompletableFuture<U> thenComposeAsync
2598 (Function<? super T, CompletableFuture<U>> fn) {
2599 return doCompose(fn, ForkJoinPool.commonPool());
2600 }
2601
2602 /**
2603 * Returns a CompletableFuture (or an equivalent one) produced
2604 * asynchronously using the given executor by the given function
2605 * of the result of this CompletableFuture when completed.
2606 * If this CompletableFuture completes exceptionally, then the
2607 * returned CompletableFuture also does so, with a
2608 * CompletionException holding this exception as its cause.
2609 *
2610 * @param fn the function returning a new CompletableFuture
2611 * @param executor the executor to use for asynchronous execution
2612 * @return the CompletableFuture, that {@code isDone()} upon
2613 * return if completed by the given function, or an exception
2614 * occurs
2615 */
2616 public <U> CompletableFuture<U> thenComposeAsync
2617 (Function<? super T, CompletableFuture<U>> fn,
2618 Executor executor) {
2619 if (executor == null) throw new NullPointerException();
2620 return doCompose(fn, executor);
2621 }
2622
2623 private <U> CompletableFuture<U> doCompose
2624 (Function<? super T, CompletableFuture<U>> fn,
2625 Executor e) {
2626 if (fn == null) throw new NullPointerException();
2627 CompletableFuture<U> dst = null;
2628 ComposeCompletion<T,U> d = null;
2629 Object r;
2630 if ((r = result) == null) {
2631 dst = new CompletableFuture<U>();
2632 CompletionNode p = new CompletionNode
2633 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2634 while ((r = result) == null) {
2635 if (UNSAFE.compareAndSwapObject
2636 (this, COMPLETIONS, p.next = completions, p))
2637 break;
2638 }
2639 }
2640 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2641 T t; Throwable ex;
2642 if (r instanceof AltResult) {
2643 ex = ((AltResult)r).ex;
2644 t = null;
2645 }
2646 else {
2647 ex = null;
2648 @SuppressWarnings("unchecked") T tr = (T) r;
2649 t = tr;
2650 }
2651 if (ex == null) {
2652 if (e != null) {
2653 if (dst == null)
2654 dst = new CompletableFuture<U>();
2655 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2656 }
2657 else {
2658 try {
2659 if ((dst = fn.apply(t)) == null)
2660 ex = new NullPointerException();
2661 } catch (Throwable rex) {
2662 ex = rex;
2663 }
2664 if (dst == null)
2665 dst = new CompletableFuture<U>();
2666 }
2667 }
2668 if (e == null && ex != null)
2669 dst.internalComplete(null, ex);
2670 }
2671 helpPostComplete();
2672 dst.helpPostComplete();
2673 return dst;
2674 }
2675
2676 /**
2677 * Returns a new CompletableFuture that is completed with
2678 * the result of the given function of the exception triggering
2679 * this CompletableFuture's completion when it completes
2680 * exceptionally; Otherwise, if this CompletableFuture completes
2681 * normally, then the returned CompletableFuture also completes
2682 * normally with the same value.
2683 *
2684 * @param fn the function to use to compute the value of the
2685 * returned CompletableFuture if this CompletableFuture completed
2686 * exceptionally
2687 * @return the new CompletableFuture
2688 */
2689 public CompletableFuture<T> exceptionally
2690 (Function<Throwable, ? extends T> fn) {
2691 if (fn == null) throw new NullPointerException();
2692 CompletableFuture<T> dst = new CompletableFuture<T>();
2693 ExceptionCompletion<T> d = null;
2694 Object r;
2695 if ((r = result) == null) {
2696 CompletionNode p =
2697 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2698 while ((r = result) == null) {
2699 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2700 p.next = completions, p))
2701 break;
2702 }
2703 }
2704 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2705 T t = null; Throwable ex, dx = null;
2706 if (r instanceof AltResult) {
2707 if ((ex = ((AltResult)r).ex) != null) {
2708 try {
2709 t = fn.apply(ex);
2710 } catch (Throwable rex) {
2711 dx = rex;
2712 }
2713 }
2714 }
2715 else {
2716 @SuppressWarnings("unchecked") T tr = (T) r;
2717 t = tr;
2718 }
2719 dst.internalComplete(t, dx);
2720 }
2721 helpPostComplete();
2722 return dst;
2723 }
2724
2725 /**
2726 * Returns a new CompletableFuture that is completed with
2727 * the result of the given function of the result and exception of
2728 * this CompletableFuture's completion when it completes. The
2729 * given function is invoked with the result (or {@code null} if
2730 * none) and the exception (or {@code null} if none) of this
2731 * CompletableFuture when complete.
2732 *
2733 * @param fn the function to use to compute the value of the
2734 * returned CompletableFuture
2735 * @return the new CompletableFuture
2736 */
2737 public <U> CompletableFuture<U> handle
2738 (BiFunction<? super T, Throwable, ? extends U> fn) {
2739 if (fn == null) throw new NullPointerException();
2740 CompletableFuture<U> dst = new CompletableFuture<U>();
2741 HandleCompletion<T,U> d = null;
2742 Object r;
2743 if ((r = result) == null) {
2744 CompletionNode p =
2745 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2746 while ((r = result) == null) {
2747 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2748 p.next = completions, p))
2749 break;
2750 }
2751 }
2752 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2753 T t; Throwable ex;
2754 if (r instanceof AltResult) {
2755 ex = ((AltResult)r).ex;
2756 t = null;
2757 }
2758 else {
2759 ex = null;
2760 @SuppressWarnings("unchecked") T tr = (T) r;
2761 t = tr;
2762 }
2763 U u; Throwable dx;
2764 try {
2765 u = fn.apply(t, ex);
2766 dx = null;
2767 } catch (Throwable rex) {
2768 dx = rex;
2769 u = null;
2770 }
2771 dst.internalComplete(u, dx);
2772 }
2773 helpPostComplete();
2774 return dst;
2775 }
2776
2777
2778 /* ------------- Arbitrary-arity constructions -------------- */
2779
2780 /*
2781 * The basic plan of attack is to recursively form binary
2782 * completion trees of elements. This can be overkill for small
2783 * sets, but scales nicely. The And/All vs Or/Any forms use the
2784 * same idea, but details differ.
2785 */
2786
2787 /**
2788 * Returns a new CompletableFuture that is completed when all of
2789 * the given CompletableFutures complete. If any of the component
2790 * CompletableFutures complete exceptionally, then so does the
2791 * returned CompletableFuture. Otherwise, the results, if any, of
2792 * the component CompletableFutures are not reflected in the
2793 * returned CompletableFuture, but may be obtained by inspecting
2794 * them individually. If the number of components is zero, returns
2795 * a CompletableFuture completed with the value {@code null}.
2796 *
2797 * <p>Among the applications of this method is to await completion
2798 * of a set of independent CompletableFutures before continuing a
2799 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2800 * c3).join();}.
2801 *
2802 * @param cfs the CompletableFutures
2803 * @return a new CompletableFuture that is completed when all of the
2804 * given CompletableFutures complete
2805 * @throws NullPointerException if the array or any of its elements are
2806 * {@code null}
2807 */
2808 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2809 int len = cfs.length; // Directly handle empty and singleton cases
2810 if (len > 1)
2811 return allTree(cfs, 0, len - 1);
2812 else {
2813 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2814 CompletableFuture<?> f;
2815 if (len == 0)
2816 dst.result = NIL;
2817 else if ((f = cfs[0]) == null)
2818 throw new NullPointerException();
2819 else {
2820 ThenCopy d = null;
2821 CompletionNode p = null;
2822 Object r;
2823 while ((r = f.result) == null) {
2824 if (d == null)
2825 d = new ThenCopy(f, dst);
2826 else if (p == null)
2827 p = new CompletionNode(d);
2828 else if (UNSAFE.compareAndSwapObject
2829 (f, COMPLETIONS, p.next = f.completions, p))
2830 break;
2831 }
2832 if (r != null && (d == null || d.compareAndSet(0, 1)))
2833 dst.internalComplete(null, (r instanceof AltResult) ?
2834 ((AltResult)r).ex : null);
2835 f.helpPostComplete();
2836 }
2837 return dst;
2838 }
2839 }
2840
2841 /**
2842 * Recursively constructs an And'ed tree of CompletableFutures.
2843 * Called only when array known to have at least two elements.
2844 */
2845 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2846 int lo, int hi) {
2847 CompletableFuture<?> fst, snd;
2848 int mid = (lo + hi) >>> 1;
2849 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2850 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2851 throw new NullPointerException();
2852 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2853 AndCompletion d = null;
2854 CompletionNode p = null, q = null;
2855 Object r = null, s = null;
2856 while ((r = fst.result) == null || (s = snd.result) == null) {
2857 if (d == null)
2858 d = new AndCompletion(fst, snd, dst);
2859 else if (p == null)
2860 p = new CompletionNode(d);
2861 else if (q == null) {
2862 if (UNSAFE.compareAndSwapObject
2863 (fst, COMPLETIONS, p.next = fst.completions, p))
2864 q = new CompletionNode(d);
2865 }
2866 else if (UNSAFE.compareAndSwapObject
2867 (snd, COMPLETIONS, q.next = snd.completions, q))
2868 break;
2869 }
2870 if ((r != null || (r = fst.result) != null) &&
2871 (s != null || (s = snd.result) != null) &&
2872 (d == null || d.compareAndSet(0, 1))) {
2873 Throwable ex;
2874 if (r instanceof AltResult)
2875 ex = ((AltResult)r).ex;
2876 else
2877 ex = null;
2878 if (ex == null && (s instanceof AltResult))
2879 ex = ((AltResult)s).ex;
2880 dst.internalComplete(null, ex);
2881 }
2882 fst.helpPostComplete();
2883 snd.helpPostComplete();
2884 return dst;
2885 }
2886
2887 /**
2888 * Returns a new CompletableFuture that is completed when any of
2889 * the component CompletableFutures complete; with the same result if
2890 * it completed normally, otherwise exceptionally. If the number
2891 * of components is zero, returns an incomplete CompletableFuture.
2892 *
2893 * @param cfs the CompletableFutures
2894 * @return a new CompletableFuture that is completed when any of the
2895 * given CompletableFutures complete
2896 * @throws NullPointerException if the array or any of its elements are
2897 * {@code null}
2898 */
2899 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2900 int len = cfs.length; // Same idea as allOf
2901 if (len > 1)
2902 return anyTree(cfs, 0, len - 1);
2903 else {
2904 CompletableFuture<?> dst = new CompletableFuture<Object>();
2905 CompletableFuture<?> f;
2906 if (len == 0)
2907 ; // skip
2908 else if ((f = cfs[0]) == null)
2909 throw new NullPointerException();
2910 else {
2911 ThenCopy d = null;
2912 CompletionNode p = null;
2913 Object r;
2914 while ((r = f.result) == null) {
2915 if (d == null)
2916 d = new ThenCopy(f, dst);
2917 else if (p == null)
2918 p = new CompletionNode(d);
2919 else if (UNSAFE.compareAndSwapObject
2920 (f, COMPLETIONS, p.next = f.completions, p))
2921 break;
2922 }
2923 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2924 Throwable ex; Object t;
2925 if (r instanceof AltResult) {
2926 ex = ((AltResult)r).ex;
2927 t = null;
2928 }
2929 else {
2930 ex = null;
2931 t = r;
2932 }
2933 dst.internalComplete(t, ex);
2934 }
2935 f.helpPostComplete();
2936 }
2937 return dst;
2938 }
2939 }
2940
2941 /**
2942 * Recursively constructs an Or'ed tree of CompletableFutures.
2943 */
2944 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
2945 int lo, int hi) {
2946 CompletableFuture<?> fst, snd;
2947 int mid = (lo + hi) >>> 1;
2948 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2949 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2950 throw new NullPointerException();
2951 CompletableFuture<?> dst = new CompletableFuture<Object>();
2952 OrCompletion d = null;
2953 CompletionNode p = null, q = null;
2954 Object r;
2955 while ((r = fst.result) == null && (r = snd.result) == null) {
2956 if (d == null)
2957 d = new OrCompletion(fst, snd, dst);
2958 else if (p == null)
2959 p = new CompletionNode(d);
2960 else if (q == null) {
2961 if (UNSAFE.compareAndSwapObject
2962 (fst, COMPLETIONS, p.next = fst.completions, p))
2963 q = new CompletionNode(d);
2964 }
2965 else if (UNSAFE.compareAndSwapObject
2966 (snd, COMPLETIONS, q.next = snd.completions, q))
2967 break;
2968 }
2969 if ((r != null || (r = fst.result) != null ||
2970 (r = snd.result) != null) &&
2971 (d == null || d.compareAndSet(0, 1))) {
2972 Throwable ex; Object t;
2973 if (r instanceof AltResult) {
2974 ex = ((AltResult)r).ex;
2975 t = null;
2976 }
2977 else {
2978 ex = null;
2979 t = r;
2980 }
2981 dst.internalComplete(t, ex);
2982 }
2983 fst.helpPostComplete();
2984 snd.helpPostComplete();
2985 return dst;
2986 }
2987
2988
2989 /* ------------- Control and status methods -------------- */
2990
2991 /**
2992 * If not already completed, completes this CompletableFuture with
2993 * a {@link CancellationException}. Dependent CompletableFutures
2994 * that have not already completed will also complete
2995 * exceptionally, with a {@link CompletionException} caused by
2996 * this {@code CancellationException}.
2997 *
2998 * @param mayInterruptIfRunning this value has no effect in this
2999 * implementation because interrupts are not used to control
3000 * processing.
3001 *
3002 * @return {@code true} if this task is now cancelled
3003 */
3004 public boolean cancel(boolean mayInterruptIfRunning) {
3005 boolean cancelled = (result == null) &&
3006 UNSAFE.compareAndSwapObject
3007 (this, RESULT, null, new AltResult(new CancellationException()));
3008 postComplete();
3009 return cancelled || isCancelled();
3010 }
3011
3012 /**
3013 * Returns {@code true} if this CompletableFuture was cancelled
3014 * before it completed normally.
3015 *
3016 * @return {@code true} if this CompletableFuture was cancelled
3017 * before it completed normally
3018 */
3019 public boolean isCancelled() {
3020 Object r;
3021 return ((r = result) instanceof AltResult) &&
3022 (((AltResult)r).ex instanceof CancellationException);
3023 }
3024
3025 /**
3026 * Forcibly sets or resets the value subsequently returned by
3027 * method {@link #get()} and related methods, whether or not
3028 * already completed. This method is designed for use only in
3029 * error recovery actions, and even in such situations may result
3030 * in ongoing dependent completions using established versus
3031 * overwritten outcomes.
3032 *
3033 * @param value the completion value
3034 */
3035 public void obtrudeValue(T value) {
3036 result = (value == null) ? NIL : value;
3037 postComplete();
3038 }
3039
3040 /**
3041 * Forcibly causes subsequent invocations of method {@link #get()}
3042 * and related methods to throw the given exception, whether or
3043 * not already completed. This method is designed for use only in
3044 * recovery actions, and even in such situations may result in
3045 * ongoing dependent completions using established versus
3046 * overwritten outcomes.
3047 *
3048 * @param ex the exception
3049 */
3050 public void obtrudeException(Throwable ex) {
3051 if (ex == null) throw new NullPointerException();
3052 result = new AltResult(ex);
3053 postComplete();
3054 }
3055
3056 /**
3057 * Returns the estimated number of CompletableFutures whose
3058 * completions are awaiting completion of this CompletableFuture.
3059 * This method is designed for use in monitoring system state, not
3060 * for synchronization control.
3061 *
3062 * @return the number of dependent CompletableFutures
3063 */
3064 public int getNumberOfDependents() {
3065 int count = 0;
3066 for (CompletionNode p = completions; p != null; p = p.next)
3067 ++count;
3068 return count;
3069 }
3070
3071 /**
3072 * Returns a string identifying this CompletableFuture, as well as
3073 * its completion state. The state, in brackets, contains the
3074 * String {@code "Completed Normally"} or the String {@code
3075 * "Completed Exceptionally"}, or the String {@code "Not
3076 * completed"} followed by the number of CompletableFutures
3077 * dependent upon its completion, if any.
3078 *
3079 * @return a string identifying this CompletableFuture, as well as its state
3080 */
3081 public String toString() {
3082 Object r = result;
3083 int count;
3084 return super.toString() +
3085 ((r == null) ?
3086 (((count = getNumberOfDependents()) == 0) ?
3087 "[Not completed]" :
3088 "[Not completed, " + count + " dependents]") :
3089 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3090 "[Completed exceptionally]" :
3091 "[Completed normally]"));
3092 }
3093
3094 // Unsafe mechanics
3095 private static final sun.misc.Unsafe UNSAFE;
3096 private static final long RESULT;
3097 private static final long WAITERS;
3098 private static final long COMPLETIONS;
3099 static {
3100 try {
3101 UNSAFE = sun.misc.Unsafe.getUnsafe();
3102 Class<?> k = CompletableFuture.class;
3103 RESULT = UNSAFE.objectFieldOffset
3104 (k.getDeclaredField("result"));
3105 WAITERS = UNSAFE.objectFieldOffset
3106 (k.getDeclaredField("waiters"));
3107 COMPLETIONS = UNSAFE.objectFieldOffset
3108 (k.getDeclaredField("completions"));
3109 } catch (Exception e) {
3110 throw new Error(e);
3111 }
3112 }
3113 }