ViewVC Help
View File | Revision Log | Show Annotations | Download File | Root Listing
root/jsr166/jsr166/src/main/java/util/concurrent/CompletableFuture.java
Revision: 1.74
Committed: Tue Mar 19 17:14:34 2013 UTC (11 years, 2 months ago) by jsr166
Branch: MAIN
Changes since 1.73: +63 -45 lines
Log Message:
more precise exception specs for "either" methods

File Contents

# Content
1 /*
2 * Written by Doug Lea with assistance from members of JCP JSR-166
3 * Expert Group and released to the public domain, as explained at
4 * http://creativecommons.org/publicdomain/zero/1.0/
5 */
6
7 package java.util.concurrent;
8 import java.util.function.Supplier;
9 import java.util.function.Consumer;
10 import java.util.function.BiConsumer;
11 import java.util.function.Function;
12 import java.util.function.BiFunction;
13 import java.util.concurrent.Future;
14 import java.util.concurrent.TimeUnit;
15 import java.util.concurrent.ForkJoinPool;
16 import java.util.concurrent.ForkJoinTask;
17 import java.util.concurrent.Executor;
18 import java.util.concurrent.ThreadLocalRandom;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.TimeoutException;
21 import java.util.concurrent.CancellationException;
22 import java.util.concurrent.atomic.AtomicInteger;
23 import java.util.concurrent.locks.LockSupport;
24
25 /**
26 * A {@link Future} that may be explicitly completed (setting its
27 * value and status), and may include dependent functions and actions
28 * that trigger upon its completion.
29 *
30 * <p>When two or more threads attempt to
31 * {@link #complete complete},
32 * {@link #completeExceptionally completeExceptionally}, or
33 * {@link #cancel cancel}
34 * a CompletableFuture, only one of them succeeds.
35 *
36 * <p>Methods are available for adding dependents based on
37 * user-provided Functions, Consumers, or Runnables. The appropriate
38 * form to use depends on whether actions require arguments and/or
39 * produce results. Completion of a dependent action will trigger the
40 * completion of another CompletableFuture. Actions may also be
41 * triggered after either or both the current and another
42 * CompletableFuture complete. Multiple CompletableFutures may also
43 * be grouped as one using {@link #anyOf(CompletableFuture...)} and
44 * {@link #allOf(CompletableFuture...)}.
45 *
46 * <p>CompletableFutures themselves do not execute asynchronously.
47 * However, actions supplied for dependent completions of another
48 * CompletableFuture may do so, depending on whether they are provided
49 * via one of the <em>async</em> methods (that is, methods with names
50 * of the form <tt><var>xxx</var>Async</tt>). The <em>async</em>
51 * methods provide a way to commence asynchronous processing of an
52 * action using either a given {@link Executor} or by default the
53 * {@link ForkJoinPool#commonPool()}. To simplify monitoring,
54 * debugging, and tracking, all generated asynchronous tasks are
55 * instances of the marker interface {@link AsynchronousCompletionTask}.
56 *
57 * <p>Actions supplied for dependent completions of <em>non-async</em>
58 * methods may be performed by the thread that completes the current
59 * CompletableFuture, or by any other caller of these methods. There
60 * are no guarantees about the order of processing completions unless
61 * constrained by these methods.
62 *
63 * <p>Since (unlike {@link FutureTask}) this class has no direct
64 * control over the computation that causes it to be completed,
65 * cancellation is treated as just another form of exceptional completion.
66 * Method {@link #cancel cancel} has the same effect as
67 * {@code completeExceptionally(new CancellationException())}.
68 *
69 * <p>Upon exceptional completion (including cancellation), or when a
70 * completion entails an additional computation which terminates
71 * abruptly with an (unchecked) exception or error, then all of their
72 * dependent completions (and their dependents in turn) generally act
73 * as {@code completeExceptionally} with a {@link CompletionException}
74 * holding that exception as its cause. However, the {@link
75 * #exceptionally exceptionally} and {@link #handle handle}
76 * completions <em>are</em> able to handle exceptional completions of
77 * the CompletableFutures they depend on.
78 *
79 * <p>In case of exceptional completion with a CompletionException,
80 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
81 * {@link ExecutionException} with the same cause as held in the
82 * corresponding CompletionException. However, in these cases,
83 * methods {@link #join()} and {@link #getNow} throw the
84 * CompletionException, which simplifies usage.
85 *
86 * @author Doug Lea
87 * @since 1.8
88 */
89 public class CompletableFuture<T> implements Future<T> {
90
91 /*
92 * Overview:
93 *
94 * 1. Non-nullness of field result (set via CAS) indicates done.
95 * An AltResult is used to box null as a result, as well as to
96 * hold exceptions. Using a single field makes completion fast
97 * and simple to detect and trigger, at the expense of a lot of
98 * encoding and decoding that infiltrates many methods. One minor
99 * simplification relies on the (static) NIL (to box null results)
100 * being the only AltResult with a null exception field, so we
101 * don't usually need explicit comparisons with NIL. The CF
102 * exception propagation mechanics surrounding decoding rely on
103 * unchecked casts of decoded results really being unchecked,
104 * where user type errors are caught at point of use, as is
105 * currently the case in Java. These are highlighted by using
106 * SuppressWarnings-annotated temporaries.
107 *
108 * 2. Waiters are held in a Treiber stack similar to the one used
109 * in FutureTask, Phaser, and SynchronousQueue. See their
110 * internal documentation for algorithmic details.
111 *
112 * 3. Completions are also kept in a list/stack, and pulled off
113 * and run when completion is triggered. (We could even use the
114 * same stack as for waiters, but would give up the potential
115 * parallelism obtained because woken waiters help release/run
116 * others -- see method postComplete). Because post-processing
117 * may race with direct calls, class Completion opportunistically
118 * extends AtomicInteger so callers can claim the action via
119 * compareAndSet(0, 1). The Completion.run methods are all
120 * written a boringly similar uniform way (that sometimes includes
121 * unnecessary-looking checks, kept to maintain uniformity).
122 * There are enough dimensions upon which they differ that
123 * attempts to factor commonalities while maintaining efficiency
124 * require more lines of code than they would save.
125 *
126 * 4. The exported then/and/or methods do support a bit of
127 * factoring (see doThenApply etc). They must cope with the
128 * intrinsic races surrounding addition of a dependent action
129 * versus performing the action directly because the task is
130 * already complete. For example, a CF may not be complete upon
131 * entry, so a dependent completion is added, but by the time it
132 * is added, the target CF is complete, so must be directly
133 * executed. This is all done while avoiding unnecessary object
134 * construction in safe-bypass cases.
135 */
136
137 // preliminaries
138
139 static final class AltResult {
140 final Throwable ex; // null only for NIL
141 AltResult(Throwable ex) { this.ex = ex; }
142 }
143
144 static final AltResult NIL = new AltResult(null);
145
146 // Fields
147
148 volatile Object result; // Either the result or boxed AltResult
149 volatile WaitNode waiters; // Treiber stack of threads blocked on get()
150 volatile CompletionNode completions; // list (Treiber stack) of completions
151
152 // Basic utilities for triggering and processing completions
153
154 /**
155 * Removes and signals all waiting threads and runs all completions.
156 */
157 final void postComplete() {
158 WaitNode q; Thread t;
159 while ((q = waiters) != null) {
160 if (UNSAFE.compareAndSwapObject(this, WAITERS, q, q.next) &&
161 (t = q.thread) != null) {
162 q.thread = null;
163 LockSupport.unpark(t);
164 }
165 }
166
167 CompletionNode h; Completion c;
168 while ((h = completions) != null) {
169 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS, h, h.next) &&
170 (c = h.completion) != null)
171 c.run();
172 }
173 }
174
175 /**
176 * Triggers completion with the encoding of the given arguments:
177 * if the exception is non-null, encodes it as a wrapped
178 * CompletionException unless it is one already. Otherwise uses
179 * the given result, boxed as NIL if null.
180 */
181 final void internalComplete(Object v, Throwable ex) {
182 if (result == null)
183 UNSAFE.compareAndSwapObject
184 (this, RESULT, null,
185 (ex == null) ? (v == null) ? NIL : v :
186 new AltResult((ex instanceof CompletionException) ? ex :
187 new CompletionException(ex)));
188 postComplete(); // help out even if not triggered
189 }
190
191 /**
192 * If triggered, helps release and/or process completions.
193 */
194 final void helpPostComplete() {
195 if (result != null)
196 postComplete();
197 }
198
199 /* ------------- waiting for completions -------------- */
200
201 /** Number of processors, for spin control */
202 static final int NCPU = Runtime.getRuntime().availableProcessors();
203
204 /**
205 * Heuristic spin value for waitingGet() before blocking on
206 * multiprocessors
207 */
208 static final int SPINS = (NCPU > 1) ? 1 << 8 : 0;
209
210 /**
211 * Linked nodes to record waiting threads in a Treiber stack. See
212 * other classes such as Phaser and SynchronousQueue for more
213 * detailed explanation. This class implements ManagedBlocker to
214 * avoid starvation when blocking actions pile up in
215 * ForkJoinPools.
216 */
217 static final class WaitNode implements ForkJoinPool.ManagedBlocker {
218 long nanos; // wait time if timed
219 final long deadline; // non-zero if timed
220 volatile int interruptControl; // > 0: interruptible, < 0: interrupted
221 volatile Thread thread;
222 volatile WaitNode next;
223 WaitNode(boolean interruptible, long nanos, long deadline) {
224 this.thread = Thread.currentThread();
225 this.interruptControl = interruptible ? 1 : 0;
226 this.nanos = nanos;
227 this.deadline = deadline;
228 }
229 public boolean isReleasable() {
230 if (thread == null)
231 return true;
232 if (Thread.interrupted()) {
233 int i = interruptControl;
234 interruptControl = -1;
235 if (i > 0)
236 return true;
237 }
238 if (deadline != 0L &&
239 (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) {
240 thread = null;
241 return true;
242 }
243 return false;
244 }
245 public boolean block() {
246 if (isReleasable())
247 return true;
248 else if (deadline == 0L)
249 LockSupport.park(this);
250 else if (nanos > 0L)
251 LockSupport.parkNanos(this, nanos);
252 return isReleasable();
253 }
254 }
255
256 /**
257 * Returns raw result after waiting, or null if interruptible and
258 * interrupted.
259 */
260 private Object waitingGet(boolean interruptible) {
261 WaitNode q = null;
262 boolean queued = false;
263 int spins = SPINS;
264 for (Object r;;) {
265 if ((r = result) != null) {
266 if (q != null) { // suppress unpark
267 q.thread = null;
268 if (q.interruptControl < 0) {
269 if (interruptible) {
270 removeWaiter(q);
271 return null;
272 }
273 Thread.currentThread().interrupt();
274 }
275 }
276 postComplete(); // help release others
277 return r;
278 }
279 else if (spins > 0) {
280 int rnd = ThreadLocalRandom.nextSecondarySeed();
281 if (rnd == 0)
282 rnd = ThreadLocalRandom.current().nextInt();
283 if (rnd >= 0)
284 --spins;
285 }
286 else if (q == null)
287 q = new WaitNode(interruptible, 0L, 0L);
288 else if (!queued)
289 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
290 q.next = waiters, q);
291 else if (interruptible && q.interruptControl < 0) {
292 removeWaiter(q);
293 return null;
294 }
295 else if (q.thread != null && result == null) {
296 try {
297 ForkJoinPool.managedBlock(q);
298 } catch (InterruptedException ex) {
299 q.interruptControl = -1;
300 }
301 }
302 }
303 }
304
305 /**
306 * Awaits completion or aborts on interrupt or timeout.
307 *
308 * @param nanos time to wait
309 * @return raw result
310 */
311 private Object timedAwaitDone(long nanos)
312 throws InterruptedException, TimeoutException {
313 WaitNode q = null;
314 boolean queued = false;
315 for (Object r;;) {
316 if ((r = result) != null) {
317 if (q != null) {
318 q.thread = null;
319 if (q.interruptControl < 0) {
320 removeWaiter(q);
321 throw new InterruptedException();
322 }
323 }
324 postComplete();
325 return r;
326 }
327 else if (q == null) {
328 if (nanos <= 0L)
329 throw new TimeoutException();
330 long d = System.nanoTime() + nanos;
331 q = new WaitNode(true, nanos, d == 0L ? 1L : d); // avoid 0
332 }
333 else if (!queued)
334 queued = UNSAFE.compareAndSwapObject(this, WAITERS,
335 q.next = waiters, q);
336 else if (q.interruptControl < 0) {
337 removeWaiter(q);
338 throw new InterruptedException();
339 }
340 else if (q.nanos <= 0L) {
341 if (result == null) {
342 removeWaiter(q);
343 throw new TimeoutException();
344 }
345 }
346 else if (q.thread != null && result == null) {
347 try {
348 ForkJoinPool.managedBlock(q);
349 } catch (InterruptedException ex) {
350 q.interruptControl = -1;
351 }
352 }
353 }
354 }
355
356 /**
357 * Tries to unlink a timed-out or interrupted wait node to avoid
358 * accumulating garbage. Internal nodes are simply unspliced
359 * without CAS since it is harmless if they are traversed anyway
360 * by releasers. To avoid effects of unsplicing from already
361 * removed nodes, the list is retraversed in case of an apparent
362 * race. This is slow when there are a lot of nodes, but we don't
363 * expect lists to be long enough to outweigh higher-overhead
364 * schemes.
365 */
366 private void removeWaiter(WaitNode node) {
367 if (node != null) {
368 node.thread = null;
369 retry:
370 for (;;) { // restart on removeWaiter race
371 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
372 s = q.next;
373 if (q.thread != null)
374 pred = q;
375 else if (pred != null) {
376 pred.next = s;
377 if (pred.thread == null) // check for race
378 continue retry;
379 }
380 else if (!UNSAFE.compareAndSwapObject(this, WAITERS, q, s))
381 continue retry;
382 }
383 break;
384 }
385 }
386 }
387
388 /* ------------- Async tasks -------------- */
389
390 /**
391 * A marker interface identifying asynchronous tasks produced by
392 * {@code async} methods. This may be useful for monitoring,
393 * debugging, and tracking asynchronous activities.
394 *
395 * @since 1.8
396 */
397 public static interface AsynchronousCompletionTask {
398 }
399
400 /** Base class can act as either FJ or plain Runnable */
401 abstract static class Async extends ForkJoinTask<Void>
402 implements Runnable, AsynchronousCompletionTask {
403 public final Void getRawResult() { return null; }
404 public final void setRawResult(Void v) { }
405 public final void run() { exec(); }
406 }
407
408 static final class AsyncRun extends Async {
409 final Runnable fn;
410 final CompletableFuture<Void> dst;
411 AsyncRun(Runnable fn, CompletableFuture<Void> dst) {
412 this.fn = fn; this.dst = dst;
413 }
414 public final boolean exec() {
415 CompletableFuture<Void> d; Throwable ex;
416 if ((d = this.dst) != null && d.result == null) {
417 try {
418 fn.run();
419 ex = null;
420 } catch (Throwable rex) {
421 ex = rex;
422 }
423 d.internalComplete(null, ex);
424 }
425 return true;
426 }
427 private static final long serialVersionUID = 5232453952276885070L;
428 }
429
430 static final class AsyncSupply<U> extends Async {
431 final Supplier<U> fn;
432 final CompletableFuture<U> dst;
433 AsyncSupply(Supplier<U> fn, CompletableFuture<U> dst) {
434 this.fn = fn; this.dst = dst;
435 }
436 public final boolean exec() {
437 CompletableFuture<U> d; U u; Throwable ex;
438 if ((d = this.dst) != null && d.result == null) {
439 try {
440 u = fn.get();
441 ex = null;
442 } catch (Throwable rex) {
443 ex = rex;
444 u = null;
445 }
446 d.internalComplete(u, ex);
447 }
448 return true;
449 }
450 private static final long serialVersionUID = 5232453952276885070L;
451 }
452
453 static final class AsyncApply<T,U> extends Async {
454 final T arg;
455 final Function<? super T,? extends U> fn;
456 final CompletableFuture<U> dst;
457 AsyncApply(T arg, Function<? super T,? extends U> fn,
458 CompletableFuture<U> dst) {
459 this.arg = arg; this.fn = fn; this.dst = dst;
460 }
461 public final boolean exec() {
462 CompletableFuture<U> d; U u; Throwable ex;
463 if ((d = this.dst) != null && d.result == null) {
464 try {
465 u = fn.apply(arg);
466 ex = null;
467 } catch (Throwable rex) {
468 ex = rex;
469 u = null;
470 }
471 d.internalComplete(u, ex);
472 }
473 return true;
474 }
475 private static final long serialVersionUID = 5232453952276885070L;
476 }
477
478 static final class AsyncBiApply<T,U,V> extends Async {
479 final T arg1;
480 final U arg2;
481 final BiFunction<? super T,? super U,? extends V> fn;
482 final CompletableFuture<V> dst;
483 AsyncBiApply(T arg1, U arg2,
484 BiFunction<? super T,? super U,? extends V> fn,
485 CompletableFuture<V> dst) {
486 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
487 }
488 public final boolean exec() {
489 CompletableFuture<V> d; V v; Throwable ex;
490 if ((d = this.dst) != null && d.result == null) {
491 try {
492 v = fn.apply(arg1, arg2);
493 ex = null;
494 } catch (Throwable rex) {
495 ex = rex;
496 v = null;
497 }
498 d.internalComplete(v, ex);
499 }
500 return true;
501 }
502 private static final long serialVersionUID = 5232453952276885070L;
503 }
504
505 static final class AsyncAccept<T> extends Async {
506 final T arg;
507 final Consumer<? super T> fn;
508 final CompletableFuture<Void> dst;
509 AsyncAccept(T arg, Consumer<? super T> fn,
510 CompletableFuture<Void> dst) {
511 this.arg = arg; this.fn = fn; this.dst = dst;
512 }
513 public final boolean exec() {
514 CompletableFuture<Void> d; Throwable ex;
515 if ((d = this.dst) != null && d.result == null) {
516 try {
517 fn.accept(arg);
518 ex = null;
519 } catch (Throwable rex) {
520 ex = rex;
521 }
522 d.internalComplete(null, ex);
523 }
524 return true;
525 }
526 private static final long serialVersionUID = 5232453952276885070L;
527 }
528
529 static final class AsyncBiAccept<T,U> extends Async {
530 final T arg1;
531 final U arg2;
532 final BiConsumer<? super T,? super U> fn;
533 final CompletableFuture<Void> dst;
534 AsyncBiAccept(T arg1, U arg2,
535 BiConsumer<? super T,? super U> fn,
536 CompletableFuture<Void> dst) {
537 this.arg1 = arg1; this.arg2 = arg2; this.fn = fn; this.dst = dst;
538 }
539 public final boolean exec() {
540 CompletableFuture<Void> d; Throwable ex;
541 if ((d = this.dst) != null && d.result == null) {
542 try {
543 fn.accept(arg1, arg2);
544 ex = null;
545 } catch (Throwable rex) {
546 ex = rex;
547 }
548 d.internalComplete(null, ex);
549 }
550 return true;
551 }
552 private static final long serialVersionUID = 5232453952276885070L;
553 }
554
555 static final class AsyncCompose<T,U> extends Async {
556 final T arg;
557 final Function<? super T, CompletableFuture<U>> fn;
558 final CompletableFuture<U> dst;
559 AsyncCompose(T arg,
560 Function<? super T, CompletableFuture<U>> fn,
561 CompletableFuture<U> dst) {
562 this.arg = arg; this.fn = fn; this.dst = dst;
563 }
564 public final boolean exec() {
565 CompletableFuture<U> d, fr; U u; Throwable ex;
566 if ((d = this.dst) != null && d.result == null) {
567 try {
568 fr = fn.apply(arg);
569 ex = (fr == null) ? new NullPointerException() : null;
570 } catch (Throwable rex) {
571 ex = rex;
572 fr = null;
573 }
574 if (ex != null)
575 u = null;
576 else {
577 Object r = fr.result;
578 if (r == null)
579 r = fr.waitingGet(false);
580 if (r instanceof AltResult) {
581 ex = ((AltResult)r).ex;
582 u = null;
583 }
584 else {
585 @SuppressWarnings("unchecked") U ur = (U) r;
586 u = ur;
587 }
588 }
589 d.internalComplete(u, ex);
590 }
591 return true;
592 }
593 private static final long serialVersionUID = 5232453952276885070L;
594 }
595
596 /* ------------- Completions -------------- */
597
598 /**
599 * Simple linked list nodes to record completions, used in
600 * basically the same way as WaitNodes. (We separate nodes from
601 * the Completions themselves mainly because for the And and Or
602 * methods, the same Completion object resides in two lists.)
603 */
604 static final class CompletionNode {
605 final Completion completion;
606 volatile CompletionNode next;
607 CompletionNode(Completion completion) { this.completion = completion; }
608 }
609
610 // Opportunistically subclass AtomicInteger to use compareAndSet to claim.
611 abstract static class Completion extends AtomicInteger implements Runnable {
612 }
613
614 static final class ApplyCompletion<T,U> extends Completion {
615 final CompletableFuture<? extends T> src;
616 final Function<? super T,? extends U> fn;
617 final CompletableFuture<U> dst;
618 final Executor executor;
619 ApplyCompletion(CompletableFuture<? extends T> src,
620 Function<? super T,? extends U> fn,
621 CompletableFuture<U> dst, Executor executor) {
622 this.src = src; this.fn = fn; this.dst = dst;
623 this.executor = executor;
624 }
625 public final void run() {
626 final CompletableFuture<? extends T> a;
627 final Function<? super T,? extends U> fn;
628 final CompletableFuture<U> dst;
629 Object r; T t; Throwable ex;
630 if ((dst = this.dst) != null &&
631 (fn = this.fn) != null &&
632 (a = this.src) != null &&
633 (r = a.result) != null &&
634 compareAndSet(0, 1)) {
635 if (r instanceof AltResult) {
636 ex = ((AltResult)r).ex;
637 t = null;
638 }
639 else {
640 ex = null;
641 @SuppressWarnings("unchecked") T tr = (T) r;
642 t = tr;
643 }
644 Executor e = executor;
645 U u = null;
646 if (ex == null) {
647 try {
648 if (e != null)
649 e.execute(new AsyncApply<T,U>(t, fn, dst));
650 else
651 u = fn.apply(t);
652 } catch (Throwable rex) {
653 ex = rex;
654 }
655 }
656 if (e == null || ex != null)
657 dst.internalComplete(u, ex);
658 }
659 }
660 private static final long serialVersionUID = 5232453952276885070L;
661 }
662
663 static final class AcceptCompletion<T> extends Completion {
664 final CompletableFuture<? extends T> src;
665 final Consumer<? super T> fn;
666 final CompletableFuture<Void> dst;
667 final Executor executor;
668 AcceptCompletion(CompletableFuture<? extends T> src,
669 Consumer<? super T> fn,
670 CompletableFuture<Void> dst, Executor executor) {
671 this.src = src; this.fn = fn; this.dst = dst;
672 this.executor = executor;
673 }
674 public final void run() {
675 final CompletableFuture<? extends T> a;
676 final Consumer<? super T> fn;
677 final CompletableFuture<Void> dst;
678 Object r; T t; Throwable ex;
679 if ((dst = this.dst) != null &&
680 (fn = this.fn) != null &&
681 (a = this.src) != null &&
682 (r = a.result) != null &&
683 compareAndSet(0, 1)) {
684 if (r instanceof AltResult) {
685 ex = ((AltResult)r).ex;
686 t = null;
687 }
688 else {
689 ex = null;
690 @SuppressWarnings("unchecked") T tr = (T) r;
691 t = tr;
692 }
693 Executor e = executor;
694 if (ex == null) {
695 try {
696 if (e != null)
697 e.execute(new AsyncAccept<T>(t, fn, dst));
698 else
699 fn.accept(t);
700 } catch (Throwable rex) {
701 ex = rex;
702 }
703 }
704 if (e == null || ex != null)
705 dst.internalComplete(null, ex);
706 }
707 }
708 private static final long serialVersionUID = 5232453952276885070L;
709 }
710
711 static final class RunCompletion<T> extends Completion {
712 final CompletableFuture<? extends T> src;
713 final Runnable fn;
714 final CompletableFuture<Void> dst;
715 final Executor executor;
716 RunCompletion(CompletableFuture<? extends T> src,
717 Runnable fn,
718 CompletableFuture<Void> dst,
719 Executor executor) {
720 this.src = src; this.fn = fn; this.dst = dst;
721 this.executor = executor;
722 }
723 public final void run() {
724 final CompletableFuture<? extends T> a;
725 final Runnable fn;
726 final CompletableFuture<Void> dst;
727 Object r; Throwable ex;
728 if ((dst = this.dst) != null &&
729 (fn = this.fn) != null &&
730 (a = this.src) != null &&
731 (r = a.result) != null &&
732 compareAndSet(0, 1)) {
733 if (r instanceof AltResult)
734 ex = ((AltResult)r).ex;
735 else
736 ex = null;
737 Executor e = executor;
738 if (ex == null) {
739 try {
740 if (e != null)
741 e.execute(new AsyncRun(fn, dst));
742 else
743 fn.run();
744 } catch (Throwable rex) {
745 ex = rex;
746 }
747 }
748 if (e == null || ex != null)
749 dst.internalComplete(null, ex);
750 }
751 }
752 private static final long serialVersionUID = 5232453952276885070L;
753 }
754
755 static final class BiApplyCompletion<T,U,V> extends Completion {
756 final CompletableFuture<? extends T> src;
757 final CompletableFuture<? extends U> snd;
758 final BiFunction<? super T,? super U,? extends V> fn;
759 final CompletableFuture<V> dst;
760 final Executor executor;
761 BiApplyCompletion(CompletableFuture<? extends T> src,
762 CompletableFuture<? extends U> snd,
763 BiFunction<? super T,? super U,? extends V> fn,
764 CompletableFuture<V> dst, Executor executor) {
765 this.src = src; this.snd = snd;
766 this.fn = fn; this.dst = dst;
767 this.executor = executor;
768 }
769 public final void run() {
770 final CompletableFuture<? extends T> a;
771 final CompletableFuture<? extends U> b;
772 final BiFunction<? super T,? super U,? extends V> fn;
773 final CompletableFuture<V> dst;
774 Object r, s; T t; U u; Throwable ex;
775 if ((dst = this.dst) != null &&
776 (fn = this.fn) != null &&
777 (a = this.src) != null &&
778 (r = a.result) != null &&
779 (b = this.snd) != null &&
780 (s = b.result) != null &&
781 compareAndSet(0, 1)) {
782 if (r instanceof AltResult) {
783 ex = ((AltResult)r).ex;
784 t = null;
785 }
786 else {
787 ex = null;
788 @SuppressWarnings("unchecked") T tr = (T) r;
789 t = tr;
790 }
791 if (ex != null)
792 u = null;
793 else if (s instanceof AltResult) {
794 ex = ((AltResult)s).ex;
795 u = null;
796 }
797 else {
798 @SuppressWarnings("unchecked") U us = (U) s;
799 u = us;
800 }
801 Executor e = executor;
802 V v = null;
803 if (ex == null) {
804 try {
805 if (e != null)
806 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
807 else
808 v = fn.apply(t, u);
809 } catch (Throwable rex) {
810 ex = rex;
811 }
812 }
813 if (e == null || ex != null)
814 dst.internalComplete(v, ex);
815 }
816 }
817 private static final long serialVersionUID = 5232453952276885070L;
818 }
819
820 static final class BiAcceptCompletion<T,U> extends Completion {
821 final CompletableFuture<? extends T> src;
822 final CompletableFuture<? extends U> snd;
823 final BiConsumer<? super T,? super U> fn;
824 final CompletableFuture<Void> dst;
825 final Executor executor;
826 BiAcceptCompletion(CompletableFuture<? extends T> src,
827 CompletableFuture<? extends U> snd,
828 BiConsumer<? super T,? super U> fn,
829 CompletableFuture<Void> dst, Executor executor) {
830 this.src = src; this.snd = snd;
831 this.fn = fn; this.dst = dst;
832 this.executor = executor;
833 }
834 public final void run() {
835 final CompletableFuture<? extends T> a;
836 final CompletableFuture<? extends U> b;
837 final BiConsumer<? super T,? super U> fn;
838 final CompletableFuture<Void> dst;
839 Object r, s; T t; U u; Throwable ex;
840 if ((dst = this.dst) != null &&
841 (fn = this.fn) != null &&
842 (a = this.src) != null &&
843 (r = a.result) != null &&
844 (b = this.snd) != null &&
845 (s = b.result) != null &&
846 compareAndSet(0, 1)) {
847 if (r instanceof AltResult) {
848 ex = ((AltResult)r).ex;
849 t = null;
850 }
851 else {
852 ex = null;
853 @SuppressWarnings("unchecked") T tr = (T) r;
854 t = tr;
855 }
856 if (ex != null)
857 u = null;
858 else if (s instanceof AltResult) {
859 ex = ((AltResult)s).ex;
860 u = null;
861 }
862 else {
863 @SuppressWarnings("unchecked") U us = (U) s;
864 u = us;
865 }
866 Executor e = executor;
867 if (ex == null) {
868 try {
869 if (e != null)
870 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
871 else
872 fn.accept(t, u);
873 } catch (Throwable rex) {
874 ex = rex;
875 }
876 }
877 if (e == null || ex != null)
878 dst.internalComplete(null, ex);
879 }
880 }
881 private static final long serialVersionUID = 5232453952276885070L;
882 }
883
884 static final class BiRunCompletion<T> extends Completion {
885 final CompletableFuture<? extends T> src;
886 final CompletableFuture<?> snd;
887 final Runnable fn;
888 final CompletableFuture<Void> dst;
889 final Executor executor;
890 BiRunCompletion(CompletableFuture<? extends T> src,
891 CompletableFuture<?> snd,
892 Runnable fn,
893 CompletableFuture<Void> dst, Executor executor) {
894 this.src = src; this.snd = snd;
895 this.fn = fn; this.dst = dst;
896 this.executor = executor;
897 }
898 public final void run() {
899 final CompletableFuture<? extends T> a;
900 final CompletableFuture<?> b;
901 final Runnable fn;
902 final CompletableFuture<Void> dst;
903 Object r, s; Throwable ex;
904 if ((dst = this.dst) != null &&
905 (fn = this.fn) != null &&
906 (a = this.src) != null &&
907 (r = a.result) != null &&
908 (b = this.snd) != null &&
909 (s = b.result) != null &&
910 compareAndSet(0, 1)) {
911 if (r instanceof AltResult)
912 ex = ((AltResult)r).ex;
913 else
914 ex = null;
915 if (ex == null && (s instanceof AltResult))
916 ex = ((AltResult)s).ex;
917 Executor e = executor;
918 if (ex == null) {
919 try {
920 if (e != null)
921 e.execute(new AsyncRun(fn, dst));
922 else
923 fn.run();
924 } catch (Throwable rex) {
925 ex = rex;
926 }
927 }
928 if (e == null || ex != null)
929 dst.internalComplete(null, ex);
930 }
931 }
932 private static final long serialVersionUID = 5232453952276885070L;
933 }
934
935 static final class AndCompletion extends Completion {
936 final CompletableFuture<?> src;
937 final CompletableFuture<?> snd;
938 final CompletableFuture<Void> dst;
939 AndCompletion(CompletableFuture<?> src,
940 CompletableFuture<?> snd,
941 CompletableFuture<Void> dst) {
942 this.src = src; this.snd = snd; this.dst = dst;
943 }
944 public final void run() {
945 final CompletableFuture<?> a;
946 final CompletableFuture<?> b;
947 final CompletableFuture<Void> dst;
948 Object r, s; Throwable ex;
949 if ((dst = this.dst) != null &&
950 (a = this.src) != null &&
951 (r = a.result) != null &&
952 (b = this.snd) != null &&
953 (s = b.result) != null &&
954 compareAndSet(0, 1)) {
955 if (r instanceof AltResult)
956 ex = ((AltResult)r).ex;
957 else
958 ex = null;
959 if (ex == null && (s instanceof AltResult))
960 ex = ((AltResult)s).ex;
961 dst.internalComplete(null, ex);
962 }
963 }
964 private static final long serialVersionUID = 5232453952276885070L;
965 }
966
967 static final class OrApplyCompletion<T,U> extends Completion {
968 final CompletableFuture<? extends T> src;
969 final CompletableFuture<? extends T> snd;
970 final Function<? super T,? extends U> fn;
971 final CompletableFuture<U> dst;
972 final Executor executor;
973 OrApplyCompletion(CompletableFuture<? extends T> src,
974 CompletableFuture<? extends T> snd,
975 Function<? super T,? extends U> fn,
976 CompletableFuture<U> dst, Executor executor) {
977 this.src = src; this.snd = snd;
978 this.fn = fn; this.dst = dst;
979 this.executor = executor;
980 }
981 public final void run() {
982 final CompletableFuture<? extends T> a;
983 final CompletableFuture<? extends T> b;
984 final Function<? super T,? extends U> fn;
985 final CompletableFuture<U> dst;
986 Object r; T t; Throwable ex;
987 if ((dst = this.dst) != null &&
988 (fn = this.fn) != null &&
989 (((a = this.src) != null && (r = a.result) != null) ||
990 ((b = this.snd) != null && (r = b.result) != null)) &&
991 compareAndSet(0, 1)) {
992 if (r instanceof AltResult) {
993 ex = ((AltResult)r).ex;
994 t = null;
995 }
996 else {
997 ex = null;
998 @SuppressWarnings("unchecked") T tr = (T) r;
999 t = tr;
1000 }
1001 Executor e = executor;
1002 U u = null;
1003 if (ex == null) {
1004 try {
1005 if (e != null)
1006 e.execute(new AsyncApply<T,U>(t, fn, dst));
1007 else
1008 u = fn.apply(t);
1009 } catch (Throwable rex) {
1010 ex = rex;
1011 }
1012 }
1013 if (e == null || ex != null)
1014 dst.internalComplete(u, ex);
1015 }
1016 }
1017 private static final long serialVersionUID = 5232453952276885070L;
1018 }
1019
1020 static final class OrAcceptCompletion<T> extends Completion {
1021 final CompletableFuture<? extends T> src;
1022 final CompletableFuture<? extends T> snd;
1023 final Consumer<? super T> fn;
1024 final CompletableFuture<Void> dst;
1025 final Executor executor;
1026 OrAcceptCompletion(CompletableFuture<? extends T> src,
1027 CompletableFuture<? extends T> snd,
1028 Consumer<? super T> fn,
1029 CompletableFuture<Void> dst, Executor executor) {
1030 this.src = src; this.snd = snd;
1031 this.fn = fn; this.dst = dst;
1032 this.executor = executor;
1033 }
1034 public final void run() {
1035 final CompletableFuture<? extends T> a;
1036 final CompletableFuture<? extends T> b;
1037 final Consumer<? super T> fn;
1038 final CompletableFuture<Void> dst;
1039 Object r; T t; Throwable ex;
1040 if ((dst = this.dst) != null &&
1041 (fn = this.fn) != null &&
1042 (((a = this.src) != null && (r = a.result) != null) ||
1043 ((b = this.snd) != null && (r = b.result) != null)) &&
1044 compareAndSet(0, 1)) {
1045 if (r instanceof AltResult) {
1046 ex = ((AltResult)r).ex;
1047 t = null;
1048 }
1049 else {
1050 ex = null;
1051 @SuppressWarnings("unchecked") T tr = (T) r;
1052 t = tr;
1053 }
1054 Executor e = executor;
1055 if (ex == null) {
1056 try {
1057 if (e != null)
1058 e.execute(new AsyncAccept<T>(t, fn, dst));
1059 else
1060 fn.accept(t);
1061 } catch (Throwable rex) {
1062 ex = rex;
1063 }
1064 }
1065 if (e == null || ex != null)
1066 dst.internalComplete(null, ex);
1067 }
1068 }
1069 private static final long serialVersionUID = 5232453952276885070L;
1070 }
1071
1072 static final class OrRunCompletion<T> extends Completion {
1073 final CompletableFuture<? extends T> src;
1074 final CompletableFuture<?> snd;
1075 final Runnable fn;
1076 final CompletableFuture<Void> dst;
1077 final Executor executor;
1078 OrRunCompletion(CompletableFuture<? extends T> src,
1079 CompletableFuture<?> snd,
1080 Runnable fn,
1081 CompletableFuture<Void> dst, Executor executor) {
1082 this.src = src; this.snd = snd;
1083 this.fn = fn; this.dst = dst;
1084 this.executor = executor;
1085 }
1086 public final void run() {
1087 final CompletableFuture<? extends T> a;
1088 final CompletableFuture<?> b;
1089 final Runnable fn;
1090 final CompletableFuture<Void> dst;
1091 Object r; Throwable ex;
1092 if ((dst = this.dst) != null &&
1093 (fn = this.fn) != null &&
1094 (((a = this.src) != null && (r = a.result) != null) ||
1095 ((b = this.snd) != null && (r = b.result) != null)) &&
1096 compareAndSet(0, 1)) {
1097 if (r instanceof AltResult)
1098 ex = ((AltResult)r).ex;
1099 else
1100 ex = null;
1101 Executor e = executor;
1102 if (ex == null) {
1103 try {
1104 if (e != null)
1105 e.execute(new AsyncRun(fn, dst));
1106 else
1107 fn.run();
1108 } catch (Throwable rex) {
1109 ex = rex;
1110 }
1111 }
1112 if (e == null || ex != null)
1113 dst.internalComplete(null, ex);
1114 }
1115 }
1116 private static final long serialVersionUID = 5232453952276885070L;
1117 }
1118
1119 static final class OrCompletion extends Completion {
1120 final CompletableFuture<?> src;
1121 final CompletableFuture<?> snd;
1122 final CompletableFuture<?> dst;
1123 OrCompletion(CompletableFuture<?> src,
1124 CompletableFuture<?> snd,
1125 CompletableFuture<?> dst) {
1126 this.src = src; this.snd = snd; this.dst = dst;
1127 }
1128 public final void run() {
1129 final CompletableFuture<?> a;
1130 final CompletableFuture<?> b;
1131 final CompletableFuture<?> dst;
1132 Object r, t; Throwable ex;
1133 if ((dst = this.dst) != null &&
1134 (((a = this.src) != null && (r = a.result) != null) ||
1135 ((b = this.snd) != null && (r = b.result) != null)) &&
1136 compareAndSet(0, 1)) {
1137 if (r instanceof AltResult) {
1138 ex = ((AltResult)r).ex;
1139 t = null;
1140 }
1141 else {
1142 ex = null;
1143 t = r;
1144 }
1145 dst.internalComplete(t, ex);
1146 }
1147 }
1148 private static final long serialVersionUID = 5232453952276885070L;
1149 }
1150
1151 static final class ExceptionCompletion<T> extends Completion {
1152 final CompletableFuture<? extends T> src;
1153 final Function<? super Throwable, ? extends T> fn;
1154 final CompletableFuture<T> dst;
1155 ExceptionCompletion(CompletableFuture<? extends T> src,
1156 Function<? super Throwable, ? extends T> fn,
1157 CompletableFuture<T> dst) {
1158 this.src = src; this.fn = fn; this.dst = dst;
1159 }
1160 public final void run() {
1161 final CompletableFuture<? extends T> a;
1162 final Function<? super Throwable, ? extends T> fn;
1163 final CompletableFuture<T> dst;
1164 Object r; T t = null; Throwable ex, dx = null;
1165 if ((dst = this.dst) != null &&
1166 (fn = this.fn) != null &&
1167 (a = this.src) != null &&
1168 (r = a.result) != null &&
1169 compareAndSet(0, 1)) {
1170 if ((r instanceof AltResult) &&
1171 (ex = ((AltResult)r).ex) != null) {
1172 try {
1173 t = fn.apply(ex);
1174 } catch (Throwable rex) {
1175 dx = rex;
1176 }
1177 }
1178 else {
1179 @SuppressWarnings("unchecked") T tr = (T) r;
1180 t = tr;
1181 }
1182 dst.internalComplete(t, dx);
1183 }
1184 }
1185 private static final long serialVersionUID = 5232453952276885070L;
1186 }
1187
1188 static final class ThenCopy extends Completion {
1189 final CompletableFuture<?> src;
1190 final CompletableFuture<?> dst;
1191 ThenCopy(CompletableFuture<?> src,
1192 CompletableFuture<?> dst) {
1193 this.src = src; this.dst = dst;
1194 }
1195 public final void run() {
1196 final CompletableFuture<?> a;
1197 final CompletableFuture<?> dst;
1198 Object r; Object t; Throwable ex;
1199 if ((dst = this.dst) != null &&
1200 (a = this.src) != null &&
1201 (r = a.result) != null &&
1202 compareAndSet(0, 1)) {
1203 if (r instanceof AltResult) {
1204 ex = ((AltResult)r).ex;
1205 t = null;
1206 }
1207 else {
1208 ex = null;
1209 t = r;
1210 }
1211 dst.internalComplete(t, ex);
1212 }
1213 }
1214 private static final long serialVersionUID = 5232453952276885070L;
1215 }
1216
1217 static final class HandleCompletion<T,U> extends Completion {
1218 final CompletableFuture<? extends T> src;
1219 final BiFunction<? super T, Throwable, ? extends U> fn;
1220 final CompletableFuture<U> dst;
1221 HandleCompletion(CompletableFuture<? extends T> src,
1222 BiFunction<? super T, Throwable, ? extends U> fn,
1223 CompletableFuture<U> dst) {
1224 this.src = src; this.fn = fn; this.dst = dst;
1225 }
1226 public final void run() {
1227 final CompletableFuture<? extends T> a;
1228 final BiFunction<? super T, Throwable, ? extends U> fn;
1229 final CompletableFuture<U> dst;
1230 Object r; T t; Throwable ex;
1231 if ((dst = this.dst) != null &&
1232 (fn = this.fn) != null &&
1233 (a = this.src) != null &&
1234 (r = a.result) != null &&
1235 compareAndSet(0, 1)) {
1236 if (r instanceof AltResult) {
1237 ex = ((AltResult)r).ex;
1238 t = null;
1239 }
1240 else {
1241 ex = null;
1242 @SuppressWarnings("unchecked") T tr = (T) r;
1243 t = tr;
1244 }
1245 U u = null; Throwable dx = null;
1246 try {
1247 u = fn.apply(t, ex);
1248 } catch (Throwable rex) {
1249 dx = rex;
1250 }
1251 dst.internalComplete(u, dx);
1252 }
1253 }
1254 private static final long serialVersionUID = 5232453952276885070L;
1255 }
1256
1257 static final class ComposeCompletion<T,U> extends Completion {
1258 final CompletableFuture<? extends T> src;
1259 final Function<? super T, CompletableFuture<U>> fn;
1260 final CompletableFuture<U> dst;
1261 final Executor executor;
1262 ComposeCompletion(CompletableFuture<? extends T> src,
1263 Function<? super T, CompletableFuture<U>> fn,
1264 CompletableFuture<U> dst, Executor executor) {
1265 this.src = src; this.fn = fn; this.dst = dst;
1266 this.executor = executor;
1267 }
1268 public final void run() {
1269 final CompletableFuture<? extends T> a;
1270 final Function<? super T, CompletableFuture<U>> fn;
1271 final CompletableFuture<U> dst;
1272 Object r; T t; Throwable ex; Executor e;
1273 if ((dst = this.dst) != null &&
1274 (fn = this.fn) != null &&
1275 (a = this.src) != null &&
1276 (r = a.result) != null &&
1277 compareAndSet(0, 1)) {
1278 if (r instanceof AltResult) {
1279 ex = ((AltResult)r).ex;
1280 t = null;
1281 }
1282 else {
1283 ex = null;
1284 @SuppressWarnings("unchecked") T tr = (T) r;
1285 t = tr;
1286 }
1287 CompletableFuture<U> c = null;
1288 U u = null;
1289 boolean complete = false;
1290 if (ex == null) {
1291 if ((e = executor) != null)
1292 e.execute(new AsyncCompose<T,U>(t, fn, dst));
1293 else {
1294 try {
1295 if ((c = fn.apply(t)) == null)
1296 ex = new NullPointerException();
1297 } catch (Throwable rex) {
1298 ex = rex;
1299 }
1300 }
1301 }
1302 if (c != null) {
1303 ThenCopy d = null;
1304 Object s;
1305 if ((s = c.result) == null) {
1306 CompletionNode p = new CompletionNode
1307 (d = new ThenCopy(c, dst));
1308 while ((s = c.result) == null) {
1309 if (UNSAFE.compareAndSwapObject
1310 (c, COMPLETIONS, p.next = c.completions, p))
1311 break;
1312 }
1313 }
1314 if (s != null && (d == null || d.compareAndSet(0, 1))) {
1315 complete = true;
1316 if (s instanceof AltResult) {
1317 ex = ((AltResult)s).ex; // no rewrap
1318 u = null;
1319 }
1320 else {
1321 @SuppressWarnings("unchecked") U us = (U) s;
1322 u = us;
1323 }
1324 }
1325 }
1326 if (complete || ex != null)
1327 dst.internalComplete(u, ex);
1328 if (c != null)
1329 c.helpPostComplete();
1330 }
1331 }
1332 private static final long serialVersionUID = 5232453952276885070L;
1333 }
1334
1335 // public methods
1336
1337 /**
1338 * Creates a new incomplete CompletableFuture.
1339 */
1340 public CompletableFuture() {
1341 }
1342
1343 /**
1344 * Returns a new CompletableFuture that is asynchronously completed
1345 * by a task running in the {@link ForkJoinPool#commonPool()} with
1346 * the value obtained by calling the given Supplier.
1347 *
1348 * @param supplier a function returning the value to be used
1349 * to complete the returned CompletableFuture
1350 * @return the new CompletableFuture
1351 */
1352 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
1353 if (supplier == null) throw new NullPointerException();
1354 CompletableFuture<U> f = new CompletableFuture<U>();
1355 ForkJoinPool.commonPool().
1356 execute((ForkJoinTask<?>)new AsyncSupply<U>(supplier, f));
1357 return f;
1358 }
1359
1360 /**
1361 * Returns a new CompletableFuture that is asynchronously completed
1362 * by a task running in the given executor with the value obtained
1363 * by calling the given Supplier.
1364 *
1365 * @param supplier a function returning the value to be used
1366 * to complete the returned CompletableFuture
1367 * @param executor the executor to use for asynchronous execution
1368 * @return the new CompletableFuture
1369 */
1370 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
1371 Executor executor) {
1372 if (executor == null || supplier == null)
1373 throw new NullPointerException();
1374 CompletableFuture<U> f = new CompletableFuture<U>();
1375 executor.execute(new AsyncSupply<U>(supplier, f));
1376 return f;
1377 }
1378
1379 /**
1380 * Returns a new CompletableFuture that is asynchronously completed
1381 * by a task running in the {@link ForkJoinPool#commonPool()} after
1382 * it runs the given action.
1383 *
1384 * @param runnable the action to run before completing the
1385 * returned CompletableFuture
1386 * @return the new CompletableFuture
1387 */
1388 public static CompletableFuture<Void> runAsync(Runnable runnable) {
1389 if (runnable == null) throw new NullPointerException();
1390 CompletableFuture<Void> f = new CompletableFuture<Void>();
1391 ForkJoinPool.commonPool().
1392 execute((ForkJoinTask<?>)new AsyncRun(runnable, f));
1393 return f;
1394 }
1395
1396 /**
1397 * Returns a new CompletableFuture that is asynchronously completed
1398 * by a task running in the given executor after it runs the given
1399 * action.
1400 *
1401 * @param runnable the action to run before completing the
1402 * returned CompletableFuture
1403 * @param executor the executor to use for asynchronous execution
1404 * @return the new CompletableFuture
1405 */
1406 public static CompletableFuture<Void> runAsync(Runnable runnable,
1407 Executor executor) {
1408 if (executor == null || runnable == null)
1409 throw new NullPointerException();
1410 CompletableFuture<Void> f = new CompletableFuture<Void>();
1411 executor.execute(new AsyncRun(runnable, f));
1412 return f;
1413 }
1414
1415 /**
1416 * Returns {@code true} if completed in any fashion: normally,
1417 * exceptionally, or via cancellation.
1418 *
1419 * @return {@code true} if completed
1420 */
1421 public boolean isDone() {
1422 return result != null;
1423 }
1424
1425 /**
1426 * Waits if necessary for this future to complete, and then
1427 * returns its result.
1428 *
1429 * @return the result value
1430 * @throws CancellationException if this future was cancelled
1431 * @throws ExecutionException if this future completed exceptionally
1432 * @throws InterruptedException if the current thread was interrupted
1433 * while waiting
1434 */
1435 public T get() throws InterruptedException, ExecutionException {
1436 Object r; Throwable ex, cause;
1437 if ((r = result) == null && (r = waitingGet(true)) == null)
1438 throw new InterruptedException();
1439 if (!(r instanceof AltResult)) {
1440 @SuppressWarnings("unchecked") T tr = (T) r;
1441 return tr;
1442 }
1443 if ((ex = ((AltResult)r).ex) == null)
1444 return null;
1445 if (ex instanceof CancellationException)
1446 throw (CancellationException)ex;
1447 if ((ex instanceof CompletionException) &&
1448 (cause = ex.getCause()) != null)
1449 ex = cause;
1450 throw new ExecutionException(ex);
1451 }
1452
1453 /**
1454 * Waits if necessary for at most the given time for this future
1455 * to complete, and then returns its result, if available.
1456 *
1457 * @param timeout the maximum time to wait
1458 * @param unit the time unit of the timeout argument
1459 * @return the result value
1460 * @throws CancellationException if this future was cancelled
1461 * @throws ExecutionException if this future completed exceptionally
1462 * @throws InterruptedException if the current thread was interrupted
1463 * while waiting
1464 * @throws TimeoutException if the wait timed out
1465 */
1466 public T get(long timeout, TimeUnit unit)
1467 throws InterruptedException, ExecutionException, TimeoutException {
1468 Object r; Throwable ex, cause;
1469 long nanos = unit.toNanos(timeout);
1470 if (Thread.interrupted())
1471 throw new InterruptedException();
1472 if ((r = result) == null)
1473 r = timedAwaitDone(nanos);
1474 if (!(r instanceof AltResult)) {
1475 @SuppressWarnings("unchecked") T tr = (T) r;
1476 return tr;
1477 }
1478 if ((ex = ((AltResult)r).ex) == null)
1479 return null;
1480 if (ex instanceof CancellationException)
1481 throw (CancellationException)ex;
1482 if ((ex instanceof CompletionException) &&
1483 (cause = ex.getCause()) != null)
1484 ex = cause;
1485 throw new ExecutionException(ex);
1486 }
1487
1488 /**
1489 * Returns the result value when complete, or throws an
1490 * (unchecked) exception if completed exceptionally. To better
1491 * conform with the use of common functional forms, if a
1492 * computation involved in the completion of this
1493 * CompletableFuture threw an exception, this method throws an
1494 * (unchecked) {@link CompletionException} with the underlying
1495 * exception as its cause.
1496 *
1497 * @return the result value
1498 * @throws CancellationException if the computation was cancelled
1499 * @throws CompletionException if this future completed
1500 * exceptionally or a completion computation threw an exception
1501 */
1502 public T join() {
1503 Object r; Throwable ex;
1504 if ((r = result) == null)
1505 r = waitingGet(false);
1506 if (!(r instanceof AltResult)) {
1507 @SuppressWarnings("unchecked") T tr = (T) r;
1508 return tr;
1509 }
1510 if ((ex = ((AltResult)r).ex) == null)
1511 return null;
1512 if (ex instanceof CancellationException)
1513 throw (CancellationException)ex;
1514 if (ex instanceof CompletionException)
1515 throw (CompletionException)ex;
1516 throw new CompletionException(ex);
1517 }
1518
1519 /**
1520 * Returns the result value (or throws any encountered exception)
1521 * if completed, else returns the given valueIfAbsent.
1522 *
1523 * @param valueIfAbsent the value to return if not completed
1524 * @return the result value, if completed, else the given valueIfAbsent
1525 * @throws CancellationException if the computation was cancelled
1526 * @throws CompletionException if this future completed
1527 * exceptionally or a completion computation threw an exception
1528 */
1529 public T getNow(T valueIfAbsent) {
1530 Object r; Throwable ex;
1531 if ((r = result) == null)
1532 return valueIfAbsent;
1533 if (!(r instanceof AltResult)) {
1534 @SuppressWarnings("unchecked") T tr = (T) r;
1535 return tr;
1536 }
1537 if ((ex = ((AltResult)r).ex) == null)
1538 return null;
1539 if (ex instanceof CancellationException)
1540 throw (CancellationException)ex;
1541 if (ex instanceof CompletionException)
1542 throw (CompletionException)ex;
1543 throw new CompletionException(ex);
1544 }
1545
1546 /**
1547 * If not already completed, sets the value returned by {@link
1548 * #get()} and related methods to the given value.
1549 *
1550 * @param value the result value
1551 * @return {@code true} if this invocation caused this CompletableFuture
1552 * to transition to a completed state, else {@code false}
1553 */
1554 public boolean complete(T value) {
1555 boolean triggered = result == null &&
1556 UNSAFE.compareAndSwapObject(this, RESULT, null,
1557 value == null ? NIL : value);
1558 postComplete();
1559 return triggered;
1560 }
1561
1562 /**
1563 * If not already completed, causes invocations of {@link #get()}
1564 * and related methods to throw the given exception.
1565 *
1566 * @param ex the exception
1567 * @return {@code true} if this invocation caused this CompletableFuture
1568 * to transition to a completed state, else {@code false}
1569 */
1570 public boolean completeExceptionally(Throwable ex) {
1571 if (ex == null) throw new NullPointerException();
1572 boolean triggered = result == null &&
1573 UNSAFE.compareAndSwapObject(this, RESULT, null, new AltResult(ex));
1574 postComplete();
1575 return triggered;
1576 }
1577
1578 /**
1579 * Returns a new CompletableFuture that is completed
1580 * when this CompletableFuture completes, with the result of the
1581 * given function of this CompletableFuture's result.
1582 *
1583 * <p>If this CompletableFuture completes exceptionally, or the
1584 * supplied function throws an exception, then the returned
1585 * CompletableFuture completes exceptionally with a
1586 * CompletionException holding the exception as its cause.
1587 *
1588 * @param fn the function to use to compute the value of
1589 * the returned CompletableFuture
1590 * @return the new CompletableFuture
1591 */
1592 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {
1593 return doThenApply(fn, null);
1594 }
1595
1596 /**
1597 * Returns a new CompletableFuture that is asynchronously completed
1598 * when this CompletableFuture completes, with the result of the
1599 * given function of this CompletableFuture's result from a
1600 * task running in the {@link ForkJoinPool#commonPool()}.
1601 *
1602 * <p>If this CompletableFuture completes exceptionally, or the
1603 * supplied function throws an exception, then the returned
1604 * CompletableFuture completes exceptionally with a
1605 * CompletionException holding the exception as its cause.
1606 *
1607 * @param fn the function to use to compute the value of
1608 * the returned CompletableFuture
1609 * @return the new CompletableFuture
1610 */
1611 public <U> CompletableFuture<U> thenApplyAsync
1612 (Function<? super T,? extends U> fn) {
1613 return doThenApply(fn, ForkJoinPool.commonPool());
1614 }
1615
1616 /**
1617 * Returns a new CompletableFuture that is asynchronously completed
1618 * when this CompletableFuture completes, with the result of the
1619 * given function of this CompletableFuture's result from a
1620 * task running in the given executor.
1621 *
1622 * <p>If this CompletableFuture completes exceptionally, or the
1623 * supplied function throws an exception, then the returned
1624 * CompletableFuture completes exceptionally with a
1625 * CompletionException holding the exception as its cause.
1626 *
1627 * @param fn the function to use to compute the value of
1628 * the returned CompletableFuture
1629 * @param executor the executor to use for asynchronous execution
1630 * @return the new CompletableFuture
1631 */
1632 public <U> CompletableFuture<U> thenApplyAsync
1633 (Function<? super T,? extends U> fn,
1634 Executor executor) {
1635 if (executor == null) throw new NullPointerException();
1636 return doThenApply(fn, executor);
1637 }
1638
1639 private <U> CompletableFuture<U> doThenApply
1640 (Function<? super T,? extends U> fn,
1641 Executor e) {
1642 if (fn == null) throw new NullPointerException();
1643 CompletableFuture<U> dst = new CompletableFuture<U>();
1644 ApplyCompletion<T,U> d = null;
1645 Object r;
1646 if ((r = result) == null) {
1647 CompletionNode p = new CompletionNode
1648 (d = new ApplyCompletion<T,U>(this, fn, dst, e));
1649 while ((r = result) == null) {
1650 if (UNSAFE.compareAndSwapObject
1651 (this, COMPLETIONS, p.next = completions, p))
1652 break;
1653 }
1654 }
1655 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1656 T t; Throwable ex;
1657 if (r instanceof AltResult) {
1658 ex = ((AltResult)r).ex;
1659 t = null;
1660 }
1661 else {
1662 ex = null;
1663 @SuppressWarnings("unchecked") T tr = (T) r;
1664 t = tr;
1665 }
1666 U u = null;
1667 if (ex == null) {
1668 try {
1669 if (e != null)
1670 e.execute(new AsyncApply<T,U>(t, fn, dst));
1671 else
1672 u = fn.apply(t);
1673 } catch (Throwable rex) {
1674 ex = rex;
1675 }
1676 }
1677 if (e == null || ex != null)
1678 dst.internalComplete(u, ex);
1679 }
1680 helpPostComplete();
1681 return dst;
1682 }
1683
1684 /**
1685 * Returns a new CompletableFuture that is completed
1686 * when this CompletableFuture completes, after performing the given
1687 * action with this CompletableFuture's result.
1688 *
1689 * <p>If this CompletableFuture completes exceptionally, or the
1690 * supplied action throws an exception, then the returned
1691 * CompletableFuture completes exceptionally with a
1692 * CompletionException holding the exception as its cause.
1693 *
1694 * @param block the action to perform before completing the
1695 * returned CompletableFuture
1696 * @return the new CompletableFuture
1697 */
1698 public CompletableFuture<Void> thenAccept(Consumer<? super T> block) {
1699 return doThenAccept(block, null);
1700 }
1701
1702 /**
1703 * Returns a new CompletableFuture that is asynchronously completed
1704 * when this CompletableFuture completes, after performing the given
1705 * action with this CompletableFuture's result from a task running
1706 * in the {@link ForkJoinPool#commonPool()}.
1707 *
1708 * <p>If this CompletableFuture completes exceptionally, or the
1709 * supplied action throws an exception, then the returned
1710 * CompletableFuture completes exceptionally with a
1711 * CompletionException holding the exception as its cause.
1712 *
1713 * @param block the action to perform before completing the
1714 * returned CompletableFuture
1715 * @return the new CompletableFuture
1716 */
1717 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block) {
1718 return doThenAccept(block, ForkJoinPool.commonPool());
1719 }
1720
1721 /**
1722 * Returns a new CompletableFuture that is asynchronously completed
1723 * when this CompletableFuture completes, after performing the given
1724 * action with this CompletableFuture's result from a task running
1725 * in the given executor.
1726 *
1727 * <p>If this CompletableFuture completes exceptionally, or the
1728 * supplied action throws an exception, then the returned
1729 * CompletableFuture completes exceptionally with a
1730 * CompletionException holding the exception as its cause.
1731 *
1732 * @param block the action to perform before completing the
1733 * returned CompletableFuture
1734 * @param executor the executor to use for asynchronous execution
1735 * @return the new CompletableFuture
1736 */
1737 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> block,
1738 Executor executor) {
1739 if (executor == null) throw new NullPointerException();
1740 return doThenAccept(block, executor);
1741 }
1742
1743 private CompletableFuture<Void> doThenAccept(Consumer<? super T> fn,
1744 Executor e) {
1745 if (fn == null) throw new NullPointerException();
1746 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1747 AcceptCompletion<T> d = null;
1748 Object r;
1749 if ((r = result) == null) {
1750 CompletionNode p = new CompletionNode
1751 (d = new AcceptCompletion<T>(this, fn, dst, e));
1752 while ((r = result) == null) {
1753 if (UNSAFE.compareAndSwapObject
1754 (this, COMPLETIONS, p.next = completions, p))
1755 break;
1756 }
1757 }
1758 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1759 T t; Throwable ex;
1760 if (r instanceof AltResult) {
1761 ex = ((AltResult)r).ex;
1762 t = null;
1763 }
1764 else {
1765 ex = null;
1766 @SuppressWarnings("unchecked") T tr = (T) r;
1767 t = tr;
1768 }
1769 if (ex == null) {
1770 try {
1771 if (e != null)
1772 e.execute(new AsyncAccept<T>(t, fn, dst));
1773 else
1774 fn.accept(t);
1775 } catch (Throwable rex) {
1776 ex = rex;
1777 }
1778 }
1779 if (e == null || ex != null)
1780 dst.internalComplete(null, ex);
1781 }
1782 helpPostComplete();
1783 return dst;
1784 }
1785
1786 /**
1787 * Returns a new CompletableFuture that is completed
1788 * when this CompletableFuture completes, after performing the given
1789 * action.
1790 *
1791 * <p>If this CompletableFuture completes exceptionally, or the
1792 * supplied action throws an exception, then the returned
1793 * CompletableFuture completes exceptionally with a
1794 * CompletionException holding the exception as its cause.
1795 *
1796 * @param action the action to perform before completing the
1797 * returned CompletableFuture
1798 * @return the new CompletableFuture
1799 */
1800 public CompletableFuture<Void> thenRun(Runnable action) {
1801 return doThenRun(action, null);
1802 }
1803
1804 /**
1805 * Returns a new CompletableFuture that is asynchronously completed
1806 * when this CompletableFuture completes, after performing the given
1807 * action from a task running in the {@link ForkJoinPool#commonPool()}.
1808 *
1809 * <p>If this CompletableFuture completes exceptionally, or the
1810 * supplied action throws an exception, then the returned
1811 * CompletableFuture completes exceptionally with a
1812 * CompletionException holding the exception as its cause.
1813 *
1814 * @param action the action to perform before completing the
1815 * returned CompletableFuture
1816 * @return the new CompletableFuture
1817 */
1818 public CompletableFuture<Void> thenRunAsync(Runnable action) {
1819 return doThenRun(action, ForkJoinPool.commonPool());
1820 }
1821
1822 /**
1823 * Returns a new CompletableFuture that is asynchronously completed
1824 * when this CompletableFuture completes, after performing the given
1825 * action from a task running in the given executor.
1826 *
1827 * <p>If this CompletableFuture completes exceptionally, or the
1828 * supplied action throws an exception, then the returned
1829 * CompletableFuture completes exceptionally with a
1830 * CompletionException holding the exception as its cause.
1831 *
1832 * @param action the action to perform before completing the
1833 * returned CompletableFuture
1834 * @param executor the executor to use for asynchronous execution
1835 * @return the new CompletableFuture
1836 */
1837 public CompletableFuture<Void> thenRunAsync(Runnable action,
1838 Executor executor) {
1839 if (executor == null) throw new NullPointerException();
1840 return doThenRun(action, executor);
1841 }
1842
1843 private CompletableFuture<Void> doThenRun(Runnable action,
1844 Executor e) {
1845 if (action == null) throw new NullPointerException();
1846 CompletableFuture<Void> dst = new CompletableFuture<Void>();
1847 RunCompletion<T> d = null;
1848 Object r;
1849 if ((r = result) == null) {
1850 CompletionNode p = new CompletionNode
1851 (d = new RunCompletion<T>(this, action, dst, e));
1852 while ((r = result) == null) {
1853 if (UNSAFE.compareAndSwapObject
1854 (this, COMPLETIONS, p.next = completions, p))
1855 break;
1856 }
1857 }
1858 if (r != null && (d == null || d.compareAndSet(0, 1))) {
1859 Throwable ex;
1860 if (r instanceof AltResult)
1861 ex = ((AltResult)r).ex;
1862 else
1863 ex = null;
1864 if (ex == null) {
1865 try {
1866 if (e != null)
1867 e.execute(new AsyncRun(action, dst));
1868 else
1869 action.run();
1870 } catch (Throwable rex) {
1871 ex = rex;
1872 }
1873 }
1874 if (e == null || ex != null)
1875 dst.internalComplete(null, ex);
1876 }
1877 helpPostComplete();
1878 return dst;
1879 }
1880
1881 /**
1882 * Returns a new CompletableFuture that is completed
1883 * when both this and the other given CompletableFuture complete,
1884 * with the result of the given function of the results of the two
1885 * CompletableFutures.
1886 *
1887 * <p>If this and/or the other CompletableFuture complete
1888 * exceptionally, or the supplied function throws an exception,
1889 * then the returned CompletableFuture completes exceptionally
1890 * with a CompletionException holding the exception as its cause.
1891 *
1892 * @param other the other CompletableFuture
1893 * @param fn the function to use to compute the value of
1894 * the returned CompletableFuture
1895 * @return the new CompletableFuture
1896 */
1897 public <U,V> CompletableFuture<V> thenCombine
1898 (CompletableFuture<? extends U> other,
1899 BiFunction<? super T,? super U,? extends V> fn) {
1900 return doThenBiApply(other, fn, null);
1901 }
1902
1903 /**
1904 * Returns a new CompletableFuture that is asynchronously completed
1905 * when both this and the other given CompletableFuture complete,
1906 * with the result of the given function of the results of the two
1907 * CompletableFutures from a task running in the
1908 * {@link ForkJoinPool#commonPool()}.
1909 *
1910 * <p>If this and/or the other CompletableFuture complete
1911 * exceptionally, or the supplied function throws an exception,
1912 * then the returned CompletableFuture completes exceptionally
1913 * with a CompletionException holding the exception as its cause.
1914 *
1915 * @param other the other CompletableFuture
1916 * @param fn the function to use to compute the value of
1917 * the returned CompletableFuture
1918 * @return the new CompletableFuture
1919 */
1920 public <U,V> CompletableFuture<V> thenCombineAsync
1921 (CompletableFuture<? extends U> other,
1922 BiFunction<? super T,? super U,? extends V> fn) {
1923 return doThenBiApply(other, fn, ForkJoinPool.commonPool());
1924 }
1925
1926 /**
1927 * Returns a new CompletableFuture that is asynchronously completed
1928 * when both this and the other given CompletableFuture complete,
1929 * with the result of the given function of the results of the two
1930 * CompletableFutures from a task running in the given executor.
1931 *
1932 * <p>If this and/or the other CompletableFuture complete
1933 * exceptionally, or the supplied function throws an exception,
1934 * then the returned CompletableFuture completes exceptionally
1935 * with a CompletionException holding the exception as its cause.
1936 *
1937 * @param other the other CompletableFuture
1938 * @param fn the function to use to compute the value of
1939 * the returned CompletableFuture
1940 * @param executor the executor to use for asynchronous execution
1941 * @return the new CompletableFuture
1942 */
1943 public <U,V> CompletableFuture<V> thenCombineAsync
1944 (CompletableFuture<? extends U> other,
1945 BiFunction<? super T,? super U,? extends V> fn,
1946 Executor executor) {
1947 if (executor == null) throw new NullPointerException();
1948 return doThenBiApply(other, fn, executor);
1949 }
1950
1951 private <U,V> CompletableFuture<V> doThenBiApply
1952 (CompletableFuture<? extends U> other,
1953 BiFunction<? super T,? super U,? extends V> fn,
1954 Executor e) {
1955 if (other == null || fn == null) throw new NullPointerException();
1956 CompletableFuture<V> dst = new CompletableFuture<V>();
1957 BiApplyCompletion<T,U,V> d = null;
1958 Object r, s = null;
1959 if ((r = result) == null || (s = other.result) == null) {
1960 d = new BiApplyCompletion<T,U,V>(this, other, fn, dst, e);
1961 CompletionNode q = null, p = new CompletionNode(d);
1962 while ((r == null && (r = result) == null) ||
1963 (s == null && (s = other.result) == null)) {
1964 if (q != null) {
1965 if (s != null ||
1966 UNSAFE.compareAndSwapObject
1967 (other, COMPLETIONS, q.next = other.completions, q))
1968 break;
1969 }
1970 else if (r != null ||
1971 UNSAFE.compareAndSwapObject
1972 (this, COMPLETIONS, p.next = completions, p)) {
1973 if (s != null)
1974 break;
1975 q = new CompletionNode(d);
1976 }
1977 }
1978 }
1979 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
1980 T t; U u; Throwable ex;
1981 if (r instanceof AltResult) {
1982 ex = ((AltResult)r).ex;
1983 t = null;
1984 }
1985 else {
1986 ex = null;
1987 @SuppressWarnings("unchecked") T tr = (T) r;
1988 t = tr;
1989 }
1990 if (ex != null)
1991 u = null;
1992 else if (s instanceof AltResult) {
1993 ex = ((AltResult)s).ex;
1994 u = null;
1995 }
1996 else {
1997 @SuppressWarnings("unchecked") U us = (U) s;
1998 u = us;
1999 }
2000 V v = null;
2001 if (ex == null) {
2002 try {
2003 if (e != null)
2004 e.execute(new AsyncBiApply<T,U,V>(t, u, fn, dst));
2005 else
2006 v = fn.apply(t, u);
2007 } catch (Throwable rex) {
2008 ex = rex;
2009 }
2010 }
2011 if (e == null || ex != null)
2012 dst.internalComplete(v, ex);
2013 }
2014 helpPostComplete();
2015 other.helpPostComplete();
2016 return dst;
2017 }
2018
2019 /**
2020 * Returns a new CompletableFuture that is completed
2021 * when both this and the other given CompletableFuture complete,
2022 * after performing the given action with the results of the two
2023 * CompletableFutures.
2024 *
2025 * <p>If this and/or the other CompletableFuture complete
2026 * exceptionally, or the supplied action throws an exception,
2027 * then the returned CompletableFuture completes exceptionally
2028 * with a CompletionException holding the exception as its cause.
2029 *
2030 * @param other the other CompletableFuture
2031 * @param block the action to perform before completing the
2032 * returned CompletableFuture
2033 * @return the new CompletableFuture
2034 */
2035 public <U> CompletableFuture<Void> thenAcceptBoth
2036 (CompletableFuture<? extends U> other,
2037 BiConsumer<? super T, ? super U> block) {
2038 return doThenBiAccept(other, block, null);
2039 }
2040
2041 /**
2042 * Returns a new CompletableFuture that is asynchronously completed
2043 * when both this and the other given CompletableFuture complete,
2044 * after performing the given action with the results of the two
2045 * CompletableFutures from a task running in the {@link
2046 * ForkJoinPool#commonPool()}.
2047 *
2048 * <p>If this and/or the other CompletableFuture complete
2049 * exceptionally, or the supplied action throws an exception,
2050 * then the returned CompletableFuture completes exceptionally
2051 * with a CompletionException holding the exception as its cause.
2052 *
2053 * @param other the other CompletableFuture
2054 * @param block the action to perform before completing the
2055 * returned CompletableFuture
2056 * @return the new CompletableFuture
2057 */
2058 public <U> CompletableFuture<Void> thenAcceptBothAsync
2059 (CompletableFuture<? extends U> other,
2060 BiConsumer<? super T, ? super U> block) {
2061 return doThenBiAccept(other, block, ForkJoinPool.commonPool());
2062 }
2063
2064 /**
2065 * Returns a new CompletableFuture that is asynchronously completed
2066 * when both this and the other given CompletableFuture complete,
2067 * after performing the given action with the results of the two
2068 * CompletableFutures from a task running in the given executor.
2069 *
2070 * <p>If this and/or the other CompletableFuture complete
2071 * exceptionally, or the supplied action throws an exception,
2072 * then the returned CompletableFuture completes exceptionally
2073 * with a CompletionException holding the exception as its cause.
2074 *
2075 * @param other the other CompletableFuture
2076 * @param block the action to perform before completing the
2077 * returned CompletableFuture
2078 * @param executor the executor to use for asynchronous execution
2079 * @return the new CompletableFuture
2080 */
2081 public <U> CompletableFuture<Void> thenAcceptBothAsync
2082 (CompletableFuture<? extends U> other,
2083 BiConsumer<? super T, ? super U> block,
2084 Executor executor) {
2085 if (executor == null) throw new NullPointerException();
2086 return doThenBiAccept(other, block, executor);
2087 }
2088
2089 private <U> CompletableFuture<Void> doThenBiAccept
2090 (CompletableFuture<? extends U> other,
2091 BiConsumer<? super T,? super U> fn,
2092 Executor e) {
2093 if (other == null || fn == null) throw new NullPointerException();
2094 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2095 BiAcceptCompletion<T,U> d = null;
2096 Object r, s = null;
2097 if ((r = result) == null || (s = other.result) == null) {
2098 d = new BiAcceptCompletion<T,U>(this, other, fn, dst, e);
2099 CompletionNode q = null, p = new CompletionNode(d);
2100 while ((r == null && (r = result) == null) ||
2101 (s == null && (s = other.result) == null)) {
2102 if (q != null) {
2103 if (s != null ||
2104 UNSAFE.compareAndSwapObject
2105 (other, COMPLETIONS, q.next = other.completions, q))
2106 break;
2107 }
2108 else if (r != null ||
2109 UNSAFE.compareAndSwapObject
2110 (this, COMPLETIONS, p.next = completions, p)) {
2111 if (s != null)
2112 break;
2113 q = new CompletionNode(d);
2114 }
2115 }
2116 }
2117 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2118 T t; U u; Throwable ex;
2119 if (r instanceof AltResult) {
2120 ex = ((AltResult)r).ex;
2121 t = null;
2122 }
2123 else {
2124 ex = null;
2125 @SuppressWarnings("unchecked") T tr = (T) r;
2126 t = tr;
2127 }
2128 if (ex != null)
2129 u = null;
2130 else if (s instanceof AltResult) {
2131 ex = ((AltResult)s).ex;
2132 u = null;
2133 }
2134 else {
2135 @SuppressWarnings("unchecked") U us = (U) s;
2136 u = us;
2137 }
2138 if (ex == null) {
2139 try {
2140 if (e != null)
2141 e.execute(new AsyncBiAccept<T,U>(t, u, fn, dst));
2142 else
2143 fn.accept(t, u);
2144 } catch (Throwable rex) {
2145 ex = rex;
2146 }
2147 }
2148 if (e == null || ex != null)
2149 dst.internalComplete(null, ex);
2150 }
2151 helpPostComplete();
2152 other.helpPostComplete();
2153 return dst;
2154 }
2155
2156 /**
2157 * Returns a new CompletableFuture that is completed
2158 * when both this and the other given CompletableFuture complete,
2159 * after performing the given action.
2160 *
2161 * <p>If this and/or the other CompletableFuture complete
2162 * exceptionally, or the supplied action throws an exception,
2163 * then the returned CompletableFuture completes exceptionally
2164 * with a CompletionException holding the exception as its cause.
2165 *
2166 * @param other the other CompletableFuture
2167 * @param action the action to perform before completing the
2168 * returned CompletableFuture
2169 * @return the new CompletableFuture
2170 */
2171 public CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other,
2172 Runnable action) {
2173 return doThenBiRun(other, action, null);
2174 }
2175
2176 /**
2177 * Returns a new CompletableFuture that is asynchronously completed
2178 * when both this and the other given CompletableFuture complete,
2179 * after performing the given action from a task running in the
2180 * {@link ForkJoinPool#commonPool()}.
2181 *
2182 * <p>If this and/or the other CompletableFuture complete
2183 * exceptionally, or the supplied action throws an exception,
2184 * then the returned CompletableFuture completes exceptionally
2185 * with a CompletionException holding the exception as its cause.
2186 *
2187 * @param other the other CompletableFuture
2188 * @param action the action to perform before completing the
2189 * returned CompletableFuture
2190 * @return the new CompletableFuture
2191 */
2192 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2193 Runnable action) {
2194 return doThenBiRun(other, action, ForkJoinPool.commonPool());
2195 }
2196
2197 /**
2198 * Returns a new CompletableFuture that is asynchronously completed
2199 * when both this and the other given CompletableFuture complete,
2200 * after performing the given action from a task running in the
2201 * given executor.
2202 *
2203 * <p>If this and/or the other CompletableFuture complete
2204 * exceptionally, or the supplied action throws an exception,
2205 * then the returned CompletableFuture completes exceptionally
2206 * with a CompletionException holding the exception as its cause.
2207 *
2208 * @param other the other CompletableFuture
2209 * @param action the action to perform before completing the
2210 * returned CompletableFuture
2211 * @param executor the executor to use for asynchronous execution
2212 * @return the new CompletableFuture
2213 */
2214 public CompletableFuture<Void> runAfterBothAsync(CompletableFuture<?> other,
2215 Runnable action,
2216 Executor executor) {
2217 if (executor == null) throw new NullPointerException();
2218 return doThenBiRun(other, action, executor);
2219 }
2220
2221 private CompletableFuture<Void> doThenBiRun(CompletableFuture<?> other,
2222 Runnable action,
2223 Executor e) {
2224 if (other == null || action == null) throw new NullPointerException();
2225 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2226 BiRunCompletion<T> d = null;
2227 Object r, s = null;
2228 if ((r = result) == null || (s = other.result) == null) {
2229 d = new BiRunCompletion<T>(this, other, action, dst, e);
2230 CompletionNode q = null, p = new CompletionNode(d);
2231 while ((r == null && (r = result) == null) ||
2232 (s == null && (s = other.result) == null)) {
2233 if (q != null) {
2234 if (s != null ||
2235 UNSAFE.compareAndSwapObject
2236 (other, COMPLETIONS, q.next = other.completions, q))
2237 break;
2238 }
2239 else if (r != null ||
2240 UNSAFE.compareAndSwapObject
2241 (this, COMPLETIONS, p.next = completions, p)) {
2242 if (s != null)
2243 break;
2244 q = new CompletionNode(d);
2245 }
2246 }
2247 }
2248 if (r != null && s != null && (d == null || d.compareAndSet(0, 1))) {
2249 Throwable ex;
2250 if (r instanceof AltResult)
2251 ex = ((AltResult)r).ex;
2252 else
2253 ex = null;
2254 if (ex == null && (s instanceof AltResult))
2255 ex = ((AltResult)s).ex;
2256 if (ex == null) {
2257 try {
2258 if (e != null)
2259 e.execute(new AsyncRun(action, dst));
2260 else
2261 action.run();
2262 } catch (Throwable rex) {
2263 ex = rex;
2264 }
2265 }
2266 if (e == null || ex != null)
2267 dst.internalComplete(null, ex);
2268 }
2269 helpPostComplete();
2270 other.helpPostComplete();
2271 return dst;
2272 }
2273
2274 /**
2275 * Returns a new CompletableFuture that is completed
2276 * when either this or the other given CompletableFuture completes,
2277 * with the result of the given function of either this or the other
2278 * CompletableFuture's result.
2279 *
2280 * <p>If this and/or the other CompletableFuture complete
2281 * exceptionally, then the returned CompletableFuture may also do so,
2282 * with a CompletionException holding one of these exceptions as its
2283 * cause. No guarantees are made about which result or exception is
2284 * used in the returned CompletableFuture. If the supplied function
2285 * throws an exception, then the returned CompletableFuture completes
2286 * exceptionally with a CompletionException holding the exception as
2287 * its cause.
2288 *
2289 * @param other the other CompletableFuture
2290 * @param fn the function to use to compute the value of
2291 * the returned CompletableFuture
2292 * @return the new CompletableFuture
2293 */
2294 public <U> CompletableFuture<U> applyToEither
2295 (CompletableFuture<? extends T> other,
2296 Function<? super T, U> fn) {
2297 return doOrApply(other, fn, null);
2298 }
2299
2300 /**
2301 * Returns a new CompletableFuture that is asynchronously completed
2302 * when either this or the other given CompletableFuture completes,
2303 * with the result of the given function of either this or the other
2304 * CompletableFuture's result from a task running in the
2305 * {@link ForkJoinPool#commonPool()}.
2306 *
2307 * <p>If this and/or the other CompletableFuture complete
2308 * exceptionally, then the returned CompletableFuture may also do so,
2309 * with a CompletionException holding one of these exceptions as its
2310 * cause. No guarantees are made about which result or exception is
2311 * used in the returned CompletableFuture. If the supplied function
2312 * throws an exception, then the returned CompletableFuture completes
2313 * exceptionally with a CompletionException holding the exception as
2314 * its cause.
2315 *
2316 * @param other the other CompletableFuture
2317 * @param fn the function to use to compute the value of
2318 * the returned CompletableFuture
2319 * @return the new CompletableFuture
2320 */
2321 public <U> CompletableFuture<U> applyToEitherAsync
2322 (CompletableFuture<? extends T> other,
2323 Function<? super T, U> fn) {
2324 return doOrApply(other, fn, ForkJoinPool.commonPool());
2325 }
2326
2327 /**
2328 * Returns a new CompletableFuture that is asynchronously completed
2329 * when either this or the other given CompletableFuture completes,
2330 * with the result of the given function of either this or the other
2331 * CompletableFuture's result from a task running in the
2332 * given executor.
2333 *
2334 * <p>If this and/or the other CompletableFuture complete
2335 * exceptionally, then the returned CompletableFuture may also do so,
2336 * with a CompletionException holding one of these exceptions as its
2337 * cause. No guarantees are made about which result or exception is
2338 * used in the returned CompletableFuture. If the supplied function
2339 * throws an exception, then the returned CompletableFuture completes
2340 * exceptionally with a CompletionException holding the exception as
2341 * its cause.
2342 *
2343 * @param other the other CompletableFuture
2344 * @param fn the function to use to compute the value of
2345 * the returned CompletableFuture
2346 * @param executor the executor to use for asynchronous execution
2347 * @return the new CompletableFuture
2348 */
2349 public <U> CompletableFuture<U> applyToEitherAsync
2350 (CompletableFuture<? extends T> other,
2351 Function<? super T, U> fn,
2352 Executor executor) {
2353 if (executor == null) throw new NullPointerException();
2354 return doOrApply(other, fn, executor);
2355 }
2356
2357 private <U> CompletableFuture<U> doOrApply
2358 (CompletableFuture<? extends T> other,
2359 Function<? super T, U> fn,
2360 Executor e) {
2361 if (other == null || fn == null) throw new NullPointerException();
2362 CompletableFuture<U> dst = new CompletableFuture<U>();
2363 OrApplyCompletion<T,U> d = null;
2364 Object r;
2365 if ((r = result) == null && (r = other.result) == null) {
2366 d = new OrApplyCompletion<T,U>(this, other, fn, dst, e);
2367 CompletionNode q = null, p = new CompletionNode(d);
2368 while ((r = result) == null && (r = other.result) == null) {
2369 if (q != null) {
2370 if (UNSAFE.compareAndSwapObject
2371 (other, COMPLETIONS, q.next = other.completions, q))
2372 break;
2373 }
2374 else if (UNSAFE.compareAndSwapObject
2375 (this, COMPLETIONS, p.next = completions, p))
2376 q = new CompletionNode(d);
2377 }
2378 }
2379 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2380 T t; Throwable ex;
2381 if (r instanceof AltResult) {
2382 ex = ((AltResult)r).ex;
2383 t = null;
2384 }
2385 else {
2386 ex = null;
2387 @SuppressWarnings("unchecked") T tr = (T) r;
2388 t = tr;
2389 }
2390 U u = null;
2391 if (ex == null) {
2392 try {
2393 if (e != null)
2394 e.execute(new AsyncApply<T,U>(t, fn, dst));
2395 else
2396 u = fn.apply(t);
2397 } catch (Throwable rex) {
2398 ex = rex;
2399 }
2400 }
2401 if (e == null || ex != null)
2402 dst.internalComplete(u, ex);
2403 }
2404 helpPostComplete();
2405 other.helpPostComplete();
2406 return dst;
2407 }
2408
2409 /**
2410 * Returns a new CompletableFuture that is completed
2411 * when either this or the other given CompletableFuture completes,
2412 * after performing the given action with the result of either this
2413 * or the other CompletableFuture's result.
2414 *
2415 * <p>If this and/or the other CompletableFuture complete
2416 * exceptionally, then the returned CompletableFuture may also do so,
2417 * with a CompletionException holding one of these exceptions as its
2418 * cause. No guarantees are made about which result or exception is
2419 * used in the returned CompletableFuture. If the supplied action
2420 * throws an exception, then the returned CompletableFuture completes
2421 * exceptionally with a CompletionException holding the exception as
2422 * its cause.
2423 *
2424 * @param other the other CompletableFuture
2425 * @param block the action to perform before completing the
2426 * returned CompletableFuture
2427 * @return the new CompletableFuture
2428 */
2429 public CompletableFuture<Void> acceptEither
2430 (CompletableFuture<? extends T> other,
2431 Consumer<? super T> block) {
2432 return doOrAccept(other, block, null);
2433 }
2434
2435 /**
2436 * Returns a new CompletableFuture that is asynchronously completed
2437 * when either this or the other given CompletableFuture completes,
2438 * after performing the given action with the result of either this
2439 * or the other CompletableFuture's result from a task running in
2440 * the {@link ForkJoinPool#commonPool()}.
2441 *
2442 * <p>If this and/or the other CompletableFuture complete
2443 * exceptionally, then the returned CompletableFuture may also do so,
2444 * with a CompletionException holding one of these exceptions as its
2445 * cause. No guarantees are made about which result or exception is
2446 * used in the returned CompletableFuture. If the supplied action
2447 * throws an exception, then the returned CompletableFuture completes
2448 * exceptionally with a CompletionException holding the exception as
2449 * its cause.
2450 *
2451 * @param other the other CompletableFuture
2452 * @param block the action to perform before completing the
2453 * returned CompletableFuture
2454 * @return the new CompletableFuture
2455 */
2456 public CompletableFuture<Void> acceptEitherAsync
2457 (CompletableFuture<? extends T> other,
2458 Consumer<? super T> block) {
2459 return doOrAccept(other, block, ForkJoinPool.commonPool());
2460 }
2461
2462 /**
2463 * Returns a new CompletableFuture that is asynchronously completed
2464 * when either this or the other given CompletableFuture completes,
2465 * after performing the given action with the result of either this
2466 * or the other CompletableFuture's result from a task running in
2467 * the given executor.
2468 *
2469 * <p>If this and/or the other CompletableFuture complete
2470 * exceptionally, then the returned CompletableFuture may also do so,
2471 * with a CompletionException holding one of these exceptions as its
2472 * cause. No guarantees are made about which result or exception is
2473 * used in the returned CompletableFuture. If the supplied action
2474 * throws an exception, then the returned CompletableFuture completes
2475 * exceptionally with a CompletionException holding the exception as
2476 * its cause.
2477 *
2478 * @param other the other CompletableFuture
2479 * @param block the action to perform before completing the
2480 * returned CompletableFuture
2481 * @param executor the executor to use for asynchronous execution
2482 * @return the new CompletableFuture
2483 */
2484 public CompletableFuture<Void> acceptEitherAsync
2485 (CompletableFuture<? extends T> other,
2486 Consumer<? super T> block,
2487 Executor executor) {
2488 if (executor == null) throw new NullPointerException();
2489 return doOrAccept(other, block, executor);
2490 }
2491
2492 private CompletableFuture<Void> doOrAccept
2493 (CompletableFuture<? extends T> other,
2494 Consumer<? super T> fn,
2495 Executor e) {
2496 if (other == null || fn == null) throw new NullPointerException();
2497 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2498 OrAcceptCompletion<T> d = null;
2499 Object r;
2500 if ((r = result) == null && (r = other.result) == null) {
2501 d = new OrAcceptCompletion<T>(this, other, fn, dst, e);
2502 CompletionNode q = null, p = new CompletionNode(d);
2503 while ((r = result) == null && (r = other.result) == null) {
2504 if (q != null) {
2505 if (UNSAFE.compareAndSwapObject
2506 (other, COMPLETIONS, q.next = other.completions, q))
2507 break;
2508 }
2509 else if (UNSAFE.compareAndSwapObject
2510 (this, COMPLETIONS, p.next = completions, p))
2511 q = new CompletionNode(d);
2512 }
2513 }
2514 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2515 T t; Throwable ex;
2516 if (r instanceof AltResult) {
2517 ex = ((AltResult)r).ex;
2518 t = null;
2519 }
2520 else {
2521 ex = null;
2522 @SuppressWarnings("unchecked") T tr = (T) r;
2523 t = tr;
2524 }
2525 if (ex == null) {
2526 try {
2527 if (e != null)
2528 e.execute(new AsyncAccept<T>(t, fn, dst));
2529 else
2530 fn.accept(t);
2531 } catch (Throwable rex) {
2532 ex = rex;
2533 }
2534 }
2535 if (e == null || ex != null)
2536 dst.internalComplete(null, ex);
2537 }
2538 helpPostComplete();
2539 other.helpPostComplete();
2540 return dst;
2541 }
2542
2543 /**
2544 * Returns a new CompletableFuture that is completed
2545 * when either this or the other given CompletableFuture completes,
2546 * after performing the given action.
2547 *
2548 * <p>If this and/or the other CompletableFuture complete
2549 * exceptionally, then the returned CompletableFuture may also do so,
2550 * with a CompletionException holding one of these exceptions as its
2551 * cause. No guarantees are made about which result or exception is
2552 * used in the returned CompletableFuture. If the supplied action
2553 * throws an exception, then the returned CompletableFuture completes
2554 * exceptionally with a CompletionException holding the exception as
2555 * its cause.
2556 *
2557 * @param other the other CompletableFuture
2558 * @param action the action to perform before completing the
2559 * returned CompletableFuture
2560 * @return the new CompletableFuture
2561 */
2562 public CompletableFuture<Void> runAfterEither(CompletableFuture<?> other,
2563 Runnable action) {
2564 return doOrRun(other, action, null);
2565 }
2566
2567 /**
2568 * Returns a new CompletableFuture that is asynchronously completed
2569 * when either this or the other given CompletableFuture completes,
2570 * after performing the given action from a task running in the
2571 * {@link ForkJoinPool#commonPool()}.
2572 *
2573 * <p>If this and/or the other CompletableFuture complete
2574 * exceptionally, then the returned CompletableFuture may also do so,
2575 * with a CompletionException holding one of these exceptions as its
2576 * cause. No guarantees are made about which result or exception is
2577 * used in the returned CompletableFuture. If the supplied action
2578 * throws an exception, then the returned CompletableFuture completes
2579 * exceptionally with a CompletionException holding the exception as
2580 * its cause.
2581 *
2582 * @param other the other CompletableFuture
2583 * @param action the action to perform before completing the
2584 * returned CompletableFuture
2585 * @return the new CompletableFuture
2586 */
2587 public CompletableFuture<Void> runAfterEitherAsync
2588 (CompletableFuture<?> other,
2589 Runnable action) {
2590 return doOrRun(other, action, ForkJoinPool.commonPool());
2591 }
2592
2593 /**
2594 * Returns a new CompletableFuture that is asynchronously completed
2595 * when either this or the other given CompletableFuture completes,
2596 * after performing the given action from a task running in the
2597 * given executor.
2598 *
2599 * <p>If this and/or the other CompletableFuture complete
2600 * exceptionally, then the returned CompletableFuture may also do so,
2601 * with a CompletionException holding one of these exceptions as its
2602 * cause. No guarantees are made about which result or exception is
2603 * used in the returned CompletableFuture. If the supplied action
2604 * throws an exception, then the returned CompletableFuture completes
2605 * exceptionally with a CompletionException holding the exception as
2606 * its cause.
2607 *
2608 * @param other the other CompletableFuture
2609 * @param action the action to perform before completing the
2610 * returned CompletableFuture
2611 * @param executor the executor to use for asynchronous execution
2612 * @return the new CompletableFuture
2613 */
2614 public CompletableFuture<Void> runAfterEitherAsync
2615 (CompletableFuture<?> other,
2616 Runnable action,
2617 Executor executor) {
2618 if (executor == null) throw new NullPointerException();
2619 return doOrRun(other, action, executor);
2620 }
2621
2622 private CompletableFuture<Void> doOrRun(CompletableFuture<?> other,
2623 Runnable action,
2624 Executor e) {
2625 if (other == null || action == null) throw new NullPointerException();
2626 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2627 OrRunCompletion<T> d = null;
2628 Object r;
2629 if ((r = result) == null && (r = other.result) == null) {
2630 d = new OrRunCompletion<T>(this, other, action, dst, e);
2631 CompletionNode q = null, p = new CompletionNode(d);
2632 while ((r = result) == null && (r = other.result) == null) {
2633 if (q != null) {
2634 if (UNSAFE.compareAndSwapObject
2635 (other, COMPLETIONS, q.next = other.completions, q))
2636 break;
2637 }
2638 else if (UNSAFE.compareAndSwapObject
2639 (this, COMPLETIONS, p.next = completions, p))
2640 q = new CompletionNode(d);
2641 }
2642 }
2643 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2644 Throwable ex;
2645 if (r instanceof AltResult)
2646 ex = ((AltResult)r).ex;
2647 else
2648 ex = null;
2649 if (ex == null) {
2650 try {
2651 if (e != null)
2652 e.execute(new AsyncRun(action, dst));
2653 else
2654 action.run();
2655 } catch (Throwable rex) {
2656 ex = rex;
2657 }
2658 }
2659 if (e == null || ex != null)
2660 dst.internalComplete(null, ex);
2661 }
2662 helpPostComplete();
2663 other.helpPostComplete();
2664 return dst;
2665 }
2666
2667 /**
2668 * Returns a CompletableFuture that upon completion, has the same
2669 * value as produced by the given function of the result of this
2670 * CompletableFuture.
2671 *
2672 * <p>If this CompletableFuture completes exceptionally, then the
2673 * returned CompletableFuture also does so, with a
2674 * CompletionException holding this exception as its cause.
2675 * Similarly, if the computed CompletableFuture completes
2676 * exceptionally, then so does the returned CompletableFuture.
2677 *
2678 * @param fn the function returning a new CompletableFuture
2679 * @return the CompletableFuture
2680 */
2681 public <U> CompletableFuture<U> thenCompose
2682 (Function<? super T, CompletableFuture<U>> fn) {
2683 return doCompose(fn, null);
2684 }
2685
2686 /**
2687 * Returns a CompletableFuture that upon completion, has the same
2688 * value as that produced asynchronously using the {@link
2689 * ForkJoinPool#commonPool()} by the given function of the result
2690 * of this CompletableFuture.
2691 *
2692 * <p>If this CompletableFuture completes exceptionally, then the
2693 * returned CompletableFuture also does so, with a
2694 * CompletionException holding this exception as its cause.
2695 * Similarly, if the computed CompletableFuture completes
2696 * exceptionally, then so does the returned CompletableFuture.
2697 *
2698 * @param fn the function returning a new CompletableFuture
2699 * @return the CompletableFuture
2700 */
2701 public <U> CompletableFuture<U> thenComposeAsync
2702 (Function<? super T, CompletableFuture<U>> fn) {
2703 return doCompose(fn, ForkJoinPool.commonPool());
2704 }
2705
2706 /**
2707 * Returns a CompletableFuture that upon completion, has the same
2708 * value as that produced asynchronously using the given executor
2709 * by the given function of this CompletableFuture.
2710 *
2711 * <p>If this CompletableFuture completes exceptionally, then the
2712 * returned CompletableFuture also does so, with a
2713 * CompletionException holding this exception as its cause.
2714 * Similarly, if the computed CompletableFuture completes
2715 * exceptionally, then so does the returned CompletableFuture.
2716 *
2717 * @param fn the function returning a new CompletableFuture
2718 * @param executor the executor to use for asynchronous execution
2719 * @return the CompletableFuture
2720 */
2721 public <U> CompletableFuture<U> thenComposeAsync
2722 (Function<? super T, CompletableFuture<U>> fn,
2723 Executor executor) {
2724 if (executor == null) throw new NullPointerException();
2725 return doCompose(fn, executor);
2726 }
2727
2728 private <U> CompletableFuture<U> doCompose
2729 (Function<? super T, CompletableFuture<U>> fn,
2730 Executor e) {
2731 if (fn == null) throw new NullPointerException();
2732 CompletableFuture<U> dst = null;
2733 ComposeCompletion<T,U> d = null;
2734 Object r;
2735 if ((r = result) == null) {
2736 dst = new CompletableFuture<U>();
2737 CompletionNode p = new CompletionNode
2738 (d = new ComposeCompletion<T,U>(this, fn, dst, e));
2739 while ((r = result) == null) {
2740 if (UNSAFE.compareAndSwapObject
2741 (this, COMPLETIONS, p.next = completions, p))
2742 break;
2743 }
2744 }
2745 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2746 T t; Throwable ex;
2747 if (r instanceof AltResult) {
2748 ex = ((AltResult)r).ex;
2749 t = null;
2750 }
2751 else {
2752 ex = null;
2753 @SuppressWarnings("unchecked") T tr = (T) r;
2754 t = tr;
2755 }
2756 if (ex == null) {
2757 if (e != null) {
2758 if (dst == null)
2759 dst = new CompletableFuture<U>();
2760 e.execute(new AsyncCompose<T,U>(t, fn, dst));
2761 }
2762 else {
2763 try {
2764 if ((dst = fn.apply(t)) == null)
2765 ex = new NullPointerException();
2766 } catch (Throwable rex) {
2767 ex = rex;
2768 }
2769 if (dst == null)
2770 dst = new CompletableFuture<U>();
2771 }
2772 }
2773 if (e == null && ex != null)
2774 dst.internalComplete(null, ex);
2775 }
2776 helpPostComplete();
2777 dst.helpPostComplete();
2778 return dst;
2779 }
2780
2781 /**
2782 * Returns a new CompletableFuture that is completed when this
2783 * CompletableFuture completes, with the result of the given
2784 * function of the exception triggering this CompletableFuture's
2785 * completion when it completes exceptionally; otherwise, if this
2786 * CompletableFuture completes normally, then the returned
2787 * CompletableFuture also completes normally with the same value.
2788 *
2789 * @param fn the function to use to compute the value of the
2790 * returned CompletableFuture if this CompletableFuture completed
2791 * exceptionally
2792 * @return the new CompletableFuture
2793 */
2794 public CompletableFuture<T> exceptionally
2795 (Function<Throwable, ? extends T> fn) {
2796 if (fn == null) throw new NullPointerException();
2797 CompletableFuture<T> dst = new CompletableFuture<T>();
2798 ExceptionCompletion<T> d = null;
2799 Object r;
2800 if ((r = result) == null) {
2801 CompletionNode p =
2802 new CompletionNode(d = new ExceptionCompletion<T>(this, fn, dst));
2803 while ((r = result) == null) {
2804 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2805 p.next = completions, p))
2806 break;
2807 }
2808 }
2809 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2810 T t = null; Throwable ex, dx = null;
2811 if (r instanceof AltResult) {
2812 if ((ex = ((AltResult)r).ex) != null) {
2813 try {
2814 t = fn.apply(ex);
2815 } catch (Throwable rex) {
2816 dx = rex;
2817 }
2818 }
2819 }
2820 else {
2821 @SuppressWarnings("unchecked") T tr = (T) r;
2822 t = tr;
2823 }
2824 dst.internalComplete(t, dx);
2825 }
2826 helpPostComplete();
2827 return dst;
2828 }
2829
2830 /**
2831 * Returns a new CompletableFuture that is completed when this
2832 * CompletableFuture completes, with the result of the given
2833 * function of the result and exception of this CompletableFuture's
2834 * completion. The given function is invoked with the result (or
2835 * {@code null} if none) and the exception (or {@code null} if none)
2836 * of this CompletableFuture when complete.
2837 *
2838 * @param fn the function to use to compute the value of the
2839 * returned CompletableFuture
2840 * @return the new CompletableFuture
2841 */
2842 public <U> CompletableFuture<U> handle
2843 (BiFunction<? super T, Throwable, ? extends U> fn) {
2844 if (fn == null) throw new NullPointerException();
2845 CompletableFuture<U> dst = new CompletableFuture<U>();
2846 HandleCompletion<T,U> d = null;
2847 Object r;
2848 if ((r = result) == null) {
2849 CompletionNode p =
2850 new CompletionNode(d = new HandleCompletion<T,U>(this, fn, dst));
2851 while ((r = result) == null) {
2852 if (UNSAFE.compareAndSwapObject(this, COMPLETIONS,
2853 p.next = completions, p))
2854 break;
2855 }
2856 }
2857 if (r != null && (d == null || d.compareAndSet(0, 1))) {
2858 T t; Throwable ex;
2859 if (r instanceof AltResult) {
2860 ex = ((AltResult)r).ex;
2861 t = null;
2862 }
2863 else {
2864 ex = null;
2865 @SuppressWarnings("unchecked") T tr = (T) r;
2866 t = tr;
2867 }
2868 U u; Throwable dx;
2869 try {
2870 u = fn.apply(t, ex);
2871 dx = null;
2872 } catch (Throwable rex) {
2873 dx = rex;
2874 u = null;
2875 }
2876 dst.internalComplete(u, dx);
2877 }
2878 helpPostComplete();
2879 return dst;
2880 }
2881
2882
2883 /* ------------- Arbitrary-arity constructions -------------- */
2884
2885 /*
2886 * The basic plan of attack is to recursively form binary
2887 * completion trees of elements. This can be overkill for small
2888 * sets, but scales nicely. The And/All vs Or/Any forms use the
2889 * same idea, but details differ.
2890 */
2891
2892 /**
2893 * Returns a new CompletableFuture that is completed when all of
2894 * the given CompletableFutures complete. If any of the given
2895 * CompletableFutures complete exceptionally, then the returned
2896 * CompletableFuture also does so, with a CompletionException
2897 * holding this exception as its cause. Otherwise, the results,
2898 * if any, of the given CompletableFutures are not reflected in
2899 * the returned CompletableFuture, but may be obtained by
2900 * inspecting them individually. If no CompletableFutures are
2901 * provided, returns a CompletableFuture completed with the value
2902 * {@code null}.
2903 *
2904 * <p>Among the applications of this method is to await completion
2905 * of a set of independent CompletableFutures before continuing a
2906 * program, as in: {@code CompletableFuture.allOf(c1, c2,
2907 * c3).join();}.
2908 *
2909 * @param cfs the CompletableFutures
2910 * @return a new CompletableFuture that is completed when all of the
2911 * given CompletableFutures complete
2912 * @throws NullPointerException if the array or any of its elements are
2913 * {@code null}
2914 */
2915 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
2916 int len = cfs.length; // Directly handle empty and singleton cases
2917 if (len > 1)
2918 return allTree(cfs, 0, len - 1);
2919 else {
2920 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2921 CompletableFuture<?> f;
2922 if (len == 0)
2923 dst.result = NIL;
2924 else if ((f = cfs[0]) == null)
2925 throw new NullPointerException();
2926 else {
2927 ThenCopy d = null;
2928 CompletionNode p = null;
2929 Object r;
2930 while ((r = f.result) == null) {
2931 if (d == null)
2932 d = new ThenCopy(f, dst);
2933 else if (p == null)
2934 p = new CompletionNode(d);
2935 else if (UNSAFE.compareAndSwapObject
2936 (f, COMPLETIONS, p.next = f.completions, p))
2937 break;
2938 }
2939 if (r != null && (d == null || d.compareAndSet(0, 1)))
2940 dst.internalComplete(null, (r instanceof AltResult) ?
2941 ((AltResult)r).ex : null);
2942 f.helpPostComplete();
2943 }
2944 return dst;
2945 }
2946 }
2947
2948 /**
2949 * Recursively constructs an And'ed tree of CompletableFutures.
2950 * Called only when array known to have at least two elements.
2951 */
2952 private static CompletableFuture<Void> allTree(CompletableFuture<?>[] cfs,
2953 int lo, int hi) {
2954 CompletableFuture<?> fst, snd;
2955 int mid = (lo + hi) >>> 1;
2956 if ((fst = (lo == mid ? cfs[lo] : allTree(cfs, lo, mid))) == null ||
2957 (snd = (hi == mid+1 ? cfs[hi] : allTree(cfs, mid+1, hi))) == null)
2958 throw new NullPointerException();
2959 CompletableFuture<Void> dst = new CompletableFuture<Void>();
2960 AndCompletion d = null;
2961 CompletionNode p = null, q = null;
2962 Object r = null, s = null;
2963 while ((r = fst.result) == null || (s = snd.result) == null) {
2964 if (d == null)
2965 d = new AndCompletion(fst, snd, dst);
2966 else if (p == null)
2967 p = new CompletionNode(d);
2968 else if (q == null) {
2969 if (UNSAFE.compareAndSwapObject
2970 (fst, COMPLETIONS, p.next = fst.completions, p))
2971 q = new CompletionNode(d);
2972 }
2973 else if (UNSAFE.compareAndSwapObject
2974 (snd, COMPLETIONS, q.next = snd.completions, q))
2975 break;
2976 }
2977 if ((r != null || (r = fst.result) != null) &&
2978 (s != null || (s = snd.result) != null) &&
2979 (d == null || d.compareAndSet(0, 1))) {
2980 Throwable ex;
2981 if (r instanceof AltResult)
2982 ex = ((AltResult)r).ex;
2983 else
2984 ex = null;
2985 if (ex == null && (s instanceof AltResult))
2986 ex = ((AltResult)s).ex;
2987 dst.internalComplete(null, ex);
2988 }
2989 fst.helpPostComplete();
2990 snd.helpPostComplete();
2991 return dst;
2992 }
2993
2994 /**
2995 * Returns a new CompletableFuture that is completed when any of the
2996 * given CompletableFutures complete, with the same result if it
2997 * completed normally. Otherwise, if it completed exceptionally,
2998 * the returned CompletableFuture also does so, with a
2999 * CompletionException holding this exception as its cause. If no
3000 * CompletableFutures are provided, returns an incomplete
3001 * CompletableFuture.
3002 *
3003 * @param cfs the CompletableFutures
3004 * @return a new CompletableFuture that is completed when any of the
3005 * given CompletableFutures complete
3006 * @throws NullPointerException if the array or any of its elements are
3007 * {@code null}
3008 */
3009 public static CompletableFuture<?> anyOf(CompletableFuture<?>... cfs) {
3010 int len = cfs.length; // Same idea as allOf
3011 if (len > 1)
3012 return anyTree(cfs, 0, len - 1);
3013 else {
3014 CompletableFuture<?> dst = new CompletableFuture<Object>();
3015 CompletableFuture<?> f;
3016 if (len == 0)
3017 ; // skip
3018 else if ((f = cfs[0]) == null)
3019 throw new NullPointerException();
3020 else {
3021 ThenCopy d = null;
3022 CompletionNode p = null;
3023 Object r;
3024 while ((r = f.result) == null) {
3025 if (d == null)
3026 d = new ThenCopy(f, dst);
3027 else if (p == null)
3028 p = new CompletionNode(d);
3029 else if (UNSAFE.compareAndSwapObject
3030 (f, COMPLETIONS, p.next = f.completions, p))
3031 break;
3032 }
3033 if (r != null && (d == null || d.compareAndSet(0, 1))) {
3034 Throwable ex; Object t;
3035 if (r instanceof AltResult) {
3036 ex = ((AltResult)r).ex;
3037 t = null;
3038 }
3039 else {
3040 ex = null;
3041 t = r;
3042 }
3043 dst.internalComplete(t, ex);
3044 }
3045 f.helpPostComplete();
3046 }
3047 return dst;
3048 }
3049 }
3050
3051 /**
3052 * Recursively constructs an Or'ed tree of CompletableFutures.
3053 */
3054 private static CompletableFuture<?> anyTree(CompletableFuture<?>[] cfs,
3055 int lo, int hi) {
3056 CompletableFuture<?> fst, snd;
3057 int mid = (lo + hi) >>> 1;
3058 if ((fst = (lo == mid ? cfs[lo] : anyTree(cfs, lo, mid))) == null ||
3059 (snd = (hi == mid+1 ? cfs[hi] : anyTree(cfs, mid+1, hi))) == null)
3060 throw new NullPointerException();
3061 CompletableFuture<?> dst = new CompletableFuture<Object>();
3062 OrCompletion d = null;
3063 CompletionNode p = null, q = null;
3064 Object r;
3065 while ((r = fst.result) == null && (r = snd.result) == null) {
3066 if (d == null)
3067 d = new OrCompletion(fst, snd, dst);
3068 else if (p == null)
3069 p = new CompletionNode(d);
3070 else if (q == null) {
3071 if (UNSAFE.compareAndSwapObject
3072 (fst, COMPLETIONS, p.next = fst.completions, p))
3073 q = new CompletionNode(d);
3074 }
3075 else if (UNSAFE.compareAndSwapObject
3076 (snd, COMPLETIONS, q.next = snd.completions, q))
3077 break;
3078 }
3079 if ((r != null || (r = fst.result) != null ||
3080 (r = snd.result) != null) &&
3081 (d == null || d.compareAndSet(0, 1))) {
3082 Throwable ex; Object t;
3083 if (r instanceof AltResult) {
3084 ex = ((AltResult)r).ex;
3085 t = null;
3086 }
3087 else {
3088 ex = null;
3089 t = r;
3090 }
3091 dst.internalComplete(t, ex);
3092 }
3093 fst.helpPostComplete();
3094 snd.helpPostComplete();
3095 return dst;
3096 }
3097
3098
3099 /* ------------- Control and status methods -------------- */
3100
3101 /**
3102 * If not already completed, completes this CompletableFuture with
3103 * a {@link CancellationException}. Dependent CompletableFutures
3104 * that have not already completed will also complete
3105 * exceptionally, with a {@link CompletionException} caused by
3106 * this {@code CancellationException}.
3107 *
3108 * @param mayInterruptIfRunning this value has no effect in this
3109 * implementation because interrupts are not used to control
3110 * processing.
3111 *
3112 * @return {@code true} if this task is now cancelled
3113 */
3114 public boolean cancel(boolean mayInterruptIfRunning) {
3115 boolean cancelled = (result == null) &&
3116 UNSAFE.compareAndSwapObject
3117 (this, RESULT, null, new AltResult(new CancellationException()));
3118 postComplete();
3119 return cancelled || isCancelled();
3120 }
3121
3122 /**
3123 * Returns {@code true} if this CompletableFuture was cancelled
3124 * before it completed normally.
3125 *
3126 * @return {@code true} if this CompletableFuture was cancelled
3127 * before it completed normally
3128 */
3129 public boolean isCancelled() {
3130 Object r;
3131 return ((r = result) instanceof AltResult) &&
3132 (((AltResult)r).ex instanceof CancellationException);
3133 }
3134
3135 /**
3136 * Forcibly sets or resets the value subsequently returned by
3137 * method {@link #get()} and related methods, whether or not
3138 * already completed. This method is designed for use only in
3139 * error recovery actions, and even in such situations may result
3140 * in ongoing dependent completions using established versus
3141 * overwritten outcomes.
3142 *
3143 * @param value the completion value
3144 */
3145 public void obtrudeValue(T value) {
3146 result = (value == null) ? NIL : value;
3147 postComplete();
3148 }
3149
3150 /**
3151 * Forcibly causes subsequent invocations of method {@link #get()}
3152 * and related methods to throw the given exception, whether or
3153 * not already completed. This method is designed for use only in
3154 * recovery actions, and even in such situations may result in
3155 * ongoing dependent completions using established versus
3156 * overwritten outcomes.
3157 *
3158 * @param ex the exception
3159 */
3160 public void obtrudeException(Throwable ex) {
3161 if (ex == null) throw new NullPointerException();
3162 result = new AltResult(ex);
3163 postComplete();
3164 }
3165
3166 /**
3167 * Returns the estimated number of CompletableFutures whose
3168 * completions are awaiting completion of this CompletableFuture.
3169 * This method is designed for use in monitoring system state, not
3170 * for synchronization control.
3171 *
3172 * @return the number of dependent CompletableFutures
3173 */
3174 public int getNumberOfDependents() {
3175 int count = 0;
3176 for (CompletionNode p = completions; p != null; p = p.next)
3177 ++count;
3178 return count;
3179 }
3180
3181 /**
3182 * Returns a string identifying this CompletableFuture, as well as
3183 * its completion state. The state, in brackets, contains the
3184 * String {@code "Completed Normally"} or the String {@code
3185 * "Completed Exceptionally"}, or the String {@code "Not
3186 * completed"} followed by the number of CompletableFutures
3187 * dependent upon its completion, if any.
3188 *
3189 * @return a string identifying this CompletableFuture, as well as its state
3190 */
3191 public String toString() {
3192 Object r = result;
3193 int count;
3194 return super.toString() +
3195 ((r == null) ?
3196 (((count = getNumberOfDependents()) == 0) ?
3197 "[Not completed]" :
3198 "[Not completed, " + count + " dependents]") :
3199 (((r instanceof AltResult) && ((AltResult)r).ex != null) ?
3200 "[Completed exceptionally]" :
3201 "[Completed normally]"));
3202 }
3203
3204 // Unsafe mechanics
3205 private static final sun.misc.Unsafe UNSAFE;
3206 private static final long RESULT;
3207 private static final long WAITERS;
3208 private static final long COMPLETIONS;
3209 static {
3210 try {
3211 UNSAFE = sun.misc.Unsafe.getUnsafe();
3212 Class<?> k = CompletableFuture.class;
3213 RESULT = UNSAFE.objectFieldOffset
3214 (k.getDeclaredField("result"));
3215 WAITERS = UNSAFE.objectFieldOffset
3216 (k.getDeclaredField("waiters"));
3217 COMPLETIONS = UNSAFE.objectFieldOffset
3218 (k.getDeclaredField("completions"));
3219 } catch (Exception e) {
3220 throw new Error(e);
3221 }
3222 }
3223 }