ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.66
Committed: Mon Mar 18 05:37:20 2013 UTC (11 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.65: +243 -188 lines
Log Message:
javadoc rewrite

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on
37 * user-provided Functions, Consumers, or Runnables. The appropriate
38 * form to use depends on whether actions require arguments and/or
39 * produce results. Completion of a dependent action will trigger the
40 * completion of another CompletableFuture. Actions may also be
41 * triggered after either or both the current and another
42 * CompletableFuture complete. Multiple CompletableFutures may also
43 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44 * {@link #allOf(CompletableFuture...)}.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, actions supplied for dependent completions of another
48 * CompletableFuture may do so, depending on whether they are provided
49 * via one of the <em>async</em> methods (that is, methods with names
50 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51 * methods provide a way to commence asynchronous processing of an
52 * action using either a given {@link Executor} or by default the
53 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54 * debugging, and tracking, all generated asynchronous tasks are
55 * instances of the marker interface {@link AsynchronousCompletionTask}.
56 *
57 * <p>Actions supplied for dependent completions of <em>non-async</em>
58 * methods may be performed by the thread that completes the current
59 * CompletableFuture, or by any other caller of these methods. There
60 * are no guarantees about the order of processing completions unless
61 * constrained by these methods.
62 *
63 * <p>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional completion.
66 * Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}.
68 *
69 * <p>Upon exceptional completion (including cancellation), or when a
70 * completion entails an additional computation which terminates
71 * abruptly with an (unchecked) exception or error, then all of their
72 * dependent completions (and their dependents in turn) generally act
73 * as {@code completeExceptionally} with a {@link CompletionException}
74 * holding that exception as its cause. However, the {@link
75 * #exceptionally exceptionally} and {@link #handle handle}
76 * completions <em>are</em> able to handle exceptional completions of
77 * the CompletableFutures they depend on.
78 *
79 * <p>In case of exceptional completion with a CompletionException,
80 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81 * {@link ExecutionException} with the same cause as held in the
82 * corresponding CompletionException. However, in these cases,
83 * methods {@link #join()} and {@link #getNow} throw the
84 * CompletionException, which simplifies usage.
85 *
86 * @author Doug Lea
87 * @since 1.8
88 */
89 public class CompletableFuture<T> implements Future<T> {
90
91 /*
92 * Overview:
93 *
94 * 1. Non-nullness of field result (set via CAS) indicates done.
95 * An AltResult is used to box null as a result, as well as to
96 * hold exceptions. Using a single field makes completion fast
97 * and simple to detect and trigger, at the expense of a lot of
98 * encoding and decoding that infiltrates many methods. One minor
99 * simplification relies on the (static) NIL (to box null results)
100 * being the only AltResult with a null exception field, so we
101 * don't usually need explicit comparisons with NIL. The CF
102 * exception propagation mechanics surrounding decoding rely on
103 * unchecked casts of decoded results really being unchecked,
104 * where user type errors are caught at point of use, as is
105 * currently the case in Java. These are highlighted by using
106 * SuppressWarnings-annotated temporaries.
107 *
108 * 2. Waiters are held in a Treiber stack similar to the one used
109 * in FutureTask, Phaser, and SynchronousQueue. See their
110 * internal documentation for algorithmic details.
111 *
112 * 3. Completions are also kept in a list/stack, and pulled off
113 * and run when completion is triggered. (We could even use the
114 * same stack as for waiters, but would give up the potential
115 * parallelism obtained because woken waiters help release/run
116 * others -- see method postComplete). Because post-processing
117 * may race with direct calls, class Completion opportunistically
118 * extends AtomicInteger so callers can claim the action via
119 * compareAndSet(0, 1). The Completion.run methods are all
120 * written a boringly similar uniform way (that sometimes includes
121 * unnecessary-looking checks, kept to maintain uniformity).
122 * There are enough dimensions upon which they differ that
123 * attempts to factor commonalities while maintaining efficiency
124 * require more lines of code than they would save.
125 *
126 * 4. The exported then/and/or methods do support a bit of
127 * factoring (see doThenApply etc). They must cope with the
128 * intrinsic races surrounding addition of a dependent action
129 * versus performing the action directly because the task is
130 * already complete. For example, a CF may not be complete upon
131 * entry, so a dependent completion is added, but by the time it
132 * is added, the target CF is complete, so must be directly
133 * executed. This is all done while avoiding unnecessary object
134 * construction in safe-bypass cases.
135 */
136
137 // preliminaries
138
139 static final class AltResult {
140 final Throwable ex; // null only for NIL
141 AltResult(Throwable ex) { this.ex = ex; }
142 }
143
144 static final AltResult NIL = new AltResult(null);
145
146 // Fields
147
148 volatile Object result; // Either the result or boxed AltResult
149 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
150 volatile CompletionNode completions; // list (Treiber stack) of completions
151
152 // Basic utilities for triggering and processing completions
153
154 /**
155 * Removes and signals all waiting threads and runs all completions.
156 */
157 final void postComplete() {
158 WaitNode q; Thread t;
159 while ((q = waiters) != null) {
160 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
161 (t = q.thread) != null) {
162 q.thread = null;
163 LockSupport.unpark(t);
164 }
165 }
166
167 CompletionNode h; Completion c;
168 while ((h = completions) != null) {
169 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
170 (c = h.completion) != null)
171 c.run();
172 }
173 }
174
175 /**
176 * Triggers completion with the encoding of the given arguments:
177 * if the exception is non-null, encodes it as a wrapped
178 * CompletionException unless it is one already. Otherwise uses
179 * the given result, boxed as NIL if null.
180 */
181 final void internalComplete(Object v, Throwable ex) {
182 if (result == null)
183 UNSAFE.compareAndSwapObject
184 (this, RESULT, null,
185 (ex == null) ? (v == null) ? NIL : v :
186 new AltResult((ex instanceof CompletionException) ? ex :
187 new CompletionException(ex)));
188 postComplete(); // help out even if not triggered
189 }
190
191 /**
192 * If triggered, helps release and/or process completions.
193 */
194 final void helpPostComplete() {
195 if (result != null)
196 postComplete();
197 }
198
199 /* ------------- waiting for completions -------------- */
200
201 /** Number of processors, for spin control */
202 static final int NCPU = Runtime.getRuntime().availableProcessors();
203
204 /**
205 * Heuristic spin value for waitingGet() before blocking on
206 * multiprocessors
207 */
208 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
209
210 /**
211 * Linked nodes to record waiting threads in a Treiber stack. See
212 * other classes such as Phaser and SynchronousQueue for more
213 * detailed explanation. This class implements ManagedBlocker to
214 * avoid starvation when blocking actions pile up in
215 * ForkJoinPools.
216 */
217 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
218 long nanos; // wait time if timed
219 final long deadline; // non-zero if timed
220 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
221 volatile Thread thread;
222 volatile WaitNode next;
223 WaitNode(boolean interruptible, long nanos, long deadline) {
224 this.thread = Thread.currentThread();
225 this.interruptControl = interruptible ? 1 : 0;
226 this.nanos = nanos;
227 this.deadline = deadline;
228 }
229 public boolean isReleasable() {
230 if (thread == null)
231 return true;
232 if (Thread.interrupted()) {
233 int i = interruptControl;
234 interruptControl = -1;
235 if (i > 0)
236 return true;
237 }
238 if (deadline != 0L &&
239 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
240 thread = null;
241 return true;
242 }
243 return false;
244 }
245 public boolean block() {
246 if (isReleasable())
247 return true;
248 else if (deadline == 0L)
249 LockSupport.park(this);
250 else if (nanos > 0L)
251 LockSupport.parkNanos(this, nanos);
252 return isReleasable();
253 }
254 }
255
256 /**
257 * Returns raw result after waiting, or null if interruptible and
258 * interrupted.
259 */
260 private Object waitingGet(boolean interruptible) {
261 WaitNode q = null;
262 boolean queued = false;
263 int spins = SPINS;
264 for (Object r;;) {
265 if ((r = result) != null) {
266 if (q != null) { // suppress unpark
267 q.thread = null;
268 if (q.interruptControl < 0) {
269 if (interruptible) {
270 removeWaiter(q);
271 return null;
272 }
273 Thread.currentThread().interrupt();
274 }
275 }
276 postComplete(); // help release others
277 return r;
278 }
279 else if (spins > 0) {
280 int rnd = ThreadLocalRandom.nextSecondarySeed();
281 if (rnd == 0)
282 rnd = ThreadLocalRandom.current().nextInt();
283 if (rnd >= 0)
284 --spins;
285 }
286 else if (q == null)
287 q = new WaitNode(interruptible, 0L, 0L);
288 else if (!queued)
289 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
290 q.next = waiters, q);
291 else if (interruptible && q.interruptControl < 0) {
292 removeWaiter(q);
293 return null;
294 }
295 else if (q.thread != null && result == null) {
296 try {
297 ForkJoinPool.managedBlock(q);
298 } catch (InterruptedException ex) {
299 q.interruptControl = -1;
300 }
301 }
302 }
303 }
304
305 /**
306 * Awaits completion or aborts on interrupt or timeout.
307 *
308 * @param nanos time to wait
309 * @return raw result
310 */
311 private Object timedAwaitDone(long nanos)
312 throws InterruptedException, TimeoutException {
313 WaitNode q = null;
314 boolean queued = false;
315 for (Object r;;) {
316 if ((r = result) != null) {
317 if (q != null) {
318 q.thread = null;
319 if (q.interruptControl < 0) {
320 removeWaiter(q);
321 throw new InterruptedException();
322 }
323 }
324 postComplete();
325 return r;
326 }
327 else if (q == null) {
328 if (nanos <= 0L)
329 throw new TimeoutException();
330 long d = System.nanoTime() + nanos;
331 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
332 }
333 else if (!queued)
334 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
335 q.next = waiters, q);
336 else if (q.interruptControl < 0) {
337 removeWaiter(q);
338 throw new InterruptedException();
339 }
340 else if (q.nanos <= 0L) {
341 if (result == null) {
342 removeWaiter(q);
343 throw new TimeoutException();
344 }
345 }
346 else if (q.thread != null && result == null) {
347 try {
348 ForkJoinPool.managedBlock(q);
349 } catch (InterruptedException ex) {
350 q.interruptControl = -1;
351 }
352 }
353 }
354 }
355
356 /**
357 * Tries to unlink a timed-out or interrupted wait node to avoid
358 * accumulating garbage. Internal nodes are simply unspliced
359 * without CAS since it is harmless if they are traversed anyway
360 * by releasers. To avoid effects of unsplicing from already
361 * removed nodes, the list is retraversed in case of an apparent
362 * race. This is slow when there are a lot of nodes, but we don't
363 * expect lists to be long enough to outweigh higher-overhead
364 * schemes.
365 */
366 private void removeWaiter(WaitNode node) {
367 if (node != null) {
368 node.thread = null;
369 retry:
370 for (;;) { // restart on removeWaiter race
371 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
372 s = q.next;
373 if (q.thread != null)
374 pred = q;
375 else if (pred != null) {
376 pred.next = s;
377 if (pred.thread == null) // check for race
378 continue retry;
379 }
380 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
381 continue retry;
382 }
383 break;
384 }
385 }
386 }
387
388 /* ------------- Async tasks -------------- */
389
390 /**
391 * A marker interface identifying asynchronous tasks produced by
392 * {@code async} methods. This may be useful for monitoring,
393 * debugging, and tracking asynchronous activities.
394 *
395 * @since 1.8
396 */
397 public static interface AsynchronousCompletionTask {
398 }
399
400 /** Base class can act as either FJ or plain Runnable */
401 abstract static class Async extends ForkJoinTask<Void>
402 implements Runnable, AsynchronousCompletionTask {
403 public final Void getRawResult() { return null; }
404 public final void setRawResult(Void v) { }
405 public final void run() { exec(); }
406 }
407
408 static final class AsyncRun extends Async {
409 final Runnable fn;
410 final CompletableFuture<Void> dst;
411 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
412 this.fn = fn; this.dst = dst;
413 }
414 public final boolean exec() {
415 CompletableFuture<Void> d; Throwable ex;
416 if ((d = this.dst) != null && d.result == null) {
417 try {
418 fn.run();
419 ex = null;
420 } catch (Throwable rex) {
421 ex = rex;
422 }
423 d.internalComplete(null, ex);
424 }
425 return true;
426 }
427 private static final long serialVersionUID = 5232453952276885070L;
428 }
429
430 static final class AsyncSupply<U> extends Async {
431 final Supplier<U> fn;
432 final CompletableFuture<U> dst;
433 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
434 this.fn = fn; this.dst = dst;
435 }
436 public final boolean exec() {
437 CompletableFuture<U> d; U u; Throwable ex;
438 if ((d = this.dst) != null && d.result == null) {
439 try {
440 u = fn.get();
441 ex = null;
442 } catch (Throwable rex) {
443 ex = rex;
444 u = null;
445 }
446 d.internalComplete(u, ex);
447 }
448 return true;
449 }
450 private static final long serialVersionUID = 5232453952276885070L;
451 }
452
453 static final class AsyncApply<T,U> extends Async {
454 final T arg;
455 final Function<? super T,? extends U> fn;
456 final CompletableFuture<U> dst;
457 AsyncApply(T arg, Function<? super T,? extends U> fn,
458 CompletableFuture<U> dst) {
459 this.arg = arg; this.fn = fn; this.dst = dst;
460 }
461 public final boolean exec() {
462 CompletableFuture<U> d; U u; Throwable ex;
463 if ((d = this.dst) != null && d.result == null) {
464 try {
465 u = fn.apply(arg);
466 ex = null;
467 } catch (Throwable rex) {
468 ex = rex;
469 u = null;
470 }
471 d.internalComplete(u, ex);
472 }
473 return true;
474 }
475 private static final long serialVersionUID = 5232453952276885070L;
476 }
477
478 static final class AsyncBiApply<T,U,V> extends Async {
479 final T arg1;
480 final U arg2;
481 final BiFunction<? super T,? super U,? extends V> fn;
482 final CompletableFuture<V> dst;
483 AsyncBiApply(T arg1, U arg2,
484 BiFunction<? super T,? super U,? extends V> fn,
485 CompletableFuture<V> dst) {
486 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
487 }
488 public final boolean exec() {
489 CompletableFuture<V> d; V v; Throwable ex;
490 if ((d = this.dst) != null && d.result == null) {
491 try {
492 v = fn.apply(arg1, arg2);
493 ex = null;
494 } catch (Throwable rex) {
495 ex = rex;
496 v = null;
497 }
498 d.internalComplete(v, ex);
499 }
500 return true;
501 }
502 private static final long serialVersionUID = 5232453952276885070L;
503 }
504
505 static final class AsyncAccept<T> extends Async {
506 final T arg;
507 final Consumer<? super T> fn;
508 final CompletableFuture<Void> dst;
509 AsyncAccept(T arg, Consumer<? super T> fn,
510 CompletableFuture<Void> dst) {
511 this.arg = arg; this.fn = fn; this.dst = dst;
512 }
513 public final boolean exec() {
514 CompletableFuture<Void> d; Throwable ex;
515 if ((d = this.dst) != null && d.result == null) {
516 try {
517 fn.accept(arg);
518 ex = null;
519 } catch (Throwable rex) {
520 ex = rex;
521 }
522 d.internalComplete(null, ex);
523 }
524 return true;
525 }
526 private static final long serialVersionUID = 5232453952276885070L;
527 }
528
529 static final class AsyncBiAccept<T,U> extends Async {
530 final T arg1;
531 final U arg2;
532 final BiConsumer<? super T,? super U> fn;
533 final CompletableFuture<Void> dst;
534 AsyncBiAccept(T arg1, U arg2,
535 BiConsumer<? super T,? super U> fn,
536 CompletableFuture<Void> dst) {
537 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
538 }
539 public final boolean exec() {
540 CompletableFuture<Void> d; Throwable ex;
541 if ((d = this.dst) != null && d.result == null) {
542 try {
543 fn.accept(arg1, arg2);
544 ex = null;
545 } catch (Throwable rex) {
546 ex = rex;
547 }
548 d.internalComplete(null, ex);
549 }
550 return true;
551 }
552 private static final long serialVersionUID = 5232453952276885070L;
553 }
554
555 static final class AsyncCompose<T,U> extends Async {
556 final T arg;
557 final Function<? super T, CompletableFuture<U>> fn;
558 final CompletableFuture<U> dst;
559 AsyncCompose(T arg,
560 Function<? super T, CompletableFuture<U>> fn,
561 CompletableFuture<U> dst) {
562 this.arg = arg; this.fn = fn; this.dst = dst;
563 }
564 public final boolean exec() {
565 CompletableFuture<U> d, fr; U u; Throwable ex;
566 if ((d = this.dst) != null && d.result == null) {
567 try {
568 fr = fn.apply(arg);
569 ex = (fr == null) ? new NullPointerException() : null;
570 } catch (Throwable rex) {
571 ex = rex;
572 fr = null;
573 }
574 if (ex != null)
575 u = null;
576 else {
577 Object r = fr.result;
578 if (r instanceof AltResult) {
579 ex = ((AltResult)r).ex;
580 u = null;
581 }
582 else {
583 @SuppressWarnings("unchecked") U ur = (U) r;
584 u = ur;
585 }
586 }
587 d.internalComplete(u, ex);
588 }
589 return true;
590 }
591 private static final long serialVersionUID = 5232453952276885070L;
592 }
593
594 /* ------------- Completions -------------- */
595
596 /**
597 * Simple linked list nodes to record completions, used in
598 * basically the same way as WaitNodes. (We separate nodes from
599 * the Completions themselves mainly because for the And and Or
600 * methods, the same Completion object resides in two lists.)
601 */
602 static final class CompletionNode {
603 final Completion completion;
604 volatile CompletionNode next;
605 CompletionNode(Completion completion) { this.completion = completion; }
606 }
607
608 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
609 abstract static class Completion extends AtomicInteger implements Runnable {
610 }
611
612 static final class ApplyCompletion<T,U> extends Completion {
613 final CompletableFuture<? extends T> src;
614 final Function<? super T,? extends U> fn;
615 final CompletableFuture<U> dst;
616 final Executor executor;
617 ApplyCompletion(CompletableFuture<? extends T> src,
618 Function<? super T,? extends U> fn,
619 CompletableFuture<U> dst, Executor executor) {
620 this.src = src; this.fn = fn; this.dst = dst;
621 this.executor = executor;
622 }
623 public final void run() {
624 final CompletableFuture<? extends T> a;
625 final Function<? super T,? extends U> fn;
626 final CompletableFuture<U> dst;
627 Object r; T t; Throwable ex;
628 if ((dst = this.dst) != null &&
629 (fn = this.fn) != null &&
630 (a = this.src) != null &&
631 (r = a.result) != null &&
632 compareAndSet(0, 1)) {
633 if (r instanceof AltResult) {
634 ex = ((AltResult)r).ex;
635 t = null;
636 }
637 else {
638 ex = null;
639 @SuppressWarnings("unchecked") T tr = (T) r;
640 t = tr;
641 }
642 Executor e = executor;
643 U u = null;
644 if (ex == null) {
645 try {
646 if (e != null)
647 e.execute(new AsyncApply<T,U>(t, fn, dst));
648 else
649 u = fn.apply(t);
650 } catch (Throwable rex) {
651 ex = rex;
652 }
653 }
654 if (e == null || ex != null)
655 dst.internalComplete(u, ex);
656 }
657 }
658 private static final long serialVersionUID = 5232453952276885070L;
659 }
660
661 static final class AcceptCompletion<T> extends Completion {
662 final CompletableFuture<? extends T> src;
663 final Consumer<? super T> fn;
664 final CompletableFuture<Void> dst;
665 final Executor executor;
666 AcceptCompletion(CompletableFuture<? extends T> src,
667 Consumer<? super T> fn,
668 CompletableFuture<Void> dst, Executor executor) {
669 this.src = src; this.fn = fn; this.dst = dst;
670 this.executor = executor;
671 }
672 public final void run() {
673 final CompletableFuture<? extends T> a;
674 final Consumer<? super T> fn;
675 final CompletableFuture<Void> dst;
676 Object r; T t; Throwable ex;
677 if ((dst = this.dst) != null &&
678 (fn = this.fn) != null &&
679 (a = this.src) != null &&
680 (r = a.result) != null &&
681 compareAndSet(0, 1)) {
682 if (r instanceof AltResult) {
683 ex = ((AltResult)r).ex;
684 t = null;
685 }
686 else {
687 ex = null;
688 @SuppressWarnings("unchecked") T tr = (T) r;
689 t = tr;
690 }
691 Executor e = executor;
692 if (ex == null) {
693 try {
694 if (e != null)
695 e.execute(new AsyncAccept<T>(t, fn, dst));
696 else
697 fn.accept(t);
698 } catch (Throwable rex) {
699 ex = rex;
700 }
701 }
702 if (e == null || ex != null)
703 dst.internalComplete(null, ex);
704 }
705 }
706 private static final long serialVersionUID = 5232453952276885070L;
707 }
708
709 static final class RunCompletion<T> extends Completion {
710 final CompletableFuture<? extends T> src;
711 final Runnable fn;
712 final CompletableFuture<Void> dst;
713 final Executor executor;
714 RunCompletion(CompletableFuture<? extends T> src,
715 Runnable fn,
716 CompletableFuture<Void> dst,
717 Executor executor) {
718 this.src = src; this.fn = fn; this.dst = dst;
719 this.executor = executor;
720 }
721 public final void run() {
722 final CompletableFuture<? extends T> a;
723 final Runnable fn;
724 final CompletableFuture<Void> dst;
725 Object r; Throwable ex;
726 if ((dst = this.dst) != null &&
727 (fn = this.fn) != null &&
728 (a = this.src) != null &&
729 (r = a.result) != null &&
730 compareAndSet(0, 1)) {
731 if (r instanceof AltResult)
732 ex = ((AltResult)r).ex;
733 else
734 ex = null;
735 Executor e = executor;
736 if (ex == null) {
737 try {
738 if (e != null)
739 e.execute(new AsyncRun(fn, dst));
740 else
741 fn.run();
742 } catch (Throwable rex) {
743 ex = rex;
744 }
745 }
746 if (e == null || ex != null)
747 dst.internalComplete(null, ex);
748 }
749 }
750 private static final long serialVersionUID = 5232453952276885070L;
751 }
752
753 static final class BiApplyCompletion<T,U,V> extends Completion {
754 final CompletableFuture<? extends T> src;
755 final CompletableFuture<? extends U> snd;
756 final BiFunction<? super T,? super U,? extends V> fn;
757 final CompletableFuture<V> dst;
758 final Executor executor;
759 BiApplyCompletion(CompletableFuture<? extends T> src,
760 CompletableFuture<? extends U> snd,
761 BiFunction<? super T,? super U,? extends V> fn,
762 CompletableFuture<V> dst, Executor executor) {
763 this.src = src; this.snd = snd;
764 this.fn = fn; this.dst = dst;
765 this.executor = executor;
766 }
767 public final void run() {
768 final CompletableFuture<? extends T> a;
769 final CompletableFuture<? extends U> b;
770 final BiFunction<? super T,? super U,? extends V> fn;
771 final CompletableFuture<V> dst;
772 Object r, s; T t; U u; Throwable ex;
773 if ((dst = this.dst) != null &&
774 (fn = this.fn) != null &&
775 (a = this.src) != null &&
776 (r = a.result) != null &&
777 (b = this.snd) != null &&
778 (s = b.result) != null &&
779 compareAndSet(0, 1)) {
780 if (r instanceof AltResult) {
781 ex = ((AltResult)r).ex;
782 t = null;
783 }
784 else {
785 ex = null;
786 @SuppressWarnings("unchecked") T tr = (T) r;
787 t = tr;
788 }
789 if (ex != null)
790 u = null;
791 else if (s instanceof AltResult) {
792 ex = ((AltResult)s).ex;
793 u = null;
794 }
795 else {
796 @SuppressWarnings("unchecked") U us = (U) s;
797 u = us;
798 }
799 Executor e = executor;
800 V v = null;
801 if (ex == null) {
802 try {
803 if (e != null)
804 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
805 else
806 v = fn.apply(t, u);
807 } catch (Throwable rex) {
808 ex = rex;
809 }
810 }
811 if (e == null || ex != null)
812 dst.internalComplete(v, ex);
813 }
814 }
815 private static final long serialVersionUID = 5232453952276885070L;
816 }
817
818 static final class BiAcceptCompletion<T,U> extends Completion {
819 final CompletableFuture<? extends T> src;
820 final CompletableFuture<? extends U> snd;
821 final BiConsumer<? super T,? super U> fn;
822 final CompletableFuture<Void> dst;
823 final Executor executor;
824 BiAcceptCompletion(CompletableFuture<? extends T> src,
825 CompletableFuture<? extends U> snd,
826 BiConsumer<? super T,? super U> fn,
827 CompletableFuture<Void> dst, Executor executor) {
828 this.src = src; this.snd = snd;
829 this.fn = fn; this.dst = dst;
830 this.executor = executor;
831 }
832 public final void run() {
833 final CompletableFuture<? extends T> a;
834 final CompletableFuture<? extends U> b;
835 final BiConsumer<? super T,? super U> fn;
836 final CompletableFuture<Void> dst;
837 Object r, s; T t; U u; Throwable ex;
838 if ((dst = this.dst) != null &&
839 (fn = this.fn) != null &&
840 (a = this.src) != null &&
841 (r = a.result) != null &&
842 (b = this.snd) != null &&
843 (s = b.result) != null &&
844 compareAndSet(0, 1)) {
845 if (r instanceof AltResult) {
846 ex = ((AltResult)r).ex;
847 t = null;
848 }
849 else {
850 ex = null;
851 @SuppressWarnings("unchecked") T tr = (T) r;
852 t = tr;
853 }
854 if (ex != null)
855 u = null;
856 else if (s instanceof AltResult) {
857 ex = ((AltResult)s).ex;
858 u = null;
859 }
860 else {
861 @SuppressWarnings("unchecked") U us = (U) s;
862 u = us;
863 }
864 Executor e = executor;
865 if (ex == null) {
866 try {
867 if (e != null)
868 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
869 else
870 fn.accept(t, u);
871 } catch (Throwable rex) {
872 ex = rex;
873 }
874 }
875 if (e == null || ex != null)
876 dst.internalComplete(null, ex);
877 }
878 }
879 private static final long serialVersionUID = 5232453952276885070L;
880 }
881
882 static final class BiRunCompletion<T> extends Completion {
883 final CompletableFuture<? extends T> src;
884 final CompletableFuture<?> snd;
885 final Runnable fn;
886 final CompletableFuture<Void> dst;
887 final Executor executor;
888 BiRunCompletion(CompletableFuture<? extends T> src,
889 CompletableFuture<?> snd,
890 Runnable fn,
891 CompletableFuture<Void> dst, Executor executor) {
892 this.src = src; this.snd = snd;
893 this.fn = fn; this.dst = dst;
894 this.executor = executor;
895 }
896 public final void run() {
897 final CompletableFuture<? extends T> a;
898 final CompletableFuture<?> b;
899 final Runnable fn;
900 final CompletableFuture<Void> dst;
901 Object r, s; Throwable ex;
902 if ((dst = this.dst) != null &&
903 (fn = this.fn) != null &&
904 (a = this.src) != null &&
905 (r = a.result) != null &&
906 (b = this.snd) != null &&
907 (s = b.result) != null &&
908 compareAndSet(0, 1)) {
909 if (r instanceof AltResult)
910 ex = ((AltResult)r).ex;
911 else
912 ex = null;
913 if (ex == null && (s instanceof AltResult))
914 ex = ((AltResult)s).ex;
915 Executor e = executor;
916 if (ex == null) {
917 try {
918 if (e != null)
919 e.execute(new AsyncRun(fn, dst));
920 else
921 fn.run();
922 } catch (Throwable rex) {
923 ex = rex;
924 }
925 }
926 if (e == null || ex != null)
927 dst.internalComplete(null, ex);
928 }
929 }
930 private static final long serialVersionUID = 5232453952276885070L;
931 }
932
933 static final class AndCompletion extends Completion {
934 final CompletableFuture<?> src;
935 final CompletableFuture<?> snd;
936 final CompletableFuture<Void> dst;
937 AndCompletion(CompletableFuture<?> src,
938 CompletableFuture<?> snd,
939 CompletableFuture<Void> dst) {
940 this.src = src; this.snd = snd; this.dst = dst;
941 }
942 public final void run() {
943 final CompletableFuture<?> a;
944 final CompletableFuture<?> b;
945 final CompletableFuture<Void> dst;
946 Object r, s; Throwable ex;
947 if ((dst = this.dst) != null &&
948 (a = this.src) != null &&
949 (r = a.result) != null &&
950 (b = this.snd) != null &&
951 (s = b.result) != null &&
952 compareAndSet(0, 1)) {
953 if (r instanceof AltResult)
954 ex = ((AltResult)r).ex;
955 else
956 ex = null;
957 if (ex == null && (s instanceof AltResult))
958 ex = ((AltResult)s).ex;
959 dst.internalComplete(null, ex);
960 }
961 }
962 private static final long serialVersionUID = 5232453952276885070L;
963 }
964
965 static final class OrApplyCompletion<T,U> extends Completion {
966 final CompletableFuture<? extends T> src;
967 final CompletableFuture<? extends T> snd;
968 final Function<? super T,? extends U> fn;
969 final CompletableFuture<U> dst;
970 final Executor executor;
971 OrApplyCompletion(CompletableFuture<? extends T> src,
972 CompletableFuture<? extends T> snd,
973 Function<? super T,? extends U> fn,
974 CompletableFuture<U> dst, Executor executor) {
975 this.src = src; this.snd = snd;
976 this.fn = fn; this.dst = dst;
977 this.executor = executor;
978 }
979 public final void run() {
980 final CompletableFuture<? extends T> a;
981 final CompletableFuture<? extends T> b;
982 final Function<? super T,? extends U> fn;
983 final CompletableFuture<U> dst;
984 Object r; T t; Throwable ex;
985 if ((dst = this.dst) != null &&
986 (fn = this.fn) != null &&
987 (((a = this.src) != null && (r = a.result) != null) ||
988 ((b = this.snd) != null && (r = b.result) != null)) &&
989 compareAndSet(0, 1)) {
990 if (r instanceof AltResult) {
991 ex = ((AltResult)r).ex;
992 t = null;
993 }
994 else {
995 ex = null;
996 @SuppressWarnings("unchecked") T tr = (T) r;
997 t = tr;
998 }
999 Executor e = executor;
1000 U u = null;
1001 if (ex == null) {
1002 try {
1003 if (e != null)
1004 e.execute(new AsyncApply<T,U>(t, fn, dst));
1005 else
1006 u = fn.apply(t);
1007 } catch (Throwable rex) {
1008 ex = rex;
1009 }
1010 }
1011 if (e == null || ex != null)
1012 dst.internalComplete(u, ex);
1013 }
1014 }
1015 private static final long serialVersionUID = 5232453952276885070L;
1016 }
1017
1018 static final class OrAcceptCompletion<T> extends Completion {
1019 final CompletableFuture<? extends T> src;
1020 final CompletableFuture<? extends T> snd;
1021 final Consumer<? super T> fn;
1022 final CompletableFuture<Void> dst;
1023 final Executor executor;
1024 OrAcceptCompletion(CompletableFuture<? extends T> src,
1025 CompletableFuture<? extends T> snd,
1026 Consumer<? super T> fn,
1027 CompletableFuture<Void> dst, Executor executor) {
1028 this.src = src; this.snd = snd;
1029 this.fn = fn; this.dst = dst;
1030 this.executor = executor;
1031 }
1032 public final void run() {
1033 final CompletableFuture<? extends T> a;
1034 final CompletableFuture<? extends T> b;
1035 final Consumer<? super T> fn;
1036 final CompletableFuture<Void> dst;
1037 Object r; T t; Throwable ex;
1038 if ((dst = this.dst) != null &&
1039 (fn = this.fn) != null &&
1040 (((a = this.src) != null && (r = a.result) != null) ||
1041 ((b = this.snd) != null && (r = b.result) != null)) &&
1042 compareAndSet(0, 1)) {
1043 if (r instanceof AltResult) {
1044 ex = ((AltResult)r).ex;
1045 t = null;
1046 }
1047 else {
1048 ex = null;
1049 @SuppressWarnings("unchecked") T tr = (T) r;
1050 t = tr;
1051 }
1052 Executor e = executor;
1053 if (ex == null) {
1054 try {
1055 if (e != null)
1056 e.execute(new AsyncAccept<T>(t, fn, dst));
1057 else
1058 fn.accept(t);
1059 } catch (Throwable rex) {
1060 ex = rex;
1061 }
1062 }
1063 if (e == null || ex != null)
1064 dst.internalComplete(null, ex);
1065 }
1066 }
1067 private static final long serialVersionUID = 5232453952276885070L;
1068 }
1069
1070 static final class OrRunCompletion<T> extends Completion {
1071 final CompletableFuture<? extends T> src;
1072 final CompletableFuture<?> snd;
1073 final Runnable fn;
1074 final CompletableFuture<Void> dst;
1075 final Executor executor;
1076 OrRunCompletion(CompletableFuture<? extends T> src,
1077 CompletableFuture<?> snd,
1078 Runnable fn,
1079 CompletableFuture<Void> dst, Executor executor) {
1080 this.src = src; this.snd = snd;
1081 this.fn = fn; this.dst = dst;
1082 this.executor = executor;
1083 }
1084 public final void run() {
1085 final CompletableFuture<? extends T> a;
1086 final CompletableFuture<?> b;
1087 final Runnable fn;
1088 final CompletableFuture<Void> dst;
1089 Object r; Throwable ex;
1090 if ((dst = this.dst) != null &&
1091 (fn = this.fn) != null &&
1092 (((a = this.src) != null && (r = a.result) != null) ||
1093 ((b = this.snd) != null && (r = b.result) != null)) &&
1094 compareAndSet(0, 1)) {
1095 if (r instanceof AltResult)
1096 ex = ((AltResult)r).ex;
1097 else
1098 ex = null;
1099 Executor e = executor;
1100 if (ex == null) {
1101 try {
1102 if (e != null)
1103 e.execute(new AsyncRun(fn, dst));
1104 else
1105 fn.run();
1106 } catch (Throwable rex) {
1107 ex = rex;
1108 }
1109 }
1110 if (e == null || ex != null)
1111 dst.internalComplete(null, ex);
1112 }
1113 }
1114 private static final long serialVersionUID = 5232453952276885070L;
1115 }
1116
1117 static final class OrCompletion extends Completion {
1118 final CompletableFuture<?> src;
1119 final CompletableFuture<?> snd;
1120 final CompletableFuture<?> dst;
1121 OrCompletion(CompletableFuture<?> src,
1122 CompletableFuture<?> snd,
1123 CompletableFuture<?> dst) {
1124 this.src = src; this.snd = snd; this.dst = dst;
1125 }
1126 public final void run() {
1127 final CompletableFuture<?> a;
1128 final CompletableFuture<?> b;
1129 final CompletableFuture<?> dst;
1130 Object r, t; Throwable ex;
1131 if ((dst = this.dst) != null &&
1132 (((a = this.src) != null && (r = a.result) != null) ||
1133 ((b = this.snd) != null && (r = b.result) != null)) &&
1134 compareAndSet(0, 1)) {
1135 if (r instanceof AltResult) {
1136 ex = ((AltResult)r).ex;
1137 t = null;
1138 }
1139 else {
1140 ex = null;
1141 t = r;
1142 }
1143 dst.internalComplete(t, ex);
1144 }
1145 }
1146 private static final long serialVersionUID = 5232453952276885070L;
1147 }
1148
1149 static final class ExceptionCompletion<T> extends Completion {
1150 final CompletableFuture<? extends T> src;
1151 final Function<? super Throwable, ? extends T> fn;
1152 final CompletableFuture<T> dst;
1153 ExceptionCompletion(CompletableFuture<? extends T> src,
1154 Function<? super Throwable, ? extends T> fn,
1155 CompletableFuture<T> dst) {
1156 this.src = src; this.fn = fn; this.dst = dst;
1157 }
1158 public final void run() {
1159 final CompletableFuture<? extends T> a;
1160 final Function<? super Throwable, ? extends T> fn;
1161 final CompletableFuture<T> dst;
1162 Object r; T t = null; Throwable ex, dx = null;
1163 if ((dst = this.dst) != null &&
1164 (fn = this.fn) != null &&
1165 (a = this.src) != null &&
1166 (r = a.result) != null &&
1167 compareAndSet(0, 1)) {
1168 if ((r instanceof AltResult) &&
1169 (ex = ((AltResult)r).ex) != null) {
1170 try {
1171 t = fn.apply(ex);
1172 } catch (Throwable rex) {
1173 dx = rex;
1174 }
1175 }
1176 else {
1177 @SuppressWarnings("unchecked") T tr = (T) r;
1178 t = tr;
1179 }
1180 dst.internalComplete(t, dx);
1181 }
1182 }
1183 private static final long serialVersionUID = 5232453952276885070L;
1184 }
1185
1186 static final class ThenCopy extends Completion {
1187 final CompletableFuture<?> src;
1188 final CompletableFuture<?> dst;
1189 ThenCopy(CompletableFuture<?> src,
1190 CompletableFuture<?> dst) {
1191 this.src = src; this.dst = dst;
1192 }
1193 public final void run() {
1194 final CompletableFuture<?> a;
1195 final CompletableFuture<?> dst;
1196 Object r; Object t; Throwable ex;
1197 if ((dst = this.dst) != null &&
1198 (a = this.src) != null &&
1199 (r = a.result) != null &&
1200 compareAndSet(0, 1)) {
1201 if (r instanceof AltResult) {
1202 ex = ((AltResult)r).ex;
1203 t = null;
1204 }
1205 else {
1206 ex = null;
1207 t = r;
1208 }
1209 dst.internalComplete(t, ex);
1210 }
1211 }
1212 private static final long serialVersionUID = 5232453952276885070L;
1213 }
1214
1215 static final class HandleCompletion<T,U> extends Completion {
1216 final CompletableFuture<? extends T> src;
1217 final BiFunction<? super T, Throwable, ? extends U> fn;
1218 final CompletableFuture<U> dst;
1219 HandleCompletion(CompletableFuture<? extends T> src,
1220 BiFunction<? super T, Throwable, ? extends U> fn,
1221 CompletableFuture<U> dst) {
1222 this.src = src; this.fn = fn; this.dst = dst;
1223 }
1224 public final void run() {
1225 final CompletableFuture<? extends T> a;
1226 final BiFunction<? super T, Throwable, ? extends U> fn;
1227 final CompletableFuture<U> dst;
1228 Object r; T t; Throwable ex;
1229 if ((dst = this.dst) != null &&
1230 (fn = this.fn) != null &&
1231 (a = this.src) != null &&
1232 (r = a.result) != null &&
1233 compareAndSet(0, 1)) {
1234 if (r instanceof AltResult) {
1235 ex = ((AltResult)r).ex;
1236 t = null;
1237 }
1238 else {
1239 ex = null;
1240 @SuppressWarnings("unchecked") T tr = (T) r;
1241 t = tr;
1242 }
1243 U u = null; Throwable dx = null;
1244 try {
1245 u = fn.apply(t, ex);
1246 } catch (Throwable rex) {
1247 dx = rex;
1248 }
1249 dst.internalComplete(u, dx);
1250 }
1251 }
1252 private static final long serialVersionUID = 5232453952276885070L;
1253 }
1254
1255 static final class ComposeCompletion<T,U> extends Completion {
1256 final CompletableFuture<? extends T> src;
1257 final Function<? super T, CompletableFuture<U>> fn;
1258 final CompletableFuture<U> dst;
1259 final Executor executor;
1260 ComposeCompletion(CompletableFuture<? extends T> src,
1261 Function<? super T, CompletableFuture<U>> fn,
1262 CompletableFuture<U> dst, Executor executor) {
1263 this.src = src; this.fn = fn; this.dst = dst;
1264 this.executor = executor;
1265 }
1266 public final void run() {
1267 final CompletableFuture<? extends T> a;
1268 final Function<? super T, CompletableFuture<U>> fn;
1269 final CompletableFuture<U> dst;
1270 Object r; T t; Throwable ex; Executor e;
1271 if ((dst = this.dst) != null &&
1272 (fn = this.fn) != null &&
1273 (a = this.src) != null &&
1274 (r = a.result) != null &&
1275 compareAndSet(0, 1)) {
1276 if (r instanceof AltResult) {
1277 ex = ((AltResult)r).ex;
1278 t = null;
1279 }
1280 else {
1281 ex = null;
1282 @SuppressWarnings("unchecked") T tr = (T) r;
1283 t = tr;
1284 }
1285 CompletableFuture<U> c = null;
1286 U u = null;
1287 boolean complete = false;
1288 if (ex == null) {
1289 if ((e = executor) != null)
1290 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1291 else {
1292 try {
1293 if ((c = fn.apply(t)) == null)
1294 ex = new NullPointerException();
1295 } catch (Throwable rex) {
1296 ex = rex;
1297 }
1298 }
1299 }
1300 if (c != null) {
1301 ThenCopy d = null;
1302 Object s;
1303 if ((s = c.result) == null) {
1304 CompletionNode p = new CompletionNode
1305 (d = new ThenCopy(c, dst));
1306 while ((s = c.result) == null) {
1307 if (UNSAFE.compareAndSwapObject
1308 (c, COMPLETIONS, p.next = c.completions, p))
1309 break;
1310 }
1311 }
1312 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1313 complete = true;
1314 if (s instanceof AltResult) {
1315 ex = ((AltResult)s).ex; // no rewrap
1316 u = null;
1317 }
1318 else {
1319 @SuppressWarnings("unchecked") U us = (U) s;
1320 u = us;
1321 }
1322 }
1323 }
1324 if (complete || ex != null)
1325 dst.internalComplete(u, ex);
1326 if (c != null)
1327 c.helpPostComplete();
1328 }
1329 }
1330 private static final long serialVersionUID = 5232453952276885070L;
1331 }
1332
1333 // public methods
1334
1335 /**
1336 * Creates a new incomplete CompletableFuture.
1337 */
1338 public CompletableFuture() {
1339 }
1340
1341 /**
1342 * Returns a new CompletableFuture that is asynchronously completed
1343 * by a task running in the {@link ForkJoinPool#commonPool()} with
1344 * the value obtained by calling the given Supplier.
1345 *
1346 * @param supplier a function returning the value to be used
1347 * to complete the returned CompletableFuture
1348 * @return the new CompletableFuture
1349 */
1350 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1351 if (supplier == null) throw new NullPointerException();
1352 CompletableFuture<U> f = new CompletableFuture<U>();
1353 ForkJoinPool.commonPool().
1354 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1355 return f;
1356 }
1357
1358 /**
1359 * Returns a new CompletableFuture that is asynchronously completed
1360 * by a task running in the given executor with the value obtained
1361 * by calling the given Supplier.
1362 *
1363 * @param supplier a function returning the value to be used
1364 * to complete the returned CompletableFuture
1365 * @param executor the executor to use for asynchronous execution
1366 * @return the new CompletableFuture
1367 */
1368 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1369 Executor executor) {
1370 if (executor == null || supplier == null)
1371 throw new NullPointerException();
1372 CompletableFuture<U> f = new CompletableFuture<U>();
1373 executor.execute(new AsyncSupply<U>(supplier, f));
1374 return f;
1375 }
1376
1377 /**
1378 * Returns a new CompletableFuture that is asynchronously completed
1379 * by a task running in the {@link ForkJoinPool#commonPool()} after
1380 * it runs the given action.
1381 *
1382 * @param runnable the action to run before completing the
1383 * returned CompletableFuture
1384 * @return the new CompletableFuture
1385 */
1386 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1387 if (runnable == null) throw new NullPointerException();
1388 CompletableFuture<Void> f = new CompletableFuture<Void>();
1389 ForkJoinPool.commonPool().
1390 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1391 return f;
1392 }
1393
1394 /**
1395 * Returns a new CompletableFuture that is asynchronously completed
1396 * by a task running in the given executor after it runs the given
1397 * action.
1398 *
1399 * @param runnable the action to run before completing the
1400 * returned CompletableFuture
1401 * @param executor the executor to use for asynchronous execution
1402 * @return the new CompletableFuture
1403 */
1404 public static CompletableFuture<Void> runAsync(Runnable runnable,
1405 Executor executor) {
1406 if (executor == null || runnable == null)
1407 throw new NullPointerException();
1408 CompletableFuture<Void> f = new CompletableFuture<Void>();
1409 executor.execute(new AsyncRun(runnable, f));
1410 return f;
1411 }
1412
1413 /**
1414 * Returns {@code true} if completed in any fashion: normally,
1415 * exceptionally, or via cancellation.
1416 *
1417 * @return {@code true} if completed
1418 */
1419 public boolean isDone() {
1420 return result != null;
1421 }
1422
1423 /**
1424 * Waits if necessary for this future to complete, and then
1425 * returns its result.
1426 *
1427 * @return the result value
1428 * @throws CancellationException if this future was cancelled
1429 * @throws ExecutionException if this future completed exceptionally
1430 * @throws InterruptedException if the current thread was interrupted
1431 * while waiting
1432 */
1433 public T get() throws InterruptedException, ExecutionException {
1434 Object r; Throwable ex, cause;
1435 if ((r = result) == null && (r = waitingGet(true)) == null)
1436 throw new InterruptedException();
1437 if (!(r instanceof AltResult)) {
1438 @SuppressWarnings("unchecked") T tr = (T) r;
1439 return tr;
1440 }
1441 if ((ex = ((AltResult)r).ex) == null)
1442 return null;
1443 if (ex instanceof CancellationException)
1444 throw (CancellationException)ex;
1445 if ((ex instanceof CompletionException) &&
1446 (cause = ex.getCause()) != null)
1447 ex = cause;
1448 throw new ExecutionException(ex);
1449 }
1450
1451 /**
1452 * Waits if necessary for at most the given time for this future
1453 * to complete, and then returns its result, if available.
1454 *
1455 * @param timeout the maximum time to wait
1456 * @param unit the time unit of the timeout argument
1457 * @return the result value
1458 * @throws CancellationException if this future was cancelled
1459 * @throws ExecutionException if this future completed exceptionally
1460 * @throws InterruptedException if the current thread was interrupted
1461 * while waiting
1462 * @throws TimeoutException if the wait timed out
1463 */
1464 public T get(long timeout, TimeUnit unit)
1465 throws InterruptedException, ExecutionException, TimeoutException {
1466 Object r; Throwable ex, cause;
1467 long nanos = unit.toNanos(timeout);
1468 if (Thread.interrupted())
1469 throw new InterruptedException();
1470 if ((r = result) == null)
1471 r = timedAwaitDone(nanos);
1472 if (!(r instanceof AltResult)) {
1473 @SuppressWarnings("unchecked") T tr = (T) r;
1474 return tr;
1475 }
1476 if ((ex = ((AltResult)r).ex) == null)
1477 return null;
1478 if (ex instanceof CancellationException)
1479 throw (CancellationException)ex;
1480 if ((ex instanceof CompletionException) &&
1481 (cause = ex.getCause()) != null)
1482 ex = cause;
1483 throw new ExecutionException(ex);
1484 }
1485
1486 /**
1487 * Returns the result value when complete, or throws an
1488 * (unchecked) exception if completed exceptionally. To better
1489 * conform with the use of common functional forms, if a
1490 * computation involved in the completion of this
1491 * CompletableFuture threw an exception, this method throws an
1492 * (unchecked) {@link CompletionException} with the underlying
1493 * exception as its cause.
1494 *
1495 * @return the result value
1496 * @throws CancellationException if the computation was cancelled
1497 * @throws CompletionException if this future completed
1498 * exceptionally or a completion computation threw an exception
1499 */
1500 public T join() {
1501 Object r; Throwable ex;
1502 if ((r = result) == null)
1503 r = waitingGet(false);
1504 if (!(r instanceof AltResult)) {
1505 @SuppressWarnings("unchecked") T tr = (T) r;
1506 return tr;
1507 }
1508 if ((ex = ((AltResult)r).ex) == null)
1509 return null;
1510 if (ex instanceof CancellationException)
1511 throw (CancellationException)ex;
1512 if (ex instanceof CompletionException)
1513 throw (CompletionException)ex;
1514 throw new CompletionException(ex);
1515 }
1516
1517 /**
1518 * Returns the result value (or throws any encountered exception)
1519 * if completed, else returns the given valueIfAbsent.
1520 *
1521 * @param valueIfAbsent the value to return if not completed
1522 * @return the result value, if completed, else the given valueIfAbsent
1523 * @throws CancellationException if the computation was cancelled
1524 * @throws CompletionException if this future completed
1525 * exceptionally or a completion computation threw an exception
1526 */
1527 public T getNow(T valueIfAbsent) {
1528 Object r; Throwable ex;
1529 if ((r = result) == null)
1530 return valueIfAbsent;
1531 if (!(r instanceof AltResult)) {
1532 @SuppressWarnings("unchecked") T tr = (T) r;
1533 return tr;
1534 }
1535 if ((ex = ((AltResult)r).ex) == null)
1536 return null;
1537 if (ex instanceof CancellationException)
1538 throw (CancellationException)ex;
1539 if (ex instanceof CompletionException)
1540 throw (CompletionException)ex;
1541 throw new CompletionException(ex);
1542 }
1543
1544 /**
1545 * If not already completed, sets the value returned by {@link
1546 * #get()} and related methods to the given value.
1547 *
1548 * @param value the result value
1549 * @return {@code true} if this invocation caused this CompletableFuture
1550 * to transition to a completed state, else {@code false}
1551 */
1552 public boolean complete(T value) {
1553 boolean triggered = result == null &&
1554 UNSAFE.compareAndSwapObject(this, RESULT, null,
1555 value == null ? NIL : value);
1556 postComplete();
1557 return triggered;
1558 }
1559
1560 /**
1561 * If not already completed, causes invocations of {@link #get()}
1562 * and related methods to throw the given exception.
1563 *
1564 * @param ex the exception
1565 * @return {@code true} if this invocation caused this CompletableFuture
1566 * to transition to a completed state, else {@code false}
1567 */
1568 public boolean completeExceptionally(Throwable ex) {
1569 if (ex == null) throw new NullPointerException();
1570 boolean triggered = result == null &&
1571 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1572 postComplete();
1573 return triggered;
1574 }
1575
1576 /**
1577 * Returns a new CompletableFuture that is completed
1578 * when this CompletableFuture completes, with the result of the
1579 * given function of this CompletableFuture's result.
1580 *
1581 * <p>If this CompletableFuture completes exceptionally,
1582 * then the returned CompletableFuture also does so, with a
1583 * CompletionException holding this exception as its cause.
1584 *
1585 * @param fn the function to use to compute the value of
1586 * the returned CompletableFuture
1587 * @return the new CompletableFuture
1588 */
1589 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1590 return doThenApply(fn, null);
1591 }
1592
1593 /**
1594 * Returns a new CompletableFuture that is asynchronously completed
1595 * when this CompletableFuture completes, with the result of the
1596 * given function of this CompletableFuture's result, called from a
1597 * task running in the {@link ForkJoinPool#commonPool()}.
1598 *
1599 * <p>If this CompletableFuture completes exceptionally,
1600 * then the returned CompletableFuture also does so, with a
1601 * CompletionException holding this exception as its cause.
1602 *
1603 * @param fn the function to use to compute the value of
1604 * the returned CompletableFuture
1605 * @return the new CompletableFuture
1606 */
1607 public <U> CompletableFuture<U> thenApplyAsync
1608 (Function<? super T,? extends U> fn) {
1609 return doThenApply(fn, ForkJoinPool.commonPool());
1610 }
1611
1612 /**
1613 * Returns a new CompletableFuture that is asynchronously completed
1614 * when this CompletableFuture completes, with the result of the
1615 * given function of this CompletableFuture's result, called from a
1616 * task running in the given executor.
1617 *
1618 * <p>If this CompletableFuture completes exceptionally,
1619 * then the returned CompletableFuture also does so, with a
1620 * CompletionException holding this exception as its cause.
1621 *
1622 * @param fn the function to use to compute the value of
1623 * the returned CompletableFuture
1624 * @param executor the executor to use for asynchronous execution
1625 * @return the new CompletableFuture
1626 */
1627 public <U> CompletableFuture<U> thenApplyAsync
1628 (Function<? super T,? extends U> fn,
1629 Executor executor) {
1630 if (executor == null) throw new NullPointerException();
1631 return doThenApply(fn, executor);
1632 }
1633
1634 private <U> CompletableFuture<U> doThenApply
1635 (Function<? super T,? extends U> fn,
1636 Executor e) {
1637 if (fn == null) throw new NullPointerException();
1638 CompletableFuture<U> dst = new CompletableFuture<U>();
1639 ApplyCompletion<T,U> d = null;
1640 Object r;
1641 if ((r = result) == null) {
1642 CompletionNode p = new CompletionNode
1643 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1644 while ((r = result) == null) {
1645 if (UNSAFE.compareAndSwapObject
1646 (this, COMPLETIONS, p.next = completions, p))
1647 break;
1648 }
1649 }
1650 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1651 T t; Throwable ex;
1652 if (r instanceof AltResult) {
1653 ex = ((AltResult)r).ex;
1654 t = null;
1655 }
1656 else {
1657 ex = null;
1658 @SuppressWarnings("unchecked") T tr = (T) r;
1659 t = tr;
1660 }
1661 U u = null;
1662 if (ex == null) {
1663 try {
1664 if (e != null)
1665 e.execute(new AsyncApply<T,U>(t, fn, dst));
1666 else
1667 u = fn.apply(t);
1668 } catch (Throwable rex) {
1669 ex = rex;
1670 }
1671 }
1672 if (e == null || ex != null)
1673 dst.internalComplete(u, ex);
1674 }
1675 helpPostComplete();
1676 return dst;
1677 }
1678
1679 /**
1680 * Returns a new CompletableFuture that is completed
1681 * when this CompletableFuture completes, after performing the given
1682 * action with this CompletableFuture's result.
1683 *
1684 * <p>If this CompletableFuture completes exceptionally,
1685 * then the returned CompletableFuture also does so, with a
1686 * CompletionException holding this exception as its cause.
1687 *
1688 * @param block the action to perform before completing the
1689 * returned CompletableFuture
1690 * @return the new CompletableFuture
1691 */
1692 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1693 return doThenAccept(block, null);
1694 }
1695
1696 /**
1697 * Returns a new CompletableFuture that is asynchronously completed
1698 * when this CompletableFuture completes, after performing the given
1699 * action with this CompletableFuture's result from a task running
1700 * in the {@link ForkJoinPool#commonPool()}.
1701 *
1702 * <p>If this CompletableFuture completes exceptionally,
1703 * then the returned CompletableFuture also does so, with a
1704 * CompletionException holding this exception as its cause.
1705 *
1706 * @param block the action to perform before completing the
1707 * returned CompletableFuture
1708 * @return the new CompletableFuture
1709 */
1710 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1711 return doThenAccept(block, ForkJoinPool.commonPool());
1712 }
1713
1714 /**
1715 * Returns a new CompletableFuture that is asynchronously completed
1716 * when this CompletableFuture completes, after performing the given
1717 * action with this CompletableFuture's result from a task running
1718 * in the given executor.
1719 *
1720 * <p>If this CompletableFuture completes exceptionally,
1721 * then the returned CompletableFuture also does so, with a
1722 * CompletionException holding this exception as its cause.
1723 *
1724 * @param block the action to perform before completing the
1725 * returned CompletableFuture
1726 * @param executor the executor to use for asynchronous execution
1727 * @return the new CompletableFuture
1728 */
1729 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1730 Executor executor) {
1731 if (executor == null) throw new NullPointerException();
1732 return doThenAccept(block, executor);
1733 }
1734
1735 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1736 Executor e) {
1737 if (fn == null) throw new NullPointerException();
1738 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1739 AcceptCompletion<T> d = null;
1740 Object r;
1741 if ((r = result) == null) {
1742 CompletionNode p = new CompletionNode
1743 (d = new AcceptCompletion<T>(this, fn, dst, e));
1744 while ((r = result) == null) {
1745 if (UNSAFE.compareAndSwapObject
1746 (this, COMPLETIONS, p.next = completions, p))
1747 break;
1748 }
1749 }
1750 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1751 T t; Throwable ex;
1752 if (r instanceof AltResult) {
1753 ex = ((AltResult)r).ex;
1754 t = null;
1755 }
1756 else {
1757 ex = null;
1758 @SuppressWarnings("unchecked") T tr = (T) r;
1759 t = tr;
1760 }
1761 if (ex == null) {
1762 try {
1763 if (e != null)
1764 e.execute(new AsyncAccept<T>(t, fn, dst));
1765 else
1766 fn.accept(t);
1767 } catch (Throwable rex) {
1768 ex = rex;
1769 }
1770 }
1771 if (e == null || ex != null)
1772 dst.internalComplete(null, ex);
1773 }
1774 helpPostComplete();
1775 return dst;
1776 }
1777
1778 /**
1779 * Returns a new CompletableFuture that is completed
1780 * when this CompletableFuture completes, after performing the given
1781 * action.
1782 *
1783 * <p>If this CompletableFuture completes exceptionally,
1784 * then the returned CompletableFuture also does so, with a
1785 * CompletionException holding this exception as its cause.
1786 *
1787 * @param action the action to perform before completing the
1788 * returned CompletableFuture
1789 * @return the new CompletableFuture
1790 */
1791 public CompletableFuture<Void> thenRun(Runnable action) {
1792 return doThenRun(action, null);
1793 }
1794
1795 /**
1796 * Returns a new CompletableFuture that is asynchronously completed
1797 * when this CompletableFuture completes, after performing the given
1798 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1799 *
1800 * <p>If this CompletableFuture completes exceptionally,
1801 * then the returned CompletableFuture also does so, with a
1802 * CompletionException holding this exception as its cause.
1803 *
1804 * @param action the action to perform before completing the
1805 * returned CompletableFuture
1806 * @return the new CompletableFuture
1807 */
1808 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1809 return doThenRun(action, ForkJoinPool.commonPool());
1810 }
1811
1812 /**
1813 * Returns a new CompletableFuture that is asynchronously completed
1814 * when this CompletableFuture completes, after performing the given
1815 * action from a task running in the given executor.
1816 *
1817 * <p>If this CompletableFuture completes exceptionally,
1818 * then the returned CompletableFuture also does so, with a
1819 * CompletionException holding this exception as its cause.
1820 *
1821 * @param action the action to perform before completing the
1822 * returned CompletableFuture
1823 * @param executor the executor to use for asynchronous execution
1824 * @return the new CompletableFuture
1825 */
1826 public CompletableFuture<Void> thenRunAsync(Runnable action,
1827 Executor executor) {
1828 if (executor == null) throw new NullPointerException();
1829 return doThenRun(action, executor);
1830 }
1831
1832 private CompletableFuture<Void> doThenRun(Runnable action,
1833 Executor e) {
1834 if (action == null) throw new NullPointerException();
1835 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1836 RunCompletion<T> d = null;
1837 Object r;
1838 if ((r = result) == null) {
1839 CompletionNode p = new CompletionNode
1840 (d = new RunCompletion<T>(this, action, dst, e));
1841 while ((r = result) == null) {
1842 if (UNSAFE.compareAndSwapObject
1843 (this, COMPLETIONS, p.next = completions, p))
1844 break;
1845 }
1846 }
1847 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1848 Throwable ex;
1849 if (r instanceof AltResult)
1850 ex = ((AltResult)r).ex;
1851 else
1852 ex = null;
1853 if (ex == null) {
1854 try {
1855 if (e != null)
1856 e.execute(new AsyncRun(action, dst));
1857 else
1858 action.run();
1859 } catch (Throwable rex) {
1860 ex = rex;
1861 }
1862 }
1863 if (e == null || ex != null)
1864 dst.internalComplete(null, ex);
1865 }
1866 helpPostComplete();
1867 return dst;
1868 }
1869
1870 /**
1871 * Returns a new CompletableFuture that is completed
1872 * when both this and the other given CompletableFuture complete,
1873 * with the result of the given function of the results of the two
1874 * CompletableFutures.
1875 *
1876 * <p>If this and/or the other CompletableFuture complete exceptionally,
1877 * then the returned CompletableFuture also does so, with a
1878 * CompletionException holding the exception as its cause.
1879 *
1880 * @param other the other CompletableFuture
1881 * @param fn the function to use to compute the value of
1882 * the returned CompletableFuture
1883 * @return the new CompletableFuture
1884 */
1885 public <U,V> CompletableFuture<V> thenCombine
1886 (CompletableFuture<? extends U> other,
1887 BiFunction<? super T,? super U,? extends V> fn) {
1888 return doThenBiApply(other, fn, null);
1889 }
1890
1891 /**
1892 * Returns a new CompletableFuture that is asynchronously completed
1893 * when both this and the other given CompletableFuture complete,
1894 * with the result of the given function of the results of the two
1895 * CompletableFutures, called from a task running in the
1896 * {@link ForkJoinPool#commonPool()}.
1897 *
1898 * <p>If this and/or the other CompletableFuture complete exceptionally,
1899 * then the returned CompletableFuture also does so, with a
1900 * CompletionException holding the exception as its cause.
1901 *
1902 * @param other the other CompletableFuture
1903 * @param fn the function to use to compute the value of
1904 * the returned CompletableFuture
1905 * @return the new CompletableFuture
1906 */
1907 public <U,V> CompletableFuture<V> thenCombineAsync
1908 (CompletableFuture<? extends U> other,
1909 BiFunction<? super T,? super U,? extends V> fn) {
1910 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1911 }
1912
1913 /**
1914 * Returns a new CompletableFuture that is asynchronously completed
1915 * when both this and the other given CompletableFuture complete,
1916 * with the result of the given function of the results of the two
1917 * CompletableFutures, called from a task running in the
1918 * given executor.
1919 *
1920 * <p>If this and/or the other CompletableFuture complete exceptionally,
1921 * then the returned CompletableFuture also does so, with a
1922 * CompletionException holding the exception as its cause.
1923 *
1924 * @param other the other CompletableFuture
1925 * @param fn the function to use to compute the value of
1926 * the returned CompletableFuture
1927 * @param executor the executor to use for asynchronous execution
1928 * @return the new CompletableFuture
1929 */
1930 public <U,V> CompletableFuture<V> thenCombineAsync
1931 (CompletableFuture<? extends U> other,
1932 BiFunction<? super T,? super U,? extends V> fn,
1933 Executor executor) {
1934 if (executor == null) throw new NullPointerException();
1935 return doThenBiApply(other, fn, executor);
1936 }
1937
1938 private <U,V> CompletableFuture<V> doThenBiApply
1939 (CompletableFuture<? extends U> other,
1940 BiFunction<? super T,? super U,? extends V> fn,
1941 Executor e) {
1942 if (other == null || fn == null) throw new NullPointerException();
1943 CompletableFuture<V> dst = new CompletableFuture<V>();
1944 BiApplyCompletion<T,U,V> d = null;
1945 Object r, s = null;
1946 if ((r = result) == null || (s = other.result) == null) {
1947 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1948 CompletionNode q = null, p = new CompletionNode(d);
1949 while ((r == null && (r = result) == null) ||
1950 (s == null && (s = other.result) == null)) {
1951 if (q != null) {
1952 if (s != null ||
1953 UNSAFE.compareAndSwapObject
1954 (other, COMPLETIONS, q.next = other.completions, q))
1955 break;
1956 }
1957 else if (r != null ||
1958 UNSAFE.compareAndSwapObject
1959 (this, COMPLETIONS, p.next = completions, p)) {
1960 if (s != null)
1961 break;
1962 q = new CompletionNode(d);
1963 }
1964 }
1965 }
1966 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1967 T t; U u; Throwable ex;
1968 if (r instanceof AltResult) {
1969 ex = ((AltResult)r).ex;
1970 t = null;
1971 }
1972 else {
1973 ex = null;
1974 @SuppressWarnings("unchecked") T tr = (T) r;
1975 t = tr;
1976 }
1977 if (ex != null)
1978 u = null;
1979 else if (s instanceof AltResult) {
1980 ex = ((AltResult)s).ex;
1981 u = null;
1982 }
1983 else {
1984 @SuppressWarnings("unchecked") U us = (U) s;
1985 u = us;
1986 }
1987 V v = null;
1988 if (ex == null) {
1989 try {
1990 if (e != null)
1991 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
1992 else
1993 v = fn.apply(t, u);
1994 } catch (Throwable rex) {
1995 ex = rex;
1996 }
1997 }
1998 if (e == null || ex != null)
1999 dst.internalComplete(v, ex);
2000 }
2001 helpPostComplete();
2002 other.helpPostComplete();
2003 return dst;
2004 }
2005
2006 /**
2007 * Returns a new CompletableFuture that is completed
2008 * when both this and the other given CompletableFuture complete,
2009 * after performing the given action with the results of the two
2010 * CompletableFutures.
2011 *
2012 * <p>If this and/or the other CompletableFuture complete exceptionally,
2013 * then the returned CompletableFuture also does so, with a
2014 * CompletionException holding one of these exceptions as its cause.
2015 *
2016 * @param other the other CompletableFuture
2017 * @param block the action to perform before completing the
2018 * returned CompletableFuture
2019 * @return the new CompletableFuture
2020 */
2021 public <U> CompletableFuture<Void> thenAcceptBoth
2022 (CompletableFuture<? extends U> other,
2023 BiConsumer<? super T, ? super U> block) {
2024 return doThenBiAccept(other, block, null);
2025 }
2026
2027 /**
2028 * Returns a new CompletableFuture that is asynchronously completed
2029 * when both this and the other given CompletableFuture complete,
2030 * after performing the given action with the results of the two
2031 * CompletableFutures from a task running in the {@link
2032 * ForkJoinPool#commonPool()}.
2033 *
2034 * <p>If this and/or the other CompletableFuture complete exceptionally,
2035 * then the returned CompletableFuture also does so, with a
2036 * CompletionException holding one of these exceptions as its cause.
2037 *
2038 * @param other the other CompletableFuture
2039 * @param block the action to perform before completing the
2040 * returned CompletableFuture
2041 * @return the new CompletableFuture
2042 */
2043 public <U> CompletableFuture<Void> thenAcceptBothAsync
2044 (CompletableFuture<? extends U> other,
2045 BiConsumer<? super T, ? super U> block) {
2046 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2047 }
2048
2049 /**
2050 * Returns a new CompletableFuture that is asynchronously completed
2051 * when both this and the other given CompletableFuture complete,
2052 * after performing the given action with the results of the two
2053 * CompletableFutures from a task running in the given executor.
2054 *
2055 * <p>If this and/or the other CompletableFuture complete exceptionally,
2056 * then the returned CompletableFuture also does so, with a
2057 * CompletionException holding one of these exceptions as its cause.
2058 *
2059 * @param other the other CompletableFuture
2060 * @param block the action to perform before completing the
2061 * returned CompletableFuture
2062 * @param executor the executor to use for asynchronous execution
2063 * @return the new CompletableFuture
2064 */
2065 public <U> CompletableFuture<Void> thenAcceptBothAsync
2066 (CompletableFuture<? extends U> other,
2067 BiConsumer<? super T, ? super U> block,
2068 Executor executor) {
2069 if (executor == null) throw new NullPointerException();
2070 return doThenBiAccept(other, block, executor);
2071 }
2072
2073 private <U> CompletableFuture<Void> doThenBiAccept
2074 (CompletableFuture<? extends U> other,
2075 BiConsumer<? super T,? super U> fn,
2076 Executor e) {
2077 if (other == null || fn == null) throw new NullPointerException();
2078 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2079 BiAcceptCompletion<T,U> d = null;
2080 Object r, s = null;
2081 if ((r = result) == null || (s = other.result) == null) {
2082 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2083 CompletionNode q = null, p = new CompletionNode(d);
2084 while ((r == null && (r = result) == null) ||
2085 (s == null && (s = other.result) == null)) {
2086 if (q != null) {
2087 if (s != null ||
2088 UNSAFE.compareAndSwapObject
2089 (other, COMPLETIONS, q.next = other.completions, q))
2090 break;
2091 }
2092 else if (r != null ||
2093 UNSAFE.compareAndSwapObject
2094 (this, COMPLETIONS, p.next = completions, p)) {
2095 if (s != null)
2096 break;
2097 q = new CompletionNode(d);
2098 }
2099 }
2100 }
2101 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2102 T t; U u; Throwable ex;
2103 if (r instanceof AltResult) {
2104 ex = ((AltResult)r).ex;
2105 t = null;
2106 }
2107 else {
2108 ex = null;
2109 @SuppressWarnings("unchecked") T tr = (T) r;
2110 t = tr;
2111 }
2112 if (ex != null)
2113 u = null;
2114 else if (s instanceof AltResult) {
2115 ex = ((AltResult)s).ex;
2116 u = null;
2117 }
2118 else {
2119 @SuppressWarnings("unchecked") U us = (U) s;
2120 u = us;
2121 }
2122 if (ex == null) {
2123 try {
2124 if (e != null)
2125 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2126 else
2127 fn.accept(t, u);
2128 } catch (Throwable rex) {
2129 ex = rex;
2130 }
2131 }
2132 if (e == null || ex != null)
2133 dst.internalComplete(null, ex);
2134 }
2135 helpPostComplete();
2136 other.helpPostComplete();
2137 return dst;
2138 }
2139
2140 /**
2141 * Returns a new CompletableFuture that is completed
2142 * when both this and the other given CompletableFuture complete,
2143 * after performing the given action.
2144 *
2145 * <p>If this and/or the other CompletableFuture complete exceptionally,
2146 * then the returned CompletableFuture also does so, with a
2147 * CompletionException holding one of these exceptions as its cause.
2148 *
2149 * @param other the other CompletableFuture
2150 * @param action the action to perform before completing the
2151 * returned CompletableFuture
2152 * @return the new CompletableFuture
2153 */
2154 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2155 Runnable action) {
2156 return doThenBiRun(other, action, null);
2157 }
2158
2159 /**
2160 * Returns a new CompletableFuture that is asynchronously completed
2161 * when both this and the other given CompletableFuture complete,
2162 * after performing the given action from a task running in the
2163 * {@link ForkJoinPool#commonPool()}.
2164 *
2165 * <p>If this and/or the other CompletableFuture complete exceptionally,
2166 * then the returned CompletableFuture also does so, with a
2167 * CompletionException holding one of these exceptions as its cause.
2168 *
2169 * @param other the other CompletableFuture
2170 * @param action the action to perform before completing the
2171 * returned CompletableFuture
2172 * @return the new CompletableFuture
2173 */
2174 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2175 Runnable action) {
2176 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2177 }
2178
2179 /**
2180 * Returns a new CompletableFuture that is asynchronously completed
2181 * when both this and the other given CompletableFuture complete,
2182 * after performing the given action from a task running in the
2183 * given executor.
2184 *
2185 * <p>If this and/or the other CompletableFuture complete exceptionally,
2186 * then the returned CompletableFuture also does so, with a
2187 * CompletionException holding one of these exceptions as its cause.
2188 *
2189 * @param other the other CompletableFuture
2190 * @param action the action to perform before completing the
2191 * returned CompletableFuture
2192 * @param executor the executor to use for asynchronous execution
2193 * @return the new CompletableFuture
2194 */
2195 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2196 Runnable action,
2197 Executor executor) {
2198 if (executor == null) throw new NullPointerException();
2199 return doThenBiRun(other, action, executor);
2200 }
2201
2202 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2203 Runnable action,
2204 Executor e) {
2205 if (other == null || action == null) throw new NullPointerException();
2206 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2207 BiRunCompletion<T> d = null;
2208 Object r, s = null;
2209 if ((r = result) == null || (s = other.result) == null) {
2210 d = new BiRunCompletion<T>(this, other, action, dst, e);
2211 CompletionNode q = null, p = new CompletionNode(d);
2212 while ((r == null && (r = result) == null) ||
2213 (s == null && (s = other.result) == null)) {
2214 if (q != null) {
2215 if (s != null ||
2216 UNSAFE.compareAndSwapObject
2217 (other, COMPLETIONS, q.next = other.completions, q))
2218 break;
2219 }
2220 else if (r != null ||
2221 UNSAFE.compareAndSwapObject
2222 (this, COMPLETIONS, p.next = completions, p)) {
2223 if (s != null)
2224 break;
2225 q = new CompletionNode(d);
2226 }
2227 }
2228 }
2229 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2230 Throwable ex;
2231 if (r instanceof AltResult)
2232 ex = ((AltResult)r).ex;
2233 else
2234 ex = null;
2235 if (ex == null && (s instanceof AltResult))
2236 ex = ((AltResult)s).ex;
2237 if (ex == null) {
2238 try {
2239 if (e != null)
2240 e.execute(new AsyncRun(action, dst));
2241 else
2242 action.run();
2243 } catch (Throwable rex) {
2244 ex = rex;
2245 }
2246 }
2247 if (e == null || ex != null)
2248 dst.internalComplete(null, ex);
2249 }
2250 helpPostComplete();
2251 other.helpPostComplete();
2252 return dst;
2253 }
2254
2255 /**
2256 * Returns a new CompletableFuture that is completed
2257 * when either this or the other given CompletableFuture completes,
2258 * with the result of the given function of either this or the other
2259 * CompletableFuture's result.
2260 *
2261 * <p>If this and/or the other CompletableFuture complete exceptionally,
2262 * then the returned CompletableFuture may also do so, with a
2263 * CompletionException holding one of these exceptions as its cause.
2264 * No guarantees are made about which result or exception is used in
2265 * the returned CompletableFuture.
2266 *
2267 * @param other the other CompletableFuture
2268 * @param fn the function to use to compute the value of
2269 * the returned CompletableFuture
2270 * @return the new CompletableFuture
2271 */
2272 public <U> CompletableFuture<U> applyToEither
2273 (CompletableFuture<? extends T> other,
2274 Function<? super T, U> fn) {
2275 return doOrApply(other, fn, null);
2276 }
2277
2278 /**
2279 * Returns a new CompletableFuture that is asynchronously completed
2280 * when either this or the other given CompletableFuture completes,
2281 * with the result of the given function of either this or the other
2282 * CompletableFuture's result, called from a task running in the
2283 * {@link ForkJoinPool#commonPool()}.
2284 *
2285 * <p>If this and/or the other CompletableFuture complete exceptionally,
2286 * then the returned CompletableFuture may also do so, with a
2287 * CompletionException holding one of these exceptions as its cause.
2288 * No guarantees are made about which result or exception is used in
2289 * the returned CompletableFuture.
2290 *
2291 * @param other the other CompletableFuture
2292 * @param fn the function to use to compute the value of
2293 * the returned CompletableFuture
2294 * @return the new CompletableFuture
2295 */
2296 public <U> CompletableFuture<U> applyToEitherAsync
2297 (CompletableFuture<? extends T> other,
2298 Function<? super T, U> fn) {
2299 return doOrApply(other, fn, ForkJoinPool.commonPool());
2300 }
2301
2302 /**
2303 * Returns a new CompletableFuture that is asynchronously completed
2304 * when either this or the other given CompletableFuture completes,
2305 * with the result of the given function of either this or the other
2306 * CompletableFuture's result, called from a task running in the
2307 * given executor.
2308 *
2309 * <p>If this and/or the other CompletableFuture complete exceptionally,
2310 * then the returned CompletableFuture may also do so, with a
2311 * CompletionException holding one of these exceptions as its cause.
2312 * No guarantees are made about which result or exception is used in
2313 * the returned CompletableFuture.
2314 *
2315 * @param other the other CompletableFuture
2316 * @param fn the function to use to compute the value of
2317 * the returned CompletableFuture
2318 * @param executor the executor to use for asynchronous execution
2319 * @return the new CompletableFuture
2320 */
2321 public <U> CompletableFuture<U> applyToEitherAsync
2322 (CompletableFuture<? extends T> other,
2323 Function<? super T, U> fn,
2324 Executor executor) {
2325 if (executor == null) throw new NullPointerException();
2326 return doOrApply(other, fn, executor);
2327 }
2328
2329 private <U> CompletableFuture<U> doOrApply
2330 (CompletableFuture<? extends T> other,
2331 Function<? super T, U> fn,
2332 Executor e) {
2333 if (other == null || fn == null) throw new NullPointerException();
2334 CompletableFuture<U> dst = new CompletableFuture<U>();
2335 OrApplyCompletion<T,U> d = null;
2336 Object r;
2337 if ((r = result) == null && (r = other.result) == null) {
2338 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2339 CompletionNode q = null, p = new CompletionNode(d);
2340 while ((r = result) == null && (r = other.result) == null) {
2341 if (q != null) {
2342 if (UNSAFE.compareAndSwapObject
2343 (other, COMPLETIONS, q.next = other.completions, q))
2344 break;
2345 }
2346 else if (UNSAFE.compareAndSwapObject
2347 (this, COMPLETIONS, p.next = completions, p))
2348 q = new CompletionNode(d);
2349 }
2350 }
2351 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2352 T t; Throwable ex;
2353 if (r instanceof AltResult) {
2354 ex = ((AltResult)r).ex;
2355 t = null;
2356 }
2357 else {
2358 ex = null;
2359 @SuppressWarnings("unchecked") T tr = (T) r;
2360 t = tr;
2361 }
2362 U u = null;
2363 if (ex == null) {
2364 try {
2365 if (e != null)
2366 e.execute(new AsyncApply<T,U>(t, fn, dst));
2367 else
2368 u = fn.apply(t);
2369 } catch (Throwable rex) {
2370 ex = rex;
2371 }
2372 }
2373 if (e == null || ex != null)
2374 dst.internalComplete(u, ex);
2375 }
2376 helpPostComplete();
2377 other.helpPostComplete();
2378 return dst;
2379 }
2380
2381 /**
2382 * Returns a new CompletableFuture that is completed
2383 * when either this or the other given CompletableFuture completes,
2384 * after performing the given action with the result of either this
2385 * or the other CompletableFuture's result.
2386 *
2387 * <p>If this and/or the other CompletableFuture complete exceptionally,
2388 * then the returned CompletableFuture may also do so, with a
2389 * CompletionException holding one of these exceptions as its cause.
2390 * No guarantees are made about which result or exception is used in
2391 * the returned CompletableFuture.
2392 *
2393 * @param other the other CompletableFuture
2394 * @param block the action to perform before completing the
2395 * returned CompletableFuture
2396 * @return the new CompletableFuture
2397 */
2398 public CompletableFuture<Void> acceptEither
2399 (CompletableFuture<? extends T> other,
2400 Consumer<? super T> block) {
2401 return doOrAccept(other, block, null);
2402 }
2403
2404 /**
2405 * Returns a new CompletableFuture that is asynchronously completed
2406 * when either this or the other given CompletableFuture completes,
2407 * after performing the given action with the result of either this
2408 * or the other CompletableFuture's result from a task running in
2409 * the {@link ForkJoinPool#commonPool()}.
2410 *
2411 * <p>If this and/or the other CompletableFuture complete exceptionally,
2412 * then the returned CompletableFuture may also do so, with a
2413 * CompletionException holding one of these exceptions as its cause.
2414 * No guarantees are made about which result or exception is used in
2415 * the returned CompletableFuture.
2416 *
2417 * @param other the other CompletableFuture
2418 * @param block the action to perform before completing the
2419 * returned CompletableFuture
2420 * @return the new CompletableFuture
2421 */
2422 public CompletableFuture<Void> acceptEitherAsync
2423 (CompletableFuture<? extends T> other,
2424 Consumer<? super T> block) {
2425 return doOrAccept(other, block, ForkJoinPool.commonPool());
2426 }
2427
2428 /**
2429 * Returns a new CompletableFuture that is asynchronously completed
2430 * when either this or the other given CompletableFuture completes,
2431 * after performing the given action with the result of either this
2432 * or the other CompletableFuture's result from a task running in
2433 * the given executor.
2434 *
2435 * <p>If this and/or the other CompletableFuture complete exceptionally,
2436 * then the returned CompletableFuture may also do so, with a
2437 * CompletionException holding one of these exceptions as its cause.
2438 * No guarantees are made about which result or exception is used in
2439 * the returned CompletableFuture.
2440 *
2441 * @param other the other CompletableFuture
2442 * @param block the action to perform before completing the
2443 * returned CompletableFuture
2444 * @param executor the executor to use for asynchronous execution
2445 * @return the new CompletableFuture
2446 */
2447 public CompletableFuture<Void> acceptEitherAsync
2448 (CompletableFuture<? extends T> other,
2449 Consumer<? super T> block,
2450 Executor executor) {
2451 if (executor == null) throw new NullPointerException();
2452 return doOrAccept(other, block, executor);
2453 }
2454
2455 private CompletableFuture<Void> doOrAccept
2456 (CompletableFuture<? extends T> other,
2457 Consumer<? super T> fn,
2458 Executor e) {
2459 if (other == null || fn == null) throw new NullPointerException();
2460 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2461 OrAcceptCompletion<T> d = null;
2462 Object r;
2463 if ((r = result) == null && (r = other.result) == null) {
2464 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2465 CompletionNode q = null, p = new CompletionNode(d);
2466 while ((r = result) == null && (r = other.result) == null) {
2467 if (q != null) {
2468 if (UNSAFE.compareAndSwapObject
2469 (other, COMPLETIONS, q.next = other.completions, q))
2470 break;
2471 }
2472 else if (UNSAFE.compareAndSwapObject
2473 (this, COMPLETIONS, p.next = completions, p))
2474 q = new CompletionNode(d);
2475 }
2476 }
2477 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2478 T t; Throwable ex;
2479 if (r instanceof AltResult) {
2480 ex = ((AltResult)r).ex;
2481 t = null;
2482 }
2483 else {
2484 ex = null;
2485 @SuppressWarnings("unchecked") T tr = (T) r;
2486 t = tr;
2487 }
2488 if (ex == null) {
2489 try {
2490 if (e != null)
2491 e.execute(new AsyncAccept<T>(t, fn, dst));
2492 else
2493 fn.accept(t);
2494 } catch (Throwable rex) {
2495 ex = rex;
2496 }
2497 }
2498 if (e == null || ex != null)
2499 dst.internalComplete(null, ex);
2500 }
2501 helpPostComplete();
2502 other.helpPostComplete();
2503 return dst;
2504 }
2505
2506 /**
2507 * Returns a new CompletableFuture that is completed
2508 * when either this or the other given CompletableFuture completes,
2509 * after performing the given action.
2510 *
2511 * <p>If this and/or the other CompletableFuture complete exceptionally,
2512 * then the returned CompletableFuture may also do so, with a
2513 * CompletionException holding one of these exceptions as its cause.
2514 * No guarantees are made about which result or exception is used in
2515 * the returned CompletableFuture.
2516 *
2517 * @param other the other CompletableFuture
2518 * @param action the action to perform before completing the
2519 * returned CompletableFuture
2520 * @return the new CompletableFuture
2521 */
2522 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2523 Runnable action) {
2524 return doOrRun(other, action, null);
2525 }
2526
2527 /**
2528 * Returns a new CompletableFuture that is asynchronously completed
2529 * when either this or the other given CompletableFuture completes,
2530 * after performing the given action from a task running in the
2531 * {@link ForkJoinPool#commonPool()}.
2532 *
2533 * <p>If this and/or the other CompletableFuture complete exceptionally,
2534 * then the returned CompletableFuture may also do so, with a
2535 * CompletionException holding one of these exceptions as its cause.
2536 * No guarantees are made about which result or exception is used in
2537 * the returned CompletableFuture.
2538 *
2539 * @param other the other CompletableFuture
2540 * @param action the action to perform before completing the
2541 * returned CompletableFuture
2542 * @return the new CompletableFuture
2543 */
2544 public CompletableFuture<Void> runAfterEitherAsync
2545 (CompletableFuture<?> other,
2546 Runnable action) {
2547 return doOrRun(other, action, ForkJoinPool.commonPool());
2548 }
2549
2550 /**
2551 * Returns a new CompletableFuture that is asynchronously completed
2552 * when either this or the other given CompletableFuture completes,
2553 * after performing the given action from a task running in the
2554 * given executor.
2555 *
2556 * <p>If this and/or the other CompletableFuture complete exceptionally,
2557 * then the returned CompletableFuture may also do so, with a
2558 * CompletionException holding one of these exceptions as its cause.
2559 * No guarantees are made about which result or exception is used in
2560 * the returned CompletableFuture.
2561 *
2562 * @param other the other CompletableFuture
2563 * @param action the action to perform before completing the
2564 * returned CompletableFuture
2565 * @param executor the executor to use for asynchronous execution
2566 * @return the new CompletableFuture
2567 */
2568 public CompletableFuture<Void> runAfterEitherAsync
2569 (CompletableFuture<?> other,
2570 Runnable action,
2571 Executor executor) {
2572 if (executor == null) throw new NullPointerException();
2573 return doOrRun(other, action, executor);
2574 }
2575
2576 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2577 Runnable action,
2578 Executor e) {
2579 if (other == null || action == null) throw new NullPointerException();
2580 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2581 OrRunCompletion<T> d = null;
2582 Object r;
2583 if ((r = result) == null && (r = other.result) == null) {
2584 d = new OrRunCompletion<T>(this, other, action, dst, e);
2585 CompletionNode q = null, p = new CompletionNode(d);
2586 while ((r = result) == null && (r = other.result) == null) {
2587 if (q != null) {
2588 if (UNSAFE.compareAndSwapObject
2589 (other, COMPLETIONS, q.next = other.completions, q))
2590 break;
2591 }
2592 else if (UNSAFE.compareAndSwapObject
2593 (this, COMPLETIONS, p.next = completions, p))
2594 q = new CompletionNode(d);
2595 }
2596 }
2597 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2598 Throwable ex;
2599 if (r instanceof AltResult)
2600 ex = ((AltResult)r).ex;
2601 else
2602 ex = null;
2603 if (ex == null) {
2604 try {
2605 if (e != null)
2606 e.execute(new AsyncRun(action, dst));
2607 else
2608 action.run();
2609 } catch (Throwable rex) {
2610 ex = rex;
2611 }
2612 }
2613 if (e == null || ex != null)
2614 dst.internalComplete(null, ex);
2615 }
2616 helpPostComplete();
2617 other.helpPostComplete();
2618 return dst;
2619 }
2620
2621 /**
2622 * Returns a CompletableFuture (or an equivalent one) produced by the
2623 * given function of the result of this CompletableFuture when completed.
2624 *
2625 * <p>If this CompletableFuture completes exceptionally,
2626 * then the returned CompletableFuture also does so, with a
2627 * CompletionException holding this exception as its cause.
2628 *
2629 * @param fn the function returning a new CompletableFuture
2630 * @return the CompletableFuture, that {@code isDone()} upon
2631 * return if completed by the given function, or an exception
2632 * occurs
2633 */
2634 public <U> CompletableFuture<U> thenCompose
2635 (Function<? super T, CompletableFuture<U>> fn) {
2636 return doCompose(fn, null);
2637 }
2638
2639 /**
2640 * Returns a CompletableFuture (or an equivalent one) produced
2641 * asynchronously using the {@link ForkJoinPool#commonPool()} by
2642 * the given function of the result of this CompletableFuture when
2643 * completed.
2644 *
2645 * <p>If this CompletableFuture completes exceptionally,
2646 * then the returned CompletableFuture also does so, with a
2647 * CompletionException holding this exception as its cause.
2648 *
2649 * @param fn the function returning a new CompletableFuture
2650 * @return the CompletableFuture, that {@code isDone()} upon
2651 * return if completed by the given function, or an exception
2652 * occurs
2653 */
2654 public <U> CompletableFuture<U> thenComposeAsync
2655 (Function<? super T, CompletableFuture<U>> fn) {
2656 return doCompose(fn, ForkJoinPool.commonPool());
2657 }
2658
2659 /**
2660 * Returns a CompletableFuture (or an equivalent one) produced
2661 * asynchronously using the given executor by the given function
2662 * of the result of this CompletableFuture when completed.
2663 *
2664 * <p>If this CompletableFuture completes exceptionally,
2665 * then the returned CompletableFuture also does so, with a
2666 * CompletionException holding this exception as its cause.
2667 *
2668 * @param fn the function returning a new CompletableFuture
2669 * @param executor the executor to use for asynchronous execution
2670 * @return the CompletableFuture, that {@code isDone()} upon
2671 * return if completed by the given function, or an exception
2672 * occurs
2673 */
2674 public <U> CompletableFuture<U> thenComposeAsync
2675 (Function<? super T, CompletableFuture<U>> fn,
2676 Executor executor) {
2677 if (executor == null) throw new NullPointerException();
2678 return doCompose(fn, executor);
2679 }
2680
2681 private <U> CompletableFuture<U> doCompose
2682 (Function<? super T, CompletableFuture<U>> fn,
2683 Executor e) {
2684 if (fn == null) throw new NullPointerException();
2685 CompletableFuture<U> dst = null;
2686 ComposeCompletion<T,U> d = null;
2687 Object r;
2688 if ((r = result) == null) {
2689 dst = new CompletableFuture<U>();
2690 CompletionNode p = new CompletionNode
2691 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2692 while ((r = result) == null) {
2693 if (UNSAFE.compareAndSwapObject
2694 (this, COMPLETIONS, p.next = completions, p))
2695 break;
2696 }
2697 }
2698 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2699 T t; Throwable ex;
2700 if (r instanceof AltResult) {
2701 ex = ((AltResult)r).ex;
2702 t = null;
2703 }
2704 else {
2705 ex = null;
2706 @SuppressWarnings("unchecked") T tr = (T) r;
2707 t = tr;
2708 }
2709 if (ex == null) {
2710 if (e != null) {
2711 if (dst == null)
2712 dst = new CompletableFuture<U>();
2713 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2714 }
2715 else {
2716 try {
2717 if ((dst = fn.apply(t)) == null)
2718 ex = new NullPointerException();
2719 } catch (Throwable rex) {
2720 ex = rex;
2721 }
2722 if (dst == null)
2723 dst = new CompletableFuture<U>();
2724 }
2725 }
2726 if (e == null && ex != null)
2727 dst.internalComplete(null, ex);
2728 }
2729 helpPostComplete();
2730 dst.helpPostComplete();
2731 return dst;
2732 }
2733
2734 /**
2735 * Returns a new CompletableFuture that is completed when this
2736 * CompletableFuture completes, with the result of the given
2737 * function of the exception triggering this CompletableFuture's
2738 * completion when it completes exceptionally; otherwise, if this
2739 * CompletableFuture completes normally, then the returned
2740 * CompletableFuture also completes normally with the same value.
2741 *
2742 * @param fn the function to use to compute the value of the
2743 * returned CompletableFuture if this CompletableFuture completed
2744 * exceptionally
2745 * @return the new CompletableFuture
2746 */
2747 public CompletableFuture<T> exceptionally
2748 (Function<Throwable, ? extends T> fn) {
2749 if (fn == null) throw new NullPointerException();
2750 CompletableFuture<T> dst = new CompletableFuture<T>();
2751 ExceptionCompletion<T> d = null;
2752 Object r;
2753 if ((r = result) == null) {
2754 CompletionNode p =
2755 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2756 while ((r = result) == null) {
2757 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2758 p.next = completions, p))
2759 break;
2760 }
2761 }
2762 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2763 T t = null; Throwable ex, dx = null;
2764 if (r instanceof AltResult) {
2765 if ((ex = ((AltResult)r).ex) != null) {
2766 try {
2767 t = fn.apply(ex);
2768 } catch (Throwable rex) {
2769 dx = rex;
2770 }
2771 }
2772 }
2773 else {
2774 @SuppressWarnings("unchecked") T tr = (T) r;
2775 t = tr;
2776 }
2777 dst.internalComplete(t, dx);
2778 }
2779 helpPostComplete();
2780 return dst;
2781 }
2782
2783 /**
2784 * Returns a new CompletableFuture that is completed when this
2785 * CompletableFuture completes, with the result of the given
2786 * function of the result and exception of this CompletableFuture's
2787 * completion. The given function is invoked with the result (or
2788 * {@code null} if none) and the exception (or {@code null} if none)
2789 * of this CompletableFuture when complete.
2790 *
2791 * @param fn the function to use to compute the value of the
2792 * returned CompletableFuture
2793 * @return the new CompletableFuture
2794 */
2795 public <U> CompletableFuture<U> handle
2796 (BiFunction<? super T, Throwable, ? extends U> fn) {
2797 if (fn == null) throw new NullPointerException();
2798 CompletableFuture<U> dst = new CompletableFuture<U>();
2799 HandleCompletion<T,U> d = null;
2800 Object r;
2801 if ((r = result) == null) {
2802 CompletionNode p =
2803 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2804 while ((r = result) == null) {
2805 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2806 p.next = completions, p))
2807 break;
2808 }
2809 }
2810 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2811 T t; Throwable ex;
2812 if (r instanceof AltResult) {
2813 ex = ((AltResult)r).ex;
2814 t = null;
2815 }
2816 else {
2817 ex = null;
2818 @SuppressWarnings("unchecked") T tr = (T) r;
2819 t = tr;
2820 }
2821 U u; Throwable dx;
2822 try {
2823 u = fn.apply(t, ex);
2824 dx = null;
2825 } catch (Throwable rex) {
2826 dx = rex;
2827 u = null;
2828 }
2829 dst.internalComplete(u, dx);
2830 }
2831 helpPostComplete();
2832 return dst;
2833 }
2834
2835
2836 /* ------------- Arbitrary-arity constructions -------------- */
2837
2838 /*
2839 * The basic plan of attack is to recursively form binary
2840 * completion trees of elements. This can be overkill for small
2841 * sets, but scales nicely. The And/All vs Or/Any forms use the
2842 * same idea, but details differ.
2843 */
2844
2845 /**
2846 * Returns a new CompletableFuture that is completed when all of
2847 * the given CompletableFutures complete. If any of the given
2848 * CompletableFutures complete exceptionally, then so does the
2849 * returned CompletableFuture. Otherwise, the results, if any, of
2850 * the given CompletableFutures are not reflected in the returned
2851 * CompletableFuture, but may be obtained by inspecting them
2852 * individually. If no CompletableFutures are provided, returns a
2853 * CompletableFuture completed with the value {@code null}.
2854 *
2855 * <p>Among the applications of this method is to await completion
2856 * of a set of independent CompletableFutures before continuing a
2857 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2858 * c3).join();}.
2859 *
2860 * @param cfs the CompletableFutures
2861 * @return a new CompletableFuture that is completed when all of the
2862 * given CompletableFutures complete
2863 * @throws NullPointerException if the array or any of its elements are
2864 * {@code null}
2865 */
2866 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2867 int len = cfs.length; // Directly handle empty and singleton cases
2868 if (len > 1)
2869 return allTree(cfs, 0, len - 1);
2870 else {
2871 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2872 CompletableFuture<?> f;
2873 if (len == 0)
2874 dst.result = NIL;
2875 else if ((f = cfs[0]) == null)
2876 throw new NullPointerException();
2877 else {
2878 ThenCopy d = null;
2879 CompletionNode p = null;
2880 Object r;
2881 while ((r = f.result) == null) {
2882 if (d == null)
2883 d = new ThenCopy(f, dst);
2884 else if (p == null)
2885 p = new CompletionNode(d);
2886 else if (UNSAFE.compareAndSwapObject
2887 (f, COMPLETIONS, p.next = f.completions, p))
2888 break;
2889 }
2890 if (r != null && (d == null || d.compareAndSet(0, 1)))
2891 dst.internalComplete(null, (r instanceof AltResult) ?
2892 ((AltResult)r).ex : null);
2893 f.helpPostComplete();
2894 }
2895 return dst;
2896 }
2897 }
2898
2899 /**
2900 * Recursively constructs an And'ed tree of CompletableFutures.
2901 * Called only when array known to have at least two elements.
2902 */
2903 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2904 int lo, int hi) {
2905 CompletableFuture<?> fst, snd;
2906 int mid = (lo + hi) >>> 1;
2907 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2908 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2909 throw new NullPointerException();
2910 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2911 AndCompletion d = null;
2912 CompletionNode p = null, q = null;
2913 Object r = null, s = null;
2914 while ((r = fst.result) == null || (s = snd.result) == null) {
2915 if (d == null)
2916 d = new AndCompletion(fst, snd, dst);
2917 else if (p == null)
2918 p = new CompletionNode(d);
2919 else if (q == null) {
2920 if (UNSAFE.compareAndSwapObject
2921 (fst, COMPLETIONS, p.next = fst.completions, p))
2922 q = new CompletionNode(d);
2923 }
2924 else if (UNSAFE.compareAndSwapObject
2925 (snd, COMPLETIONS, q.next = snd.completions, q))
2926 break;
2927 }
2928 if ((r != null || (r = fst.result) != null) &&
2929 (s != null || (s = snd.result) != null) &&
2930 (d == null || d.compareAndSet(0, 1))) {
2931 Throwable ex;
2932 if (r instanceof AltResult)
2933 ex = ((AltResult)r).ex;
2934 else
2935 ex = null;
2936 if (ex == null && (s instanceof AltResult))
2937 ex = ((AltResult)s).ex;
2938 dst.internalComplete(null, ex);
2939 }
2940 fst.helpPostComplete();
2941 snd.helpPostComplete();
2942 return dst;
2943 }
2944
2945 /**
2946 * Returns a new CompletableFuture that is completed when any of
2947 * the given CompletableFutures complete; with the same result if
2948 * it completed normally, otherwise exceptionally. If no
2949 * CompletableFutures are provided, returns an incomplete
2950 * CompletableFuture.
2951 *
2952 * @param cfs the CompletableFutures
2953 * @return a new CompletableFuture that is completed when any of the
2954 * given CompletableFutures complete
2955 * @throws NullPointerException if the array or any of its elements are
2956 * {@code null}
2957 */
2958 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
2959 int len = cfs.length; // Same idea as allOf
2960 if (len > 1)
2961 return anyTree(cfs, 0, len - 1);
2962 else {
2963 CompletableFuture<?> dst = new CompletableFuture<Object>();
2964 CompletableFuture<?> f;
2965 if (len == 0)
2966 ; // skip
2967 else if ((f = cfs[0]) == null)
2968 throw new NullPointerException();
2969 else {
2970 ThenCopy d = null;
2971 CompletionNode p = null;
2972 Object r;
2973 while ((r = f.result) == null) {
2974 if (d == null)
2975 d = new ThenCopy(f, dst);
2976 else if (p == null)
2977 p = new CompletionNode(d);
2978 else if (UNSAFE.compareAndSwapObject
2979 (f, COMPLETIONS, p.next = f.completions, p))
2980 break;
2981 }
2982 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2983 Throwable ex; Object t;
2984 if (r instanceof AltResult) {
2985 ex = ((AltResult)r).ex;
2986 t = null;
2987 }
2988 else {
2989 ex = null;
2990 t = r;
2991 }
2992 dst.internalComplete(t, ex);
2993 }
2994 f.helpPostComplete();
2995 }
2996 return dst;
2997 }
2998 }
2999
3000 /**
3001 * Recursively constructs an Or'ed tree of CompletableFutures.
3002 */
3003 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
3004 int lo, int hi) {
3005 CompletableFuture<?> fst, snd;
3006 int mid = (lo + hi) >>> 1;
3007 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3008 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3009 throw new NullPointerException();
3010 CompletableFuture<?> dst = new CompletableFuture<Object>();
3011 OrCompletion d = null;
3012 CompletionNode p = null, q = null;
3013 Object r;
3014 while ((r = fst.result) == null && (r = snd.result) == null) {
3015 if (d == null)
3016 d = new OrCompletion(fst, snd, dst);
3017 else if (p == null)
3018 p = new CompletionNode(d);
3019 else if (q == null) {
3020 if (UNSAFE.compareAndSwapObject
3021 (fst, COMPLETIONS, p.next = fst.completions, p))
3022 q = new CompletionNode(d);
3023 }
3024 else if (UNSAFE.compareAndSwapObject
3025 (snd, COMPLETIONS, q.next = snd.completions, q))
3026 break;
3027 }
3028 if ((r != null || (r = fst.result) != null ||
3029 (r = snd.result) != null) &&
3030 (d == null || d.compareAndSet(0, 1))) {
3031 Throwable ex; Object t;
3032 if (r instanceof AltResult) {
3033 ex = ((AltResult)r).ex;
3034 t = null;
3035 }
3036 else {
3037 ex = null;
3038 t = r;
3039 }
3040 dst.internalComplete(t, ex);
3041 }
3042 fst.helpPostComplete();
3043 snd.helpPostComplete();
3044 return dst;
3045 }
3046
3047
3048 /* ------------- Control and status methods -------------- */
3049
3050 /**
3051 * If not already completed, completes this CompletableFuture with
3052 * a {@link CancellationException}. Dependent CompletableFutures
3053 * that have not already completed will also complete
3054 * exceptionally, with a {@link CompletionException} caused by
3055 * this {@code CancellationException}.
3056 *
3057 * @param mayInterruptIfRunning this value has no effect in this
3058 * implementation because interrupts are not used to control
3059 * processing.
3060 *
3061 * @return {@code true} if this task is now cancelled
3062 */
3063 public boolean cancel(boolean mayInterruptIfRunning) {
3064 boolean cancelled = (result == null) &&
3065 UNSAFE.compareAndSwapObject
3066 (this, RESULT, null, new AltResult(new CancellationException()));
3067 postComplete();
3068 return cancelled || isCancelled();
3069 }
3070
3071 /**
3072 * Returns {@code true} if this CompletableFuture was cancelled
3073 * before it completed normally.
3074 *
3075 * @return {@code true} if this CompletableFuture was cancelled
3076 * before it completed normally
3077 */
3078 public boolean isCancelled() {
3079 Object r;
3080 return ((r = result) instanceof AltResult) &&
3081 (((AltResult)r).ex instanceof CancellationException);
3082 }
3083
3084 /**
3085 * Forcibly sets or resets the value subsequently returned by
3086 * method {@link #get()} and related methods, whether or not
3087 * already completed. This method is designed for use only in
3088 * error recovery actions, and even in such situations may result
3089 * in ongoing dependent completions using established versus
3090 * overwritten outcomes.
3091 *
3092 * @param value the completion value
3093 */
3094 public void obtrudeValue(T value) {
3095 result = (value == null) ? NIL : value;
3096 postComplete();
3097 }
3098
3099 /**
3100 * Forcibly causes subsequent invocations of method {@link #get()}
3101 * and related methods to throw the given exception, whether or
3102 * not already completed. This method is designed for use only in
3103 * recovery actions, and even in such situations may result in
3104 * ongoing dependent completions using established versus
3105 * overwritten outcomes.
3106 *
3107 * @param ex the exception
3108 */
3109 public void obtrudeException(Throwable ex) {
3110 if (ex == null) throw new NullPointerException();
3111 result = new AltResult(ex);
3112 postComplete();
3113 }
3114
3115 /**
3116 * Returns the estimated number of CompletableFutures whose
3117 * completions are awaiting completion of this CompletableFuture.
3118 * This method is designed for use in monitoring system state, not
3119 * for synchronization control.
3120 *
3121 * @return the number of dependent CompletableFutures
3122 */
3123 public int getNumberOfDependents() {
3124 int count = 0;
3125 for (CompletionNode p = completions; p != null; p = p.next)
3126 ++count;
3127 return count;
3128 }
3129
3130 /**
3131 * Returns a string identifying this CompletableFuture, as well as
3132 * its completion state. The state, in brackets, contains the
3133 * String {@code "Completed Normally"} or the String {@code
3134 * "Completed Exceptionally"}, or the String {@code "Not
3135 * completed"} followed by the number of CompletableFutures
3136 * dependent upon its completion, if any.
3137 *
3138 * @return a string identifying this CompletableFuture, as well as its state
3139 */
3140 public String toString() {
3141 Object r = result;
3142 int count;
3143 return super.toString() +
3144 ((r == null) ?
3145 (((count = getNumberOfDependents()) == 0) ?
3146 "[Not completed]" :
3147 "[Not completed, " + count + " dependents]") :
3148 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3149 "[Completed exceptionally]" :
3150 "[Completed normally]"));
3151 }
3152
3153 // Unsafe mechanics
3154 private static final sun.misc.Unsafe UNSAFE;
3155 private static final long RESULT;
3156 private static final long WAITERS;
3157 private static final long COMPLETIONS;
3158 static {
3159 try {
3160 UNSAFE = sun.misc.Unsafe.getUnsafe();
3161 Class<?> k = CompletableFuture.class;
3162 RESULT = UNSAFE.objectFieldOffset
3163 (k.getDeclaredField("result"));
3164 WAITERS = UNSAFE.objectFieldOffset
3165 (k.getDeclaredField("waiters"));
3166 COMPLETIONS = UNSAFE.objectFieldOffset
3167 (k.getDeclaredField("completions"));
3168 } catch (Exception e) {
3169 throw new Error(e);
3170 }
3171 }
3172 }