ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.55
Committed: Sat Feb 16 21:25:55 2013 UTC (11 years, 3 months ago) by jsr166
Branch: MAIN
Changes since 1.54: +29 -21 lines
Log Message:
improve javadocs for cancellation and exceptional completion

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on Functions,
37 * Consumers, and Runnables. The appropriate form to use depends on
38 * whether actions require arguments and/or produce results. Actions
39 * may also be triggered after either or both the current and another
40 * CompletableFuture complete. Multiple CompletableFutures may also
41 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
42 * {@link #allOf(CompletableFuture...)}.
43 *
44 * <p>Actions supplied for dependent completions (mainly using methods
45 * with prefix {@code then}) may be performed by the thread that
46 * completes the current CompletableFuture, or by any other caller of
47 * these methods. There are no guarantees about the order of
48 * processing completions unless constrained by these methods.
49 *
50 * <p>Since (unlike {@link FutureTask}) this class has no direct
51 * control over the computation that causes it to be completed,
52 * cancellation is treated as just another form of exceptional completion.
53 * Method {@link #cancel cancel} has the same effect as
54 * {@code completeExceptionally(new CancellationException())}.
55 *
56 * <p>Upon exceptional completion (including cancellation), or when a
57 * completion entails an additional computation which terminates
58 * abruptly with an (unchecked) exception or error, then all of their
59 * dependent completions (and their dependents in turn) generally act
60 * as {@code completeExceptionally} with a {@link CompletionException}
61 * holding that exception as its cause. However, the {@link
62 * #exceptionally exceptionally} and {@link #handle handle}
63 * completions <em>are</em> able to handle exceptional completions of
64 * the CompletableFutures they depend on.
65 *
66 * <p>In case of exceptional completion with a CompletionException,
67 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
68 * {@link ExecutionException} with the same cause as held in the
69 * corresponding CompletionException. However, in these cases,
70 * methods {@link #join()} and {@link #getNow} throw the
71 * CompletionException, which simplifies usage.
72 *
73 * <p>CompletableFutures themselves do not execute asynchronously.
74 * However, the {@code async} methods provide commonly useful ways to
75 * commence asynchronous processing, using either a given {@link
76 * Executor} or by default the {@link ForkJoinPool#commonPool()}, of a
77 * function or action that will result in the completion of a new
78 * CompletableFuture. To simplify monitoring, debugging, and tracking,
79 * all generated asynchronous tasks are instances of the tagging
80 * interface {@link AsynchronousCompletionTask}.
81 *
82 * @author Doug Lea
83 * @since 1.8
84 */
85 public class CompletableFuture<T> implements Future<T> {
86
87 /*
88 * Overview:
89 *
90 * 1. Non-nullness of field result (set via CAS) indicates done.
91 * An AltResult is used to box null as a result, as well as to
92 * hold exceptions. Using a single field makes completion fast
93 * and simple to detect and trigger, at the expense of a lot of
94 * encoding and decoding that infiltrates many methods. One minor
95 * simplification relies on the (static) NIL (to box null results)
96 * being the only AltResult with a null exception field, so we
97 * don't usually need explicit comparisons with NIL. The CF
98 * exception propagation mechanics surrounding decoding rely on
99 * unchecked casts of decoded results really being unchecked,
100 * where user type errors are caught at point of use, as is
101 * currently the case in Java. These are highlighted by using
102 * SuppressWarnings-annotated temporaries.
103 *
104 * 2. Waiters are held in a Treiber stack similar to the one used
105 * in FutureTask, Phaser, and SynchronousQueue. See their
106 * internal documentation for algorithmic details.
107 *
108 * 3. Completions are also kept in a list/stack, and pulled off
109 * and run when completion is triggered. (We could even use the
110 * same stack as for waiters, but would give up the potential
111 * parallelism obtained because woken waiters help release/run
112 * others -- see method postComplete). Because post-processing
113 * may race with direct calls, class Completion opportunistically
114 * extends AtomicInteger so callers can claim the action via
115 * compareAndSet(0, 1). The Completion.run methods are all
116 * written a boringly similar uniform way (that sometimes includes
117 * unnecessary-looking checks, kept to maintain uniformity).
118 * There are enough dimensions upon which they differ that
119 * attempts to factor commonalities while maintaining efficiency
120 * require more lines of code than they would save.
121 *
122 * 4. The exported then/and/or methods do support a bit of
123 * factoring (see doThenApply etc). They must cope with the
124 * intrinsic races surrounding addition of a dependent action
125 * versus performing the action directly because the task is
126 * already complete. For example, a CF may not be complete upon
127 * entry, so a dependent completion is added, but by the time it
128 * is added, the target CF is complete, so must be directly
129 * executed. This is all done while avoiding unnecessary object
130 * construction in safe-bypass cases.
131 */
132
133 // preliminaries
134
135 static final class AltResult {
136 final Throwable ex; // null only for NIL
137 AltResult(Throwable ex) { this.ex = ex; }
138 }
139
140 static final AltResult NIL = new AltResult(null);
141
142 // Fields
143
144 volatile Object result; // Either the result or boxed AltResult
145 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
146 volatile CompletionNode completions; // list (Treiber stack) of completions
147
148 // Basic utilities for triggering and processing completions
149
150 /**
151 * Removes and signals all waiting threads and runs all completions.
152 */
153 final void postComplete() {
154 WaitNode q; Thread t;
155 while ((q = waiters) != null) {
156 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
157 (t = q.thread) != null) {
158 q.thread = null;
159 LockSupport.unpark(t);
160 }
161 }
162
163 CompletionNode h; Completion c;
164 while ((h = completions) != null) {
165 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
166 (c = h.completion) != null)
167 c.run();
168 }
169 }
170
171 /**
172 * Triggers completion with the encoding of the given arguments:
173 * if the exception is non-null, encodes it as a wrapped
174 * CompletionException unless it is one already. Otherwise uses
175 * the given result, boxed as NIL if null.
176 */
177 final void internalComplete(Object v, Throwable ex) {
178 if (result == null)
179 UNSAFE.compareAndSwapObject
180 (this, RESULT, null,
181 (ex == null) ? (v == null) ? NIL : v :
182 new AltResult((ex instanceof CompletionException) ? ex :
183 new CompletionException(ex)));
184 postComplete(); // help out even if not triggered
185 }
186
187 /**
188 * If triggered, helps release and/or process completions.
189 */
190 final void helpPostComplete() {
191 if (result != null)
192 postComplete();
193 }
194
195 /* ------------- waiting for completions -------------- */
196
197 /** Number of processors, for spin control */
198 static final int NCPU = Runtime.getRuntime().availableProcessors();
199
200 /**
201 * Heuristic spin value for waitingGet() before blocking on
202 * multiprocessors
203 */
204 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
205
206 /**
207 * Linked nodes to record waiting threads in a Treiber stack. See
208 * other classes such as Phaser and SynchronousQueue for more
209 * detailed explanation. This class implements ManagedBlocker to
210 * avoid starvation when blocking actions pile up in
211 * ForkJoinPools.
212 */
213 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
214 long nanos; // wait time if timed
215 final long deadline; // non-zero if timed
216 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
217 volatile Thread thread;
218 volatile WaitNode next;
219 WaitNode(boolean interruptible, long nanos, long deadline) {
220 this.thread = Thread.currentThread();
221 this.interruptControl = interruptible ? 1 : 0;
222 this.nanos = nanos;
223 this.deadline = deadline;
224 }
225 public boolean isReleasable() {
226 if (thread == null)
227 return true;
228 if (Thread.interrupted()) {
229 int i = interruptControl;
230 interruptControl = -1;
231 if (i > 0)
232 return true;
233 }
234 if (deadline != 0L &&
235 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
236 thread = null;
237 return true;
238 }
239 return false;
240 }
241 public boolean block() {
242 if (isReleasable())
243 return true;
244 else if (deadline == 0L)
245 LockSupport.park(this);
246 else if (nanos > 0L)
247 LockSupport.parkNanos(this, nanos);
248 return isReleasable();
249 }
250 }
251
252 /**
253 * Returns raw result after waiting, or null if interruptible and
254 * interrupted.
255 */
256 private Object waitingGet(boolean interruptible) {
257 WaitNode q = null;
258 boolean queued = false;
259 int spins = SPINS;
260 for (Object r;;) {
261 if ((r = result) != null) {
262 if (q != null) { // suppress unpark
263 q.thread = null;
264 if (q.interruptControl < 0) {
265 if (interruptible) {
266 removeWaiter(q);
267 return null;
268 }
269 Thread.currentThread().interrupt();
270 }
271 }
272 postComplete(); // help release others
273 return r;
274 }
275 else if (spins > 0) {
276 int rnd = ThreadLocalRandom.nextSecondarySeed();
277 if (rnd == 0)
278 rnd = ThreadLocalRandom.current().nextInt();
279 if (rnd >= 0)
280 --spins;
281 }
282 else if (q == null)
283 q = new WaitNode(interruptible, 0L, 0L);
284 else if (!queued)
285 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
286 q.next = waiters, q);
287 else if (interruptible && q.interruptControl < 0) {
288 removeWaiter(q);
289 return null;
290 }
291 else if (q.thread != null && result == null) {
292 try {
293 ForkJoinPool.managedBlock(q);
294 } catch (InterruptedException ex) {
295 q.interruptControl = -1;
296 }
297 }
298 }
299 }
300
301 /**
302 * Awaits completion or aborts on interrupt or timeout.
303 *
304 * @param nanos time to wait
305 * @return raw result
306 */
307 private Object timedAwaitDone(long nanos)
308 throws InterruptedException, TimeoutException {
309 WaitNode q = null;
310 boolean queued = false;
311 for (Object r;;) {
312 if ((r = result) != null) {
313 if (q != null) {
314 q.thread = null;
315 if (q.interruptControl < 0) {
316 removeWaiter(q);
317 throw new InterruptedException();
318 }
319 }
320 postComplete();
321 return r;
322 }
323 else if (q == null) {
324 if (nanos <= 0L)
325 throw new TimeoutException();
326 long d = System.nanoTime() + nanos;
327 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
328 }
329 else if (!queued)
330 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
331 q.next = waiters, q);
332 else if (q.interruptControl < 0) {
333 removeWaiter(q);
334 throw new InterruptedException();
335 }
336 else if (q.nanos <= 0L) {
337 if (result == null) {
338 removeWaiter(q);
339 throw new TimeoutException();
340 }
341 }
342 else if (q.thread != null && result == null) {
343 try {
344 ForkJoinPool.managedBlock(q);
345 } catch (InterruptedException ex) {
346 q.interruptControl = -1;
347 }
348 }
349 }
350 }
351
352 /**
353 * Tries to unlink a timed-out or interrupted wait node to avoid
354 * accumulating garbage. Internal nodes are simply unspliced
355 * without CAS since it is harmless if they are traversed anyway
356 * by releasers. To avoid effects of unsplicing from already
357 * removed nodes, the list is retraversed in case of an apparent
358 * race. This is slow when there are a lot of nodes, but we don't
359 * expect lists to be long enough to outweigh higher-overhead
360 * schemes.
361 */
362 private void removeWaiter(WaitNode node) {
363 if (node != null) {
364 node.thread = null;
365 retry:
366 for (;;) { // restart on removeWaiter race
367 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
368 s = q.next;
369 if (q.thread != null)
370 pred = q;
371 else if (pred != null) {
372 pred.next = s;
373 if (pred.thread == null) // check for race
374 continue retry;
375 }
376 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
377 continue retry;
378 }
379 break;
380 }
381 }
382 }
383
384 /* ------------- Async tasks -------------- */
385
386 /**
387 * A tagging interface identifying asynchronous tasks produced by
388 * {@code async} methods. This may be useful for monitoring,
389 * debugging, and tracking asynchronous activities.
390 */
391 public static interface AsynchronousCompletionTask {
392 }
393
394 /** Base class can act as either FJ or plain Runnable */
395 abstract static class Async extends ForkJoinTask<Void>
396 implements Runnable, AsynchronousCompletionTask {
397 public final Void getRawResult() { return null; }
398 public final void setRawResult(Void v) { }
399 public final void run() { exec(); }
400 }
401
402 static final class AsyncRun extends Async {
403 final Runnable fn;
404 final CompletableFuture<Void> dst;
405 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
406 this.fn = fn; this.dst = dst;
407 }
408 public final boolean exec() {
409 CompletableFuture<Void> d; Throwable ex;
410 if ((d = this.dst) != null && d.result == null) {
411 try {
412 fn.run();
413 ex = null;
414 } catch (Throwable rex) {
415 ex = rex;
416 }
417 d.internalComplete(null, ex);
418 }
419 return true;
420 }
421 private static final long serialVersionUID = 5232453952276885070L;
422 }
423
424 static final class AsyncSupply<U> extends Async {
425 final Supplier<U> fn;
426 final CompletableFuture<U> dst;
427 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
428 this.fn = fn; this.dst = dst;
429 }
430 public final boolean exec() {
431 CompletableFuture<U> d; U u; Throwable ex;
432 if ((d = this.dst) != null && d.result == null) {
433 try {
434 u = fn.get();
435 ex = null;
436 } catch (Throwable rex) {
437 ex = rex;
438 u = null;
439 }
440 d.internalComplete(u, ex);
441 }
442 return true;
443 }
444 private static final long serialVersionUID = 5232453952276885070L;
445 }
446
447 static final class AsyncApply<T,U> extends Async {
448 final Function<? super T,? extends U> fn;
449 final T arg;
450 final CompletableFuture<U> dst;
451 AsyncApply(T arg, Function<? super T,? extends U> fn,
452 CompletableFuture<U> dst) {
453 this.arg = arg; this.fn = fn; this.dst = dst;
454 }
455 public final boolean exec() {
456 CompletableFuture<U> d; U u; Throwable ex;
457 if ((d = this.dst) != null && d.result == null) {
458 try {
459 u = fn.apply(arg);
460 ex = null;
461 } catch (Throwable rex) {
462 ex = rex;
463 u = null;
464 }
465 d.internalComplete(u, ex);
466 }
467 return true;
468 }
469 private static final long serialVersionUID = 5232453952276885070L;
470 }
471
472 static final class AsyncBiApply<T,U,V> extends Async {
473 final BiFunction<? super T,? super U,? extends V> fn;
474 final T arg1;
475 final U arg2;
476 final CompletableFuture<V> dst;
477 AsyncBiApply(T arg1, U arg2,
478 BiFunction<? super T,? super U,? extends V> fn,
479 CompletableFuture<V> dst) {
480 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
481 }
482 public final boolean exec() {
483 CompletableFuture<V> d; V v; Throwable ex;
484 if ((d = this.dst) != null && d.result == null) {
485 try {
486 v = fn.apply(arg1, arg2);
487 ex = null;
488 } catch (Throwable rex) {
489 ex = rex;
490 v = null;
491 }
492 d.internalComplete(v, ex);
493 }
494 return true;
495 }
496 private static final long serialVersionUID = 5232453952276885070L;
497 }
498
499 static final class AsyncAccept<T> extends Async {
500 final Consumer<? super T> fn;
501 final T arg;
502 final CompletableFuture<Void> dst;
503 AsyncAccept(T arg, Consumer<? super T> fn,
504 CompletableFuture<Void> dst) {
505 this.arg = arg; this.fn = fn; this.dst = dst;
506 }
507 public final boolean exec() {
508 CompletableFuture<Void> d; Throwable ex;
509 if ((d = this.dst) != null && d.result == null) {
510 try {
511 fn.accept(arg);
512 ex = null;
513 } catch (Throwable rex) {
514 ex = rex;
515 }
516 d.internalComplete(null, ex);
517 }
518 return true;
519 }
520 private static final long serialVersionUID = 5232453952276885070L;
521 }
522
523 static final class AsyncBiAccept<T,U> extends Async {
524 final BiConsumer<? super T,? super U> fn;
525 final T arg1;
526 final U arg2;
527 final CompletableFuture<Void> dst;
528 AsyncBiAccept(T arg1, U arg2,
529 BiConsumer<? super T,? super U> fn,
530 CompletableFuture<Void> dst) {
531 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
532 }
533 public final boolean exec() {
534 CompletableFuture<Void> d; Throwable ex;
535 if ((d = this.dst) != null && d.result == null) {
536 try {
537 fn.accept(arg1, arg2);
538 ex = null;
539 } catch (Throwable rex) {
540 ex = rex;
541 }
542 d.internalComplete(null, ex);
543 }
544 return true;
545 }
546 private static final long serialVersionUID = 5232453952276885070L;
547 }
548
549 static final class AsyncCompose<T,U> extends Async {
550 final Function<? super T, CompletableFuture<U>> fn;
551 final T arg;
552 final CompletableFuture<U> dst;
553 AsyncCompose(T arg,
554 Function<? super T, CompletableFuture<U>> fn,
555 CompletableFuture<U> dst) {
556 this.arg = arg; this.fn = fn; this.dst = dst;
557 }
558 public final boolean exec() {
559 CompletableFuture<U> d, fr; U u; Throwable ex;
560 if ((d = this.dst) != null && d.result == null) {
561 try {
562 fr = fn.apply(arg);
563 ex = null;
564 } catch (Throwable rex) {
565 ex = rex;
566 fr = null;
567 }
568 if (ex != null)
569 u = null;
570 else if (fr == null) {
571 ex = new NullPointerException();
572 u = null;
573 }
574 else {
575 Object r = fr.result;
576 if (r instanceof AltResult) {
577 ex = ((AltResult)r).ex;
578 u = null;
579 }
580 else {
581 @SuppressWarnings("unchecked") U ur = (U) r;
582 u = ur;
583 }
584 }
585 d.internalComplete(u, ex);
586 }
587 return true;
588 }
589 private static final long serialVersionUID = 5232453952276885070L;
590 }
591
592 /* ------------- Completions -------------- */
593
594 /**
595 * Simple linked list nodes to record completions, used in
596 * basically the same way as WaitNodes. (We separate nodes from
597 * the Completions themselves mainly because for the And and Or
598 * methods, the same Completion object resides in two lists.)
599 */
600 static final class CompletionNode {
601 final Completion completion;
602 volatile CompletionNode next;
603 CompletionNode(Completion completion) { this.completion = completion; }
604 }
605
606 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
607 abstract static class Completion extends AtomicInteger implements Runnable {
608 }
609
610 static final class ApplyCompletion<T,U> extends Completion {
611 final CompletableFuture<? extends T> src;
612 final Function<? super T,? extends U> fn;
613 final CompletableFuture<U> dst;
614 final Executor executor;
615 ApplyCompletion(CompletableFuture<? extends T> src,
616 Function<? super T,? extends U> fn,
617 CompletableFuture<U> dst, Executor executor) {
618 this.src = src; this.fn = fn; this.dst = dst;
619 this.executor = executor;
620 }
621 public final void run() {
622 final CompletableFuture<? extends T> a;
623 final Function<? super T,? extends U> fn;
624 final CompletableFuture<U> dst;
625 Object r; T t; Throwable ex;
626 if ((dst = this.dst) != null &&
627 (fn = this.fn) != null &&
628 (a = this.src) != null &&
629 (r = a.result) != null &&
630 compareAndSet(0, 1)) {
631 if (r instanceof AltResult) {
632 ex = ((AltResult)r).ex;
633 t = null;
634 }
635 else {
636 ex = null;
637 @SuppressWarnings("unchecked") T tr = (T) r;
638 t = tr;
639 }
640 Executor e = executor;
641 U u = null;
642 if (ex == null) {
643 try {
644 if (e != null)
645 e.execute(new AsyncApply<T,U>(t, fn, dst));
646 else
647 u = fn.apply(t);
648 } catch (Throwable rex) {
649 ex = rex;
650 }
651 }
652 if (e == null || ex != null)
653 dst.internalComplete(u, ex);
654 }
655 }
656 private static final long serialVersionUID = 5232453952276885070L;
657 }
658
659 static final class AcceptCompletion<T> extends Completion {
660 final CompletableFuture<? extends T> src;
661 final Consumer<? super T> fn;
662 final CompletableFuture<Void> dst;
663 final Executor executor;
664 AcceptCompletion(CompletableFuture<? extends T> src,
665 Consumer<? super T> fn,
666 CompletableFuture<Void> dst, Executor executor) {
667 this.src = src; this.fn = fn; this.dst = dst;
668 this.executor = executor;
669 }
670 public final void run() {
671 final CompletableFuture<? extends T> a;
672 final Consumer<? super T> fn;
673 final CompletableFuture<Void> dst;
674 Object r; T t; Throwable ex;
675 if ((dst = this.dst) != null &&
676 (fn = this.fn) != null &&
677 (a = this.src) != null &&
678 (r = a.result) != null &&
679 compareAndSet(0, 1)) {
680 if (r instanceof AltResult) {
681 ex = ((AltResult)r).ex;
682 t = null;
683 }
684 else {
685 ex = null;
686 @SuppressWarnings("unchecked") T tr = (T) r;
687 t = tr;
688 }
689 Executor e = executor;
690 if (ex == null) {
691 try {
692 if (e != null)
693 e.execute(new AsyncAccept<T>(t, fn, dst));
694 else
695 fn.accept(t);
696 } catch (Throwable rex) {
697 ex = rex;
698 }
699 }
700 if (e == null || ex != null)
701 dst.internalComplete(null, ex);
702 }
703 }
704 private static final long serialVersionUID = 5232453952276885070L;
705 }
706
707 static final class RunCompletion<T> extends Completion {
708 final CompletableFuture<? extends T> src;
709 final Runnable fn;
710 final CompletableFuture<Void> dst;
711 final Executor executor;
712 RunCompletion(CompletableFuture<? extends T> src,
713 Runnable fn,
714 CompletableFuture<Void> dst,
715 Executor executor) {
716 this.src = src; this.fn = fn; this.dst = dst;
717 this.executor = executor;
718 }
719 public final void run() {
720 final CompletableFuture<? extends T> a;
721 final Runnable fn;
722 final CompletableFuture<Void> dst;
723 Object r; Throwable ex;
724 if ((dst = this.dst) != null &&
725 (fn = this.fn) != null &&
726 (a = this.src) != null &&
727 (r = a.result) != null &&
728 compareAndSet(0, 1)) {
729 if (r instanceof AltResult)
730 ex = ((AltResult)r).ex;
731 else
732 ex = null;
733 Executor e = executor;
734 if (ex == null) {
735 try {
736 if (e != null)
737 e.execute(new AsyncRun(fn, dst));
738 else
739 fn.run();
740 } catch (Throwable rex) {
741 ex = rex;
742 }
743 }
744 if (e == null || ex != null)
745 dst.internalComplete(null, ex);
746 }
747 }
748 private static final long serialVersionUID = 5232453952276885070L;
749 }
750
751 static final class BiApplyCompletion<T,U,V> extends Completion {
752 final CompletableFuture<? extends T> src;
753 final CompletableFuture<? extends U> snd;
754 final BiFunction<? super T,? super U,? extends V> fn;
755 final CompletableFuture<V> dst;
756 final Executor executor;
757 BiApplyCompletion(CompletableFuture<? extends T> src,
758 CompletableFuture<? extends U> snd,
759 BiFunction<? super T,? super U,? extends V> fn,
760 CompletableFuture<V> dst, Executor executor) {
761 this.src = src; this.snd = snd;
762 this.fn = fn; this.dst = dst;
763 this.executor = executor;
764 }
765 public final void run() {
766 final CompletableFuture<? extends T> a;
767 final CompletableFuture<? extends U> b;
768 final BiFunction<? super T,? super U,? extends V> fn;
769 final CompletableFuture<V> dst;
770 Object r, s; T t; U u; Throwable ex;
771 if ((dst = this.dst) != null &&
772 (fn = this.fn) != null &&
773 (a = this.src) != null &&
774 (r = a.result) != null &&
775 (b = this.snd) != null &&
776 (s = b.result) != null &&
777 compareAndSet(0, 1)) {
778 if (r instanceof AltResult) {
779 ex = ((AltResult)r).ex;
780 t = null;
781 }
782 else {
783 ex = null;
784 @SuppressWarnings("unchecked") T tr = (T) r;
785 t = tr;
786 }
787 if (ex != null)
788 u = null;
789 else if (s instanceof AltResult) {
790 ex = ((AltResult)s).ex;
791 u = null;
792 }
793 else {
794 @SuppressWarnings("unchecked") U us = (U) s;
795 u = us;
796 }
797 Executor e = executor;
798 V v = null;
799 if (ex == null) {
800 try {
801 if (e != null)
802 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
803 else
804 v = fn.apply(t, u);
805 } catch (Throwable rex) {
806 ex = rex;
807 }
808 }
809 if (e == null || ex != null)
810 dst.internalComplete(v, ex);
811 }
812 }
813 private static final long serialVersionUID = 5232453952276885070L;
814 }
815
816 static final class BiAcceptCompletion<T,U> extends Completion {
817 final CompletableFuture<? extends T> src;
818 final CompletableFuture<? extends U> snd;
819 final BiConsumer<? super T,? super U> fn;
820 final CompletableFuture<Void> dst;
821 final Executor executor;
822 BiAcceptCompletion(CompletableFuture<? extends T> src,
823 CompletableFuture<? extends U> snd,
824 BiConsumer<? super T,? super U> fn,
825 CompletableFuture<Void> dst, Executor executor) {
826 this.src = src; this.snd = snd;
827 this.fn = fn; this.dst = dst;
828 this.executor = executor;
829 }
830 public final void run() {
831 final CompletableFuture<? extends T> a;
832 final CompletableFuture<? extends U> b;
833 final BiConsumer<? super T,? super U> fn;
834 final CompletableFuture<Void> dst;
835 Object r, s; T t; U u; Throwable ex;
836 if ((dst = this.dst) != null &&
837 (fn = this.fn) != null &&
838 (a = this.src) != null &&
839 (r = a.result) != null &&
840 (b = this.snd) != null &&
841 (s = b.result) != null &&
842 compareAndSet(0, 1)) {
843 if (r instanceof AltResult) {
844 ex = ((AltResult)r).ex;
845 t = null;
846 }
847 else {
848 ex = null;
849 @SuppressWarnings("unchecked") T tr = (T) r;
850 t = tr;
851 }
852 if (ex != null)
853 u = null;
854 else if (s instanceof AltResult) {
855 ex = ((AltResult)s).ex;
856 u = null;
857 }
858 else {
859 @SuppressWarnings("unchecked") U us = (U) s;
860 u = us;
861 }
862 Executor e = executor;
863 if (ex == null) {
864 try {
865 if (e != null)
866 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
867 else
868 fn.accept(t, u);
869 } catch (Throwable rex) {
870 ex = rex;
871 }
872 }
873 if (e == null || ex != null)
874 dst.internalComplete(null, ex);
875 }
876 }
877 private static final long serialVersionUID = 5232453952276885070L;
878 }
879
880 static final class BiRunCompletion<T> extends Completion {
881 final CompletableFuture<? extends T> src;
882 final CompletableFuture<?> snd;
883 final Runnable fn;
884 final CompletableFuture<Void> dst;
885 final Executor executor;
886 BiRunCompletion(CompletableFuture<? extends T> src,
887 CompletableFuture<?> snd,
888 Runnable fn,
889 CompletableFuture<Void> dst, Executor executor) {
890 this.src = src; this.snd = snd;
891 this.fn = fn; this.dst = dst;
892 this.executor = executor;
893 }
894 public final void run() {
895 final CompletableFuture<? extends T> a;
896 final CompletableFuture<?> b;
897 final Runnable fn;
898 final CompletableFuture<Void> dst;
899 Object r, s; Throwable ex;
900 if ((dst = this.dst) != null &&
901 (fn = this.fn) != null &&
902 (a = this.src) != null &&
903 (r = a.result) != null &&
904 (b = this.snd) != null &&
905 (s = b.result) != null &&
906 compareAndSet(0, 1)) {
907 if (r instanceof AltResult)
908 ex = ((AltResult)r).ex;
909 else
910 ex = null;
911 if (ex == null && (s instanceof AltResult))
912 ex = ((AltResult)s).ex;
913 Executor e = executor;
914 if (ex == null) {
915 try {
916 if (e != null)
917 e.execute(new AsyncRun(fn, dst));
918 else
919 fn.run();
920 } catch (Throwable rex) {
921 ex = rex;
922 }
923 }
924 if (e == null || ex != null)
925 dst.internalComplete(null, ex);
926 }
927 }
928 private static final long serialVersionUID = 5232453952276885070L;
929 }
930
931 static final class AndCompletion extends Completion {
932 final CompletableFuture<?> src;
933 final CompletableFuture<?> snd;
934 final CompletableFuture<Void> dst;
935 AndCompletion(CompletableFuture<?> src,
936 CompletableFuture<?> snd,
937 CompletableFuture<Void> dst) {
938 this.src = src; this.snd = snd; this.dst = dst;
939 }
940 public final void run() {
941 final CompletableFuture<?> a;
942 final CompletableFuture<?> b;
943 final CompletableFuture<Void> dst;
944 Object r, s; Throwable ex;
945 if ((dst = this.dst) != null &&
946 (a = this.src) != null &&
947 (r = a.result) != null &&
948 (b = this.snd) != null &&
949 (s = b.result) != null &&
950 compareAndSet(0, 1)) {
951 if (r instanceof AltResult)
952 ex = ((AltResult)r).ex;
953 else
954 ex = null;
955 if (ex == null && (s instanceof AltResult))
956 ex = ((AltResult)s).ex;
957 dst.internalComplete(null, ex);
958 }
959 }
960 private static final long serialVersionUID = 5232453952276885070L;
961 }
962
963 static final class OrApplyCompletion<T,U> extends Completion {
964 final CompletableFuture<? extends T> src;
965 final CompletableFuture<? extends T> snd;
966 final Function<? super T,? extends U> fn;
967 final CompletableFuture<U> dst;
968 final Executor executor;
969 OrApplyCompletion(CompletableFuture<? extends T> src,
970 CompletableFuture<? extends T> snd,
971 Function<? super T,? extends U> fn,
972 CompletableFuture<U> dst, Executor executor) {
973 this.src = src; this.snd = snd;
974 this.fn = fn; this.dst = dst;
975 this.executor = executor;
976 }
977 public final void run() {
978 final CompletableFuture<? extends T> a;
979 final CompletableFuture<? extends T> b;
980 final Function<? super T,? extends U> fn;
981 final CompletableFuture<U> dst;
982 Object r; T t; Throwable ex;
983 if ((dst = this.dst) != null &&
984 (fn = this.fn) != null &&
985 (((a = this.src) != null && (r = a.result) != null) ||
986 ((b = this.snd) != null && (r = b.result) != null)) &&
987 compareAndSet(0, 1)) {
988 if (r instanceof AltResult) {
989 ex = ((AltResult)r).ex;
990 t = null;
991 }
992 else {
993 ex = null;
994 @SuppressWarnings("unchecked") T tr = (T) r;
995 t = tr;
996 }
997 Executor e = executor;
998 U u = null;
999 if (ex == null) {
1000 try {
1001 if (e != null)
1002 e.execute(new AsyncApply<T,U>(t, fn, dst));
1003 else
1004 u = fn.apply(t);
1005 } catch (Throwable rex) {
1006 ex = rex;
1007 }
1008 }
1009 if (e == null || ex != null)
1010 dst.internalComplete(u, ex);
1011 }
1012 }
1013 private static final long serialVersionUID = 5232453952276885070L;
1014 }
1015
1016 static final class OrAcceptCompletion<T> extends Completion {
1017 final CompletableFuture<? extends T> src;
1018 final CompletableFuture<? extends T> snd;
1019 final Consumer<? super T> fn;
1020 final CompletableFuture<Void> dst;
1021 final Executor executor;
1022 OrAcceptCompletion(CompletableFuture<? extends T> src,
1023 CompletableFuture<? extends T> snd,
1024 Consumer<? super T> fn,
1025 CompletableFuture<Void> dst, Executor executor) {
1026 this.src = src; this.snd = snd;
1027 this.fn = fn; this.dst = dst;
1028 this.executor = executor;
1029 }
1030 public final void run() {
1031 final CompletableFuture<? extends T> a;
1032 final CompletableFuture<? extends T> b;
1033 final Consumer<? super T> fn;
1034 final CompletableFuture<Void> dst;
1035 Object r; T t; Throwable ex;
1036 if ((dst = this.dst) != null &&
1037 (fn = this.fn) != null &&
1038 (((a = this.src) != null && (r = a.result) != null) ||
1039 ((b = this.snd) != null && (r = b.result) != null)) &&
1040 compareAndSet(0, 1)) {
1041 if (r instanceof AltResult) {
1042 ex = ((AltResult)r).ex;
1043 t = null;
1044 }
1045 else {
1046 ex = null;
1047 @SuppressWarnings("unchecked") T tr = (T) r;
1048 t = tr;
1049 }
1050 Executor e = executor;
1051 if (ex == null) {
1052 try {
1053 if (e != null)
1054 e.execute(new AsyncAccept<T>(t, fn, dst));
1055 else
1056 fn.accept(t);
1057 } catch (Throwable rex) {
1058 ex = rex;
1059 }
1060 }
1061 if (e == null || ex != null)
1062 dst.internalComplete(null, ex);
1063 }
1064 }
1065 private static final long serialVersionUID = 5232453952276885070L;
1066 }
1067
1068 static final class OrRunCompletion<T> extends Completion {
1069 final CompletableFuture<? extends T> src;
1070 final CompletableFuture<?> snd;
1071 final Runnable fn;
1072 final CompletableFuture<Void> dst;
1073 final Executor executor;
1074 OrRunCompletion(CompletableFuture<? extends T> src,
1075 CompletableFuture<?> snd,
1076 Runnable fn,
1077 CompletableFuture<Void> dst, Executor executor) {
1078 this.src = src; this.snd = snd;
1079 this.fn = fn; this.dst = dst;
1080 this.executor = executor;
1081 }
1082 public final void run() {
1083 final CompletableFuture<? extends T> a;
1084 final CompletableFuture<?> b;
1085 final Runnable fn;
1086 final CompletableFuture<Void> dst;
1087 Object r; Throwable ex;
1088 if ((dst = this.dst) != null &&
1089 (fn = this.fn) != null &&
1090 (((a = this.src) != null && (r = a.result) != null) ||
1091 ((b = this.snd) != null && (r = b.result) != null)) &&
1092 compareAndSet(0, 1)) {
1093 if (r instanceof AltResult)
1094 ex = ((AltResult)r).ex;
1095 else
1096 ex = null;
1097 Executor e = executor;
1098 if (ex == null) {
1099 try {
1100 if (e != null)
1101 e.execute(new AsyncRun(fn, dst));
1102 else
1103 fn.run();
1104 } catch (Throwable rex) {
1105 ex = rex;
1106 }
1107 }
1108 if (e == null || ex != null)
1109 dst.internalComplete(null, ex);
1110 }
1111 }
1112 private static final long serialVersionUID = 5232453952276885070L;
1113 }
1114
1115 static final class OrCompletion extends Completion {
1116 final CompletableFuture<?> src;
1117 final CompletableFuture<?> snd;
1118 final CompletableFuture<?> dst;
1119 OrCompletion(CompletableFuture<?> src,
1120 CompletableFuture<?> snd,
1121 CompletableFuture<?> dst) {
1122 this.src = src; this.snd = snd; this.dst = dst;
1123 }
1124 public final void run() {
1125 final CompletableFuture<?> a;
1126 final CompletableFuture<?> b;
1127 final CompletableFuture<?> dst;
1128 Object r, t; Throwable ex;
1129 if ((dst = this.dst) != null &&
1130 (((a = this.src) != null && (r = a.result) != null) ||
1131 ((b = this.snd) != null && (r = b.result) != null)) &&
1132 compareAndSet(0, 1)) {
1133 if (r instanceof AltResult) {
1134 ex = ((AltResult)r).ex;
1135 t = null;
1136 }
1137 else {
1138 ex = null;
1139 t = r;
1140 }
1141 dst.internalComplete(t, ex);
1142 }
1143 }
1144 private static final long serialVersionUID = 5232453952276885070L;
1145 }
1146
1147 static final class ExceptionCompletion<T> extends Completion {
1148 final CompletableFuture<? extends T> src;
1149 final Function<? super Throwable, ? extends T> fn;
1150 final CompletableFuture<T> dst;
1151 ExceptionCompletion(CompletableFuture<? extends T> src,
1152 Function<? super Throwable, ? extends T> fn,
1153 CompletableFuture<T> dst) {
1154 this.src = src; this.fn = fn; this.dst = dst;
1155 }
1156 public final void run() {
1157 final CompletableFuture<? extends T> a;
1158 final Function<? super Throwable, ? extends T> fn;
1159 final CompletableFuture<T> dst;
1160 Object r; T t = null; Throwable ex, dx = null;
1161 if ((dst = this.dst) != null &&
1162 (fn = this.fn) != null &&
1163 (a = this.src) != null &&
1164 (r = a.result) != null &&
1165 compareAndSet(0, 1)) {
1166 if ((r instanceof AltResult) &&
1167 (ex = ((AltResult)r).ex) != null) {
1168 try {
1169 t = fn.apply(ex);
1170 } catch (Throwable rex) {
1171 dx = rex;
1172 }
1173 }
1174 else {
1175 @SuppressWarnings("unchecked") T tr = (T) r;
1176 t = tr;
1177 }
1178 dst.internalComplete(t, dx);
1179 }
1180 }
1181 private static final long serialVersionUID = 5232453952276885070L;
1182 }
1183
1184 static final class ThenCopy extends Completion {
1185 final CompletableFuture<?> src;
1186 final CompletableFuture<?> dst;
1187 ThenCopy(CompletableFuture<?> src,
1188 CompletableFuture<?> dst) {
1189 this.src = src; this.dst = dst;
1190 }
1191 public final void run() {
1192 final CompletableFuture<?> a;
1193 final CompletableFuture<?> dst;
1194 Object r; Object t; Throwable ex;
1195 if ((dst = this.dst) != null &&
1196 (a = this.src) != null &&
1197 (r = a.result) != null &&
1198 compareAndSet(0, 1)) {
1199 if (r instanceof AltResult) {
1200 ex = ((AltResult)r).ex;
1201 t = null;
1202 }
1203 else {
1204 ex = null;
1205 t = r;
1206 }
1207 dst.internalComplete(t, ex);
1208 }
1209 }
1210 private static final long serialVersionUID = 5232453952276885070L;
1211 }
1212
1213 static final class HandleCompletion<T,U> extends Completion {
1214 final CompletableFuture<? extends T> src;
1215 final BiFunction<? super T, Throwable, ? extends U> fn;
1216 final CompletableFuture<U> dst;
1217 HandleCompletion(CompletableFuture<? extends T> src,
1218 BiFunction<? super T, Throwable, ? extends U> fn,
1219 final CompletableFuture<U> dst) {
1220 this.src = src; this.fn = fn; this.dst = dst;
1221 }
1222 public final void run() {
1223 final CompletableFuture<? extends T> a;
1224 final BiFunction<? super T, Throwable, ? extends U> fn;
1225 final CompletableFuture<U> dst;
1226 Object r; T t; Throwable ex;
1227 if ((dst = this.dst) != null &&
1228 (fn = this.fn) != null &&
1229 (a = this.src) != null &&
1230 (r = a.result) != null &&
1231 compareAndSet(0, 1)) {
1232 if (r instanceof AltResult) {
1233 ex = ((AltResult)r).ex;
1234 t = null;
1235 }
1236 else {
1237 ex = null;
1238 @SuppressWarnings("unchecked") T tr = (T) r;
1239 t = tr;
1240 }
1241 U u = null; Throwable dx = null;
1242 try {
1243 u = fn.apply(t, ex);
1244 } catch (Throwable rex) {
1245 dx = rex;
1246 }
1247 dst.internalComplete(u, dx);
1248 }
1249 }
1250 private static final long serialVersionUID = 5232453952276885070L;
1251 }
1252
1253 static final class ComposeCompletion<T,U> extends Completion {
1254 final CompletableFuture<? extends T> src;
1255 final Function<? super T, CompletableFuture<U>> fn;
1256 final CompletableFuture<U> dst;
1257 final Executor executor;
1258 ComposeCompletion(CompletableFuture<? extends T> src,
1259 Function<? super T, CompletableFuture<U>> fn,
1260 final CompletableFuture<U> dst, Executor executor) {
1261 this.src = src; this.fn = fn; this.dst = dst;
1262 this.executor = executor;
1263 }
1264 public final void run() {
1265 final CompletableFuture<? extends T> a;
1266 final Function<? super T, CompletableFuture<U>> fn;
1267 final CompletableFuture<U> dst;
1268 Object r; T t; Throwable ex; Executor e;
1269 if ((dst = this.dst) != null &&
1270 (fn = this.fn) != null &&
1271 (a = this.src) != null &&
1272 (r = a.result) != null &&
1273 compareAndSet(0, 1)) {
1274 if (r instanceof AltResult) {
1275 ex = ((AltResult)r).ex;
1276 t = null;
1277 }
1278 else {
1279 ex = null;
1280 @SuppressWarnings("unchecked") T tr = (T) r;
1281 t = tr;
1282 }
1283 CompletableFuture<U> c = null;
1284 U u = null;
1285 boolean complete = false;
1286 if (ex == null) {
1287 if ((e = executor) != null)
1288 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1289 else {
1290 try {
1291 if ((c = fn.apply(t)) == null)
1292 ex = new NullPointerException();
1293 } catch (Throwable rex) {
1294 ex = rex;
1295 }
1296 }
1297 }
1298 if (c != null) {
1299 ThenCopy d = null;
1300 Object s;
1301 if ((s = c.result) == null) {
1302 CompletionNode p = new CompletionNode
1303 (d = new ThenCopy(c, dst));
1304 while ((s = c.result) == null) {
1305 if (UNSAFE.compareAndSwapObject
1306 (c, COMPLETIONS, p.next = c.completions, p))
1307 break;
1308 }
1309 }
1310 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1311 complete = true;
1312 if (s instanceof AltResult) {
1313 ex = ((AltResult)s).ex; // no rewrap
1314 u = null;
1315 }
1316 else {
1317 @SuppressWarnings("unchecked") U us = (U) s;
1318 u = us;
1319 }
1320 }
1321 }
1322 if (complete || ex != null)
1323 dst.internalComplete(u, ex);
1324 if (c != null)
1325 c.helpPostComplete();
1326 }
1327 }
1328 private static final long serialVersionUID = 5232453952276885070L;
1329 }
1330
1331 // public methods
1332
1333 /**
1334 * Creates a new incomplete CompletableFuture.
1335 */
1336 public CompletableFuture() {
1337 }
1338
1339 /**
1340 * Asynchronously executes in the {@link
1341 * ForkJoinPool#commonPool()}, a task that completes the returned
1342 * CompletableFuture with the result of the given Supplier.
1343 *
1344 * @param supplier a function returning the value to be used
1345 * to complete the returned CompletableFuture
1346 * @return the CompletableFuture
1347 */
1348 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1349 if (supplier == null) throw new NullPointerException();
1350 CompletableFuture<U> f = new CompletableFuture<U>();
1351 ForkJoinPool.commonPool().
1352 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1353 return f;
1354 }
1355
1356 /**
1357 * Asynchronously executes using the given executor, a task that
1358 * completes the returned CompletableFuture with the result of the
1359 * given Supplier.
1360 *
1361 * @param supplier a function returning the value to be used
1362 * to complete the returned CompletableFuture
1363 * @param executor the executor to use for asynchronous execution
1364 * @return the CompletableFuture
1365 */
1366 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1367 Executor executor) {
1368 if (executor == null || supplier == null)
1369 throw new NullPointerException();
1370 CompletableFuture<U> f = new CompletableFuture<U>();
1371 executor.execute(new AsyncSupply<U>(supplier, f));
1372 return f;
1373 }
1374
1375 /**
1376 * Asynchronously executes in the {@link
1377 * ForkJoinPool#commonPool()} a task that runs the given action,
1378 * and then completes the returned CompletableFuture.
1379 *
1380 * @param runnable the action to run before completing the
1381 * returned CompletableFuture
1382 * @return the CompletableFuture
1383 */
1384 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1385 if (runnable == null) throw new NullPointerException();
1386 CompletableFuture<Void> f = new CompletableFuture<Void>();
1387 ForkJoinPool.commonPool().
1388 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1389 return f;
1390 }
1391
1392 /**
1393 * Asynchronously executes using the given executor, a task that
1394 * runs the given action, and then completes the returned
1395 * CompletableFuture.
1396 *
1397 * @param runnable the action to run before completing the
1398 * returned CompletableFuture
1399 * @param executor the executor to use for asynchronous execution
1400 * @return the CompletableFuture
1401 */
1402 public static CompletableFuture<Void> runAsync(Runnable runnable,
1403 Executor executor) {
1404 if (executor == null || runnable == null)
1405 throw new NullPointerException();
1406 CompletableFuture<Void> f = new CompletableFuture<Void>();
1407 executor.execute(new AsyncRun(runnable, f));
1408 return f;
1409 }
1410
1411 /**
1412 * Returns {@code true} if completed in any fashion: normally,
1413 * exceptionally, or via cancellation.
1414 *
1415 * @return {@code true} if completed
1416 */
1417 public boolean isDone() {
1418 return result != null;
1419 }
1420
1421 /**
1422 * Waits if necessary for this future to complete, and then
1423 * returns its result.
1424 *
1425 * @return the result value
1426 * @throws CancellationException if this future was cancelled
1427 * @throws ExecutionException if this future completed exceptionally
1428 * @throws InterruptedException if the current thread was interrupted
1429 * while waiting
1430 */
1431 public T get() throws InterruptedException, ExecutionException {
1432 Object r; Throwable ex, cause;
1433 if ((r = result) == null && (r = waitingGet(true)) == null)
1434 throw new InterruptedException();
1435 if (!(r instanceof AltResult)) {
1436 @SuppressWarnings("unchecked") T tr = (T) r;
1437 return tr;
1438 }
1439 if ((ex = ((AltResult)r).ex) == null)
1440 return null;
1441 if (ex instanceof CancellationException)
1442 throw (CancellationException)ex;
1443 if ((ex instanceof CompletionException) &&
1444 (cause = ex.getCause()) != null)
1445 ex = cause;
1446 throw new ExecutionException(ex);
1447 }
1448
1449 /**
1450 * Waits if necessary for at most the given time for this future
1451 * to complete, and then returns its result, if available.
1452 *
1453 * @param timeout the maximum time to wait
1454 * @param unit the time unit of the timeout argument
1455 * @return the result value
1456 * @throws CancellationException if this future was cancelled
1457 * @throws ExecutionException if this future completed exceptionally
1458 * @throws InterruptedException if the current thread was interrupted
1459 * while waiting
1460 * @throws TimeoutException if the wait timed out
1461 */
1462 public T get(long timeout, TimeUnit unit)
1463 throws InterruptedException, ExecutionException, TimeoutException {
1464 Object r; Throwable ex, cause;
1465 long nanos = unit.toNanos(timeout);
1466 if (Thread.interrupted())
1467 throw new InterruptedException();
1468 if ((r = result) == null)
1469 r = timedAwaitDone(nanos);
1470 if (!(r instanceof AltResult)) {
1471 @SuppressWarnings("unchecked") T tr = (T) r;
1472 return tr;
1473 }
1474 if ((ex = ((AltResult)r).ex) == null)
1475 return null;
1476 if (ex instanceof CancellationException)
1477 throw (CancellationException)ex;
1478 if ((ex instanceof CompletionException) &&
1479 (cause = ex.getCause()) != null)
1480 ex = cause;
1481 throw new ExecutionException(ex);
1482 }
1483
1484 /**
1485 * Returns the result value when complete, or throws an
1486 * (unchecked) exception if completed exceptionally. To better
1487 * conform with the use of common functional forms, if a
1488 * computation involved in the completion of this
1489 * CompletableFuture threw an exception, this method throws an
1490 * (unchecked) {@link CompletionException} with the underlying
1491 * exception as its cause.
1492 *
1493 * @return the result value
1494 * @throws CancellationException if the computation was cancelled
1495 * @throws CompletionException if this future completed
1496 * exceptionally or a completion computation threw an exception
1497 */
1498 public T join() {
1499 Object r; Throwable ex;
1500 if ((r = result) == null)
1501 r = waitingGet(false);
1502 if (!(r instanceof AltResult)) {
1503 @SuppressWarnings("unchecked") T tr = (T) r;
1504 return tr;
1505 }
1506 if ((ex = ((AltResult)r).ex) == null)
1507 return null;
1508 if (ex instanceof CancellationException)
1509 throw (CancellationException)ex;
1510 if (ex instanceof CompletionException)
1511 throw (CompletionException)ex;
1512 throw new CompletionException(ex);
1513 }
1514
1515 /**
1516 * Returns the result value (or throws any encountered exception)
1517 * if completed, else returns the given valueIfAbsent.
1518 *
1519 * @param valueIfAbsent the value to return if not completed
1520 * @return the result value, if completed, else the given valueIfAbsent
1521 * @throws CancellationException if the computation was cancelled
1522 * @throws CompletionException if this future completed
1523 * exceptionally or a completion computation threw an exception
1524 */
1525 public T getNow(T valueIfAbsent) {
1526 Object r; Throwable ex;
1527 if ((r = result) == null)
1528 return valueIfAbsent;
1529 if (!(r instanceof AltResult)) {
1530 @SuppressWarnings("unchecked") T tr = (T) r;
1531 return tr;
1532 }
1533 if ((ex = ((AltResult)r).ex) == null)
1534 return null;
1535 if (ex instanceof CancellationException)
1536 throw (CancellationException)ex;
1537 if (ex instanceof CompletionException)
1538 throw (CompletionException)ex;
1539 throw new CompletionException(ex);
1540 }
1541
1542 /**
1543 * If not already completed, sets the value returned by {@link
1544 * #get()} and related methods to the given value.
1545 *
1546 * @param value the result value
1547 * @return {@code true} if this invocation caused this CompletableFuture
1548 * to transition to a completed state, else {@code false}
1549 */
1550 public boolean complete(T value) {
1551 boolean triggered = result == null &&
1552 UNSAFE.compareAndSwapObject(this, RESULT, null,
1553 value == null ? NIL : value);
1554 postComplete();
1555 return triggered;
1556 }
1557
1558 /**
1559 * If not already completed, causes invocations of {@link #get()}
1560 * and related methods to throw the given exception.
1561 *
1562 * @param ex the exception
1563 * @return {@code true} if this invocation caused this CompletableFuture
1564 * to transition to a completed state, else {@code false}
1565 */
1566 public boolean completeExceptionally(Throwable ex) {
1567 if (ex == null) throw new NullPointerException();
1568 boolean triggered = result == null &&
1569 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1570 postComplete();
1571 return triggered;
1572 }
1573
1574 /**
1575 * Creates and returns a CompletableFuture that is completed with
1576 * the result of the given function of this CompletableFuture.
1577 * If this CompletableFuture completes exceptionally,
1578 * then the returned CompletableFuture also does so,
1579 * with a CompletionException holding this exception as
1580 * its cause.
1581 *
1582 * @param fn the function to use to compute the value of
1583 * the returned CompletableFuture
1584 * @return the new CompletableFuture
1585 */
1586 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1587 return doThenApply(fn, null);
1588 }
1589
1590 /**
1591 * Creates and returns a CompletableFuture that is asynchronously
1592 * completed using the {@link ForkJoinPool#commonPool()} with the
1593 * result of the given function of this CompletableFuture. If
1594 * this CompletableFuture completes exceptionally, then the
1595 * returned CompletableFuture also does so, with a
1596 * CompletionException holding this exception as its cause.
1597 *
1598 * @param fn the function to use to compute the value of
1599 * the returned CompletableFuture
1600 * @return the new CompletableFuture
1601 */
1602 public <U> CompletableFuture<U> thenApplyAsync
1603 (Function<? super T,? extends U> fn) {
1604 return doThenApply(fn, ForkJoinPool.commonPool());
1605 }
1606
1607 /**
1608 * Creates and returns a CompletableFuture that is asynchronously
1609 * completed using the given executor with the result of the given
1610 * function of this CompletableFuture. If this CompletableFuture
1611 * completes exceptionally, then the returned CompletableFuture
1612 * also does so, with a CompletionException holding this exception as
1613 * its cause.
1614 *
1615 * @param fn the function to use to compute the value of
1616 * the returned CompletableFuture
1617 * @param executor the executor to use for asynchronous execution
1618 * @return the new CompletableFuture
1619 */
1620 public <U> CompletableFuture<U> thenApplyAsync
1621 (Function<? super T,? extends U> fn,
1622 Executor executor) {
1623 if (executor == null) throw new NullPointerException();
1624 return doThenApply(fn, executor);
1625 }
1626
1627 private <U> CompletableFuture<U> doThenApply
1628 (Function<? super T,? extends U> fn,
1629 Executor e) {
1630 if (fn == null) throw new NullPointerException();
1631 CompletableFuture<U> dst = new CompletableFuture<U>();
1632 ApplyCompletion<T,U> d = null;
1633 Object r;
1634 if ((r = result) == null) {
1635 CompletionNode p = new CompletionNode
1636 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1637 while ((r = result) == null) {
1638 if (UNSAFE.compareAndSwapObject
1639 (this, COMPLETIONS, p.next = completions, p))
1640 break;
1641 }
1642 }
1643 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1644 T t; Throwable ex;
1645 if (r instanceof AltResult) {
1646 ex = ((AltResult)r).ex;
1647 t = null;
1648 }
1649 else {
1650 ex = null;
1651 @SuppressWarnings("unchecked") T tr = (T) r;
1652 t = tr;
1653 }
1654 U u = null;
1655 if (ex == null) {
1656 try {
1657 if (e != null)
1658 e.execute(new AsyncApply<T,U>(t, fn, dst));
1659 else
1660 u = fn.apply(t);
1661 } catch (Throwable rex) {
1662 ex = rex;
1663 }
1664 }
1665 if (e == null || ex != null)
1666 dst.internalComplete(u, ex);
1667 }
1668 helpPostComplete();
1669 return dst;
1670 }
1671
1672 /**
1673 * Creates and returns a CompletableFuture that is completed after
1674 * performing the given action with this CompletableFuture's
1675 * result when it completes. If this CompletableFuture
1676 * completes exceptionally, then the returned CompletableFuture
1677 * also does so, with a CompletionException holding this exception as
1678 * its cause.
1679 *
1680 * @param block the action to perform before completing the
1681 * returned CompletableFuture
1682 * @return the new CompletableFuture
1683 */
1684 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1685 return doThenAccept(block, null);
1686 }
1687
1688 /**
1689 * Creates and returns a CompletableFuture that is asynchronously
1690 * completed using the {@link ForkJoinPool#commonPool()} with this
1691 * CompletableFuture's result when it completes. If this
1692 * CompletableFuture completes exceptionally, then the returned
1693 * CompletableFuture also does so, with a CompletionException holding
1694 * this exception as its cause.
1695 *
1696 * @param block the action to perform before completing the
1697 * returned CompletableFuture
1698 * @return the new CompletableFuture
1699 */
1700 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1701 return doThenAccept(block, ForkJoinPool.commonPool());
1702 }
1703
1704 /**
1705 * Creates and returns a CompletableFuture that is asynchronously
1706 * completed using the given executor with this
1707 * CompletableFuture's result when it completes. If this
1708 * CompletableFuture completes exceptionally, then the returned
1709 * CompletableFuture also does so, with a CompletionException holding
1710 * this exception as its cause.
1711 *
1712 * @param block the action to perform before completing the
1713 * returned CompletableFuture
1714 * @param executor the executor to use for asynchronous execution
1715 * @return the new CompletableFuture
1716 */
1717 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1718 Executor executor) {
1719 if (executor == null) throw new NullPointerException();
1720 return doThenAccept(block, executor);
1721 }
1722
1723 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1724 Executor e) {
1725 if (fn == null) throw new NullPointerException();
1726 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1727 AcceptCompletion<T> d = null;
1728 Object r;
1729 if ((r = result) == null) {
1730 CompletionNode p = new CompletionNode
1731 (d = new AcceptCompletion<T>(this, fn, dst, e));
1732 while ((r = result) == null) {
1733 if (UNSAFE.compareAndSwapObject
1734 (this, COMPLETIONS, p.next = completions, p))
1735 break;
1736 }
1737 }
1738 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1739 T t; Throwable ex;
1740 if (r instanceof AltResult) {
1741 ex = ((AltResult)r).ex;
1742 t = null;
1743 }
1744 else {
1745 ex = null;
1746 @SuppressWarnings("unchecked") T tr = (T) r;
1747 t = tr;
1748 }
1749 if (ex == null) {
1750 try {
1751 if (e != null)
1752 e.execute(new AsyncAccept<T>(t, fn, dst));
1753 else
1754 fn.accept(t);
1755 } catch (Throwable rex) {
1756 ex = rex;
1757 }
1758 }
1759 if (e == null || ex != null)
1760 dst.internalComplete(null, ex);
1761 }
1762 helpPostComplete();
1763 return dst;
1764 }
1765
1766 /**
1767 * Creates and returns a CompletableFuture that is completed after
1768 * performing the given action when this CompletableFuture
1769 * completes. If this CompletableFuture completes exceptionally,
1770 * then the returned CompletableFuture also does so, with a
1771 * CompletionException holding this exception as its cause.
1772 *
1773 * @param action the action to perform before completing the
1774 * returned CompletableFuture
1775 * @return the new CompletableFuture
1776 */
1777 public CompletableFuture<Void> thenRun(Runnable action) {
1778 return doThenRun(action, null);
1779 }
1780
1781 /**
1782 * Creates and returns a CompletableFuture that is asynchronously
1783 * completed using the {@link ForkJoinPool#commonPool()} after
1784 * performing the given action when this CompletableFuture
1785 * completes. If this CompletableFuture completes exceptionally,
1786 * then the returned CompletableFuture also does so, with a
1787 * CompletionException holding this exception as its cause.
1788 *
1789 * @param action the action to perform before completing the
1790 * returned CompletableFuture
1791 * @return the new CompletableFuture
1792 */
1793 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1794 return doThenRun(action, ForkJoinPool.commonPool());
1795 }
1796
1797 /**
1798 * Creates and returns a CompletableFuture that is asynchronously
1799 * completed using the given executor after performing the given
1800 * action when this CompletableFuture completes. If this
1801 * CompletableFuture completes exceptionally, then the returned
1802 * CompletableFuture also does so, with a CompletionException holding
1803 * this exception as its cause.
1804 *
1805 * @param action the action to perform before completing the
1806 * returned CompletableFuture
1807 * @param executor the executor to use for asynchronous execution
1808 * @return the new CompletableFuture
1809 */
1810 public CompletableFuture<Void> thenRunAsync(Runnable action,
1811 Executor executor) {
1812 if (executor == null) throw new NullPointerException();
1813 return doThenRun(action, executor);
1814 }
1815
1816 private CompletableFuture<Void> doThenRun(Runnable action,
1817 Executor e) {
1818 if (action == null) throw new NullPointerException();
1819 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1820 RunCompletion<T> d = null;
1821 Object r;
1822 if ((r = result) == null) {
1823 CompletionNode p = new CompletionNode
1824 (d = new RunCompletion<T>(this, action, dst, e));
1825 while ((r = result) == null) {
1826 if (UNSAFE.compareAndSwapObject
1827 (this, COMPLETIONS, p.next = completions, p))
1828 break;
1829 }
1830 }
1831 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1832 Throwable ex;
1833 if (r instanceof AltResult)
1834 ex = ((AltResult)r).ex;
1835 else
1836 ex = null;
1837 if (ex == null) {
1838 try {
1839 if (e != null)
1840 e.execute(new AsyncRun(action, dst));
1841 else
1842 action.run();
1843 } catch (Throwable rex) {
1844 ex = rex;
1845 }
1846 }
1847 if (e == null || ex != null)
1848 dst.internalComplete(null, ex);
1849 }
1850 helpPostComplete();
1851 return dst;
1852 }
1853
1854 /**
1855 * Creates and returns a CompletableFuture that is completed with
1856 * the result of the given function of this and the other given
1857 * CompletableFuture's results when both complete. If this or
1858 * the other CompletableFuture complete exceptionally, then the
1859 * returned CompletableFuture also does so, with a
1860 * CompletionException holding the exception as its cause.
1861 *
1862 * @param other the other CompletableFuture
1863 * @param fn the function to use to compute the value of
1864 * the returned CompletableFuture
1865 * @return the new CompletableFuture
1866 */
1867 public <U,V> CompletableFuture<V> thenCombine
1868 (CompletableFuture<? extends U> other,
1869 BiFunction<? super T,? super U,? extends V> fn) {
1870 return doThenBiApply(other, fn, null);
1871 }
1872
1873 /**
1874 * Creates and returns a CompletableFuture that is asynchronously
1875 * completed using the {@link ForkJoinPool#commonPool()} with
1876 * the result of the given function of this and the other given
1877 * CompletableFuture's results when both complete. If this or
1878 * the other CompletableFuture complete exceptionally, then the
1879 * returned CompletableFuture also does so, with a
1880 * CompletionException holding the exception as its cause.
1881 *
1882 * @param other the other CompletableFuture
1883 * @param fn the function to use to compute the value of
1884 * the returned CompletableFuture
1885 * @return the new CompletableFuture
1886 */
1887 public <U,V> CompletableFuture<V> thenCombineAsync
1888 (CompletableFuture<? extends U> other,
1889 BiFunction<? super T,? super U,? extends V> fn) {
1890 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1891 }
1892
1893 /**
1894 * Creates and returns a CompletableFuture that is
1895 * asynchronously completed using the given executor with the
1896 * result of the given function of this and the other given
1897 * CompletableFuture's results when both complete. If this or
1898 * the other CompletableFuture complete exceptionally, then the
1899 * returned CompletableFuture also does so, with a
1900 * CompletionException holding the exception as its cause.
1901 *
1902 * @param other the other CompletableFuture
1903 * @param fn the function to use to compute the value of
1904 * the returned CompletableFuture
1905 * @param executor the executor to use for asynchronous execution
1906 * @return the new CompletableFuture
1907 */
1908 public <U,V> CompletableFuture<V> thenCombineAsync
1909 (CompletableFuture<? extends U> other,
1910 BiFunction<? super T,? super U,? extends V> fn,
1911 Executor executor) {
1912 if (executor == null) throw new NullPointerException();
1913 return doThenBiApply(other, fn, executor);
1914 }
1915
1916 private <U,V> CompletableFuture<V> doThenBiApply
1917 (CompletableFuture<? extends U> other,
1918 BiFunction<? super T,? super U,? extends V> fn,
1919 Executor e) {
1920 if (other == null || fn == null) throw new NullPointerException();
1921 CompletableFuture<V> dst = new CompletableFuture<V>();
1922 BiApplyCompletion<T,U,V> d = null;
1923 Object r, s = null;
1924 if ((r = result) == null || (s = other.result) == null) {
1925 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1926 CompletionNode q = null, p = new CompletionNode(d);
1927 while ((r == null && (r = result) == null) ||
1928 (s == null && (s = other.result) == null)) {
1929 if (q != null) {
1930 if (s != null ||
1931 UNSAFE.compareAndSwapObject
1932 (other, COMPLETIONS, q.next = other.completions, q))
1933 break;
1934 }
1935 else if (r != null ||
1936 UNSAFE.compareAndSwapObject
1937 (this, COMPLETIONS, p.next = completions, p)) {
1938 if (s != null)
1939 break;
1940 q = new CompletionNode(d);
1941 }
1942 }
1943 }
1944 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1945 T t; U u; Throwable ex;
1946 if (r instanceof AltResult) {
1947 ex = ((AltResult)r).ex;
1948 t = null;
1949 }
1950 else {
1951 ex = null;
1952 @SuppressWarnings("unchecked") T tr = (T) r;
1953 t = tr;
1954 }
1955 if (ex != null)
1956 u = null;
1957 else if (s instanceof AltResult) {
1958 ex = ((AltResult)s).ex;
1959 u = null;
1960 }
1961 else {
1962 @SuppressWarnings("unchecked") U us = (U) s;
1963 u = us;
1964 }
1965 V v = null;
1966 if (ex == null) {
1967 try {
1968 if (e != null)
1969 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1970 else
1971 v = fn.apply(t, u);
1972 } catch (Throwable rex) {
1973 ex = rex;
1974 }
1975 }
1976 if (e == null || ex != null)
1977 dst.internalComplete(v, ex);
1978 }
1979 helpPostComplete();
1980 other.helpPostComplete();
1981 return dst;
1982 }
1983
1984 /**
1985 * Creates and returns a CompletableFuture that is completed with
1986 * the results of this and the other given CompletableFuture if
1987 * both complete. If this and/or the other CompletableFuture
1988 * complete exceptionally, then the returned CompletableFuture
1989 * also does so, with a CompletionException holding one of these
1990 * exceptions as its cause.
1991 *
1992 * @param other the other CompletableFuture
1993 * @param block the action to perform before completing the
1994 * returned CompletableFuture
1995 * @return the new CompletableFuture
1996 */
1997 public <U> CompletableFuture<Void> thenAcceptBoth
1998 (CompletableFuture<? extends U> other,
1999 BiConsumer<? super T, ? super U> block) {
2000 return doThenBiAccept(other, block, null);
2001 }
2002
2003 /**
2004 * Creates and returns a CompletableFuture that is completed
2005 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2006 * the results of this and the other given CompletableFuture when
2007 * both complete. If this and/or the other CompletableFuture
2008 * complete exceptionally, then the returned CompletableFuture
2009 * also does so, with a CompletionException holding one of these
2010 * exceptions as its cause.
2011 *
2012 * @param other the other CompletableFuture
2013 * @param block the action to perform before completing the
2014 * returned CompletableFuture
2015 * @return the new CompletableFuture
2016 */
2017 public <U> CompletableFuture<Void> thenAcceptBothAsync
2018 (CompletableFuture<? extends U> other,
2019 BiConsumer<? super T, ? super U> block) {
2020 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2021 }
2022
2023 /**
2024 * Creates and returns a CompletableFuture that is completed
2025 * asynchronously using the given executor with the results of
2026 * this and the other given CompletableFuture when both complete.
2027 * If this and/or the other CompletableFuture complete exceptionally,
2028 * then the returned CompletableFuture also does so, with a
2029 * CompletionException holding one of these exceptions as its cause.
2030 *
2031 * @param other the other CompletableFuture
2032 * @param block the action to perform before completing the
2033 * returned CompletableFuture
2034 * @param executor the executor to use for asynchronous execution
2035 * @return the new CompletableFuture
2036 */
2037 public <U> CompletableFuture<Void> thenAcceptBothAsync
2038 (CompletableFuture<? extends U> other,
2039 BiConsumer<? super T, ? super U> block,
2040 Executor executor) {
2041 if (executor == null) throw new NullPointerException();
2042 return doThenBiAccept(other, block, executor);
2043 }
2044
2045 private <U> CompletableFuture<Void> doThenBiAccept
2046 (CompletableFuture<? extends U> other,
2047 BiConsumer<? super T,? super U> fn,
2048 Executor e) {
2049 if (other == null || fn == null) throw new NullPointerException();
2050 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2051 BiAcceptCompletion<T,U> d = null;
2052 Object r, s = null;
2053 if ((r = result) == null || (s = other.result) == null) {
2054 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2055 CompletionNode q = null, p = new CompletionNode(d);
2056 while ((r == null && (r = result) == null) ||
2057 (s == null && (s = other.result) == null)) {
2058 if (q != null) {
2059 if (s != null ||
2060 UNSAFE.compareAndSwapObject
2061 (other, COMPLETIONS, q.next = other.completions, q))
2062 break;
2063 }
2064 else if (r != null ||
2065 UNSAFE.compareAndSwapObject
2066 (this, COMPLETIONS, p.next = completions, p)) {
2067 if (s != null)
2068 break;
2069 q = new CompletionNode(d);
2070 }
2071 }
2072 }
2073 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2074 T t; U u; Throwable ex;
2075 if (r instanceof AltResult) {
2076 ex = ((AltResult)r).ex;
2077 t = null;
2078 }
2079 else {
2080 ex = null;
2081 @SuppressWarnings("unchecked") T tr = (T) r;
2082 t = tr;
2083 }
2084 if (ex != null)
2085 u = null;
2086 else if (s instanceof AltResult) {
2087 ex = ((AltResult)s).ex;
2088 u = null;
2089 }
2090 else {
2091 @SuppressWarnings("unchecked") U us = (U) s;
2092 u = us;
2093 }
2094 if (ex == null) {
2095 try {
2096 if (e != null)
2097 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2098 else
2099 fn.accept(t, u);
2100 } catch (Throwable rex) {
2101 ex = rex;
2102 }
2103 }
2104 if (e == null || ex != null)
2105 dst.internalComplete(null, ex);
2106 }
2107 helpPostComplete();
2108 other.helpPostComplete();
2109 return dst;
2110 }
2111
2112 /**
2113 * Creates and returns a CompletableFuture that is completed when
2114 * this and the other given CompletableFuture both complete.
2115 * If this and/or the other CompletableFuture complete exceptionally,
2116 * then the returned CompletableFuture also does so, with a
2117 * CompletionException holding one of these exceptions as its cause.
2118 *
2119 * @param other the other CompletableFuture
2120 * @param action the action to perform before completing the
2121 * returned CompletableFuture
2122 * @return the new CompletableFuture
2123 */
2124 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2125 Runnable action) {
2126 return doThenBiRun(other, action, null);
2127 }
2128
2129 /**
2130 * Creates and returns a CompletableFuture that is completed
2131 * asynchronously using the {@link ForkJoinPool#commonPool()}
2132 * when this and the other given CompletableFuture both complete.
2133 * If this and/or the other CompletableFuture complete exceptionally,
2134 * then the returned CompletableFuture also does so, with a
2135 * CompletionException holding one of these exceptions as its cause.
2136 *
2137 * @param other the other CompletableFuture
2138 * @param action the action to perform before completing the
2139 * returned CompletableFuture
2140 * @return the new CompletableFuture
2141 */
2142 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2143 Runnable action) {
2144 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2145 }
2146
2147 /**
2148 * Creates and returns a CompletableFuture that is completed
2149 * asynchronously using the given executor when this and the
2150 * other given CompletableFuture both complete.
2151 * If this and/or the other CompletableFuture complete exceptionally,
2152 * then the returned CompletableFuture also does so, with a
2153 * CompletionException holding one of these exceptions as its cause.
2154 *
2155 * @param other the other CompletableFuture
2156 * @param action the action to perform before completing the
2157 * returned CompletableFuture
2158 * @param executor the executor to use for asynchronous execution
2159 * @return the new CompletableFuture
2160 */
2161 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2162 Runnable action,
2163 Executor executor) {
2164 if (executor == null) throw new NullPointerException();
2165 return doThenBiRun(other, action, executor);
2166 }
2167
2168 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2169 Runnable action,
2170 Executor e) {
2171 if (other == null || action == null) throw new NullPointerException();
2172 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2173 BiRunCompletion<T> d = null;
2174 Object r, s = null;
2175 if ((r = result) == null || (s = other.result) == null) {
2176 d = new BiRunCompletion<T>(this, other, action, dst, e);
2177 CompletionNode q = null, p = new CompletionNode(d);
2178 while ((r == null && (r = result) == null) ||
2179 (s == null && (s = other.result) == null)) {
2180 if (q != null) {
2181 if (s != null ||
2182 UNSAFE.compareAndSwapObject
2183 (other, COMPLETIONS, q.next = other.completions, q))
2184 break;
2185 }
2186 else if (r != null ||
2187 UNSAFE.compareAndSwapObject
2188 (this, COMPLETIONS, p.next = completions, p)) {
2189 if (s != null)
2190 break;
2191 q = new CompletionNode(d);
2192 }
2193 }
2194 }
2195 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2196 Throwable ex;
2197 if (r instanceof AltResult)
2198 ex = ((AltResult)r).ex;
2199 else
2200 ex = null;
2201 if (ex == null && (s instanceof AltResult))
2202 ex = ((AltResult)s).ex;
2203 if (ex == null) {
2204 try {
2205 if (e != null)
2206 e.execute(new AsyncRun(action, dst));
2207 else
2208 action.run();
2209 } catch (Throwable rex) {
2210 ex = rex;
2211 }
2212 }
2213 if (e == null || ex != null)
2214 dst.internalComplete(null, ex);
2215 }
2216 helpPostComplete();
2217 other.helpPostComplete();
2218 return dst;
2219 }
2220
2221 /**
2222 * Creates and returns a CompletableFuture that is completed with
2223 * the result of the given function of either this or the other
2224 * given CompletableFuture's results when either complete.
2225 * If this and/or the other CompletableFuture complete exceptionally,
2226 * then the returned CompletableFuture may also do so, with a
2227 * CompletionException holding one of these exceptions as its cause.
2228 * No guarantees are made about which result or exception is used
2229 * in the returned CompletableFuture.
2230 *
2231 * @param other the other CompletableFuture
2232 * @param fn the function to use to compute the value of
2233 * the returned CompletableFuture
2234 * @return the new CompletableFuture
2235 */
2236 public <U> CompletableFuture<U> applyToEither
2237 (CompletableFuture<? extends T> other,
2238 Function<? super T, U> fn) {
2239 return doOrApply(other, fn, null);
2240 }
2241
2242 /**
2243 * Creates and returns a CompletableFuture that is completed
2244 * asynchronously using the {@link ForkJoinPool#commonPool()} with
2245 * the result of the given function of either this or the other
2246 * given CompletableFuture's results when either complete.
2247 * If this and/or the other CompletableFuture complete exceptionally,
2248 * then the returned CompletableFuture may also do so, with a
2249 * CompletionException holding one of these exceptions as its cause.
2250 * No guarantees are made about which result or exception is used
2251 * in the returned CompletableFuture.
2252 *
2253 * @param other the other CompletableFuture
2254 * @param fn the function to use to compute the value of
2255 * the returned CompletableFuture
2256 * @return the new CompletableFuture
2257 */
2258 public <U> CompletableFuture<U> applyToEitherAsync
2259 (CompletableFuture<? extends T> other,
2260 Function<? super T, U> fn) {
2261 return doOrApply(other, fn, ForkJoinPool.commonPool());
2262 }
2263
2264 /**
2265 * Creates and returns a CompletableFuture that is completed
2266 * asynchronously using the given executor with the result of the
2267 * given function of either this or the other given
2268 * CompletableFuture's results when either complete. If this
2269 * and/or the other CompletableFuture complete exceptionally, then
2270 * the returned CompletableFuture may also do so, with a
2271 * CompletionException holding one of these exceptions as its cause.
2272 * No guarantees are made about which result or exception is used
2273 * in the returned CompletableFuture.
2274 *
2275 * @param other the other CompletableFuture
2276 * @param fn the function to use to compute the value of
2277 * the returned CompletableFuture
2278 * @param executor the executor to use for asynchronous execution
2279 * @return the new CompletableFuture
2280 */
2281 public <U> CompletableFuture<U> applyToEitherAsync
2282 (CompletableFuture<? extends T> other,
2283 Function<? super T, U> fn,
2284 Executor executor) {
2285 if (executor == null) throw new NullPointerException();
2286 return doOrApply(other, fn, executor);
2287 }
2288
2289 private <U> CompletableFuture<U> doOrApply
2290 (CompletableFuture<? extends T> other,
2291 Function<? super T, U> fn,
2292 Executor e) {
2293 if (other == null || fn == null) throw new NullPointerException();
2294 CompletableFuture<U> dst = new CompletableFuture<U>();
2295 OrApplyCompletion<T,U> d = null;
2296 Object r;
2297 if ((r = result) == null && (r = other.result) == null) {
2298 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2299 CompletionNode q = null, p = new CompletionNode(d);
2300 while ((r = result) == null && (r = other.result) == null) {
2301 if (q != null) {
2302 if (UNSAFE.compareAndSwapObject
2303 (other, COMPLETIONS, q.next = other.completions, q))
2304 break;
2305 }
2306 else if (UNSAFE.compareAndSwapObject
2307 (this, COMPLETIONS, p.next = completions, p))
2308 q = new CompletionNode(d);
2309 }
2310 }
2311 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2312 T t; Throwable ex;
2313 if (r instanceof AltResult) {
2314 ex = ((AltResult)r).ex;
2315 t = null;
2316 }
2317 else {
2318 ex = null;
2319 @SuppressWarnings("unchecked") T tr = (T) r;
2320 t = tr;
2321 }
2322 U u = null;
2323 if (ex == null) {
2324 try {
2325 if (e != null)
2326 e.execute(new AsyncApply<T,U>(t, fn, dst));
2327 else
2328 u = fn.apply(t);
2329 } catch (Throwable rex) {
2330 ex = rex;
2331 }
2332 }
2333 if (e == null || ex != null)
2334 dst.internalComplete(u, ex);
2335 }
2336 helpPostComplete();
2337 other.helpPostComplete();
2338 return dst;
2339 }
2340
2341 /**
2342 * Creates and returns a CompletableFuture that is completed after
2343 * performing the given action with the result of either this or the
2344 * other given CompletableFuture's result, when either complete.
2345 * If this and/or the other CompletableFuture complete exceptionally,
2346 * then the returned CompletableFuture may also do so, with a
2347 * CompletionException holding one of these exceptions as its cause.
2348 * No guarantees are made about which exception is used in the
2349 * returned CompletableFuture.
2350 *
2351 * @param other the other CompletableFuture
2352 * @param block the action to perform before completing the
2353 * returned CompletableFuture
2354 * @return the new CompletableFuture
2355 */
2356 public CompletableFuture<Void> acceptEither
2357 (CompletableFuture<? extends T> other,
2358 Consumer<? super T> block) {
2359 return doOrAccept(other, block, null);
2360 }
2361
2362 /**
2363 * Creates and returns a CompletableFuture that is completed
2364 * asynchronously using the {@link ForkJoinPool#commonPool()},
2365 * performing the given action with the result of either this or
2366 * the other given CompletableFuture's result, when either complete.
2367 * If this and/or the other CompletableFuture complete exceptionally,
2368 * then the returned CompletableFuture may also do so, with a
2369 * CompletionException holding one of these exceptions as its cause.
2370 * No guarantees are made about which exception is used in the
2371 * returned CompletableFuture.
2372 *
2373 * @param other the other CompletableFuture
2374 * @param block the action to perform before completing the
2375 * returned CompletableFuture
2376 * @return the new CompletableFuture
2377 */
2378 public CompletableFuture<Void> acceptEitherAsync
2379 (CompletableFuture<? extends T> other,
2380 Consumer<? super T> block) {
2381 return doOrAccept(other, block, ForkJoinPool.commonPool());
2382 }
2383
2384 /**
2385 * Creates and returns a CompletableFuture that is completed
2386 * asynchronously using the given executor, performing the given
2387 * action with the result of either this or the other given
2388 * CompletableFuture's result, when either complete.
2389 * If this and/or the other CompletableFuture complete exceptionally,
2390 * then the returned CompletableFuture may also do so, with a
2391 * CompletionException holding one of these exceptions as its cause.
2392 * No guarantees are made about which exception is used in the
2393 * returned CompletableFuture.
2394 *
2395 * @param other the other CompletableFuture
2396 * @param block the action to perform before completing the
2397 * returned CompletableFuture
2398 * @param executor the executor to use for asynchronous execution
2399 * @return the new CompletableFuture
2400 */
2401 public CompletableFuture<Void> acceptEitherAsync
2402 (CompletableFuture<? extends T> other,
2403 Consumer<? super T> block,
2404 Executor executor) {
2405 if (executor == null) throw new NullPointerException();
2406 return doOrAccept(other, block, executor);
2407 }
2408
2409 private CompletableFuture<Void> doOrAccept
2410 (CompletableFuture<? extends T> other,
2411 Consumer<? super T> fn,
2412 Executor e) {
2413 if (other == null || fn == null) throw new NullPointerException();
2414 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2415 OrAcceptCompletion<T> d = null;
2416 Object r;
2417 if ((r = result) == null && (r = other.result) == null) {
2418 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2419 CompletionNode q = null, p = new CompletionNode(d);
2420 while ((r = result) == null && (r = other.result) == null) {
2421 if (q != null) {
2422 if (UNSAFE.compareAndSwapObject
2423 (other, COMPLETIONS, q.next = other.completions, q))
2424 break;
2425 }
2426 else if (UNSAFE.compareAndSwapObject
2427 (this, COMPLETIONS, p.next = completions, p))
2428 q = new CompletionNode(d);
2429 }
2430 }
2431 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2432 T t; Throwable ex;
2433 if (r instanceof AltResult) {
2434 ex = ((AltResult)r).ex;
2435 t = null;
2436 }
2437 else {
2438 ex = null;
2439 @SuppressWarnings("unchecked") T tr = (T) r;
2440 t = tr;
2441 }
2442 if (ex == null) {
2443 try {
2444 if (e != null)
2445 e.execute(new AsyncAccept<T>(t, fn, dst));
2446 else
2447 fn.accept(t);
2448 } catch (Throwable rex) {
2449 ex = rex;
2450 }
2451 }
2452 if (e == null || ex != null)
2453 dst.internalComplete(null, ex);
2454 }
2455 helpPostComplete();
2456 other.helpPostComplete();
2457 return dst;
2458 }
2459
2460 /**
2461 * Creates and returns a CompletableFuture that is completed
2462 * after this or the other given CompletableFuture complete.
2463 * If this and/or the other CompletableFuture complete exceptionally,
2464 * then the returned CompletableFuture may also do so, with a
2465 * CompletionException holding one of these exceptions as its cause.
2466 * No guarantees are made about which exception is used in the
2467 * returned CompletableFuture.
2468 *
2469 * @param other the other CompletableFuture
2470 * @param action the action to perform before completing the
2471 * returned CompletableFuture
2472 * @return the new CompletableFuture
2473 */
2474 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2475 Runnable action) {
2476 return doOrRun(other, action, null);
2477 }
2478
2479 /**
2480 * Creates and returns a CompletableFuture that is completed
2481 * asynchronously using the {@link ForkJoinPool#commonPool()}
2482 * after this or the other given CompletableFuture complete.
2483 * If this and/or the other CompletableFuture complete exceptionally,
2484 * then the returned CompletableFuture may also do so, with a
2485 * CompletionException holding one of these exceptions as its cause.
2486 * No guarantees are made about which exception is used in the
2487 * returned CompletableFuture.
2488 *
2489 * @param other the other CompletableFuture
2490 * @param action the action to perform before completing the
2491 * returned CompletableFuture
2492 * @return the new CompletableFuture
2493 */
2494 public CompletableFuture<Void> runAfterEitherAsync
2495 (CompletableFuture<?> other,
2496 Runnable action) {
2497 return doOrRun(other, action, ForkJoinPool.commonPool());
2498 }
2499
2500 /**
2501 * Creates and returns a CompletableFuture that is completed
2502 * asynchronously using the given executor after this or the other
2503 * given CompletableFuture complete.
2504 * If this and/or the other CompletableFuture complete exceptionally,
2505 * then the returned CompletableFuture may also do so, with a
2506 * CompletionException holding one of these exceptions as its cause.
2507 * No guarantees are made about which exception is used in the
2508 * returned CompletableFuture.
2509 *
2510 * @param other the other CompletableFuture
2511 * @param action the action to perform before completing the
2512 * returned CompletableFuture
2513 * @param executor the executor to use for asynchronous execution
2514 * @return the new CompletableFuture
2515 */
2516 public CompletableFuture<Void> runAfterEitherAsync
2517 (CompletableFuture<?> other,
2518 Runnable action,
2519 Executor executor) {
2520 if (executor == null) throw new NullPointerException();
2521 return doOrRun(other, action, executor);
2522 }
2523
2524 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2525 Runnable action,
2526 Executor e) {
2527 if (other == null || action == null) throw new NullPointerException();
2528 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2529 OrRunCompletion<T> d = null;
2530 Object r;
2531 if ((r = result) == null && (r = other.result) == null) {
2532 d = new OrRunCompletion<T>(this, other, action, dst, e);
2533 CompletionNode q = null, p = new CompletionNode(d);
2534 while ((r = result) == null && (r = other.result) == null) {
2535 if (q != null) {
2536 if (UNSAFE.compareAndSwapObject
2537 (other, COMPLETIONS, q.next = other.completions, q))
2538 break;
2539 }
2540 else if (UNSAFE.compareAndSwapObject
2541 (this, COMPLETIONS, p.next = completions, p))
2542 q = new CompletionNode(d);
2543 }
2544 }
2545 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2546 Throwable ex;
2547 if (r instanceof AltResult)
2548 ex = ((AltResult)r).ex;
2549 else
2550 ex = null;
2551 if (ex == null) {
2552 try {
2553 if (e != null)
2554 e.execute(new AsyncRun(action, dst));
2555 else
2556 action.run();
2557 } catch (Throwable rex) {
2558 ex = rex;
2559 }
2560 }
2561 if (e == null || ex != null)
2562 dst.internalComplete(null, ex);
2563 }
2564 helpPostComplete();
2565 other.helpPostComplete();
2566 return dst;
2567 }
2568
2569 /**
2570 * Returns a CompletableFuture (or an equivalent one) produced by
2571 * the given function of the result of this CompletableFuture when
2572 * completed. If this CompletableFuture completes exceptionally,
2573 * then the returned CompletableFuture also does so, with a
2574 * CompletionException holding this exception as its cause.
2575 *
2576 * @param fn the function returning a new CompletableFuture
2577 * @return the CompletableFuture, that {@code isDone()} upon
2578 * return if completed by the given function, or an exception
2579 * occurs
2580 */
2581 public <U> CompletableFuture<U> thenCompose
2582 (Function<? super T, CompletableFuture<U>> fn) {
2583 return doCompose(fn, null);
2584 }
2585
2586 /**
2587 * Returns a CompletableFuture (or an equivalent one) produced
2588 * asynchronously using the {@link ForkJoinPool#commonPool()} by
2589 * the given function of the result of this CompletableFuture when
2590 * completed. If this CompletableFuture completes exceptionally,
2591 * then the returned CompletableFuture also does so, with a
2592 * CompletionException holding this exception as its cause.
2593 *
2594 * @param fn the function returning a new CompletableFuture
2595 * @return the CompletableFuture, that {@code isDone()} upon
2596 * return if completed by the given function, or an exception
2597 * occurs
2598 */
2599 public <U> CompletableFuture<U> thenComposeAsync
2600 (Function<? super T, CompletableFuture<U>> fn) {
2601 return doCompose(fn, ForkJoinPool.commonPool());
2602 }
2603
2604 /**
2605 * Returns a CompletableFuture (or an equivalent one) produced
2606 * asynchronously using the given executor by the given function
2607 * of the result of this CompletableFuture when completed.
2608 * If this CompletableFuture completes exceptionally, then the
2609 * returned CompletableFuture also does so, with a
2610 * CompletionException holding this exception as its cause.
2611 *
2612 * @param fn the function returning a new CompletableFuture
2613 * @param executor the executor to use for asynchronous execution
2614 * @return the CompletableFuture, that {@code isDone()} upon
2615 * return if completed by the given function, or an exception
2616 * occurs
2617 */
2618 public <U> CompletableFuture<U> thenComposeAsync
2619 (Function<? super T, CompletableFuture<U>> fn,
2620 Executor executor) {
2621 if (executor == null) throw new NullPointerException();
2622 return doCompose(fn, executor);
2623 }
2624
2625 private <U> CompletableFuture<U> doCompose
2626 (Function<? super T, CompletableFuture<U>> fn,
2627 Executor e) {
2628 if (fn == null) throw new NullPointerException();
2629 CompletableFuture<U> dst = null;
2630 ComposeCompletion<T,U> d = null;
2631 Object r;
2632 if ((r = result) == null) {
2633 dst = new CompletableFuture<U>();
2634 CompletionNode p = new CompletionNode
2635 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2636 while ((r = result) == null) {
2637 if (UNSAFE.compareAndSwapObject
2638 (this, COMPLETIONS, p.next = completions, p))
2639 break;
2640 }
2641 }
2642 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2643 T t; Throwable ex;
2644 if (r instanceof AltResult) {
2645 ex = ((AltResult)r).ex;
2646 t = null;
2647 }
2648 else {
2649 ex = null;
2650 @SuppressWarnings("unchecked") T tr = (T) r;
2651 t = tr;
2652 }
2653 if (ex == null) {
2654 if (e != null) {
2655 if (dst == null)
2656 dst = new CompletableFuture<U>();
2657 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2658 }
2659 else {
2660 try {
2661 dst = fn.apply(t);
2662 } catch (Throwable rex) {
2663 ex = rex;
2664 }
2665 if (dst == null) {
2666 dst = new CompletableFuture<U>();
2667 if (ex == null)
2668 ex = new NullPointerException();
2669 }
2670 }
2671 }
2672 if (e == null && ex != null)
2673 dst.internalComplete(null, ex);
2674 }
2675 helpPostComplete();
2676 dst.helpPostComplete();
2677 return dst;
2678 }
2679
2680 /**
2681 * Creates and returns a CompletableFuture that is completed with
2682 * the result of the given function of the exception triggering
2683 * this CompletableFuture's completion when it completes
2684 * exceptionally; Otherwise, if this CompletableFuture completes
2685 * normally, then the returned CompletableFuture also completes
2686 * normally with the same value.
2687 *
2688 * @param fn the function to use to compute the value of the
2689 * returned CompletableFuture if this CompletableFuture completed
2690 * exceptionally
2691 * @return the new CompletableFuture
2692 */
2693 public CompletableFuture<T> exceptionally
2694 (Function<Throwable, ? extends T> fn) {
2695 if (fn == null) throw new NullPointerException();
2696 CompletableFuture<T> dst = new CompletableFuture<T>();
2697 ExceptionCompletion<T> d = null;
2698 Object r;
2699 if ((r = result) == null) {
2700 CompletionNode p =
2701 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2702 while ((r = result) == null) {
2703 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2704 p.next = completions, p))
2705 break;
2706 }
2707 }
2708 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2709 T t = null; Throwable ex, dx = null;
2710 if (r instanceof AltResult) {
2711 if ((ex = ((AltResult)r).ex) != null) {
2712 try {
2713 t = fn.apply(ex);
2714 } catch (Throwable rex) {
2715 dx = rex;
2716 }
2717 }
2718 }
2719 else {
2720 @SuppressWarnings("unchecked") T tr = (T) r;
2721 t = tr;
2722 }
2723 dst.internalComplete(t, dx);
2724 }
2725 helpPostComplete();
2726 return dst;
2727 }
2728
2729 /**
2730 * Creates and returns a CompletableFuture that is completed with
2731 * the result of the given function of the result and exception of
2732 * this CompletableFuture's completion when it completes. The
2733 * given function is invoked with the result (or {@code null} if
2734 * none) and the exception (or {@code null} if none) of this
2735 * CompletableFuture when complete.
2736 *
2737 * @param fn the function to use to compute the value of the
2738 * returned CompletableFuture
2739 * @return the new CompletableFuture
2740 */
2741 public <U> CompletableFuture<U> handle
2742 (BiFunction<? super T, Throwable, ? extends U> fn) {
2743 if (fn == null) throw new NullPointerException();
2744 CompletableFuture<U> dst = new CompletableFuture<U>();
2745 HandleCompletion<T,U> d = null;
2746 Object r;
2747 if ((r = result) == null) {
2748 CompletionNode p =
2749 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2750 while ((r = result) == null) {
2751 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2752 p.next = completions, p))
2753 break;
2754 }
2755 }
2756 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2757 T t; Throwable ex;
2758 if (r instanceof AltResult) {
2759 ex = ((AltResult)r).ex;
2760 t = null;
2761 }
2762 else {
2763 ex = null;
2764 @SuppressWarnings("unchecked") T tr = (T) r;
2765 t = tr;
2766 }
2767 U u; Throwable dx;
2768 try {
2769 u = fn.apply(t, ex);
2770 dx = null;
2771 } catch (Throwable rex) {
2772 dx = rex;
2773 u = null;
2774 }
2775 dst.internalComplete(u, dx);
2776 }
2777 helpPostComplete();
2778 return dst;
2779 }
2780
2781
2782 /* ------------- Arbitrary-arity constructions -------------- */
2783
2784 /*
2785 * The basic plan of attack is to recursively form binary
2786 * completion trees of elements. This can be overkill for small
2787 * sets, but scales nicely. The And/All vs Or/Any forms use the
2788 * same idea, but details differ.
2789 */
2790
2791 /**
2792 * Returns a new CompletableFuture that is completed when all of
2793 * the given CompletableFutures complete. If any of the component
2794 * CompletableFutures complete exceptionally, then so does the
2795 * returned CompletableFuture. Otherwise, the results, if any, of
2796 * the component CompletableFutures are not reflected in the
2797 * returned CompletableFuture, but may be obtained by inspecting
2798 * them individually. If the number of components is zero, returns
2799 * a CompletableFuture completed with the value {@code null}.
2800 *
2801 * <p>Among the applications of this method is to await completion
2802 * of a set of independent CompletableFutures before continuing a
2803 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2804 * c3).join();}.
2805 *
2806 * @param cfs the CompletableFutures
2807 * @return a CompletableFuture that is complete when all of the
2808 * given CompletableFutures complete
2809 * @throws NullPointerException if the array or any of its elements are
2810 * {@code null}
2811 */
2812 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2813 int len = cfs.length; // Directly handle empty and singleton cases
2814 if (len > 1)
2815 return allTree(cfs, 0, len - 1);
2816 else {
2817 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2818 CompletableFuture<?> f;
2819 if (len == 0)
2820 dst.result = NIL;
2821 else if ((f = cfs[0]) == null)
2822 throw new NullPointerException();
2823 else {
2824 ThenCopy d = null;
2825 CompletionNode p = null;
2826 Object r;
2827 while ((r = f.result) == null) {
2828 if (d == null)
2829 d = new ThenCopy(f, dst);
2830 else if (p == null)
2831 p = new CompletionNode(d);
2832 else if (UNSAFE.compareAndSwapObject
2833 (f, COMPLETIONS, p.next = f.completions, p))
2834 break;
2835 }
2836 if (r != null && (d == null || d.compareAndSet(0, 1)))
2837 dst.internalComplete(null, (r instanceof AltResult) ?
2838 ((AltResult)r).ex : null);
2839 f.helpPostComplete();
2840 }
2841 return dst;
2842 }
2843 }
2844
2845 /**
2846 * Recursively constructs an And'ed tree of CompletableFutures.
2847 * Called only when array known to have at least two elements.
2848 */
2849 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2850 int lo, int hi) {
2851 CompletableFuture<?> fst, snd;
2852 int mid = (lo + hi) >>> 1;
2853 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2854 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2855 throw new NullPointerException();
2856 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2857 AndCompletion d = null;
2858 CompletionNode p = null, q = null;
2859 Object r = null, s = null;
2860 while ((r = fst.result) == null || (s = snd.result) == null) {
2861 if (d == null)
2862 d = new AndCompletion(fst, snd, dst);
2863 else if (p == null)
2864 p = new CompletionNode(d);
2865 else if (q == null) {
2866 if (UNSAFE.compareAndSwapObject
2867 (fst, COMPLETIONS, p.next = fst.completions, p))
2868 q = new CompletionNode(d);
2869 }
2870 else if (UNSAFE.compareAndSwapObject
2871 (snd, COMPLETIONS, q.next = snd.completions, q))
2872 break;
2873 }
2874 if ((r != null || (r = fst.result) != null) &&
2875 (s != null || (s = snd.result) != null) &&
2876 (d == null || d.compareAndSet(0, 1))) {
2877 Throwable ex;
2878 if (r instanceof AltResult)
2879 ex = ((AltResult)r).ex;
2880 else
2881 ex = null;
2882 if (ex == null && (s instanceof AltResult))
2883 ex = ((AltResult)s).ex;
2884 dst.internalComplete(null, ex);
2885 }
2886 fst.helpPostComplete();
2887 snd.helpPostComplete();
2888 return dst;
2889 }
2890
2891 /**
2892 * Returns a new CompletableFuture that is completed when any of
2893 * the component CompletableFutures complete; with the same result if
2894 * it completed normally, otherwise exceptionally. If the number
2895 * of components is zero, returns an incomplete CompletableFuture.
2896 *
2897 * @param cfs the CompletableFutures
2898 * @return a CompletableFuture that is complete when any of the
2899 * given CompletableFutures complete
2900 * @throws NullPointerException if the array or any of its elements are
2901 * {@code null}
2902 */
2903 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2904 int len = cfs.length; // Same idea as allOf
2905 if (len > 1)
2906 return anyTree(cfs, 0, len - 1);
2907 else {
2908 CompletableFuture<?> dst = new CompletableFuture<Object>();
2909 CompletableFuture<?> f;
2910 if (len == 0)
2911 ; // skip
2912 else if ((f = cfs[0]) == null)
2913 throw new NullPointerException();
2914 else {
2915 ThenCopy d = null;
2916 CompletionNode p = null;
2917 Object r;
2918 while ((r = f.result) == null) {
2919 if (d == null)
2920 d = new ThenCopy(f, dst);
2921 else if (p == null)
2922 p = new CompletionNode(d);
2923 else if (UNSAFE.compareAndSwapObject
2924 (f, COMPLETIONS, p.next = f.completions, p))
2925 break;
2926 }
2927 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2928 Throwable ex; Object t;
2929 if (r instanceof AltResult) {
2930 ex = ((AltResult)r).ex;
2931 t = null;
2932 }
2933 else {
2934 ex = null;
2935 t = r;
2936 }
2937 dst.internalComplete(t, ex);
2938 }
2939 f.helpPostComplete();
2940 }
2941 return dst;
2942 }
2943 }
2944
2945 /**
2946 * Recursively constructs an Or'ed tree of CompletableFutures.
2947 */
2948 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
2949 int lo, int hi) {
2950 CompletableFuture<?> fst, snd;
2951 int mid = (lo + hi) >>> 1;
2952 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
2953 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
2954 throw new NullPointerException();
2955 CompletableFuture<?> dst = new CompletableFuture<Object>();
2956 OrCompletion d = null;
2957 CompletionNode p = null, q = null;
2958 Object r;
2959 while ((r = fst.result) == null && (r = snd.result) == null) {
2960 if (d == null)
2961 d = new OrCompletion(fst, snd, dst);
2962 else if (p == null)
2963 p = new CompletionNode(d);
2964 else if (q == null) {
2965 if (UNSAFE.compareAndSwapObject
2966 (fst, COMPLETIONS, p.next = fst.completions, p))
2967 q = new CompletionNode(d);
2968 }
2969 else if (UNSAFE.compareAndSwapObject
2970 (snd, COMPLETIONS, q.next = snd.completions, q))
2971 break;
2972 }
2973 if ((r != null || (r = fst.result) != null ||
2974 (r = snd.result) != null) &&
2975 (d == null || d.compareAndSet(0, 1))) {
2976 Throwable ex; Object t;
2977 if (r instanceof AltResult) {
2978 ex = ((AltResult)r).ex;
2979 t = null;
2980 }
2981 else {
2982 ex = null;
2983 t = r;
2984 }
2985 dst.internalComplete(t, ex);
2986 }
2987 fst.helpPostComplete();
2988 snd.helpPostComplete();
2989 return dst;
2990 }
2991
2992
2993 /* ------------- Control and status methods -------------- */
2994
2995 /**
2996 * If not already completed, completes this CompletableFuture with
2997 * a {@link CancellationException}. Dependent CompletableFutures
2998 * that have not already completed will also complete
2999 * exceptionally, with a {@link CompletionException} caused by
3000 * this {@code CancellationException}.
3001 *
3002 * @param mayInterruptIfRunning this value has no effect in this
3003 * implementation because interrupts are not used to control
3004 * processing.
3005 *
3006 * @return {@code true} if this task is now cancelled
3007 */
3008 public boolean cancel(boolean mayInterruptIfRunning) {
3009 boolean cancelled = (result == null) &&
3010 UNSAFE.compareAndSwapObject
3011 (this, RESULT, null, new AltResult(new CancellationException()));
3012 postComplete();
3013 return cancelled || isCancelled();
3014 }
3015
3016 /**
3017 * Returns {@code true} if this CompletableFuture was cancelled
3018 * before it completed normally.
3019 *
3020 * @return {@code true} if this CompletableFuture was cancelled
3021 * before it completed normally
3022 */
3023 public boolean isCancelled() {
3024 Object r;
3025 return ((r = result) instanceof AltResult) &&
3026 (((AltResult)r).ex instanceof CancellationException);
3027 }
3028
3029 /**
3030 * Forcibly sets or resets the value subsequently returned by
3031 * method {@link #get()} and related methods, whether or not
3032 * already completed. This method is designed for use only in
3033 * error recovery actions, and even in such situations may result
3034 * in ongoing dependent completions using established versus
3035 * overwritten outcomes.
3036 *
3037 * @param value the completion value
3038 */
3039 public void obtrudeValue(T value) {
3040 result = (value == null) ? NIL : value;
3041 postComplete();
3042 }
3043
3044 /**
3045 * Forcibly causes subsequent invocations of method {@link #get()}
3046 * and related methods to throw the given exception, whether or
3047 * not already completed. This method is designed for use only in
3048 * recovery actions, and even in such situations may result in
3049 * ongoing dependent completions using established versus
3050 * overwritten outcomes.
3051 *
3052 * @param ex the exception
3053 */
3054 public void obtrudeException(Throwable ex) {
3055 if (ex == null) throw new NullPointerException();
3056 result = new AltResult(ex);
3057 postComplete();
3058 }
3059
3060 /**
3061 * Returns the estimated number of CompletableFutures whose
3062 * completions are awaiting completion of this CompletableFuture.
3063 * This method is designed for use in monitoring system state, not
3064 * for synchronization control.
3065 *
3066 * @return the number of dependent CompletableFutures
3067 */
3068 public int getNumberOfDependents() {
3069 int count = 0;
3070 for (CompletionNode p = completions; p != null; p = p.next)
3071 ++count;
3072 return count;
3073 }
3074
3075 /**
3076 * Returns a string identifying this CompletableFuture, as well as
3077 * its completion state. The state, in brackets, contains the
3078 * String {@code "Completed Normally"} or the String {@code
3079 * "Completed Exceptionally"}, or the String {@code "Not
3080 * completed"} followed by the number of CompletableFutures
3081 * dependent upon its completion, if any.
3082 *
3083 * @return a string identifying this CompletableFuture, as well as its state
3084 */
3085 public String toString() {
3086 Object r = result;
3087 int count;
3088 return super.toString() +
3089 ((r == null) ?
3090 (((count = getNumberOfDependents()) == 0) ?
3091 "[Not completed]" :
3092 "[Not completed, " + count + " dependents]") :
3093 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3094 "[Completed exceptionally]" :
3095 "[Completed normally]"));
3096 }
3097
3098 // Unsafe mechanics
3099 private static final sun.misc.Unsafe UNSAFE;
3100 private static final long RESULT;
3101 private static final long WAITERS;
3102 private static final long COMPLETIONS;
3103 static {
3104 try {
3105 UNSAFE = sun.misc.Unsafe.getUnsafe();
3106 Class<?> k = CompletableFuture.class;
3107 RESULT = UNSAFE.objectFieldOffset
3108 (k.getDeclaredField("result"));
3109 WAITERS = UNSAFE.objectFieldOffset
3110 (k.getDeclaredField("waiters"));
3111 COMPLETIONS = UNSAFE.objectFieldOffset
3112 (k.getDeclaredField("completions"));
3113 } catch (Exception e) {
3114 throw new Error(e);
3115 }
3116 }
3117 }