ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.72
Committed: Tue Mar 19 14:43:07 2013 UTC (11 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.71: +0 -3 lines
Log Message:
remove javadoc edit leftover

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on
37 * user-provided Functions, Consumers, or Runnables. The appropriate
38 * form to use depends on whether actions require arguments and/or
39 * produce results. Completion of a dependent action will trigger the
40 * completion of another CompletableFuture. Actions may also be
41 * triggered after either or both the current and another
42 * CompletableFuture complete. Multiple CompletableFutures may also
43 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44 * {@link #allOf(CompletableFuture...)}.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, actions supplied for dependent completions of another
48 * CompletableFuture may do so, depending on whether they are provided
49 * via one of the <em>async</em> methods (that is, methods with names
50 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51 * methods provide a way to commence asynchronous processing of an
52 * action using either a given {@link Executor} or by default the
53 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54 * debugging, and tracking, all generated asynchronous tasks are
55 * instances of the marker interface {@link AsynchronousCompletionTask}.
56 *
57 * <p>Actions supplied for dependent completions of <em>non-async</em>
58 * methods may be performed by the thread that completes the current
59 * CompletableFuture, or by any other caller of these methods. There
60 * are no guarantees about the order of processing completions unless
61 * constrained by these methods.
62 *
63 * <p>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional completion.
66 * Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}.
68 *
69 * <p>Upon exceptional completion (including cancellation), or when a
70 * completion entails an additional computation which terminates
71 * abruptly with an (unchecked) exception or error, then all of their
72 * dependent completions (and their dependents in turn) generally act
73 * as {@code completeExceptionally} with a {@link CompletionException}
74 * holding that exception as its cause. However, the {@link
75 * #exceptionally exceptionally} and {@link #handle handle}
76 * completions <em>are</em> able to handle exceptional completions of
77 * the CompletableFutures they depend on.
78 *
79 * <p>In case of exceptional completion with a CompletionException,
80 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81 * {@link ExecutionException} with the same cause as held in the
82 * corresponding CompletionException. However, in these cases,
83 * methods {@link #join()} and {@link #getNow} throw the
84 * CompletionException, which simplifies usage.
85 *
86 * @author Doug Lea
87 * @since 1.8
88 */
89 public class CompletableFuture<T> implements Future<T> {
90
91 /*
92 * Overview:
93 *
94 * 1. Non-nullness of field result (set via CAS) indicates done.
95 * An AltResult is used to box null as a result, as well as to
96 * hold exceptions. Using a single field makes completion fast
97 * and simple to detect and trigger, at the expense of a lot of
98 * encoding and decoding that infiltrates many methods. One minor
99 * simplification relies on the (static) NIL (to box null results)
100 * being the only AltResult with a null exception field, so we
101 * don't usually need explicit comparisons with NIL. The CF
102 * exception propagation mechanics surrounding decoding rely on
103 * unchecked casts of decoded results really being unchecked,
104 * where user type errors are caught at point of use, as is
105 * currently the case in Java. These are highlighted by using
106 * SuppressWarnings-annotated temporaries.
107 *
108 * 2. Waiters are held in a Treiber stack similar to the one used
109 * in FutureTask, Phaser, and SynchronousQueue. See their
110 * internal documentation for algorithmic details.
111 *
112 * 3. Completions are also kept in a list/stack, and pulled off
113 * and run when completion is triggered. (We could even use the
114 * same stack as for waiters, but would give up the potential
115 * parallelism obtained because woken waiters help release/run
116 * others -- see method postComplete). Because post-processing
117 * may race with direct calls, class Completion opportunistically
118 * extends AtomicInteger so callers can claim the action via
119 * compareAndSet(0, 1). The Completion.run methods are all
120 * written a boringly similar uniform way (that sometimes includes
121 * unnecessary-looking checks, kept to maintain uniformity).
122 * There are enough dimensions upon which they differ that
123 * attempts to factor commonalities while maintaining efficiency
124 * require more lines of code than they would save.
125 *
126 * 4. The exported then/and/or methods do support a bit of
127 * factoring (see doThenApply etc). They must cope with the
128 * intrinsic races surrounding addition of a dependent action
129 * versus performing the action directly because the task is
130 * already complete. For example, a CF may not be complete upon
131 * entry, so a dependent completion is added, but by the time it
132 * is added, the target CF is complete, so must be directly
133 * executed. This is all done while avoiding unnecessary object
134 * construction in safe-bypass cases.
135 */
136
137 // preliminaries
138
139 static final class AltResult {
140 final Throwable ex; // null only for NIL
141 AltResult(Throwable ex) { this.ex = ex; }
142 }
143
144 static final AltResult NIL = new AltResult(null);
145
146 // Fields
147
148 volatile Object result; // Either the result or boxed AltResult
149 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
150 volatile CompletionNode completions; // list (Treiber stack) of completions
151
152 // Basic utilities for triggering and processing completions
153
154 /**
155 * Removes and signals all waiting threads and runs all completions.
156 */
157 final void postComplete() {
158 WaitNode q; Thread t;
159 while ((q = waiters) != null) {
160 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
161 (t = q.thread) != null) {
162 q.thread = null;
163 LockSupport.unpark(t);
164 }
165 }
166
167 CompletionNode h; Completion c;
168 while ((h = completions) != null) {
169 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
170 (c = h.completion) != null)
171 c.run();
172 }
173 }
174
175 /**
176 * Triggers completion with the encoding of the given arguments:
177 * if the exception is non-null, encodes it as a wrapped
178 * CompletionException unless it is one already. Otherwise uses
179 * the given result, boxed as NIL if null.
180 */
181 final void internalComplete(Object v, Throwable ex) {
182 if (result == null)
183 UNSAFE.compareAndSwapObject
184 (this, RESULT, null,
185 (ex == null) ? (v == null) ? NIL : v :
186 new AltResult((ex instanceof CompletionException) ? ex :
187 new CompletionException(ex)));
188 postComplete(); // help out even if not triggered
189 }
190
191 /**
192 * If triggered, helps release and/or process completions.
193 */
194 final void helpPostComplete() {
195 if (result != null)
196 postComplete();
197 }
198
199 /* ------------- waiting for completions -------------- */
200
201 /** Number of processors, for spin control */
202 static final int NCPU = Runtime.getRuntime().availableProcessors();
203
204 /**
205 * Heuristic spin value for waitingGet() before blocking on
206 * multiprocessors
207 */
208 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
209
210 /**
211 * Linked nodes to record waiting threads in a Treiber stack. See
212 * other classes such as Phaser and SynchronousQueue for more
213 * detailed explanation. This class implements ManagedBlocker to
214 * avoid starvation when blocking actions pile up in
215 * ForkJoinPools.
216 */
217 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
218 long nanos; // wait time if timed
219 final long deadline; // non-zero if timed
220 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
221 volatile Thread thread;
222 volatile WaitNode next;
223 WaitNode(boolean interruptible, long nanos, long deadline) {
224 this.thread = Thread.currentThread();
225 this.interruptControl = interruptible ? 1 : 0;
226 this.nanos = nanos;
227 this.deadline = deadline;
228 }
229 public boolean isReleasable() {
230 if (thread == null)
231 return true;
232 if (Thread.interrupted()) {
233 int i = interruptControl;
234 interruptControl = -1;
235 if (i > 0)
236 return true;
237 }
238 if (deadline != 0L &&
239 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
240 thread = null;
241 return true;
242 }
243 return false;
244 }
245 public boolean block() {
246 if (isReleasable())
247 return true;
248 else if (deadline == 0L)
249 LockSupport.park(this);
250 else if (nanos > 0L)
251 LockSupport.parkNanos(this, nanos);
252 return isReleasable();
253 }
254 }
255
256 /**
257 * Returns raw result after waiting, or null if interruptible and
258 * interrupted.
259 */
260 private Object waitingGet(boolean interruptible) {
261 WaitNode q = null;
262 boolean queued = false;
263 int spins = SPINS;
264 for (Object r;;) {
265 if ((r = result) != null) {
266 if (q != null) { // suppress unpark
267 q.thread = null;
268 if (q.interruptControl < 0) {
269 if (interruptible) {
270 removeWaiter(q);
271 return null;
272 }
273 Thread.currentThread().interrupt();
274 }
275 }
276 postComplete(); // help release others
277 return r;
278 }
279 else if (spins > 0) {
280 int rnd = ThreadLocalRandom.nextSecondarySeed();
281 if (rnd == 0)
282 rnd = ThreadLocalRandom.current().nextInt();
283 if (rnd >= 0)
284 --spins;
285 }
286 else if (q == null)
287 q = new WaitNode(interruptible, 0L, 0L);
288 else if (!queued)
289 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
290 q.next = waiters, q);
291 else if (interruptible && q.interruptControl < 0) {
292 removeWaiter(q);
293 return null;
294 }
295 else if (q.thread != null && result == null) {
296 try {
297 ForkJoinPool.managedBlock(q);
298 } catch (InterruptedException ex) {
299 q.interruptControl = -1;
300 }
301 }
302 }
303 }
304
305 /**
306 * Awaits completion or aborts on interrupt or timeout.
307 *
308 * @param nanos time to wait
309 * @return raw result
310 */
311 private Object timedAwaitDone(long nanos)
312 throws InterruptedException, TimeoutException {
313 WaitNode q = null;
314 boolean queued = false;
315 for (Object r;;) {
316 if ((r = result) != null) {
317 if (q != null) {
318 q.thread = null;
319 if (q.interruptControl < 0) {
320 removeWaiter(q);
321 throw new InterruptedException();
322 }
323 }
324 postComplete();
325 return r;
326 }
327 else if (q == null) {
328 if (nanos <= 0L)
329 throw new TimeoutException();
330 long d = System.nanoTime() + nanos;
331 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
332 }
333 else if (!queued)
334 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
335 q.next = waiters, q);
336 else if (q.interruptControl < 0) {
337 removeWaiter(q);
338 throw new InterruptedException();
339 }
340 else if (q.nanos <= 0L) {
341 if (result == null) {
342 removeWaiter(q);
343 throw new TimeoutException();
344 }
345 }
346 else if (q.thread != null && result == null) {
347 try {
348 ForkJoinPool.managedBlock(q);
349 } catch (InterruptedException ex) {
350 q.interruptControl = -1;
351 }
352 }
353 }
354 }
355
356 /**
357 * Tries to unlink a timed-out or interrupted wait node to avoid
358 * accumulating garbage. Internal nodes are simply unspliced
359 * without CAS since it is harmless if they are traversed anyway
360 * by releasers. To avoid effects of unsplicing from already
361 * removed nodes, the list is retraversed in case of an apparent
362 * race. This is slow when there are a lot of nodes, but we don't
363 * expect lists to be long enough to outweigh higher-overhead
364 * schemes.
365 */
366 private void removeWaiter(WaitNode node) {
367 if (node != null) {
368 node.thread = null;
369 retry:
370 for (;;) { // restart on removeWaiter race
371 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
372 s = q.next;
373 if (q.thread != null)
374 pred = q;
375 else if (pred != null) {
376 pred.next = s;
377 if (pred.thread == null) // check for race
378 continue retry;
379 }
380 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
381 continue retry;
382 }
383 break;
384 }
385 }
386 }
387
388 /* ------------- Async tasks -------------- */
389
390 /**
391 * A marker interface identifying asynchronous tasks produced by
392 * {@code async} methods. This may be useful for monitoring,
393 * debugging, and tracking asynchronous activities.
394 *
395 * @since 1.8
396 */
397 public static interface AsynchronousCompletionTask {
398 }
399
400 /** Base class can act as either FJ or plain Runnable */
401 abstract static class Async extends ForkJoinTask<Void>
402 implements Runnable, AsynchronousCompletionTask {
403 public final Void getRawResult() { return null; }
404 public final void setRawResult(Void v) { }
405 public final void run() { exec(); }
406 }
407
408 static final class AsyncRun extends Async {
409 final Runnable fn;
410 final CompletableFuture<Void> dst;
411 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
412 this.fn = fn; this.dst = dst;
413 }
414 public final boolean exec() {
415 CompletableFuture<Void> d; Throwable ex;
416 if ((d = this.dst) != null && d.result == null) {
417 try {
418 fn.run();
419 ex = null;
420 } catch (Throwable rex) {
421 ex = rex;
422 }
423 d.internalComplete(null, ex);
424 }
425 return true;
426 }
427 private static final long serialVersionUID = 5232453952276885070L;
428 }
429
430 static final class AsyncSupply<U> extends Async {
431 final Supplier<U> fn;
432 final CompletableFuture<U> dst;
433 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
434 this.fn = fn; this.dst = dst;
435 }
436 public final boolean exec() {
437 CompletableFuture<U> d; U u; Throwable ex;
438 if ((d = this.dst) != null && d.result == null) {
439 try {
440 u = fn.get();
441 ex = null;
442 } catch (Throwable rex) {
443 ex = rex;
444 u = null;
445 }
446 d.internalComplete(u, ex);
447 }
448 return true;
449 }
450 private static final long serialVersionUID = 5232453952276885070L;
451 }
452
453 static final class AsyncApply<T,U> extends Async {
454 final T arg;
455 final Function<? super T,? extends U> fn;
456 final CompletableFuture<U> dst;
457 AsyncApply(T arg, Function<? super T,? extends U> fn,
458 CompletableFuture<U> dst) {
459 this.arg = arg; this.fn = fn; this.dst = dst;
460 }
461 public final boolean exec() {
462 CompletableFuture<U> d; U u; Throwable ex;
463 if ((d = this.dst) != null && d.result == null) {
464 try {
465 u = fn.apply(arg);
466 ex = null;
467 } catch (Throwable rex) {
468 ex = rex;
469 u = null;
470 }
471 d.internalComplete(u, ex);
472 }
473 return true;
474 }
475 private static final long serialVersionUID = 5232453952276885070L;
476 }
477
478 static final class AsyncBiApply<T,U,V> extends Async {
479 final T arg1;
480 final U arg2;
481 final BiFunction<? super T,? super U,? extends V> fn;
482 final CompletableFuture<V> dst;
483 AsyncBiApply(T arg1, U arg2,
484 BiFunction<? super T,? super U,? extends V> fn,
485 CompletableFuture<V> dst) {
486 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
487 }
488 public final boolean exec() {
489 CompletableFuture<V> d; V v; Throwable ex;
490 if ((d = this.dst) != null && d.result == null) {
491 try {
492 v = fn.apply(arg1, arg2);
493 ex = null;
494 } catch (Throwable rex) {
495 ex = rex;
496 v = null;
497 }
498 d.internalComplete(v, ex);
499 }
500 return true;
501 }
502 private static final long serialVersionUID = 5232453952276885070L;
503 }
504
505 static final class AsyncAccept<T> extends Async {
506 final T arg;
507 final Consumer<? super T> fn;
508 final CompletableFuture<Void> dst;
509 AsyncAccept(T arg, Consumer<? super T> fn,
510 CompletableFuture<Void> dst) {
511 this.arg = arg; this.fn = fn; this.dst = dst;
512 }
513 public final boolean exec() {
514 CompletableFuture<Void> d; Throwable ex;
515 if ((d = this.dst) != null && d.result == null) {
516 try {
517 fn.accept(arg);
518 ex = null;
519 } catch (Throwable rex) {
520 ex = rex;
521 }
522 d.internalComplete(null, ex);
523 }
524 return true;
525 }
526 private static final long serialVersionUID = 5232453952276885070L;
527 }
528
529 static final class AsyncBiAccept<T,U> extends Async {
530 final T arg1;
531 final U arg2;
532 final BiConsumer<? super T,? super U> fn;
533 final CompletableFuture<Void> dst;
534 AsyncBiAccept(T arg1, U arg2,
535 BiConsumer<? super T,? super U> fn,
536 CompletableFuture<Void> dst) {
537 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
538 }
539 public final boolean exec() {
540 CompletableFuture<Void> d; Throwable ex;
541 if ((d = this.dst) != null && d.result == null) {
542 try {
543 fn.accept(arg1, arg2);
544 ex = null;
545 } catch (Throwable rex) {
546 ex = rex;
547 }
548 d.internalComplete(null, ex);
549 }
550 return true;
551 }
552 private static final long serialVersionUID = 5232453952276885070L;
553 }
554
555 static final class AsyncCompose<T,U> extends Async {
556 final T arg;
557 final Function<? super T, CompletableFuture<U>> fn;
558 final CompletableFuture<U> dst;
559 AsyncCompose(T arg,
560 Function<? super T, CompletableFuture<U>> fn,
561 CompletableFuture<U> dst) {
562 this.arg = arg; this.fn = fn; this.dst = dst;
563 }
564 public final boolean exec() {
565 CompletableFuture<U> d, fr; U u; Throwable ex;
566 if ((d = this.dst) != null && d.result == null) {
567 try {
568 fr = fn.apply(arg);
569 ex = (fr == null) ? new NullPointerException() : null;
570 } catch (Throwable rex) {
571 ex = rex;
572 fr = null;
573 }
574 if (ex != null)
575 u = null;
576 else {
577 Object r = fr.result;
578 if (r == null)
579 r = fr.waitingGet(false);
580 if (r instanceof AltResult) {
581 ex = ((AltResult)r).ex;
582 u = null;
583 }
584 else {
585 @SuppressWarnings("unchecked") U ur = (U) r;
586 u = ur;
587 }
588 }
589 d.internalComplete(u, ex);
590 }
591 return true;
592 }
593 private static final long serialVersionUID = 5232453952276885070L;
594 }
595
596 /* ------------- Completions -------------- */
597
598 /**
599 * Simple linked list nodes to record completions, used in
600 * basically the same way as WaitNodes. (We separate nodes from
601 * the Completions themselves mainly because for the And and Or
602 * methods, the same Completion object resides in two lists.)
603 */
604 static final class CompletionNode {
605 final Completion completion;
606 volatile CompletionNode next;
607 CompletionNode(Completion completion) { this.completion = completion; }
608 }
609
610 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
611 abstract static class Completion extends AtomicInteger implements Runnable {
612 }
613
614 static final class ApplyCompletion<T,U> extends Completion {
615 final CompletableFuture<? extends T> src;
616 final Function<? super T,? extends U> fn;
617 final CompletableFuture<U> dst;
618 final Executor executor;
619 ApplyCompletion(CompletableFuture<? extends T> src,
620 Function<? super T,? extends U> fn,
621 CompletableFuture<U> dst, Executor executor) {
622 this.src = src; this.fn = fn; this.dst = dst;
623 this.executor = executor;
624 }
625 public final void run() {
626 final CompletableFuture<? extends T> a;
627 final Function<? super T,? extends U> fn;
628 final CompletableFuture<U> dst;
629 Object r; T t; Throwable ex;
630 if ((dst = this.dst) != null &&
631 (fn = this.fn) != null &&
632 (a = this.src) != null &&
633 (r = a.result) != null &&
634 compareAndSet(0, 1)) {
635 if (r instanceof AltResult) {
636 ex = ((AltResult)r).ex;
637 t = null;
638 }
639 else {
640 ex = null;
641 @SuppressWarnings("unchecked") T tr = (T) r;
642 t = tr;
643 }
644 Executor e = executor;
645 U u = null;
646 if (ex == null) {
647 try {
648 if (e != null)
649 e.execute(new AsyncApply<T,U>(t, fn, dst));
650 else
651 u = fn.apply(t);
652 } catch (Throwable rex) {
653 ex = rex;
654 }
655 }
656 if (e == null || ex != null)
657 dst.internalComplete(u, ex);
658 }
659 }
660 private static final long serialVersionUID = 5232453952276885070L;
661 }
662
663 static final class AcceptCompletion<T> extends Completion {
664 final CompletableFuture<? extends T> src;
665 final Consumer<? super T> fn;
666 final CompletableFuture<Void> dst;
667 final Executor executor;
668 AcceptCompletion(CompletableFuture<? extends T> src,
669 Consumer<? super T> fn,
670 CompletableFuture<Void> dst, Executor executor) {
671 this.src = src; this.fn = fn; this.dst = dst;
672 this.executor = executor;
673 }
674 public final void run() {
675 final CompletableFuture<? extends T> a;
676 final Consumer<? super T> fn;
677 final CompletableFuture<Void> dst;
678 Object r; T t; Throwable ex;
679 if ((dst = this.dst) != null &&
680 (fn = this.fn) != null &&
681 (a = this.src) != null &&
682 (r = a.result) != null &&
683 compareAndSet(0, 1)) {
684 if (r instanceof AltResult) {
685 ex = ((AltResult)r).ex;
686 t = null;
687 }
688 else {
689 ex = null;
690 @SuppressWarnings("unchecked") T tr = (T) r;
691 t = tr;
692 }
693 Executor e = executor;
694 if (ex == null) {
695 try {
696 if (e != null)
697 e.execute(new AsyncAccept<T>(t, fn, dst));
698 else
699 fn.accept(t);
700 } catch (Throwable rex) {
701 ex = rex;
702 }
703 }
704 if (e == null || ex != null)
705 dst.internalComplete(null, ex);
706 }
707 }
708 private static final long serialVersionUID = 5232453952276885070L;
709 }
710
711 static final class RunCompletion<T> extends Completion {
712 final CompletableFuture<? extends T> src;
713 final Runnable fn;
714 final CompletableFuture<Void> dst;
715 final Executor executor;
716 RunCompletion(CompletableFuture<? extends T> src,
717 Runnable fn,
718 CompletableFuture<Void> dst,
719 Executor executor) {
720 this.src = src; this.fn = fn; this.dst = dst;
721 this.executor = executor;
722 }
723 public final void run() {
724 final CompletableFuture<? extends T> a;
725 final Runnable fn;
726 final CompletableFuture<Void> dst;
727 Object r; Throwable ex;
728 if ((dst = this.dst) != null &&
729 (fn = this.fn) != null &&
730 (a = this.src) != null &&
731 (r = a.result) != null &&
732 compareAndSet(0, 1)) {
733 if (r instanceof AltResult)
734 ex = ((AltResult)r).ex;
735 else
736 ex = null;
737 Executor e = executor;
738 if (ex == null) {
739 try {
740 if (e != null)
741 e.execute(new AsyncRun(fn, dst));
742 else
743 fn.run();
744 } catch (Throwable rex) {
745 ex = rex;
746 }
747 }
748 if (e == null || ex != null)
749 dst.internalComplete(null, ex);
750 }
751 }
752 private static final long serialVersionUID = 5232453952276885070L;
753 }
754
755 static final class BiApplyCompletion<T,U,V> extends Completion {
756 final CompletableFuture<? extends T> src;
757 final CompletableFuture<? extends U> snd;
758 final BiFunction<? super T,? super U,? extends V> fn;
759 final CompletableFuture<V> dst;
760 final Executor executor;
761 BiApplyCompletion(CompletableFuture<? extends T> src,
762 CompletableFuture<? extends U> snd,
763 BiFunction<? super T,? super U,? extends V> fn,
764 CompletableFuture<V> dst, Executor executor) {
765 this.src = src; this.snd = snd;
766 this.fn = fn; this.dst = dst;
767 this.executor = executor;
768 }
769 public final void run() {
770 final CompletableFuture<? extends T> a;
771 final CompletableFuture<? extends U> b;
772 final BiFunction<? super T,? super U,? extends V> fn;
773 final CompletableFuture<V> dst;
774 Object r, s; T t; U u; Throwable ex;
775 if ((dst = this.dst) != null &&
776 (fn = this.fn) != null &&
777 (a = this.src) != null &&
778 (r = a.result) != null &&
779 (b = this.snd) != null &&
780 (s = b.result) != null &&
781 compareAndSet(0, 1)) {
782 if (r instanceof AltResult) {
783 ex = ((AltResult)r).ex;
784 t = null;
785 }
786 else {
787 ex = null;
788 @SuppressWarnings("unchecked") T tr = (T) r;
789 t = tr;
790 }
791 if (ex != null)
792 u = null;
793 else if (s instanceof AltResult) {
794 ex = ((AltResult)s).ex;
795 u = null;
796 }
797 else {
798 @SuppressWarnings("unchecked") U us = (U) s;
799 u = us;
800 }
801 Executor e = executor;
802 V v = null;
803 if (ex == null) {
804 try {
805 if (e != null)
806 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
807 else
808 v = fn.apply(t, u);
809 } catch (Throwable rex) {
810 ex = rex;
811 }
812 }
813 if (e == null || ex != null)
814 dst.internalComplete(v, ex);
815 }
816 }
817 private static final long serialVersionUID = 5232453952276885070L;
818 }
819
820 static final class BiAcceptCompletion<T,U> extends Completion {
821 final CompletableFuture<? extends T> src;
822 final CompletableFuture<? extends U> snd;
823 final BiConsumer<? super T,? super U> fn;
824 final CompletableFuture<Void> dst;
825 final Executor executor;
826 BiAcceptCompletion(CompletableFuture<? extends T> src,
827 CompletableFuture<? extends U> snd,
828 BiConsumer<? super T,? super U> fn,
829 CompletableFuture<Void> dst, Executor executor) {
830 this.src = src; this.snd = snd;
831 this.fn = fn; this.dst = dst;
832 this.executor = executor;
833 }
834 public final void run() {
835 final CompletableFuture<? extends T> a;
836 final CompletableFuture<? extends U> b;
837 final BiConsumer<? super T,? super U> fn;
838 final CompletableFuture<Void> dst;
839 Object r, s; T t; U u; Throwable ex;
840 if ((dst = this.dst) != null &&
841 (fn = this.fn) != null &&
842 (a = this.src) != null &&
843 (r = a.result) != null &&
844 (b = this.snd) != null &&
845 (s = b.result) != null &&
846 compareAndSet(0, 1)) {
847 if (r instanceof AltResult) {
848 ex = ((AltResult)r).ex;
849 t = null;
850 }
851 else {
852 ex = null;
853 @SuppressWarnings("unchecked") T tr = (T) r;
854 t = tr;
855 }
856 if (ex != null)
857 u = null;
858 else if (s instanceof AltResult) {
859 ex = ((AltResult)s).ex;
860 u = null;
861 }
862 else {
863 @SuppressWarnings("unchecked") U us = (U) s;
864 u = us;
865 }
866 Executor e = executor;
867 if (ex == null) {
868 try {
869 if (e != null)
870 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
871 else
872 fn.accept(t, u);
873 } catch (Throwable rex) {
874 ex = rex;
875 }
876 }
877 if (e == null || ex != null)
878 dst.internalComplete(null, ex);
879 }
880 }
881 private static final long serialVersionUID = 5232453952276885070L;
882 }
883
884 static final class BiRunCompletion<T> extends Completion {
885 final CompletableFuture<? extends T> src;
886 final CompletableFuture<?> snd;
887 final Runnable fn;
888 final CompletableFuture<Void> dst;
889 final Executor executor;
890 BiRunCompletion(CompletableFuture<? extends T> src,
891 CompletableFuture<?> snd,
892 Runnable fn,
893 CompletableFuture<Void> dst, Executor executor) {
894 this.src = src; this.snd = snd;
895 this.fn = fn; this.dst = dst;
896 this.executor = executor;
897 }
898 public final void run() {
899 final CompletableFuture<? extends T> a;
900 final CompletableFuture<?> b;
901 final Runnable fn;
902 final CompletableFuture<Void> dst;
903 Object r, s; Throwable ex;
904 if ((dst = this.dst) != null &&
905 (fn = this.fn) != null &&
906 (a = this.src) != null &&
907 (r = a.result) != null &&
908 (b = this.snd) != null &&
909 (s = b.result) != null &&
910 compareAndSet(0, 1)) {
911 if (r instanceof AltResult)
912 ex = ((AltResult)r).ex;
913 else
914 ex = null;
915 if (ex == null && (s instanceof AltResult))
916 ex = ((AltResult)s).ex;
917 Executor e = executor;
918 if (ex == null) {
919 try {
920 if (e != null)
921 e.execute(new AsyncRun(fn, dst));
922 else
923 fn.run();
924 } catch (Throwable rex) {
925 ex = rex;
926 }
927 }
928 if (e == null || ex != null)
929 dst.internalComplete(null, ex);
930 }
931 }
932 private static final long serialVersionUID = 5232453952276885070L;
933 }
934
935 static final class AndCompletion extends Completion {
936 final CompletableFuture<?> src;
937 final CompletableFuture<?> snd;
938 final CompletableFuture<Void> dst;
939 AndCompletion(CompletableFuture<?> src,
940 CompletableFuture<?> snd,
941 CompletableFuture<Void> dst) {
942 this.src = src; this.snd = snd; this.dst = dst;
943 }
944 public final void run() {
945 final CompletableFuture<?> a;
946 final CompletableFuture<?> b;
947 final CompletableFuture<Void> dst;
948 Object r, s; Throwable ex;
949 if ((dst = this.dst) != null &&
950 (a = this.src) != null &&
951 (r = a.result) != null &&
952 (b = this.snd) != null &&
953 (s = b.result) != null &&
954 compareAndSet(0, 1)) {
955 if (r instanceof AltResult)
956 ex = ((AltResult)r).ex;
957 else
958 ex = null;
959 if (ex == null && (s instanceof AltResult))
960 ex = ((AltResult)s).ex;
961 dst.internalComplete(null, ex);
962 }
963 }
964 private static final long serialVersionUID = 5232453952276885070L;
965 }
966
967 static final class OrApplyCompletion<T,U> extends Completion {
968 final CompletableFuture<? extends T> src;
969 final CompletableFuture<? extends T> snd;
970 final Function<? super T,? extends U> fn;
971 final CompletableFuture<U> dst;
972 final Executor executor;
973 OrApplyCompletion(CompletableFuture<? extends T> src,
974 CompletableFuture<? extends T> snd,
975 Function<? super T,? extends U> fn,
976 CompletableFuture<U> dst, Executor executor) {
977 this.src = src; this.snd = snd;
978 this.fn = fn; this.dst = dst;
979 this.executor = executor;
980 }
981 public final void run() {
982 final CompletableFuture<? extends T> a;
983 final CompletableFuture<? extends T> b;
984 final Function<? super T,? extends U> fn;
985 final CompletableFuture<U> dst;
986 Object r; T t; Throwable ex;
987 if ((dst = this.dst) != null &&
988 (fn = this.fn) != null &&
989 (((a = this.src) != null && (r = a.result) != null) ||
990 ((b = this.snd) != null && (r = b.result) != null)) &&
991 compareAndSet(0, 1)) {
992 if (r instanceof AltResult) {
993 ex = ((AltResult)r).ex;
994 t = null;
995 }
996 else {
997 ex = null;
998 @SuppressWarnings("unchecked") T tr = (T) r;
999 t = tr;
1000 }
1001 Executor e = executor;
1002 U u = null;
1003 if (ex == null) {
1004 try {
1005 if (e != null)
1006 e.execute(new AsyncApply<T,U>(t, fn, dst));
1007 else
1008 u = fn.apply(t);
1009 } catch (Throwable rex) {
1010 ex = rex;
1011 }
1012 }
1013 if (e == null || ex != null)
1014 dst.internalComplete(u, ex);
1015 }
1016 }
1017 private static final long serialVersionUID = 5232453952276885070L;
1018 }
1019
1020 static final class OrAcceptCompletion<T> extends Completion {
1021 final CompletableFuture<? extends T> src;
1022 final CompletableFuture<? extends T> snd;
1023 final Consumer<? super T> fn;
1024 final CompletableFuture<Void> dst;
1025 final Executor executor;
1026 OrAcceptCompletion(CompletableFuture<? extends T> src,
1027 CompletableFuture<? extends T> snd,
1028 Consumer<? super T> fn,
1029 CompletableFuture<Void> dst, Executor executor) {
1030 this.src = src; this.snd = snd;
1031 this.fn = fn; this.dst = dst;
1032 this.executor = executor;
1033 }
1034 public final void run() {
1035 final CompletableFuture<? extends T> a;
1036 final CompletableFuture<? extends T> b;
1037 final Consumer<? super T> fn;
1038 final CompletableFuture<Void> dst;
1039 Object r; T t; Throwable ex;
1040 if ((dst = this.dst) != null &&
1041 (fn = this.fn) != null &&
1042 (((a = this.src) != null && (r = a.result) != null) ||
1043 ((b = this.snd) != null && (r = b.result) != null)) &&
1044 compareAndSet(0, 1)) {
1045 if (r instanceof AltResult) {
1046 ex = ((AltResult)r).ex;
1047 t = null;
1048 }
1049 else {
1050 ex = null;
1051 @SuppressWarnings("unchecked") T tr = (T) r;
1052 t = tr;
1053 }
1054 Executor e = executor;
1055 if (ex == null) {
1056 try {
1057 if (e != null)
1058 e.execute(new AsyncAccept<T>(t, fn, dst));
1059 else
1060 fn.accept(t);
1061 } catch (Throwable rex) {
1062 ex = rex;
1063 }
1064 }
1065 if (e == null || ex != null)
1066 dst.internalComplete(null, ex);
1067 }
1068 }
1069 private static final long serialVersionUID = 5232453952276885070L;
1070 }
1071
1072 static final class OrRunCompletion<T> extends Completion {
1073 final CompletableFuture<? extends T> src;
1074 final CompletableFuture<?> snd;
1075 final Runnable fn;
1076 final CompletableFuture<Void> dst;
1077 final Executor executor;
1078 OrRunCompletion(CompletableFuture<? extends T> src,
1079 CompletableFuture<?> snd,
1080 Runnable fn,
1081 CompletableFuture<Void> dst, Executor executor) {
1082 this.src = src; this.snd = snd;
1083 this.fn = fn; this.dst = dst;
1084 this.executor = executor;
1085 }
1086 public final void run() {
1087 final CompletableFuture<? extends T> a;
1088 final CompletableFuture<?> b;
1089 final Runnable fn;
1090 final CompletableFuture<Void> dst;
1091 Object r; Throwable ex;
1092 if ((dst = this.dst) != null &&
1093 (fn = this.fn) != null &&
1094 (((a = this.src) != null && (r = a.result) != null) ||
1095 ((b = this.snd) != null && (r = b.result) != null)) &&
1096 compareAndSet(0, 1)) {
1097 if (r instanceof AltResult)
1098 ex = ((AltResult)r).ex;
1099 else
1100 ex = null;
1101 Executor e = executor;
1102 if (ex == null) {
1103 try {
1104 if (e != null)
1105 e.execute(new AsyncRun(fn, dst));
1106 else
1107 fn.run();
1108 } catch (Throwable rex) {
1109 ex = rex;
1110 }
1111 }
1112 if (e == null || ex != null)
1113 dst.internalComplete(null, ex);
1114 }
1115 }
1116 private static final long serialVersionUID = 5232453952276885070L;
1117 }
1118
1119 static final class OrCompletion extends Completion {
1120 final CompletableFuture<?> src;
1121 final CompletableFuture<?> snd;
1122 final CompletableFuture<?> dst;
1123 OrCompletion(CompletableFuture<?> src,
1124 CompletableFuture<?> snd,
1125 CompletableFuture<?> dst) {
1126 this.src = src; this.snd = snd; this.dst = dst;
1127 }
1128 public final void run() {
1129 final CompletableFuture<?> a;
1130 final CompletableFuture<?> b;
1131 final CompletableFuture<?> dst;
1132 Object r, t; Throwable ex;
1133 if ((dst = this.dst) != null &&
1134 (((a = this.src) != null && (r = a.result) != null) ||
1135 ((b = this.snd) != null && (r = b.result) != null)) &&
1136 compareAndSet(0, 1)) {
1137 if (r instanceof AltResult) {
1138 ex = ((AltResult)r).ex;
1139 t = null;
1140 }
1141 else {
1142 ex = null;
1143 t = r;
1144 }
1145 dst.internalComplete(t, ex);
1146 }
1147 }
1148 private static final long serialVersionUID = 5232453952276885070L;
1149 }
1150
1151 static final class ExceptionCompletion<T> extends Completion {
1152 final CompletableFuture<? extends T> src;
1153 final Function<? super Throwable, ? extends T> fn;
1154 final CompletableFuture<T> dst;
1155 ExceptionCompletion(CompletableFuture<? extends T> src,
1156 Function<? super Throwable, ? extends T> fn,
1157 CompletableFuture<T> dst) {
1158 this.src = src; this.fn = fn; this.dst = dst;
1159 }
1160 public final void run() {
1161 final CompletableFuture<? extends T> a;
1162 final Function<? super Throwable, ? extends T> fn;
1163 final CompletableFuture<T> dst;
1164 Object r; T t = null; Throwable ex, dx = null;
1165 if ((dst = this.dst) != null &&
1166 (fn = this.fn) != null &&
1167 (a = this.src) != null &&
1168 (r = a.result) != null &&
1169 compareAndSet(0, 1)) {
1170 if ((r instanceof AltResult) &&
1171 (ex = ((AltResult)r).ex) != null) {
1172 try {
1173 t = fn.apply(ex);
1174 } catch (Throwable rex) {
1175 dx = rex;
1176 }
1177 }
1178 else {
1179 @SuppressWarnings("unchecked") T tr = (T) r;
1180 t = tr;
1181 }
1182 dst.internalComplete(t, dx);
1183 }
1184 }
1185 private static final long serialVersionUID = 5232453952276885070L;
1186 }
1187
1188 static final class ThenCopy extends Completion {
1189 final CompletableFuture<?> src;
1190 final CompletableFuture<?> dst;
1191 ThenCopy(CompletableFuture<?> src,
1192 CompletableFuture<?> dst) {
1193 this.src = src; this.dst = dst;
1194 }
1195 public final void run() {
1196 final CompletableFuture<?> a;
1197 final CompletableFuture<?> dst;
1198 Object r; Object t; Throwable ex;
1199 if ((dst = this.dst) != null &&
1200 (a = this.src) != null &&
1201 (r = a.result) != null &&
1202 compareAndSet(0, 1)) {
1203 if (r instanceof AltResult) {
1204 ex = ((AltResult)r).ex;
1205 t = null;
1206 }
1207 else {
1208 ex = null;
1209 t = r;
1210 }
1211 dst.internalComplete(t, ex);
1212 }
1213 }
1214 private static final long serialVersionUID = 5232453952276885070L;
1215 }
1216
1217 static final class HandleCompletion<T,U> extends Completion {
1218 final CompletableFuture<? extends T> src;
1219 final BiFunction<? super T, Throwable, ? extends U> fn;
1220 final CompletableFuture<U> dst;
1221 HandleCompletion(CompletableFuture<? extends T> src,
1222 BiFunction<? super T, Throwable, ? extends U> fn,
1223 CompletableFuture<U> dst) {
1224 this.src = src; this.fn = fn; this.dst = dst;
1225 }
1226 public final void run() {
1227 final CompletableFuture<? extends T> a;
1228 final BiFunction<? super T, Throwable, ? extends U> fn;
1229 final CompletableFuture<U> dst;
1230 Object r; T t; Throwable ex;
1231 if ((dst = this.dst) != null &&
1232 (fn = this.fn) != null &&
1233 (a = this.src) != null &&
1234 (r = a.result) != null &&
1235 compareAndSet(0, 1)) {
1236 if (r instanceof AltResult) {
1237 ex = ((AltResult)r).ex;
1238 t = null;
1239 }
1240 else {
1241 ex = null;
1242 @SuppressWarnings("unchecked") T tr = (T) r;
1243 t = tr;
1244 }
1245 U u = null; Throwable dx = null;
1246 try {
1247 u = fn.apply(t, ex);
1248 } catch (Throwable rex) {
1249 dx = rex;
1250 }
1251 dst.internalComplete(u, dx);
1252 }
1253 }
1254 private static final long serialVersionUID = 5232453952276885070L;
1255 }
1256
1257 static final class ComposeCompletion<T,U> extends Completion {
1258 final CompletableFuture<? extends T> src;
1259 final Function<? super T, CompletableFuture<U>> fn;
1260 final CompletableFuture<U> dst;
1261 final Executor executor;
1262 ComposeCompletion(CompletableFuture<? extends T> src,
1263 Function<? super T, CompletableFuture<U>> fn,
1264 CompletableFuture<U> dst, Executor executor) {
1265 this.src = src; this.fn = fn; this.dst = dst;
1266 this.executor = executor;
1267 }
1268 public final void run() {
1269 final CompletableFuture<? extends T> a;
1270 final Function<? super T, CompletableFuture<U>> fn;
1271 final CompletableFuture<U> dst;
1272 Object r; T t; Throwable ex; Executor e;
1273 if ((dst = this.dst) != null &&
1274 (fn = this.fn) != null &&
1275 (a = this.src) != null &&
1276 (r = a.result) != null &&
1277 compareAndSet(0, 1)) {
1278 if (r instanceof AltResult) {
1279 ex = ((AltResult)r).ex;
1280 t = null;
1281 }
1282 else {
1283 ex = null;
1284 @SuppressWarnings("unchecked") T tr = (T) r;
1285 t = tr;
1286 }
1287 CompletableFuture<U> c = null;
1288 U u = null;
1289 boolean complete = false;
1290 if (ex == null) {
1291 if ((e = executor) != null)
1292 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1293 else {
1294 try {
1295 if ((c = fn.apply(t)) == null)
1296 ex = new NullPointerException();
1297 } catch (Throwable rex) {
1298 ex = rex;
1299 }
1300 }
1301 }
1302 if (c != null) {
1303 ThenCopy d = null;
1304 Object s;
1305 if ((s = c.result) == null) {
1306 CompletionNode p = new CompletionNode
1307 (d = new ThenCopy(c, dst));
1308 while ((s = c.result) == null) {
1309 if (UNSAFE.compareAndSwapObject
1310 (c, COMPLETIONS, p.next = c.completions, p))
1311 break;
1312 }
1313 }
1314 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1315 complete = true;
1316 if (s instanceof AltResult) {
1317 ex = ((AltResult)s).ex; // no rewrap
1318 u = null;
1319 }
1320 else {
1321 @SuppressWarnings("unchecked") U us = (U) s;
1322 u = us;
1323 }
1324 }
1325 }
1326 if (complete || ex != null)
1327 dst.internalComplete(u, ex);
1328 if (c != null)
1329 c.helpPostComplete();
1330 }
1331 }
1332 private static final long serialVersionUID = 5232453952276885070L;
1333 }
1334
1335 // public methods
1336
1337 /**
1338 * Creates a new incomplete CompletableFuture.
1339 */
1340 public CompletableFuture() {
1341 }
1342
1343 /**
1344 * Returns a new CompletableFuture that is asynchronously completed
1345 * by a task running in the {@link ForkJoinPool#commonPool()} with
1346 * the value obtained by calling the given Supplier.
1347 *
1348 * @param supplier a function returning the value to be used
1349 * to complete the returned CompletableFuture
1350 * @return the new CompletableFuture
1351 */
1352 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1353 if (supplier == null) throw new NullPointerException();
1354 CompletableFuture<U> f = new CompletableFuture<U>();
1355 ForkJoinPool.commonPool().
1356 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1357 return f;
1358 }
1359
1360 /**
1361 * Returns a new CompletableFuture that is asynchronously completed
1362 * by a task running in the given executor with the value obtained
1363 * by calling the given Supplier.
1364 *
1365 * @param supplier a function returning the value to be used
1366 * to complete the returned CompletableFuture
1367 * @param executor the executor to use for asynchronous execution
1368 * @return the new CompletableFuture
1369 */
1370 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1371 Executor executor) {
1372 if (executor == null || supplier == null)
1373 throw new NullPointerException();
1374 CompletableFuture<U> f = new CompletableFuture<U>();
1375 executor.execute(new AsyncSupply<U>(supplier, f));
1376 return f;
1377 }
1378
1379 /**
1380 * Returns a new CompletableFuture that is asynchronously completed
1381 * by a task running in the {@link ForkJoinPool#commonPool()} after
1382 * it runs the given action.
1383 *
1384 * @param runnable the action to run before completing the
1385 * returned CompletableFuture
1386 * @return the new CompletableFuture
1387 */
1388 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1389 if (runnable == null) throw new NullPointerException();
1390 CompletableFuture<Void> f = new CompletableFuture<Void>();
1391 ForkJoinPool.commonPool().
1392 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1393 return f;
1394 }
1395
1396 /**
1397 * Returns a new CompletableFuture that is asynchronously completed
1398 * by a task running in the given executor after it runs the given
1399 * action.
1400 *
1401 * @param runnable the action to run before completing the
1402 * returned CompletableFuture
1403 * @param executor the executor to use for asynchronous execution
1404 * @return the new CompletableFuture
1405 */
1406 public static CompletableFuture<Void> runAsync(Runnable runnable,
1407 Executor executor) {
1408 if (executor == null || runnable == null)
1409 throw new NullPointerException();
1410 CompletableFuture<Void> f = new CompletableFuture<Void>();
1411 executor.execute(new AsyncRun(runnable, f));
1412 return f;
1413 }
1414
1415 /**
1416 * Returns {@code true} if completed in any fashion: normally,
1417 * exceptionally, or via cancellation.
1418 *
1419 * @return {@code true} if completed
1420 */
1421 public boolean isDone() {
1422 return result != null;
1423 }
1424
1425 /**
1426 * Waits if necessary for this future to complete, and then
1427 * returns its result.
1428 *
1429 * @return the result value
1430 * @throws CancellationException if this future was cancelled
1431 * @throws ExecutionException if this future completed exceptionally
1432 * @throws InterruptedException if the current thread was interrupted
1433 * while waiting
1434 */
1435 public T get() throws InterruptedException, ExecutionException {
1436 Object r; Throwable ex, cause;
1437 if ((r = result) == null && (r = waitingGet(true)) == null)
1438 throw new InterruptedException();
1439 if (!(r instanceof AltResult)) {
1440 @SuppressWarnings("unchecked") T tr = (T) r;
1441 return tr;
1442 }
1443 if ((ex = ((AltResult)r).ex) == null)
1444 return null;
1445 if (ex instanceof CancellationException)
1446 throw (CancellationException)ex;
1447 if ((ex instanceof CompletionException) &&
1448 (cause = ex.getCause()) != null)
1449 ex = cause;
1450 throw new ExecutionException(ex);
1451 }
1452
1453 /**
1454 * Waits if necessary for at most the given time for this future
1455 * to complete, and then returns its result, if available.
1456 *
1457 * @param timeout the maximum time to wait
1458 * @param unit the time unit of the timeout argument
1459 * @return the result value
1460 * @throws CancellationException if this future was cancelled
1461 * @throws ExecutionException if this future completed exceptionally
1462 * @throws InterruptedException if the current thread was interrupted
1463 * while waiting
1464 * @throws TimeoutException if the wait timed out
1465 */
1466 public T get(long timeout, TimeUnit unit)
1467 throws InterruptedException, ExecutionException, TimeoutException {
1468 Object r; Throwable ex, cause;
1469 long nanos = unit.toNanos(timeout);
1470 if (Thread.interrupted())
1471 throw new InterruptedException();
1472 if ((r = result) == null)
1473 r = timedAwaitDone(nanos);
1474 if (!(r instanceof AltResult)) {
1475 @SuppressWarnings("unchecked") T tr = (T) r;
1476 return tr;
1477 }
1478 if ((ex = ((AltResult)r).ex) == null)
1479 return null;
1480 if (ex instanceof CancellationException)
1481 throw (CancellationException)ex;
1482 if ((ex instanceof CompletionException) &&
1483 (cause = ex.getCause()) != null)
1484 ex = cause;
1485 throw new ExecutionException(ex);
1486 }
1487
1488 /**
1489 * Returns the result value when complete, or throws an
1490 * (unchecked) exception if completed exceptionally. To better
1491 * conform with the use of common functional forms, if a
1492 * computation involved in the completion of this
1493 * CompletableFuture threw an exception, this method throws an
1494 * (unchecked) {@link CompletionException} with the underlying
1495 * exception as its cause.
1496 *
1497 * @return the result value
1498 * @throws CancellationException if the computation was cancelled
1499 * @throws CompletionException if this future completed
1500 * exceptionally or a completion computation threw an exception
1501 */
1502 public T join() {
1503 Object r; Throwable ex;
1504 if ((r = result) == null)
1505 r = waitingGet(false);
1506 if (!(r instanceof AltResult)) {
1507 @SuppressWarnings("unchecked") T tr = (T) r;
1508 return tr;
1509 }
1510 if ((ex = ((AltResult)r).ex) == null)
1511 return null;
1512 if (ex instanceof CancellationException)
1513 throw (CancellationException)ex;
1514 if (ex instanceof CompletionException)
1515 throw (CompletionException)ex;
1516 throw new CompletionException(ex);
1517 }
1518
1519 /**
1520 * Returns the result value (or throws any encountered exception)
1521 * if completed, else returns the given valueIfAbsent.
1522 *
1523 * @param valueIfAbsent the value to return if not completed
1524 * @return the result value, if completed, else the given valueIfAbsent
1525 * @throws CancellationException if the computation was cancelled
1526 * @throws CompletionException if this future completed
1527 * exceptionally or a completion computation threw an exception
1528 */
1529 public T getNow(T valueIfAbsent) {
1530 Object r; Throwable ex;
1531 if ((r = result) == null)
1532 return valueIfAbsent;
1533 if (!(r instanceof AltResult)) {
1534 @SuppressWarnings("unchecked") T tr = (T) r;
1535 return tr;
1536 }
1537 if ((ex = ((AltResult)r).ex) == null)
1538 return null;
1539 if (ex instanceof CancellationException)
1540 throw (CancellationException)ex;
1541 if (ex instanceof CompletionException)
1542 throw (CompletionException)ex;
1543 throw new CompletionException(ex);
1544 }
1545
1546 /**
1547 * If not already completed, sets the value returned by {@link
1548 * #get()} and related methods to the given value.
1549 *
1550 * @param value the result value
1551 * @return {@code true} if this invocation caused this CompletableFuture
1552 * to transition to a completed state, else {@code false}
1553 */
1554 public boolean complete(T value) {
1555 boolean triggered = result == null &&
1556 UNSAFE.compareAndSwapObject(this, RESULT, null,
1557 value == null ? NIL : value);
1558 postComplete();
1559 return triggered;
1560 }
1561
1562 /**
1563 * If not already completed, causes invocations of {@link #get()}
1564 * and related methods to throw the given exception.
1565 *
1566 * @param ex the exception
1567 * @return {@code true} if this invocation caused this CompletableFuture
1568 * to transition to a completed state, else {@code false}
1569 */
1570 public boolean completeExceptionally(Throwable ex) {
1571 if (ex == null) throw new NullPointerException();
1572 boolean triggered = result == null &&
1573 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1574 postComplete();
1575 return triggered;
1576 }
1577
1578 /**
1579 * Returns a new CompletableFuture that is completed
1580 * when this CompletableFuture completes, with the result of the
1581 * given function of this CompletableFuture's result.
1582 *
1583 * <p>If this CompletableFuture completes exceptionally, or the
1584 * supplied function throws an unchecked exception, then the
1585 * returned CompletableFuture completes exceptionally with a
1586 * CompletionException holding the exception as its cause.
1587 *
1588 * @param fn the function to use to compute the value of
1589 * the returned CompletableFuture
1590 * @return the new CompletableFuture
1591 */
1592 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1593 return doThenApply(fn, null);
1594 }
1595
1596 /**
1597 * Returns a new CompletableFuture that is asynchronously completed
1598 * when this CompletableFuture completes, with the result of the
1599 * given function of this CompletableFuture's result from a
1600 * task running in the {@link ForkJoinPool#commonPool()}.
1601 *
1602 * <p>If this CompletableFuture completes exceptionally, or the
1603 * supplied function throws an unchecked exception, then the
1604 * returned CompletableFuture completes exceptionally with a
1605 * CompletionException holding the exception as its cause.
1606 *
1607 * @param fn the function to use to compute the value of
1608 * the returned CompletableFuture
1609 * @return the new CompletableFuture
1610 */
1611 public <U> CompletableFuture<U> thenApplyAsync
1612 (Function<? super T,? extends U> fn) {
1613 return doThenApply(fn, ForkJoinPool.commonPool());
1614 }
1615
1616 /**
1617 * Returns a new CompletableFuture that is asynchronously completed
1618 * when this CompletableFuture completes, with the result of the
1619 * given function of this CompletableFuture's result from a
1620 * task running in the given executor.
1621 *
1622 * <p>If this CompletableFuture completes exceptionally, or the
1623 * supplied function throws an unchecked exception, then the
1624 * returned CompletableFuture completes exceptionally with a
1625 * CompletionException holding the exception as its cause.
1626 *
1627 * @param fn the function to use to compute the value of
1628 * the returned CompletableFuture
1629 * @param executor the executor to use for asynchronous execution
1630 * @return the new CompletableFuture
1631 */
1632 public <U> CompletableFuture<U> thenApplyAsync
1633 (Function<? super T,? extends U> fn,
1634 Executor executor) {
1635 if (executor == null) throw new NullPointerException();
1636 return doThenApply(fn, executor);
1637 }
1638
1639 private <U> CompletableFuture<U> doThenApply
1640 (Function<? super T,? extends U> fn,
1641 Executor e) {
1642 if (fn == null) throw new NullPointerException();
1643 CompletableFuture<U> dst = new CompletableFuture<U>();
1644 ApplyCompletion<T,U> d = null;
1645 Object r;
1646 if ((r = result) == null) {
1647 CompletionNode p = new CompletionNode
1648 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1649 while ((r = result) == null) {
1650 if (UNSAFE.compareAndSwapObject
1651 (this, COMPLETIONS, p.next = completions, p))
1652 break;
1653 }
1654 }
1655 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1656 T t; Throwable ex;
1657 if (r instanceof AltResult) {
1658 ex = ((AltResult)r).ex;
1659 t = null;
1660 }
1661 else {
1662 ex = null;
1663 @SuppressWarnings("unchecked") T tr = (T) r;
1664 t = tr;
1665 }
1666 U u = null;
1667 if (ex == null) {
1668 try {
1669 if (e != null)
1670 e.execute(new AsyncApply<T,U>(t, fn, dst));
1671 else
1672 u = fn.apply(t);
1673 } catch (Throwable rex) {
1674 ex = rex;
1675 }
1676 }
1677 if (e == null || ex != null)
1678 dst.internalComplete(u, ex);
1679 }
1680 helpPostComplete();
1681 return dst;
1682 }
1683
1684 /**
1685 * Returns a new CompletableFuture that is completed
1686 * when this CompletableFuture completes, after performing the given
1687 * action with this CompletableFuture's result.
1688 *
1689 * <p>If this CompletableFuture completes exceptionally, or the
1690 * supplied function throws an unchecked exception, then the
1691 * returned CompletableFuture completes exceptionally with a
1692 * CompletionException holding the exception as its cause.
1693 *
1694 * @param block the action to perform before completing the
1695 * returned CompletableFuture
1696 * @return the new CompletableFuture
1697 */
1698 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1699 return doThenAccept(block, null);
1700 }
1701
1702 /**
1703 * Returns a new CompletableFuture that is asynchronously completed
1704 * when this CompletableFuture completes, after performing the given
1705 * action with this CompletableFuture's result from a task running
1706 * in the {@link ForkJoinPool#commonPool()}.
1707 *
1708 * <p>If this CompletableFuture completes exceptionally, or the
1709 * supplied function throws an unchecked exception, then the
1710 * returned CompletableFuture completes exceptionally with a
1711 * CompletionException holding the exception as its cause.
1712 *
1713 * @param block the action to perform before completing the
1714 * returned CompletableFuture
1715 * @return the new CompletableFuture
1716 */
1717 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1718 return doThenAccept(block, ForkJoinPool.commonPool());
1719 }
1720
1721 /**
1722 * Returns a new CompletableFuture that is asynchronously completed
1723 * when this CompletableFuture completes, after performing the given
1724 * action with this CompletableFuture's result from a task running
1725 * in the given executor.
1726 *
1727 * <p>If this CompletableFuture completes exceptionally, or the
1728 * supplied function throws an unchecked exception, then the
1729 * returned CompletableFuture completes exceptionally with a
1730 * CompletionException holding the exception as its cause.
1731 *
1732 * @param block the action to perform before completing the
1733 * returned CompletableFuture
1734 * @param executor the executor to use for asynchronous execution
1735 * @return the new CompletableFuture
1736 */
1737 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1738 Executor executor) {
1739 if (executor == null) throw new NullPointerException();
1740 return doThenAccept(block, executor);
1741 }
1742
1743 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1744 Executor e) {
1745 if (fn == null) throw new NullPointerException();
1746 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1747 AcceptCompletion<T> d = null;
1748 Object r;
1749 if ((r = result) == null) {
1750 CompletionNode p = new CompletionNode
1751 (d = new AcceptCompletion<T>(this, fn, dst, e));
1752 while ((r = result) == null) {
1753 if (UNSAFE.compareAndSwapObject
1754 (this, COMPLETIONS, p.next = completions, p))
1755 break;
1756 }
1757 }
1758 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1759 T t; Throwable ex;
1760 if (r instanceof AltResult) {
1761 ex = ((AltResult)r).ex;
1762 t = null;
1763 }
1764 else {
1765 ex = null;
1766 @SuppressWarnings("unchecked") T tr = (T) r;
1767 t = tr;
1768 }
1769 if (ex == null) {
1770 try {
1771 if (e != null)
1772 e.execute(new AsyncAccept<T>(t, fn, dst));
1773 else
1774 fn.accept(t);
1775 } catch (Throwable rex) {
1776 ex = rex;
1777 }
1778 }
1779 if (e == null || ex != null)
1780 dst.internalComplete(null, ex);
1781 }
1782 helpPostComplete();
1783 return dst;
1784 }
1785
1786 /**
1787 * Returns a new CompletableFuture that is completed
1788 * when this CompletableFuture completes, after performing the given
1789 * action.
1790 *
1791 * <p>If this CompletableFuture completes exceptionally, or the
1792 * supplied function throws an unchecked exception, then the
1793 * returned CompletableFuture completes exceptionally with a
1794 * CompletionException holding the exception as its cause.
1795 *
1796 * @param action the action to perform before completing the
1797 * returned CompletableFuture
1798 * @return the new CompletableFuture
1799 */
1800 public CompletableFuture<Void> thenRun(Runnable action) {
1801 return doThenRun(action, null);
1802 }
1803
1804 /**
1805 * Returns a new CompletableFuture that is asynchronously completed
1806 * when this CompletableFuture completes, after performing the given
1807 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1808 *
1809 * <p>If this CompletableFuture completes exceptionally, or the
1810 * supplied function throws an unchecked exception, then the
1811 * returned CompletableFuture completes exceptionally with a
1812 * CompletionException holding the exception as its cause.
1813 *
1814 * @param action the action to perform before completing the
1815 * returned CompletableFuture
1816 * @return the new CompletableFuture
1817 */
1818 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1819 return doThenRun(action, ForkJoinPool.commonPool());
1820 }
1821
1822 /**
1823 * Returns a new CompletableFuture that is asynchronously completed
1824 * when this CompletableFuture completes, after performing the given
1825 * action from a task running in the given executor.
1826 *
1827 * <p>If this CompletableFuture completes exceptionally, or the
1828 * supplied function throws an unchecked exception, then the
1829 * returned CompletableFuture completes exceptionally with a
1830 * CompletionException holding the exception as its cause.
1831 *
1832 * @param action the action to perform before completing the
1833 * returned CompletableFuture
1834 * @param executor the executor to use for asynchronous execution
1835 * @return the new CompletableFuture
1836 */
1837 public CompletableFuture<Void> thenRunAsync(Runnable action,
1838 Executor executor) {
1839 if (executor == null) throw new NullPointerException();
1840 return doThenRun(action, executor);
1841 }
1842
1843 private CompletableFuture<Void> doThenRun(Runnable action,
1844 Executor e) {
1845 if (action == null) throw new NullPointerException();
1846 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1847 RunCompletion<T> d = null;
1848 Object r;
1849 if ((r = result) == null) {
1850 CompletionNode p = new CompletionNode
1851 (d = new RunCompletion<T>(this, action, dst, e));
1852 while ((r = result) == null) {
1853 if (UNSAFE.compareAndSwapObject
1854 (this, COMPLETIONS, p.next = completions, p))
1855 break;
1856 }
1857 }
1858 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1859 Throwable ex;
1860 if (r instanceof AltResult)
1861 ex = ((AltResult)r).ex;
1862 else
1863 ex = null;
1864 if (ex == null) {
1865 try {
1866 if (e != null)
1867 e.execute(new AsyncRun(action, dst));
1868 else
1869 action.run();
1870 } catch (Throwable rex) {
1871 ex = rex;
1872 }
1873 }
1874 if (e == null || ex != null)
1875 dst.internalComplete(null, ex);
1876 }
1877 helpPostComplete();
1878 return dst;
1879 }
1880
1881 /**
1882 * Returns a new CompletableFuture that is completed
1883 * when both this and the other given CompletableFuture complete,
1884 * with the result of the given function of the results of the two
1885 * CompletableFutures.
1886 *
1887 * <p>If this and/or the other CompletableFuture complete
1888 * exceptionally, or the supplied function throws an unchecked
1889 * exception, then the returned CompletableFuture completes
1890 * exceptionally with a CompletionException holding the exception
1891 * as its cause.
1892 *
1893 * @param other the other CompletableFuture
1894 * @param fn the function to use to compute the value of
1895 * the returned CompletableFuture
1896 * @return the new CompletableFuture
1897 */
1898 public <U,V> CompletableFuture<V> thenCombine
1899 (CompletableFuture<? extends U> other,
1900 BiFunction<? super T,? super U,? extends V> fn) {
1901 return doThenBiApply(other, fn, null);
1902 }
1903
1904 /**
1905 * Returns a new CompletableFuture that is asynchronously completed
1906 * when both this and the other given CompletableFuture complete,
1907 * with the result of the given function of the results of the two
1908 * CompletableFutures from a task running in the
1909 * {@link ForkJoinPool#commonPool()}.
1910 *
1911 * <p>If this and/or the other CompletableFuture complete
1912 * exceptionally, or the supplied function throws an unchecked
1913 * exception, then the returned CompletableFuture completes
1914 * exceptionally with a CompletionException holding the exception
1915 * as its cause.
1916 *
1917 * @param other the other CompletableFuture
1918 * @param fn the function to use to compute the value of
1919 * the returned CompletableFuture
1920 * @return the new CompletableFuture
1921 */
1922 public <U,V> CompletableFuture<V> thenCombineAsync
1923 (CompletableFuture<? extends U> other,
1924 BiFunction<? super T,? super U,? extends V> fn) {
1925 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1926 }
1927
1928 /**
1929 * Returns a new CompletableFuture that is asynchronously completed
1930 * when both this and the other given CompletableFuture complete,
1931 * with the result of the given function of the results of the two
1932 * CompletableFutures from a task running in the given executor.
1933 *
1934 * <p>If this and/or the other CompletableFuture complete
1935 * exceptionally, or the supplied function throws an unchecked
1936 * exception, then the returned CompletableFuture completes
1937 * exceptionally with a CompletionException holding the exception
1938 * as its cause.
1939 *
1940 * @param other the other CompletableFuture
1941 * @param fn the function to use to compute the value of
1942 * the returned CompletableFuture
1943 * @param executor the executor to use for asynchronous execution
1944 * @return the new CompletableFuture
1945 */
1946 public <U,V> CompletableFuture<V> thenCombineAsync
1947 (CompletableFuture<? extends U> other,
1948 BiFunction<? super T,? super U,? extends V> fn,
1949 Executor executor) {
1950 if (executor == null) throw new NullPointerException();
1951 return doThenBiApply(other, fn, executor);
1952 }
1953
1954 private <U,V> CompletableFuture<V> doThenBiApply
1955 (CompletableFuture<? extends U> other,
1956 BiFunction<? super T,? super U,? extends V> fn,
1957 Executor e) {
1958 if (other == null || fn == null) throw new NullPointerException();
1959 CompletableFuture<V> dst = new CompletableFuture<V>();
1960 BiApplyCompletion<T,U,V> d = null;
1961 Object r, s = null;
1962 if ((r = result) == null || (s = other.result) == null) {
1963 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1964 CompletionNode q = null, p = new CompletionNode(d);
1965 while ((r == null && (r = result) == null) ||
1966 (s == null && (s = other.result) == null)) {
1967 if (q != null) {
1968 if (s != null ||
1969 UNSAFE.compareAndSwapObject
1970 (other, COMPLETIONS, q.next = other.completions, q))
1971 break;
1972 }
1973 else if (r != null ||
1974 UNSAFE.compareAndSwapObject
1975 (this, COMPLETIONS, p.next = completions, p)) {
1976 if (s != null)
1977 break;
1978 q = new CompletionNode(d);
1979 }
1980 }
1981 }
1982 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1983 T t; U u; Throwable ex;
1984 if (r instanceof AltResult) {
1985 ex = ((AltResult)r).ex;
1986 t = null;
1987 }
1988 else {
1989 ex = null;
1990 @SuppressWarnings("unchecked") T tr = (T) r;
1991 t = tr;
1992 }
1993 if (ex != null)
1994 u = null;
1995 else if (s instanceof AltResult) {
1996 ex = ((AltResult)s).ex;
1997 u = null;
1998 }
1999 else {
2000 @SuppressWarnings("unchecked") U us = (U) s;
2001 u = us;
2002 }
2003 V v = null;
2004 if (ex == null) {
2005 try {
2006 if (e != null)
2007 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
2008 else
2009 v = fn.apply(t, u);
2010 } catch (Throwable rex) {
2011 ex = rex;
2012 }
2013 }
2014 if (e == null || ex != null)
2015 dst.internalComplete(v, ex);
2016 }
2017 helpPostComplete();
2018 other.helpPostComplete();
2019 return dst;
2020 }
2021
2022 /**
2023 * Returns a new CompletableFuture that is completed
2024 * when both this and the other given CompletableFuture complete,
2025 * after performing the given action with the results of the two
2026 * CompletableFutures.
2027 *
2028 * <p>If this and/or the other CompletableFuture complete
2029 * exceptionally, or the supplied action throws an unchecked
2030 * exception, then the returned CompletableFuture completes
2031 * exceptionally with a CompletionException holding the exception
2032 * as its cause.
2033 *
2034 * @param other the other CompletableFuture
2035 * @param block the action to perform before completing the
2036 * returned CompletableFuture
2037 * @return the new CompletableFuture
2038 */
2039 public <U> CompletableFuture<Void> thenAcceptBoth
2040 (CompletableFuture<? extends U> other,
2041 BiConsumer<? super T, ? super U> block) {
2042 return doThenBiAccept(other, block, null);
2043 }
2044
2045 /**
2046 * Returns a new CompletableFuture that is asynchronously completed
2047 * when both this and the other given CompletableFuture complete,
2048 * after performing the given action with the results of the two
2049 * CompletableFutures from a task running in the {@link
2050 * ForkJoinPool#commonPool()}.
2051 *
2052 * <p>If this and/or the other CompletableFuture complete
2053 * exceptionally, or the supplied action throws an unchecked
2054 * exception, then the returned CompletableFuture completes
2055 * exceptionally with a CompletionException holding the exception
2056 * as its cause.
2057 *
2058 * @param other the other CompletableFuture
2059 * @param block the action to perform before completing the
2060 * returned CompletableFuture
2061 * @return the new CompletableFuture
2062 */
2063 public <U> CompletableFuture<Void> thenAcceptBothAsync
2064 (CompletableFuture<? extends U> other,
2065 BiConsumer<? super T, ? super U> block) {
2066 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2067 }
2068
2069 /**
2070 * Returns a new CompletableFuture that is asynchronously completed
2071 * when both this and the other given CompletableFuture complete,
2072 * after performing the given action with the results of the two
2073 * CompletableFutures from a task running in the given executor.
2074 *
2075 * <p>If this and/or the other CompletableFuture complete
2076 * exceptionally, or the supplied action throws an unchecked
2077 * exception, then the returned CompletableFuture completes
2078 * exceptionally with a CompletionException holding the exception
2079 * as its cause.
2080 *
2081 * @param other the other CompletableFuture
2082 * @param block the action to perform before completing the
2083 * returned CompletableFuture
2084 * @param executor the executor to use for asynchronous execution
2085 * @return the new CompletableFuture
2086 */
2087 public <U> CompletableFuture<Void> thenAcceptBothAsync
2088 (CompletableFuture<? extends U> other,
2089 BiConsumer<? super T, ? super U> block,
2090 Executor executor) {
2091 if (executor == null) throw new NullPointerException();
2092 return doThenBiAccept(other, block, executor);
2093 }
2094
2095 private <U> CompletableFuture<Void> doThenBiAccept
2096 (CompletableFuture<? extends U> other,
2097 BiConsumer<? super T,? super U> fn,
2098 Executor e) {
2099 if (other == null || fn == null) throw new NullPointerException();
2100 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2101 BiAcceptCompletion<T,U> d = null;
2102 Object r, s = null;
2103 if ((r = result) == null || (s = other.result) == null) {
2104 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2105 CompletionNode q = null, p = new CompletionNode(d);
2106 while ((r == null && (r = result) == null) ||
2107 (s == null && (s = other.result) == null)) {
2108 if (q != null) {
2109 if (s != null ||
2110 UNSAFE.compareAndSwapObject
2111 (other, COMPLETIONS, q.next = other.completions, q))
2112 break;
2113 }
2114 else if (r != null ||
2115 UNSAFE.compareAndSwapObject
2116 (this, COMPLETIONS, p.next = completions, p)) {
2117 if (s != null)
2118 break;
2119 q = new CompletionNode(d);
2120 }
2121 }
2122 }
2123 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2124 T t; U u; Throwable ex;
2125 if (r instanceof AltResult) {
2126 ex = ((AltResult)r).ex;
2127 t = null;
2128 }
2129 else {
2130 ex = null;
2131 @SuppressWarnings("unchecked") T tr = (T) r;
2132 t = tr;
2133 }
2134 if (ex != null)
2135 u = null;
2136 else if (s instanceof AltResult) {
2137 ex = ((AltResult)s).ex;
2138 u = null;
2139 }
2140 else {
2141 @SuppressWarnings("unchecked") U us = (U) s;
2142 u = us;
2143 }
2144 if (ex == null) {
2145 try {
2146 if (e != null)
2147 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2148 else
2149 fn.accept(t, u);
2150 } catch (Throwable rex) {
2151 ex = rex;
2152 }
2153 }
2154 if (e == null || ex != null)
2155 dst.internalComplete(null, ex);
2156 }
2157 helpPostComplete();
2158 other.helpPostComplete();
2159 return dst;
2160 }
2161
2162 /**
2163 * Returns a new CompletableFuture that is completed
2164 * when both this and the other given CompletableFuture complete,
2165 * after performing the given action.
2166 *
2167 * <p>If this and/or the other CompletableFuture complete
2168 * exceptionally, or the supplied action throws an unchecked
2169 * exception, then the returned CompletableFuture completes
2170 * exceptionally with a CompletionException holding the exception
2171 * as its cause.
2172 *
2173 * @param other the other CompletableFuture
2174 * @param action the action to perform before completing the
2175 * returned CompletableFuture
2176 * @return the new CompletableFuture
2177 */
2178 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2179 Runnable action) {
2180 return doThenBiRun(other, action, null);
2181 }
2182
2183 /**
2184 * Returns a new CompletableFuture that is asynchronously completed
2185 * when both this and the other given CompletableFuture complete,
2186 * after performing the given action from a task running in the
2187 * {@link ForkJoinPool#commonPool()}.
2188 *
2189 * <p>If this and/or the other CompletableFuture complete
2190 * exceptionally, or the supplied action throws an unchecked
2191 * exception, then the returned CompletableFuture completes
2192 * exceptionally with a CompletionException holding the exception
2193 * as its cause.
2194 *
2195 * @param other the other CompletableFuture
2196 * @param action the action to perform before completing the
2197 * returned CompletableFuture
2198 * @return the new CompletableFuture
2199 */
2200 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2201 Runnable action) {
2202 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2203 }
2204
2205 /**
2206 * Returns a new CompletableFuture that is asynchronously completed
2207 * when both this and the other given CompletableFuture complete,
2208 * after performing the given action from a task running in the
2209 * given executor.
2210 *
2211 * <p>If this and/or the other CompletableFuture complete
2212 * exceptionally, or the supplied action throws an unchecked
2213 * exception, then the returned CompletableFuture completes
2214 * exceptionally with a CompletionException holding the exception
2215 * as its cause.
2216 *
2217 * @param other the other CompletableFuture
2218 * @param action the action to perform before completing the
2219 * returned CompletableFuture
2220 * @param executor the executor to use for asynchronous execution
2221 * @return the new CompletableFuture
2222 */
2223 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2224 Runnable action,
2225 Executor executor) {
2226 if (executor == null) throw new NullPointerException();
2227 return doThenBiRun(other, action, executor);
2228 }
2229
2230 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2231 Runnable action,
2232 Executor e) {
2233 if (other == null || action == null) throw new NullPointerException();
2234 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2235 BiRunCompletion<T> d = null;
2236 Object r, s = null;
2237 if ((r = result) == null || (s = other.result) == null) {
2238 d = new BiRunCompletion<T>(this, other, action, dst, e);
2239 CompletionNode q = null, p = new CompletionNode(d);
2240 while ((r == null && (r = result) == null) ||
2241 (s == null && (s = other.result) == null)) {
2242 if (q != null) {
2243 if (s != null ||
2244 UNSAFE.compareAndSwapObject
2245 (other, COMPLETIONS, q.next = other.completions, q))
2246 break;
2247 }
2248 else if (r != null ||
2249 UNSAFE.compareAndSwapObject
2250 (this, COMPLETIONS, p.next = completions, p)) {
2251 if (s != null)
2252 break;
2253 q = new CompletionNode(d);
2254 }
2255 }
2256 }
2257 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2258 Throwable ex;
2259 if (r instanceof AltResult)
2260 ex = ((AltResult)r).ex;
2261 else
2262 ex = null;
2263 if (ex == null && (s instanceof AltResult))
2264 ex = ((AltResult)s).ex;
2265 if (ex == null) {
2266 try {
2267 if (e != null)
2268 e.execute(new AsyncRun(action, dst));
2269 else
2270 action.run();
2271 } catch (Throwable rex) {
2272 ex = rex;
2273 }
2274 }
2275 if (e == null || ex != null)
2276 dst.internalComplete(null, ex);
2277 }
2278 helpPostComplete();
2279 other.helpPostComplete();
2280 return dst;
2281 }
2282
2283 /**
2284 * Returns a new CompletableFuture that is completed
2285 * when either this or the other given CompletableFuture completes,
2286 * with the result of the given function of either this or the other
2287 * CompletableFuture's result.
2288 *
2289 * <p>If this and/or the other CompletableFuture complete
2290 * exceptionally, exceptionally, or the supplied function throws
2291 * an unchecked exception, then the returned CompletableFuture may
2292 * also do so, with a CompletionException holding one of these
2293 * exceptions as its cause. No guarantees are made about which
2294 * result or exception is used in the returned CompletableFuture.
2295 *
2296 * @param other the other CompletableFuture
2297 * @param fn the function to use to compute the value of
2298 * the returned CompletableFuture
2299 * @return the new CompletableFuture
2300 */
2301 public <U> CompletableFuture<U> applyToEither
2302 (CompletableFuture<? extends T> other,
2303 Function<? super T, U> fn) {
2304 return doOrApply(other, fn, null);
2305 }
2306
2307 /**
2308 * Returns a new CompletableFuture that is asynchronously completed
2309 * when either this or the other given CompletableFuture completes,
2310 * with the result of the given function of either this or the other
2311 * CompletableFuture's result from a task running in the
2312 * {@link ForkJoinPool#commonPool()}.
2313 *
2314 * <p>If this and/or the other CompletableFuture complete
2315 * exceptionally, exceptionally, or the supplied function throws
2316 * an unchecked exception, then the returned CompletableFuture may
2317 * also do so, with a CompletionException holding one of these
2318 * exceptions as its cause. No guarantees are made about which
2319 * result or exception is used in the returned CompletableFuture.
2320 *
2321 * @param other the other CompletableFuture
2322 * @param fn the function to use to compute the value of
2323 * the returned CompletableFuture
2324 * @return the new CompletableFuture
2325 */
2326 public <U> CompletableFuture<U> applyToEitherAsync
2327 (CompletableFuture<? extends T> other,
2328 Function<? super T, U> fn) {
2329 return doOrApply(other, fn, ForkJoinPool.commonPool());
2330 }
2331
2332 /**
2333 * Returns a new CompletableFuture that is asynchronously completed
2334 * when either this or the other given CompletableFuture completes,
2335 * with the result of the given function of either this or the other
2336 * CompletableFuture's result from a task running in the
2337 * given executor.
2338 *
2339 * <p>If this and/or the other CompletableFuture complete
2340 * exceptionally, exceptionally, or the supplied function throws
2341 * an unchecked exception, then the returned CompletableFuture may
2342 * also do so, with a CompletionException holding one of these
2343 * exceptions as its cause. No guarantees are made about which
2344 * result or exception is used in the returned CompletableFuture.
2345 *
2346 * @param other the other CompletableFuture
2347 * @param fn the function to use to compute the value of
2348 * the returned CompletableFuture
2349 * @param executor the executor to use for asynchronous execution
2350 * @return the new CompletableFuture
2351 */
2352 public <U> CompletableFuture<U> applyToEitherAsync
2353 (CompletableFuture<? extends T> other,
2354 Function<? super T, U> fn,
2355 Executor executor) {
2356 if (executor == null) throw new NullPointerException();
2357 return doOrApply(other, fn, executor);
2358 }
2359
2360 private <U> CompletableFuture<U> doOrApply
2361 (CompletableFuture<? extends T> other,
2362 Function<? super T, U> fn,
2363 Executor e) {
2364 if (other == null || fn == null) throw new NullPointerException();
2365 CompletableFuture<U> dst = new CompletableFuture<U>();
2366 OrApplyCompletion<T,U> d = null;
2367 Object r;
2368 if ((r = result) == null && (r = other.result) == null) {
2369 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2370 CompletionNode q = null, p = new CompletionNode(d);
2371 while ((r = result) == null && (r = other.result) == null) {
2372 if (q != null) {
2373 if (UNSAFE.compareAndSwapObject
2374 (other, COMPLETIONS, q.next = other.completions, q))
2375 break;
2376 }
2377 else if (UNSAFE.compareAndSwapObject
2378 (this, COMPLETIONS, p.next = completions, p))
2379 q = new CompletionNode(d);
2380 }
2381 }
2382 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2383 T t; Throwable ex;
2384 if (r instanceof AltResult) {
2385 ex = ((AltResult)r).ex;
2386 t = null;
2387 }
2388 else {
2389 ex = null;
2390 @SuppressWarnings("unchecked") T tr = (T) r;
2391 t = tr;
2392 }
2393 U u = null;
2394 if (ex == null) {
2395 try {
2396 if (e != null)
2397 e.execute(new AsyncApply<T,U>(t, fn, dst));
2398 else
2399 u = fn.apply(t);
2400 } catch (Throwable rex) {
2401 ex = rex;
2402 }
2403 }
2404 if (e == null || ex != null)
2405 dst.internalComplete(u, ex);
2406 }
2407 helpPostComplete();
2408 other.helpPostComplete();
2409 return dst;
2410 }
2411
2412 /**
2413 * Returns a new CompletableFuture that is completed
2414 * when either this or the other given CompletableFuture completes,
2415 * after performing the given action with the result of either this
2416 * or the other CompletableFuture's result.
2417 *
2418 * <p>If this and/or the other CompletableFuture complete
2419 * exceptionally, exceptionally, or the supplied action throws
2420 * an unchecked exception, then the returned CompletableFuture may
2421 * also do so, with a CompletionException holding one of these
2422 * exceptions as its cause. No guarantees are made about which
2423 * result or exception is used in the returned CompletableFuture.
2424 *
2425 * @param other the other CompletableFuture
2426 * @param block the action to perform before completing the
2427 * returned CompletableFuture
2428 * @return the new CompletableFuture
2429 */
2430 public CompletableFuture<Void> acceptEither
2431 (CompletableFuture<? extends T> other,
2432 Consumer<? super T> block) {
2433 return doOrAccept(other, block, null);
2434 }
2435
2436 /**
2437 * Returns a new CompletableFuture that is asynchronously completed
2438 * when either this or the other given CompletableFuture completes,
2439 * after performing the given action with the result of either this
2440 * or the other CompletableFuture's result from a task running in
2441 * the {@link ForkJoinPool#commonPool()}.
2442 *
2443 * <p>If this and/or the other CompletableFuture complete
2444 * exceptionally, exceptionally, or the supplied action throws
2445 * an unchecked exception, then the returned CompletableFuture may
2446 * also do so, with a CompletionException holding one of these
2447 * exceptions as its cause. No guarantees are made about which
2448 * result or exception is used in the returned CompletableFuture.
2449 *
2450 * @param other the other CompletableFuture
2451 * @param block the action to perform before completing the
2452 * returned CompletableFuture
2453 * @return the new CompletableFuture
2454 */
2455 public CompletableFuture<Void> acceptEitherAsync
2456 (CompletableFuture<? extends T> other,
2457 Consumer<? super T> block) {
2458 return doOrAccept(other, block, ForkJoinPool.commonPool());
2459 }
2460
2461 /**
2462 * Returns a new CompletableFuture that is asynchronously completed
2463 * when either this or the other given CompletableFuture completes,
2464 * after performing the given action with the result of either this
2465 * or the other CompletableFuture's result from a task running in
2466 * the given executor.
2467 *
2468 * <p>If this and/or the other CompletableFuture complete
2469 * exceptionally, exceptionally, or the supplied action throws
2470 * an unchecked exception, then the returned CompletableFuture may
2471 * also do so, with a CompletionException holding one of these
2472 * exceptions as its cause. No guarantees are made about which
2473 * result or exception is used in the returned CompletableFuture.
2474 *
2475 * @param other the other CompletableFuture
2476 * @param block the action to perform before completing the
2477 * returned CompletableFuture
2478 * @param executor the executor to use for asynchronous execution
2479 * @return the new CompletableFuture
2480 */
2481 public CompletableFuture<Void> acceptEitherAsync
2482 (CompletableFuture<? extends T> other,
2483 Consumer<? super T> block,
2484 Executor executor) {
2485 if (executor == null) throw new NullPointerException();
2486 return doOrAccept(other, block, executor);
2487 }
2488
2489 private CompletableFuture<Void> doOrAccept
2490 (CompletableFuture<? extends T> other,
2491 Consumer<? super T> fn,
2492 Executor e) {
2493 if (other == null || fn == null) throw new NullPointerException();
2494 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2495 OrAcceptCompletion<T> d = null;
2496 Object r;
2497 if ((r = result) == null && (r = other.result) == null) {
2498 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2499 CompletionNode q = null, p = new CompletionNode(d);
2500 while ((r = result) == null && (r = other.result) == null) {
2501 if (q != null) {
2502 if (UNSAFE.compareAndSwapObject
2503 (other, COMPLETIONS, q.next = other.completions, q))
2504 break;
2505 }
2506 else if (UNSAFE.compareAndSwapObject
2507 (this, COMPLETIONS, p.next = completions, p))
2508 q = new CompletionNode(d);
2509 }
2510 }
2511 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2512 T t; Throwable ex;
2513 if (r instanceof AltResult) {
2514 ex = ((AltResult)r).ex;
2515 t = null;
2516 }
2517 else {
2518 ex = null;
2519 @SuppressWarnings("unchecked") T tr = (T) r;
2520 t = tr;
2521 }
2522 if (ex == null) {
2523 try {
2524 if (e != null)
2525 e.execute(new AsyncAccept<T>(t, fn, dst));
2526 else
2527 fn.accept(t);
2528 } catch (Throwable rex) {
2529 ex = rex;
2530 }
2531 }
2532 if (e == null || ex != null)
2533 dst.internalComplete(null, ex);
2534 }
2535 helpPostComplete();
2536 other.helpPostComplete();
2537 return dst;
2538 }
2539
2540 /**
2541 * Returns a new CompletableFuture that is completed
2542 * when either this or the other given CompletableFuture completes,
2543 * after performing the given action.
2544 *
2545 * <p>If this and/or the other CompletableFuture complete
2546 * exceptionally, exceptionally, or the supplied action throws
2547 * an unchecked exception, then the returned CompletableFuture may
2548 * also do so, with a CompletionException holding one of these
2549 * exceptions as its cause. No guarantees are made about which
2550 * result or exception is used in the returned CompletableFuture.
2551 *
2552 * @param other the other CompletableFuture
2553 * @param action the action to perform before completing the
2554 * returned CompletableFuture
2555 * @return the new CompletableFuture
2556 */
2557 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2558 Runnable action) {
2559 return doOrRun(other, action, null);
2560 }
2561
2562 /**
2563 * Returns a new CompletableFuture that is asynchronously completed
2564 * when either this or the other given CompletableFuture completes,
2565 * after performing the given action from a task running in the
2566 * {@link ForkJoinPool#commonPool()}.
2567 *
2568 * <p>If this and/or the other CompletableFuture complete
2569 * exceptionally, exceptionally, or the supplied action throws
2570 * an unchecked exception, then the returned CompletableFuture may
2571 * also do so, with a CompletionException holding one of these
2572 * exceptions as its cause. No guarantees are made about which
2573 * result or exception is used in the returned CompletableFuture.
2574 *
2575 * @param other the other CompletableFuture
2576 * @param action the action to perform before completing the
2577 * returned CompletableFuture
2578 * @return the new CompletableFuture
2579 */
2580 public CompletableFuture<Void> runAfterEitherAsync
2581 (CompletableFuture<?> other,
2582 Runnable action) {
2583 return doOrRun(other, action, ForkJoinPool.commonPool());
2584 }
2585
2586 /**
2587 * Returns a new CompletableFuture that is asynchronously completed
2588 * when either this or the other given CompletableFuture completes,
2589 * after performing the given action from a task running in the
2590 * given executor.
2591 *
2592 * <p>If this and/or the other CompletableFuture complete
2593 * exceptionally, exceptionally, or the supplied action throws
2594 * an unchecked exception, then the returned CompletableFuture may
2595 * also do so, with a CompletionException holding one of these
2596 * exceptions as its cause. No guarantees are made about which
2597 * result or exception is used in the returned CompletableFuture.
2598 *
2599 * @param other the other CompletableFuture
2600 * @param action the action to perform before completing the
2601 * returned CompletableFuture
2602 * @param executor the executor to use for asynchronous execution
2603 * @return the new CompletableFuture
2604 */
2605 public CompletableFuture<Void> runAfterEitherAsync
2606 (CompletableFuture<?> other,
2607 Runnable action,
2608 Executor executor) {
2609 if (executor == null) throw new NullPointerException();
2610 return doOrRun(other, action, executor);
2611 }
2612
2613 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2614 Runnable action,
2615 Executor e) {
2616 if (other == null || action == null) throw new NullPointerException();
2617 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2618 OrRunCompletion<T> d = null;
2619 Object r;
2620 if ((r = result) == null && (r = other.result) == null) {
2621 d = new OrRunCompletion<T>(this, other, action, dst, e);
2622 CompletionNode q = null, p = new CompletionNode(d);
2623 while ((r = result) == null && (r = other.result) == null) {
2624 if (q != null) {
2625 if (UNSAFE.compareAndSwapObject
2626 (other, COMPLETIONS, q.next = other.completions, q))
2627 break;
2628 }
2629 else if (UNSAFE.compareAndSwapObject
2630 (this, COMPLETIONS, p.next = completions, p))
2631 q = new CompletionNode(d);
2632 }
2633 }
2634 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2635 Throwable ex;
2636 if (r instanceof AltResult)
2637 ex = ((AltResult)r).ex;
2638 else
2639 ex = null;
2640 if (ex == null) {
2641 try {
2642 if (e != null)
2643 e.execute(new AsyncRun(action, dst));
2644 else
2645 action.run();
2646 } catch (Throwable rex) {
2647 ex = rex;
2648 }
2649 }
2650 if (e == null || ex != null)
2651 dst.internalComplete(null, ex);
2652 }
2653 helpPostComplete();
2654 other.helpPostComplete();
2655 return dst;
2656 }
2657
2658 /**
2659 * Returns a CompletableFuture that upon completion, has the same
2660 * value as produced by the given function of the result of this
2661 * CompletableFuture.
2662 *
2663 * <p>If this CompletableFuture completes exceptionally, then the
2664 * returned CompletableFuture also does so, with a
2665 * CompletionException holding this exception as its cause.
2666 * Similarly, if the computed CompletableFuture completes
2667 * exceptionally, then so does the returned CompletableFuture.
2668 *
2669 * @param fn the function returning a new CompletableFuture
2670 * @return the CompletableFuture
2671 */
2672 public <U> CompletableFuture<U> thenCompose
2673 (Function<? super T, CompletableFuture<U>> fn) {
2674 return doCompose(fn, null);
2675 }
2676
2677 /**
2678 * Returns a CompletableFuture that upon completion, has the same
2679 * value as that produced asynchronously using the {@link
2680 * ForkJoinPool#commonPool()} by the given function of the result
2681 * of this CompletableFuture.
2682 *
2683 * <p>If this CompletableFuture completes exceptionally, then the
2684 * returned CompletableFuture also does so, with a
2685 * CompletionException holding this exception as its cause.
2686 * Similarly, if the computed CompletableFuture completes
2687 * exceptionally, then so does the returned CompletableFuture.
2688 *
2689 * @param fn the function returning a new CompletableFuture
2690 * @return the CompletableFuture
2691 */
2692 public <U> CompletableFuture<U> thenComposeAsync
2693 (Function<? super T, CompletableFuture<U>> fn) {
2694 return doCompose(fn, ForkJoinPool.commonPool());
2695 }
2696
2697 /**
2698 * Returns a CompletableFuture that upon completion, has the same
2699 * value as that produced asynchronously using the given executor
2700 * by the given function of this CompletableFuture.
2701 *
2702 * <p>If this CompletableFuture completes exceptionally, then the
2703 * returned CompletableFuture also does so, with a
2704 * CompletionException holding this exception as its cause.
2705 * Similarly, if the computed CompletableFuture completes
2706 * exceptionally, then so does the returned CompletableFuture.
2707 *
2708 * @param fn the function returning a new CompletableFuture
2709 * @param executor the executor to use for asynchronous execution
2710 * @return the CompletableFuture
2711 */
2712 public <U> CompletableFuture<U> thenComposeAsync
2713 (Function<? super T, CompletableFuture<U>> fn,
2714 Executor executor) {
2715 if (executor == null) throw new NullPointerException();
2716 return doCompose(fn, executor);
2717 }
2718
2719 private <U> CompletableFuture<U> doCompose
2720 (Function<? super T, CompletableFuture<U>> fn,
2721 Executor e) {
2722 if (fn == null) throw new NullPointerException();
2723 CompletableFuture<U> dst = null;
2724 ComposeCompletion<T,U> d = null;
2725 Object r;
2726 if ((r = result) == null) {
2727 dst = new CompletableFuture<U>();
2728 CompletionNode p = new CompletionNode
2729 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2730 while ((r = result) == null) {
2731 if (UNSAFE.compareAndSwapObject
2732 (this, COMPLETIONS, p.next = completions, p))
2733 break;
2734 }
2735 }
2736 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2737 T t; Throwable ex;
2738 if (r instanceof AltResult) {
2739 ex = ((AltResult)r).ex;
2740 t = null;
2741 }
2742 else {
2743 ex = null;
2744 @SuppressWarnings("unchecked") T tr = (T) r;
2745 t = tr;
2746 }
2747 if (ex == null) {
2748 if (e != null) {
2749 if (dst == null)
2750 dst = new CompletableFuture<U>();
2751 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2752 }
2753 else {
2754 try {
2755 if ((dst = fn.apply(t)) == null)
2756 ex = new NullPointerException();
2757 } catch (Throwable rex) {
2758 ex = rex;
2759 }
2760 if (dst == null)
2761 dst = new CompletableFuture<U>();
2762 }
2763 }
2764 if (e == null && ex != null)
2765 dst.internalComplete(null, ex);
2766 }
2767 helpPostComplete();
2768 dst.helpPostComplete();
2769 return dst;
2770 }
2771
2772 /**
2773 * Returns a new CompletableFuture that is completed when this
2774 * CompletableFuture completes, with the result of the given
2775 * function of the exception triggering this CompletableFuture's
2776 * completion when it completes exceptionally; otherwise, if this
2777 * CompletableFuture completes normally, then the returned
2778 * CompletableFuture also completes normally with the same value.
2779 *
2780 * @param fn the function to use to compute the value of the
2781 * returned CompletableFuture if this CompletableFuture completed
2782 * exceptionally
2783 * @return the new CompletableFuture
2784 */
2785 public CompletableFuture<T> exceptionally
2786 (Function<Throwable, ? extends T> fn) {
2787 if (fn == null) throw new NullPointerException();
2788 CompletableFuture<T> dst = new CompletableFuture<T>();
2789 ExceptionCompletion<T> d = null;
2790 Object r;
2791 if ((r = result) == null) {
2792 CompletionNode p =
2793 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2794 while ((r = result) == null) {
2795 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2796 p.next = completions, p))
2797 break;
2798 }
2799 }
2800 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2801 T t = null; Throwable ex, dx = null;
2802 if (r instanceof AltResult) {
2803 if ((ex = ((AltResult)r).ex) != null) {
2804 try {
2805 t = fn.apply(ex);
2806 } catch (Throwable rex) {
2807 dx = rex;
2808 }
2809 }
2810 }
2811 else {
2812 @SuppressWarnings("unchecked") T tr = (T) r;
2813 t = tr;
2814 }
2815 dst.internalComplete(t, dx);
2816 }
2817 helpPostComplete();
2818 return dst;
2819 }
2820
2821 /**
2822 * Returns a new CompletableFuture that is completed when this
2823 * CompletableFuture completes, with the result of the given
2824 * function of the result and exception of this CompletableFuture's
2825 * completion. The given function is invoked with the result (or
2826 * {@code null} if none) and the exception (or {@code null} if none)
2827 * of this CompletableFuture when complete.
2828 *
2829 * @param fn the function to use to compute the value of the
2830 * returned CompletableFuture
2831 * @return the new CompletableFuture
2832 */
2833 public <U> CompletableFuture<U> handle
2834 (BiFunction<? super T, Throwable, ? extends U> fn) {
2835 if (fn == null) throw new NullPointerException();
2836 CompletableFuture<U> dst = new CompletableFuture<U>();
2837 HandleCompletion<T,U> d = null;
2838 Object r;
2839 if ((r = result) == null) {
2840 CompletionNode p =
2841 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2842 while ((r = result) == null) {
2843 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2844 p.next = completions, p))
2845 break;
2846 }
2847 }
2848 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2849 T t; Throwable ex;
2850 if (r instanceof AltResult) {
2851 ex = ((AltResult)r).ex;
2852 t = null;
2853 }
2854 else {
2855 ex = null;
2856 @SuppressWarnings("unchecked") T tr = (T) r;
2857 t = tr;
2858 }
2859 U u; Throwable dx;
2860 try {
2861 u = fn.apply(t, ex);
2862 dx = null;
2863 } catch (Throwable rex) {
2864 dx = rex;
2865 u = null;
2866 }
2867 dst.internalComplete(u, dx);
2868 }
2869 helpPostComplete();
2870 return dst;
2871 }
2872
2873
2874 /* ------------- Arbitrary-arity constructions -------------- */
2875
2876 /*
2877 * The basic plan of attack is to recursively form binary
2878 * completion trees of elements. This can be overkill for small
2879 * sets, but scales nicely. The And/All vs Or/Any forms use the
2880 * same idea, but details differ.
2881 */
2882
2883 /**
2884 * Returns a new CompletableFuture that is completed when all of
2885 * the given CompletableFutures complete. If any of the given
2886 * CompletableFutures complete exceptionally, then the returned
2887 * CompletableFuture also does so, with a CompletionException
2888 * holding this exception as its cause. Otherwise, the results,
2889 * if any, of the given CompletableFutures are not reflected in
2890 * the returned CompletableFuture, but may be obtained by
2891 * inspecting them individually. If no CompletableFutures are
2892 * provided, returns a CompletableFuture completed with the value
2893 * {@code null}.
2894 *
2895 * <p>Among the applications of this method is to await completion
2896 * of a set of independent CompletableFutures before continuing a
2897 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2898 * c3).join();}.
2899 *
2900 * @param cfs the CompletableFutures
2901 * @return a new CompletableFuture that is completed when all of the
2902 * given CompletableFutures complete
2903 * @throws NullPointerException if the array or any of its elements are
2904 * {@code null}
2905 */
2906 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2907 int len = cfs.length; // Directly handle empty and singleton cases
2908 if (len > 1)
2909 return allTree(cfs, 0, len - 1);
2910 else {
2911 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2912 CompletableFuture<?> f;
2913 if (len == 0)
2914 dst.result = NIL;
2915 else if ((f = cfs[0]) == null)
2916 throw new NullPointerException();
2917 else {
2918 ThenCopy d = null;
2919 CompletionNode p = null;
2920 Object r;
2921 while ((r = f.result) == null) {
2922 if (d == null)
2923 d = new ThenCopy(f, dst);
2924 else if (p == null)
2925 p = new CompletionNode(d);
2926 else if (UNSAFE.compareAndSwapObject
2927 (f, COMPLETIONS, p.next = f.completions, p))
2928 break;
2929 }
2930 if (r != null && (d == null || d.compareAndSet(0, 1)))
2931 dst.internalComplete(null, (r instanceof AltResult) ?
2932 ((AltResult)r).ex : null);
2933 f.helpPostComplete();
2934 }
2935 return dst;
2936 }
2937 }
2938
2939 /**
2940 * Recursively constructs an And'ed tree of CompletableFutures.
2941 * Called only when array known to have at least two elements.
2942 */
2943 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2944 int lo, int hi) {
2945 CompletableFuture<?> fst, snd;
2946 int mid = (lo + hi) >>> 1;
2947 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2948 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2949 throw new NullPointerException();
2950 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2951 AndCompletion d = null;
2952 CompletionNode p = null, q = null;
2953 Object r = null, s = null;
2954 while ((r = fst.result) == null || (s = snd.result) == null) {
2955 if (d == null)
2956 d = new AndCompletion(fst, snd, dst);
2957 else if (p == null)
2958 p = new CompletionNode(d);
2959 else if (q == null) {
2960 if (UNSAFE.compareAndSwapObject
2961 (fst, COMPLETIONS, p.next = fst.completions, p))
2962 q = new CompletionNode(d);
2963 }
2964 else if (UNSAFE.compareAndSwapObject
2965 (snd, COMPLETIONS, q.next = snd.completions, q))
2966 break;
2967 }
2968 if ((r != null || (r = fst.result) != null) &&
2969 (s != null || (s = snd.result) != null) &&
2970 (d == null || d.compareAndSet(0, 1))) {
2971 Throwable ex;
2972 if (r instanceof AltResult)
2973 ex = ((AltResult)r).ex;
2974 else
2975 ex = null;
2976 if (ex == null && (s instanceof AltResult))
2977 ex = ((AltResult)s).ex;
2978 dst.internalComplete(null, ex);
2979 }
2980 fst.helpPostComplete();
2981 snd.helpPostComplete();
2982 return dst;
2983 }
2984
2985 /**
2986 * Returns a new CompletableFuture that is completed when any of the
2987 * given CompletableFutures complete, with the same result if it
2988 * completed normally. Otherwise, if it completed exceptionally,
2989 * the returned CompletableFuture also does so, with a
2990 * CompletionException holding this exception as its cause. If no
2991 * CompletableFutures are provided, returns an incomplete
2992 * CompletableFuture.
2993 *
2994 * @param cfs the CompletableFutures
2995 * @return a new CompletableFuture that is completed when any of the
2996 * given CompletableFutures complete
2997 * @throws NullPointerException if the array or any of its elements are
2998 * {@code null}
2999 */
3000 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
3001 int len = cfs.length; // Same idea as allOf
3002 if (len > 1)
3003 return anyTree(cfs, 0, len - 1);
3004 else {
3005 CompletableFuture<?> dst = new CompletableFuture<Object>();
3006 CompletableFuture<?> f;
3007 if (len == 0)
3008 ; // skip
3009 else if ((f = cfs[0]) == null)
3010 throw new NullPointerException();
3011 else {
3012 ThenCopy d = null;
3013 CompletionNode p = null;
3014 Object r;
3015 while ((r = f.result) == null) {
3016 if (d == null)
3017 d = new ThenCopy(f, dst);
3018 else if (p == null)
3019 p = new CompletionNode(d);
3020 else if (UNSAFE.compareAndSwapObject
3021 (f, COMPLETIONS, p.next = f.completions, p))
3022 break;
3023 }
3024 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3025 Throwable ex; Object t;
3026 if (r instanceof AltResult) {
3027 ex = ((AltResult)r).ex;
3028 t = null;
3029 }
3030 else {
3031 ex = null;
3032 t = r;
3033 }
3034 dst.internalComplete(t, ex);
3035 }
3036 f.helpPostComplete();
3037 }
3038 return dst;
3039 }
3040 }
3041
3042 /**
3043 * Recursively constructs an Or'ed tree of CompletableFutures.
3044 */
3045 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
3046 int lo, int hi) {
3047 CompletableFuture<?> fst, snd;
3048 int mid = (lo + hi) >>> 1;
3049 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3050 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3051 throw new NullPointerException();
3052 CompletableFuture<?> dst = new CompletableFuture<Object>();
3053 OrCompletion d = null;
3054 CompletionNode p = null, q = null;
3055 Object r;
3056 while ((r = fst.result) == null && (r = snd.result) == null) {
3057 if (d == null)
3058 d = new OrCompletion(fst, snd, dst);
3059 else if (p == null)
3060 p = new CompletionNode(d);
3061 else if (q == null) {
3062 if (UNSAFE.compareAndSwapObject
3063 (fst, COMPLETIONS, p.next = fst.completions, p))
3064 q = new CompletionNode(d);
3065 }
3066 else if (UNSAFE.compareAndSwapObject
3067 (snd, COMPLETIONS, q.next = snd.completions, q))
3068 break;
3069 }
3070 if ((r != null || (r = fst.result) != null ||
3071 (r = snd.result) != null) &&
3072 (d == null || d.compareAndSet(0, 1))) {
3073 Throwable ex; Object t;
3074 if (r instanceof AltResult) {
3075 ex = ((AltResult)r).ex;
3076 t = null;
3077 }
3078 else {
3079 ex = null;
3080 t = r;
3081 }
3082 dst.internalComplete(t, ex);
3083 }
3084 fst.helpPostComplete();
3085 snd.helpPostComplete();
3086 return dst;
3087 }
3088
3089
3090 /* ------------- Control and status methods -------------- */
3091
3092 /**
3093 * If not already completed, completes this CompletableFuture with
3094 * a {@link CancellationException}. Dependent CompletableFutures
3095 * that have not already completed will also complete
3096 * exceptionally, with a {@link CompletionException} caused by
3097 * this {@code CancellationException}.
3098 *
3099 * @param mayInterruptIfRunning this value has no effect in this
3100 * implementation because interrupts are not used to control
3101 * processing.
3102 *
3103 * @return {@code true} if this task is now cancelled
3104 */
3105 public boolean cancel(boolean mayInterruptIfRunning) {
3106 boolean cancelled = (result == null) &&
3107 UNSAFE.compareAndSwapObject
3108 (this, RESULT, null, new AltResult(new CancellationException()));
3109 postComplete();
3110 return cancelled || isCancelled();
3111 }
3112
3113 /**
3114 * Returns {@code true} if this CompletableFuture was cancelled
3115 * before it completed normally.
3116 *
3117 * @return {@code true} if this CompletableFuture was cancelled
3118 * before it completed normally
3119 */
3120 public boolean isCancelled() {
3121 Object r;
3122 return ((r = result) instanceof AltResult) &&
3123 (((AltResult)r).ex instanceof CancellationException);
3124 }
3125
3126 /**
3127 * Forcibly sets or resets the value subsequently returned by
3128 * method {@link #get()} and related methods, whether or not
3129 * already completed. This method is designed for use only in
3130 * error recovery actions, and even in such situations may result
3131 * in ongoing dependent completions using established versus
3132 * overwritten outcomes.
3133 *
3134 * @param value the completion value
3135 */
3136 public void obtrudeValue(T value) {
3137 result = (value == null) ? NIL : value;
3138 postComplete();
3139 }
3140
3141 /**
3142 * Forcibly causes subsequent invocations of method {@link #get()}
3143 * and related methods to throw the given exception, whether or
3144 * not already completed. This method is designed for use only in
3145 * recovery actions, and even in such situations may result in
3146 * ongoing dependent completions using established versus
3147 * overwritten outcomes.
3148 *
3149 * @param ex the exception
3150 */
3151 public void obtrudeException(Throwable ex) {
3152 if (ex == null) throw new NullPointerException();
3153 result = new AltResult(ex);
3154 postComplete();
3155 }
3156
3157 /**
3158 * Returns the estimated number of CompletableFutures whose
3159 * completions are awaiting completion of this CompletableFuture.
3160 * This method is designed for use in monitoring system state, not
3161 * for synchronization control.
3162 *
3163 * @return the number of dependent CompletableFutures
3164 */
3165 public int getNumberOfDependents() {
3166 int count = 0;
3167 for (CompletionNode p = completions; p != null; p = p.next)
3168 ++count;
3169 return count;
3170 }
3171
3172 /**
3173 * Returns a string identifying this CompletableFuture, as well as
3174 * its completion state. The state, in brackets, contains the
3175 * String {@code "Completed Normally"} or the String {@code
3176 * "Completed Exceptionally"}, or the String {@code "Not
3177 * completed"} followed by the number of CompletableFutures
3178 * dependent upon its completion, if any.
3179 *
3180 * @return a string identifying this CompletableFuture, as well as its state
3181 */
3182 public String toString() {
3183 Object r = result;
3184 int count;
3185 return super.toString() +
3186 ((r == null) ?
3187 (((count = getNumberOfDependents()) == 0) ?
3188 "[Not completed]" :
3189 "[Not completed, " + count + " dependents]") :
3190 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3191 "[Completed exceptionally]" :
3192 "[Completed normally]"));
3193 }
3194
3195 // Unsafe mechanics
3196 private static final sun.misc.Unsafe UNSAFE;
3197 private static final long RESULT;
3198 private static final long WAITERS;
3199 private static final long COMPLETIONS;
3200 static {
3201 try {
3202 UNSAFE = sun.misc.Unsafe.getUnsafe();
3203 Class<?> k = CompletableFuture.class;
3204 RESULT = UNSAFE.objectFieldOffset
3205 (k.getDeclaredField("result"));
3206 WAITERS = UNSAFE.objectFieldOffset
3207 (k.getDeclaredField("waiters"));
3208 COMPLETIONS = UNSAFE.objectFieldOffset
3209 (k.getDeclaredField("completions"));
3210 } catch (Exception e) {
3211 throw new Error(e);
3212 }
3213 }
3214 }